feat(REQ-3004) - 优化审批实例数据同步 ES 的逻辑

This commit is contained in:
wangli 2024-11-21 09:26:03 +08:00
parent 7f74794c5a
commit fea19159db
3 changed files with 69 additions and 9 deletions

View File

@ -74,4 +74,7 @@ public class SupportRefreshProperties {
*/ */
@Value("${workflow.useNewToAdminApi:false}") @Value("${workflow.useNewToAdminApi:false}")
private Boolean useNewToAdminApi; private Boolean useNewToAdminApi;
@Value("${workflow.esSyncBatchSize:10}")
private Integer esSyncBatchSize;
} }

View File

@ -18,18 +18,18 @@ import javax.annotation.Resource;
* @author wangli * @author wangli
* @since 2024-11-06 15:05 * @since 2024-11-06 15:05
*/ */
@Configuration(proxyBeanMethods = false) //@Configuration(proxyBeanMethods = false)
public class SelfBoradcastRocketConfiguration { public class SelfBoradcastRocketConfiguration {
public static final String DEFAULT_EVENT = "topic_workflow_engine_"; public static final String DEFAULT_EVENT = "topic_workflow_engine_";
@Component // @Component
@ConditionalOnProperty(name = "rocketmq.name-server") // @ConditionalOnProperty(name = "rocketmq.name-server")
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}", // @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer", // consumerGroup = "GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer",
consumeMode = ConsumeMode.ORDERLY, // consumeMode = ConsumeMode.ORDERLY,
maxReconsumeTimes = 3, // maxReconsumeTimes = 3,
nameServer = "${rocketmq.name-server}" // nameServer = "${rocketmq.name-server}"
) // )
public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> { public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource @Resource
private EventConsumer eventConsumer; private EventConsumer eventConsumer;

View File

@ -0,0 +1,57 @@
package cn.axzo.workflow.server.mq.inside.consumer;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
* 批量同步 ES
*
* @author wangli
* @since 2024-11-20 18:06
*/
@Component
@Slf4j
public class ElasticSearchBatchSyncListener {
@Resource
private SupportRefreshProperties refreshProperties;
@Resource
private RocketMQProperties rocketMQProperties;
@Value("GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer")
private String consumerGroup;
@Value("topic_workflow_engine_${spring.profiles.active}")
private String topic;
@SneakyThrows
@PostConstruct
public void init() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize());
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
log.warn("拉取消息条数:{}", msgs.size());
try {
for (MessageExt msg : msgs) {
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
}
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// consumer.start();
}
}