feat(REQ-2752) - 调整运行时的数据同步 ES 的逻辑

This commit is contained in:
wangli 2024-10-16 16:53:12 +08:00
parent f70a7d5b22
commit 6ad025c642
3 changed files with 59 additions and 57 deletions

View File

@ -1,56 +0,0 @@
package cn.axzo.workflow.listener;
import cn.axzo.workflow.core.engine.listener.entity.EntityEventHandle;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.service.impl.persistence.entity.TaskEntity;
import org.springframework.stereotype.Component;
/**
* 用于处理非结束的实例的数据同步至 ES
*
* @author wangli
* @since 2024-09-06 00:02
*/
@Slf4j
@Component
@AllArgsConstructor
public class SyncEsTaskEntityEventHandle implements EntityEventHandle<TaskEntity> {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final EsProcessInstanceService esProcessInstanceService;
@Override
public boolean support(Object entity) {
return entity instanceof TaskEntity;
}
@Override
public TaskEntity convert(Object entity) {
return (TaskEntity) entity;
}
@Override
public void onInitialized(TaskEntity taskEntity) {
log.info("SyncEsTaskEntityEventHandle onInitialized processInstanceId:{}, taskId: {}", taskEntity.getProcessInstanceId(), taskEntity.getId());
esProcessInstanceService.delete(taskEntity.getProcessInstanceId());
log.info("delete document processInstanceId: {}", taskEntity.getProcessInstanceId());
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(taskEntity.getProcessInstanceId())
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("reInsert document processInstanceId: {}", taskEntity.getProcessInstanceId());
}
}

View File

@ -34,7 +34,7 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELAT
@Component
@Scope("prototype")
@AllArgsConstructor
public class SnapshotBpmnTaskTaskEvent_100_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
public class SnapshotBpmnTaskEvent_100_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
@Override
public int getOrder() {
return Integer.MIN_VALUE + 100;

View File

@ -0,0 +1,58 @@
package cn.axzo.workflow.server.controller.listener.task;
import cn.axzo.workflow.core.common.context.TaskOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnTaskEventListener;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
/**
* 同步任务相关数据至 ES
*
* @author wangli
* @since 2024-10-16 16:50
*/
@Slf4j
@Component
@Scope("prototype")
@AllArgsConstructor
public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final EsProcessInstanceService esProcessInstanceService;
@Override
public int getOrder() {
return Integer.MIN_VALUE + 104;
}
/**
* 用户任务已指派审核人
*
* @param delegateTask
*/
@Override
public void onAssigned(DelegateTask delegateTask) {
log.info("SyncEsTaskEntityEventHandle onInitialized processInstanceId:{}, taskId: {}", delegateTask.getProcessInstanceId(), delegateTask.getId());
esProcessInstanceService.delete(delegateTask.getProcessInstanceId());
log.info("delete document processInstanceId: {}", delegateTask.getProcessInstanceId());
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(delegateTask.getProcessInstanceId())
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("reInsert document processInstanceId: {}", delegateTask.getProcessInstanceId());
}
}