diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java index e1a6b307b..74c86e4e8 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java @@ -4,6 +4,8 @@ import cn.axzo.framework.domain.data.IdHelper; import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.utils.TraceUtils; import cn.axzo.workflow.core.conf.SupportRefreshProperties; +import cn.axzo.workflow.es.model.ProcessTaskDocument; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import lombok.SneakyThrows; @@ -14,6 +16,8 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.flowable.engine.HistoryService; +import org.flowable.engine.history.HistoricProcessInstance; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -22,7 +26,9 @@ import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; -import java.util.Map; +import java.util.*; + +import static cn.axzo.workflow.common.enums.ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC; /** * 批量同步 ES @@ -41,6 +47,11 @@ public class ElasticSearchBatchSyncListener { private String consumerGroup; @Value("topic_workflow_engine_${spring.profiles.active}") private String topic; + @Resource + private HistoryService historyService; + @Resource + private AggregateProcessInstanceService aggregateProcessInstanceService; + @SneakyThrows @PostConstruct public void init() { @@ -49,18 +60,39 @@ public class ElasticSearchBatchSyncListener { consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(topic, "*"); consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize()); + consumer.setConsumeThreadMax(1); + consumer.setConsumeThreadMin(1); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + log.warn("batch get msg size: {}", msgs.size()); + if(CollectionUtils.isEmpty(msgs)) { + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } try { + HashSet idSet = new HashSet<>(); for (MessageExt msg : msgs) { String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8); Event event = convertEvent(msg, msgBody); + if (support(event)) { + String processInstanceId = event.normalizedData(String.class); + idSet.add(processInstanceId); + log.info("batch sync es processIds: {}", processInstanceId); + } } + idSet.forEach(this::sync); } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); -// consumer.start(); + consumer.start(); + } + + private Boolean support(Event event) { + if (Objects.isNull(event)) { + return false; + } + return Objects.equals(event.getEventModule(), ELASTIC_SEARCH_SYNC.getModule()) + && Objects.equals(event.getEventName(), ELASTIC_SEARCH_SYNC.getTag()); } private Event convertEvent(MessageExt msg, String msgBody) { @@ -91,4 +123,16 @@ public class ElasticSearchBatchSyncListener { TraceUtils.putTraceId(traceId); return traceId; } + + private void sync(String processInstanceId) { + HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() + .processInstanceId(processInstanceId) + .singleResult(); + // 删除指定实例的父子文档 + log.info("delete document processInstanceId: {}", processInstanceId); + aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId); + log.info("reInsert document processInstanceId: {}", processInstanceId); + List processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null); + log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size()); + } }