feat(REQ-3004) - 调整日志报告的逻辑
This commit is contained in:
parent
fea19159db
commit
611c0351d1
@ -75,6 +75,9 @@ public class SupportRefreshProperties {
|
|||||||
@Value("${workflow.useNewToAdminApi:false}")
|
@Value("${workflow.useNewToAdminApi:false}")
|
||||||
private Boolean useNewToAdminApi;
|
private Boolean useNewToAdminApi;
|
||||||
|
|
||||||
|
@Value("${workflow.sendDingTalk:true}")
|
||||||
|
private Boolean sendDingTalk;
|
||||||
|
|
||||||
@Value("${workflow.esSyncBatchSize:10}")
|
@Value("${workflow.esSyncBatchSize:10}")
|
||||||
private Integer esSyncBatchSize;
|
private Integer esSyncBatchSize;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package cn.axzo.workflow.server.common.aspectj;
|
package cn.axzo.workflow.server.common.aspectj;
|
||||||
|
|
||||||
import cn.axzo.workflow.core.common.event.ApiLogEvent;
|
import cn.axzo.workflow.core.common.event.ApiLogEvent;
|
||||||
|
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
|
||||||
import cn.axzo.workflow.server.common.annotation.EnvConfig;
|
import cn.axzo.workflow.server.common.annotation.EnvConfig;
|
||||||
import cn.axzo.workflow.server.common.annotation.ErrorReporter;
|
import cn.axzo.workflow.server.common.annotation.ErrorReporter;
|
||||||
import cn.axzo.workflow.server.common.config.property.WorkflowProperties;
|
import cn.axzo.workflow.server.common.config.property.WorkflowProperties;
|
||||||
@ -47,8 +48,8 @@ import static cn.azxo.framework.common.constatns.Constants.CTX_LOG_ID_MDC;
|
|||||||
public class ErrorReportAspect implements Ordered {
|
public class ErrorReportAspect implements Ordered {
|
||||||
@Value("${spring.profiles.active}")
|
@Value("${spring.profiles.active}")
|
||||||
private String profile;
|
private String profile;
|
||||||
@Value("${workflow.sendDingTalk:true}")
|
@Resource
|
||||||
private Boolean sendDingTalk;
|
private SupportRefreshProperties refreshProperties;
|
||||||
@Resource
|
@Resource
|
||||||
private ApplicationEventPublisher applicationEventPublisher;
|
private ApplicationEventPublisher applicationEventPublisher;
|
||||||
@Resource
|
@Resource
|
||||||
@ -141,7 +142,7 @@ public class ErrorReportAspect implements Ordered {
|
|||||||
EnvConfig[] envConfigs = errorReporter.envConfig();
|
EnvConfig[] envConfigs = errorReporter.envConfig();
|
||||||
for (EnvConfig envConfig : envConfigs) {
|
for (EnvConfig envConfig : envConfigs) {
|
||||||
if (Arrays.asList(envConfig.profiles()).contains(profile)) {
|
if (Arrays.asList(envConfig.profiles()).contains(profile)) {
|
||||||
boolean filterSendDingTalk = sendDingTalk;
|
boolean filterSendDingTalk = refreshProperties.getSendDingTalk();
|
||||||
if (workflowProperties.getFilterSendDingTalk().contains(operation.summary())) {
|
if (workflowProperties.getFilterSendDingTalk().contains(operation.summary())) {
|
||||||
filterSendDingTalk = false;
|
filterSendDingTalk = false;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,11 @@
|
|||||||
package cn.axzo.workflow.server.mq.inside.consumer;
|
package cn.axzo.workflow.server.mq.inside.consumer;
|
||||||
|
|
||||||
|
import cn.axzo.framework.domain.data.IdHelper;
|
||||||
|
import cn.axzo.framework.rocketmq.Event;
|
||||||
|
import cn.axzo.framework.rocketmq.utils.TraceUtils;
|
||||||
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
|
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
@ -9,12 +14,15 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|||||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||||
|
import org.slf4j.MDC;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 批量同步 ES
|
* 批量同步 ES
|
||||||
@ -42,10 +50,10 @@ public class ElasticSearchBatchSyncListener {
|
|||||||
consumer.subscribe(topic, "*");
|
consumer.subscribe(topic, "*");
|
||||||
consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize());
|
consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize());
|
||||||
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
|
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
|
||||||
log.warn("拉取消息条数:{}", msgs.size());
|
|
||||||
try {
|
try {
|
||||||
for (MessageExt msg : msgs) {
|
for (MessageExt msg : msgs) {
|
||||||
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
|
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
|
||||||
|
Event event = convertEvent(msg, msgBody);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||||
@ -54,4 +62,33 @@ public class ElasticSearchBatchSyncListener {
|
|||||||
});
|
});
|
||||||
// consumer.start();
|
// consumer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Event convertEvent(MessageExt msg, String msgBody) {
|
||||||
|
try {
|
||||||
|
return JSONObject.parseObject(msgBody, Event.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("====MQ CONSUMER {} ===={}, msgId = {}, parse event error, event = {}",
|
||||||
|
getTraceId(msg), msg.getMsgId(), msgBody, e.getMessage(), e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTraceId(MessageExt msg) {
|
||||||
|
Map<String, String> headers = msg.getProperties();
|
||||||
|
MDC.put(TraceUtils.CTX_LOG_ID, headers.get(TraceUtils.TRACE_ID));
|
||||||
|
MDC.put(TraceUtils.TRACE_ID, headers.get(TraceUtils.TRACE_ID));
|
||||||
|
MDC.put(TraceUtils.TRACE_ID_IN_MDC, headers.get(TraceUtils.TRACE_ID));
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("received message, topic={}, headers={}", topic, headers);
|
||||||
|
}
|
||||||
|
String traceId = null;
|
||||||
|
if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) {
|
||||||
|
traceId = headers.get(TraceUtils.TRACE_ID);
|
||||||
|
}
|
||||||
|
if (Strings.isNullOrEmpty(traceId)) {
|
||||||
|
traceId = IdHelper.get32UUID();
|
||||||
|
}
|
||||||
|
TraceUtils.putTraceId(traceId);
|
||||||
|
return traceId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user