feat(REQ-2752) - 调整运行中的实例动作注册事务提交后执行的监听。

This commit is contained in:
wangli 2024-10-18 16:42:53 +08:00
parent 0bd59ecb9a
commit 0ef090dca7
5 changed files with 61 additions and 45 deletions

View File

@ -62,9 +62,12 @@ public class WorkflowEngineClientAutoConfiguration {
log.error("get version error: {}", e.getMessage(), e); log.error("get version error: {}", e.getMessage(), e);
} }
String serviceVersion = Objects.isNull(version) ? "1.2.0-SNAPSHOT" : version; String serviceVersion = Objects.isNull(version) ? "1.2.0" : version;
log.info("client current version: {}", serviceVersion); log.info("client current version: {}", serviceVersion);
return serviceVersion; return serviceVersion
.replaceAll("-SNAPSHOT", "")
.replaceAll("-RELEASE", "")
.trim();
} }
private String getVersionFromPod(URL location) throws URISyntaxException { private String getVersionFromPod(URL location) throws URISyntaxException {

View File

@ -100,7 +100,9 @@ public class AggregateProcessInstanceService {
// 实例纬度数据同步 ES // 实例纬度数据同步 ES
esProcessInstanceService.insert(ES_FIXED_ROUTING, processInstanceDocument); esProcessInstanceService.insert(ES_FIXED_ROUTING, processInstanceDocument);
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)); String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121))
.replaceAll("-SNAPSHOT","")
.replaceAll("-RELEASE", "");
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>(); List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>();

View File

@ -4,14 +4,13 @@ import cn.axzo.workflow.core.common.context.ProcessOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener; import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnProcessEventListener; import cn.axzo.workflow.core.listener.BpmnProcessEventListener;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.engine.HistoryService; import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.engine.delegate.event.FlowableCancelledEvent; import org.flowable.engine.delegate.event.FlowableCancelledEvent;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -77,18 +76,7 @@ public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<Proc
} }
private void syncToEs(String processInstanceId) { private void syncToEs(String processInstanceId) {
log.info("SyncToEsProcessEventListener processInstanceId:{}", processInstanceId); Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, processInstanceId, null));
// 删除指定实例的父子文档
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("delete document processInstanceId: {}", processInstanceId);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("reInsert document processInstanceId: {}", processInstanceId);
} }
} }

View File

@ -3,26 +3,17 @@ package cn.axzo.workflow.server.controller.listener.task;
import cn.axzo.workflow.core.common.context.TaskOperationContext; import cn.axzo.workflow.core.common.context.TaskOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener; import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnTaskEventListener; import cn.axzo.workflow.core.listener.BpmnTaskEventListener;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.es.service.impl.EsProcessTaskServiceImpl; import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService; import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.common.engine.impl.context.Context;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* 同步任务相关数据至 ES * 同步任务相关数据至 ES
* *
@ -48,18 +39,7 @@ public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<Ta
*/ */
@Override @Override
public void onAssigned(DelegateTask delegateTask) { public void onAssigned(DelegateTask delegateTask) {
log.info("SyncEsTaskEntityEventHandle onInitialized processInstanceId:{}, taskId: {}", delegateTask.getProcessInstanceId(), delegateTask.getId()); Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, delegateTask.getProcessInstanceId(), delegateTask.getId()));
// 删除指定实例的父子文档
aggregateProcessInstanceService.deleteDocumentParentAndChild(delegateTask.getProcessInstanceId());
log.info("delete document processInstanceId: {}", delegateTask.getProcessInstanceId());
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(delegateTask.getProcessInstanceId())
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("reInsert document processInstanceId: {}", delegateTask.getProcessInstanceId());
} }
} }

View File

@ -0,0 +1,43 @@
package cn.axzo.workflow.server.controller.listener.tx;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionListener;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
/**
* 引擎内的动作事务提交后执行同步 ES
*
* @author wangli
* @since 2024-10-18 16:34
*/
@Slf4j
@AllArgsConstructor
public class OnTxCommittedSyncToEsListener implements TransactionListener {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final String processInstanceId;
private final String taskId;
@Override
public void execute(CommandContext commandContext) {
log.info("SyncEsTaskEntityEventHandle onInitialized processInstanceId:{}, taskId: {}", processInstanceId, taskId);
// 删除指定实例的父子文档
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("delete document processInstanceId: {}", processInstanceId);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("reInsert document processInstanceId: {}", processInstanceId);
}
}