fix - 调整运行时的实例同步 ES 的逻辑

This commit is contained in:
wangli 2024-11-04 11:12:26 +08:00
parent fcef179140
commit 771fc73ce7
11 changed files with 240 additions and 28 deletions

View File

@ -5,9 +5,9 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory;
import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator;
import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivitySetAssigneeJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityTriggerJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
@ -40,7 +40,6 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.lang.Nullable;
import java.time.Duration;
@ -70,7 +69,8 @@ public class FlowableConfiguration {
List<JobProcessor> jobProcessors,
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties,
SupportRefreshProperties refreshProperties) {
SupportRefreshProperties refreshProperties,
List<ProcessExtConfigurer> configurers) {
return configuration -> {
configuration.setEnableHistoricTaskLogging(true);
configuration.setHistoryLevel(HistoryLevel.AUDIT);
@ -100,6 +100,7 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler());
configurers.forEach(i-> configuration.addCustomJobHandler(i.getJobHandler()));
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);

View File

@ -0,0 +1,13 @@
package cn.axzo.workflow.core.conf;
import org.flowable.job.service.JobHandler;
/**
* TODO
*
* @author wangli
* @since 2024-11-04 10:58
*/
public interface ProcessExtConfigurer {
JobHandler getJobHandler();
}

View File

@ -70,5 +70,5 @@ public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandl
extAxPropertyService.delete(processInstanceId, jobId);
}
abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);
public abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);
}

View File

@ -27,7 +27,7 @@ public class AsyncAbortProcessInstanceJobHandler extends AbstractExecuteWithLock
}
@Override
void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncAbortProcessInstanceHandler executing...,jobInfo:{}", JSONUtil.toJsonStr(job));
log(job);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);

View File

@ -0,0 +1,32 @@
package cn.axzo.workflow.es.conf;
import cn.axzo.workflow.core.conf.ProcessExtConfigurer;
import cn.axzo.workflow.es.flowable.ext.AsyncElasticSearchSyncJobHandler;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import org.flowable.job.service.JobHandler;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.spring.boot.EngineConfigurationConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* ES 相关对 Flowable 扩展的配置
*
* @author wangli
* @since 2024-11-04 10:39
*/
@Component
public class FlowableElasticSearchConfiguration implements ProcessExtConfigurer {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
public FlowableElasticSearchConfiguration(AggregateProcessInstanceService aggregateProcessInstanceService) {
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
}
@Override
public JobHandler getJobHandler() {
return new AsyncElasticSearchSyncJobHandler(aggregateProcessInstanceService);
}
}

View File

@ -0,0 +1,44 @@
package cn.axzo.workflow.es.flowable.ext;
import cn.axzo.workflow.core.engine.job.AbstractExecuteWithLockJobHandler;
import cn.axzo.workflow.core.engine.job.AbstractJobHandler;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
/**
* 同步 ElasticSearch 数据
*
* @author wangli
* @since 2024-11-04 10:09
*/
@Slf4j
public class AsyncElasticSearchSyncJobHandler extends AbstractExecuteWithLockJobHandler implements JobHandler {
public static final String TYPE = "async-es-sync";
private final AggregateProcessInstanceService aggregateProcessInstanceService;
public AsyncElasticSearchSyncJobHandler(AggregateProcessInstanceService aggregateProcessInstanceService) {
super();
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncElasticSearchSyncJobHandler exec start...");
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
String processInstanceId = JSONUtil.toBean(job.getCustomValues(), String.class);
processEngineConfiguration.getCommandExecutor().execute(new CustomElasticSearchCmd(processInstanceId, aggregateProcessInstanceService));
}
}

View File

