From 611c0351d13d77f1ae3cea651b8f2aabe6383f45 Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Thu, 21 Nov 2024 09:47:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-3004)=20-=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=8A=A5=E5=91=8A=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/conf/SupportRefreshProperties.java | 3 ++ .../common/aspectj/ErrorReportAspect.java | 7 ++-- .../ElasticSearchBatchSyncListener.java | 39 ++++++++++++++++++- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java index e087425a4..0ce9d8beb 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java @@ -75,6 +75,9 @@ public class SupportRefreshProperties { @Value("${workflow.useNewToAdminApi:false}") private Boolean useNewToAdminApi; + @Value("${workflow.sendDingTalk:true}") + private Boolean sendDingTalk; + @Value("${workflow.esSyncBatchSize:10}") private Integer esSyncBatchSize; } diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java index 2c7a4f033..9619ce85a 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java @@ -1,6 +1,7 @@ package cn.axzo.workflow.server.common.aspectj; import cn.axzo.workflow.core.common.event.ApiLogEvent; +import cn.axzo.workflow.core.conf.SupportRefreshProperties; import cn.axzo.workflow.server.common.annotation.EnvConfig; import cn.axzo.workflow.server.common.annotation.ErrorReporter; import cn.axzo.workflow.server.common.config.property.WorkflowProperties; @@ -47,8 +48,8 @@ import static cn.azxo.framework.common.constatns.Constants.CTX_LOG_ID_MDC; public class ErrorReportAspect implements Ordered { @Value("${spring.profiles.active}") private String profile; - @Value("${workflow.sendDingTalk:true}") - private Boolean sendDingTalk; + @Resource + private SupportRefreshProperties refreshProperties; @Resource private ApplicationEventPublisher applicationEventPublisher; @Resource @@ -141,7 +142,7 @@ public class ErrorReportAspect implements Ordered { EnvConfig[] envConfigs = errorReporter.envConfig(); for (EnvConfig envConfig : envConfigs) { if (Arrays.asList(envConfig.profiles()).contains(profile)) { - boolean filterSendDingTalk = sendDingTalk; + boolean filterSendDingTalk = refreshProperties.getSendDingTalk(); if (workflowProperties.getFilterSendDingTalk().contains(operation.summary())) { filterSendDingTalk = false; } diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java index 8e80a01e5..e1a6b307b 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java @@ -1,6 +1,11 @@ package cn.axzo.workflow.server.mq.inside.consumer; +import cn.axzo.framework.domain.data.IdHelper; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.utils.TraceUtils; import cn.axzo.workflow.core.conf.SupportRefreshProperties; +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Strings; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -9,12 +14,15 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; +import java.util.Map; /** * 批量同步 ES @@ -42,10 +50,10 @@ public class ElasticSearchBatchSyncListener { consumer.subscribe(topic, "*"); consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize()); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { - log.warn("拉取消息条数:{}", msgs.size()); try { for (MessageExt msg : msgs) { String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8); + Event event = convertEvent(msg, msgBody); } } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; @@ -54,4 +62,33 @@ public class ElasticSearchBatchSyncListener { }); // consumer.start(); } + + private Event convertEvent(MessageExt msg, String msgBody) { + try { + return JSONObject.parseObject(msgBody, Event.class); + } catch (Exception e) { + log.error("====MQ CONSUMER {} ===={}, msgId = {}, parse event error, event = {}", + getTraceId(msg), msg.getMsgId(), msgBody, e.getMessage(), e); + throw e; + } + } + + public String getTraceId(MessageExt msg) { + Map headers = msg.getProperties(); + MDC.put(TraceUtils.CTX_LOG_ID, headers.get(TraceUtils.TRACE_ID)); + MDC.put(TraceUtils.TRACE_ID, headers.get(TraceUtils.TRACE_ID)); + MDC.put(TraceUtils.TRACE_ID_IN_MDC, headers.get(TraceUtils.TRACE_ID)); + if (log.isDebugEnabled()) { + log.debug("received message, topic={}, headers={}", topic, headers); + } + String traceId = null; + if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) { + traceId = headers.get(TraceUtils.TRACE_ID); + } + if (Strings.isNullOrEmpty(traceId)) { + traceId = IdHelper.get32UUID(); + } + TraceUtils.putTraceId(traceId); + return traceId; + } }