feat(REQ-3004) - 调整 ES 同步测试为批量模式

This commit is contained in:
wangli 2024-11-21 10:31:07 +08:00
parent 611c0351d1
commit cc879142d7

View File

@ -4,6 +4,8 @@ import cn.axzo.framework.domain.data.IdHelper;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.utils.TraceUtils;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import lombok.SneakyThrows;
@ -14,6 +16,8 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -22,7 +26,9 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.*;
import static cn.axzo.workflow.common.enums.ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC;
/**
* 批量同步 ES
@ -41,6 +47,11 @@ public class ElasticSearchBatchSyncListener {
private String consumerGroup;
@Value("topic_workflow_engine_${spring.profiles.active}")
private String topic;
@Resource
private HistoryService historyService;
@Resource
private AggregateProcessInstanceService aggregateProcessInstanceService;
@SneakyThrows
@PostConstruct
public void init() {
@ -49,18 +60,39 @@ public class ElasticSearchBatchSyncListener {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize());
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
log.warn("batch get msg size: {}", msgs.size());
if(CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
HashSet<String> idSet = new HashSet<>();
for (MessageExt msg : msgs) {
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
Event event = convertEvent(msg, msgBody);
if (support(event)) {
String processInstanceId = event.normalizedData(String.class);
idSet.add(processInstanceId);
log.info("batch sync es processIds: {}", processInstanceId);
}
}
idSet.forEach(this::sync);
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// consumer.start();
consumer.start();
}
private Boolean support(Event event) {
if (Objects.isNull(event)) {
return false;
}
return Objects.equals(event.getEventModule(), ELASTIC_SEARCH_SYNC.getModule())
&& Objects.equals(event.getEventName(), ELASTIC_SEARCH_SYNC.getTag());
}
private Event convertEvent(MessageExt msg, String msgBody) {
@ -91,4 +123,16 @@ public class ElasticSearchBatchSyncListener {
TraceUtils.putTraceId(traceId);
return traceId;
}
private void sync(String processInstanceId) {
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
}
}