@ -0,0 +1,70 @@
package cn.axzo.workflow.es.flowable.ext;
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
import com.alibaba.fastjson.JSON;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* 自定义同步处理器
*
* @author wangli
* @since 2024-11-04 10:00
*/
public class CustomElasticSearchAsyncCmd extends AbstractCommand<Void> implements Serializable {
private final String processInstanceId;
public CustomElasticSearchAsyncCmd(String processInstanceId) {
this.processInstanceId = processInstanceId;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("processInstanceId", processInstanceId);
return JSON.toJSONString(params);
}
@Override
public Void executeInternal(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance instance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
startAsync(processEngineConfiguration, instance);
return null;
}
private void startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, HistoricProcessInstance instance) {
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity job = jobService.createJob();
// 这里的 executionId 可为 null
job.setExecutionId(instance.getId());
job.setProcessInstanceId(instance.getId());
job.setProcessDefinitionId(instance.getProcessDefinitionId());
job.setElementId(AsyncElasticSearchSyncJobHandler.TYPE);
job.setElementName(instance.getName());
job.setJobHandlerType(AsyncElasticSearchSyncJobHandler.TYPE);
job.setTenantId(instance.getTenantId());
// 携带自定义的数据
job.setCustomValues(processInstanceId);
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.workflow.es.flowable.ext;
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 自定义同步处理器
*
* @author wangli
* @since 2024-11-04 10:00
*/
@Slf4j
public class CustomElasticSearchCmd extends AbstractCommand<Void> implements Serializable {
private final String processInstanceId;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
public CustomElasticSearchCmd(String processInstanceId,
AggregateProcessInstanceService aggregateProcessInstanceService) {
this.processInstanceId = processInstanceId;
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("processInstanceId", processInstanceId);
return JSON.toJSONString(params);
}
@Override
public Void executeInternal(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
return null;
}
}

View File

@ -27,7 +27,6 @@ import org.springframework.stereotype.Component;
@Scope("prototype")
@AllArgsConstructor
public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Override
public int getOrder() {
@ -79,6 +78,6 @@ public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<Proc
private void syncToEs(String processInstanceId) {
String uuid = UUID.fastUUID().toString();
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, processInstanceId, null, uuid));
new OnTxCommittedSyncToEsListener(processInstanceId, null, uuid));
}
}

View File

@ -26,7 +26,6 @@ import org.springframework.stereotype.Component;
@Scope("prototype")
@AllArgsConstructor
public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Override
public int getOrder() {
@ -42,6 +41,6 @@ public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<Ta
public void onAssigned(DelegateTask delegateTask) {
String uuid = UUID.fastUUID().toString();
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, delegateTask.getProcessInstanceId(), delegateTask.getId(), uuid));
new OnTxCommittedSyncToEsListener(delegateTask.getProcessInstanceId(), delegateTask.getId(), uuid));
}
}

View File

@ -1,18 +1,12 @@
package cn.axzo.workflow.server.controller.listener.tx;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.es.flowable.ext.CustomElasticSearchAsyncCmd;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionListener;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.util.List;
/**
* 引擎内的动作事务提交后执行同步 ES
*
@ -22,7 +16,6 @@ import java.util.List;
@Slf4j
@AllArgsConstructor
public class OnTxCommittedSyncToEsListener implements TransactionListener {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final String processInstanceId;
private final String taskId;
private final String uuid;
@ -31,17 +24,20 @@ public class OnTxCommittedSyncToEsListener implements TransactionListener {
public void execute(CommandContext commandContext) {
log.info("SyncEsTaskEntityEventHandle onInitialized uuid:{}, processInstanceId:{}, taskId: {}", uuid, processInstanceId, taskId);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
// HistoryService historyService = processEngineConfiguration.getHistoryService();
// HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
// .processInstanceId(processInstanceId)
// .singleResult();
// // 删除指定实例的父子文档
// log.info("delete document processInstanceId: {}", processInstanceId);
// aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
// log.info("reInsert document processInstanceId: {}", processInstanceId);
// List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
// log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
CommandContextUtil.getProcessEngineConfiguration().getCommandExecutor()
.execute(new CustomElasticSearchAsyncCmd(processInstanceId));
}
}