update(REQ-2324) - 解决 MQ 发送方丢消息的问题

This commit is contained in:
wangli 2024-05-24 00:12:11 +08:00
parent a8c0ba88e3
commit ae8ef48277
17 changed files with 51 additions and 20 deletions

View File

@ -3,10 +3,10 @@ package cn.axzo.workflow.core.common.event;
import cn.axzo.workflow.core.conf.SupportRefreshProperties; import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.repository.entity.ExtAxMqLog; import cn.axzo.workflow.core.repository.entity.ExtAxMqLog;
import cn.axzo.workflow.core.service.ExtAxMqLogService; import cn.axzo.workflow.core.service.ExtAxMqLogService;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -29,7 +29,6 @@ public class MqLogListener implements ApplicationListener<MqLogEvent> {
@SneakyThrows @SneakyThrows
@Override @Override
@Async
public void onApplicationEvent(MqLogEvent event) { public void onApplicationEvent(MqLogEvent event) {
log.info("receive mq log event"); log.info("receive mq log event");
if (!refreshProperties.getMqLogEnable()) { if (!refreshProperties.getMqLogEnable()) {
@ -62,17 +61,20 @@ public class MqLogListener implements ApplicationListener<MqLogEvent> {
} }
private void delete(ExtAxMqLog event) { private void delete(ExtAxMqLog event) {
log.info("delete mq log"); log.info("delete mq log, event: {}", JSONUtil.toJsonStr(event));
event.setDeleteThreadName(Thread.currentThread().getName());
mqLogService.delete(event); mqLogService.delete(event);
} }
private void update(ExtAxMqLog event) { private void update(ExtAxMqLog event) {
log.info("update mq log"); log.info("update mq log, event: {}", JSONUtil.toJsonStr(event));
event.setUpdateThreadName(Thread.currentThread().getName());
mqLogService.update(event); mqLogService.update(event);
} }
private void insert(ExtAxMqLog event) { private void insert(ExtAxMqLog event) {
log.info("insert mq log"); log.info("insert mq log, event: {}", JSONUtil.toJsonStr(event));
event.setInsertThreadName(Thread.currentThread().getName());
mqLogService.insert(event); mqLogService.insert(event);
} }

View File

@ -27,7 +27,6 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID; import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID;
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_UNIQUE_ID;
/** /**
* RocketMQ 全局配置 * RocketMQ 全局配置
@ -83,13 +82,12 @@ public class RocketMqEventConfiguration {
*/ */
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendBeforeCallback() { private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendBeforeCallback() {
return (event, context) -> { return (event, context) -> {
String uniqueId = IdUtil.fastSimpleUUID(); event.setEventId(IdUtil.simpleUUID());
context.getHeaders().put(MQ_UNIQUE_ID, uniqueId); MqLogEvent mqLogEvent = new MqLogEvent(event.getEventId(), null,
MqLogEvent mqLogEvent = new MqLogEvent(uniqueId, null,
event.getEventName(), event.getShardingKey(), event.getEventName(), event.getShardingKey(),
event.toPrettyJsonString(), TraceUtils.getOrCreateTraceId(), event.toPrettyJsonString(), TraceUtils.getOrCreateTraceId(),
MqLogEventType.INSERT); MqLogEventType.INSERT);
log.info("mq_send_Before: {}", event.getShardingKey()); log.info("mq_send_Before: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
applicationEventPublisher.publishEvent(mqLogEvent); applicationEventPublisher.publishEvent(mqLogEvent);
}; };
} }
@ -103,13 +101,12 @@ public class RocketMqEventConfiguration {
*/ */
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendAfterCallback() { private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendAfterCallback() {
return (event, context) -> { return (event, context) -> {
String uniqueId = context.getHeaders().get(MQ_UNIQUE_ID);
String messageId = context.getHeaders().get(MQ_MESSAGE_ID); String messageId = context.getHeaders().get(MQ_MESSAGE_ID);
MqLogEvent mqLogEvent = new MqLogEvent(uniqueId, messageId, MqLogEvent mqLogEvent = new MqLogEvent(event.getEventId(), messageId,
event.getEventName(), event.getShardingKey(), event.getEventName(), event.getShardingKey(),
event.toPrettyJsonString(), TraceUtils.getOrCreateTraceId(), event.toPrettyJsonString(), TraceUtils.getOrCreateTraceId(),
MqLogEventType.UPDATE); MqLogEventType.UPDATE);
log.info("mq_send_callback: {}", event.getShardingKey()); log.info("mq_send_after: {}, uniqueId: {}, messageId: {}", event.getShardingKey(), event.getEventId(), messageId);
applicationEventPublisher.publishEvent(mqLogEvent); applicationEventPublisher.publishEvent(mqLogEvent);
}; };
} }
@ -123,11 +120,10 @@ public class RocketMqEventConfiguration {
*/ */
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getTransactionRollbackHandler() { private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getTransactionRollbackHandler() {
return (event, context) -> { return (event, context) -> {
String uniqueId = context.getHeaders().get(MQ_UNIQUE_ID); MqLogEvent mqLog = new MqLogEvent(event.getEventId(), null, event.getEventName(),
MqLogEvent mqLog = new MqLogEvent(uniqueId, null, event.getEventName(),
event.getShardingKey(), event.toPrettyJsonString(), TraceUtils.getOrCreateTraceId(), event.getShardingKey(), event.toPrettyJsonString(), TraceUtils.getOrCreateTraceId(),
MqLogEventType.DELETE); MqLogEventType.DELETE);
log.info("mq_transaction_rollback: {}", event.getShardingKey()); log.info("mq_transaction_rollback: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
applicationEventPublisher.publishEvent(mqLog); applicationEventPublisher.publishEvent(mqLog);
}; };
} }

View File

@ -55,7 +55,7 @@ public class EngineAsyncJobEventListener extends AbstractFlowableEngineEventList
}); });
} }
stopWatch.stop(); stopWatch.stop();
log.info("StopWatch '" + stopWatch.getLastTaskName() + "': running time = " + stopWatch.getTotalTimeSeconds()); log.info("StopWatch '{}': running time = {} 's", stopWatch.getLastTaskName(), stopWatch.getTotalTimeSeconds());
} }
private List<BpmnAsyncJobEventListener> getOrderedListeners() { private List<BpmnAsyncJobEventListener> getOrderedListeners() {

View File

@ -58,7 +58,7 @@ public class EngineTaskEventListener implements TaskListener {
} }
}); });
stopWatch.stop(); stopWatch.stop();
log.info("StopWatch '" + stopWatch.getLastTaskName() + "': running time = " + stopWatch.getTotalTimeSeconds() + " s, processInstanceId:{}", delegateTask.getProcessInstanceId()); log.info("StopWatch '{}': running time = {} 's, processInstanceId:{}", stopWatch.getLastTaskName(), stopWatch.getTotalTimeSeconds(), delegateTask.getProcessInstanceId());
} }

View File

@ -14,6 +14,7 @@ import org.flowable.engine.RuntimeService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil; import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects; import java.util.Objects;
@ -33,6 +34,7 @@ import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.PROCES
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
@AllArgsConstructor @AllArgsConstructor
public class InternalExtAxTaskInstEvent_lo_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener { public class InternalExtAxTaskInstEvent_lo_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener {
private final RuntimeService runtimeService; private final RuntimeService runtimeService;

View File

@ -46,4 +46,12 @@ public class ExtAxMqLog extends BaseEntity<ExtAxMqLog> {
* 链路追踪ID * 链路追踪ID
*/ */
private String traceId; private String traceId;
/**
* 线程名称
*/
private String insertThreadName;
private String updateThreadName;
private String deleteThreadName;
} }

View File

@ -21,6 +21,7 @@ import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.impl.persistence.entity.ExecutionEntityImpl; import org.flowable.engine.impl.persistence.entity.ExecutionEntityImpl;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -50,6 +51,7 @@ import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACT
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
public class RocketMqBpmActivityEventListener extends AbstractBpmnEventListener<ActivityOperationContext> implements BpmnActivityEventListener, Ordered { public class RocketMqBpmActivityEventListener extends AbstractBpmnEventListener<ActivityOperationContext> implements BpmnActivityEventListener, Ordered {
@Resource @Resource
private RuntimeService runtimeService; private RuntimeService runtimeService;

View File

@ -20,6 +20,7 @@ import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil; import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.ProcessDefinitionUtil; import org.flowable.engine.impl.util.ProcessDefinitionUtil;
import org.flowable.job.api.JobInfo; import org.flowable.job.api.JobInfo;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Arrays; import java.util.Arrays;
@ -37,6 +38,7 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventT
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
public class ApproveErrorReporterEventListener implements BpmnAsyncJobEventListener { public class ApproveErrorReporterEventListener implements BpmnAsyncJobEventListener {
private final List<String> IGNORE_ERROR_CODES = Arrays.asList( private final List<String> IGNORE_ERROR_CODES = Arrays.asList(

View File

@ -9,6 +9,7 @@ import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.impl.event.FlowableEntityExceptionEventImpl; import org.flowable.common.engine.impl.event.FlowableEntityExceptionEventImpl;
import org.flowable.job.api.JobInfo; import org.flowable.job.api.JobInfo;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects; import java.util.Objects;
@ -23,6 +24,7 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventT
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
public class ErrorReporterEventListener implements BpmnAsyncJobEventListener { public class ErrorReporterEventListener implements BpmnAsyncJobEventListener {
@Value("${spring.profiles.active}") @Value("${spring.profiles.active}")
private String profile; private String profile;

View File

@ -25,6 +25,7 @@ import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.ProcessDefinitionUtil; import org.flowable.engine.impl.util.ProcessDefinitionUtil;
import org.flowable.task.api.history.HistoricTaskInstance; import org.flowable.task.api.history.HistoricTaskInstance;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -87,6 +88,7 @@ import static cn.axzo.workflow.common.enums.ProcessMessagePushEventEnum.PROCESS_
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
public class RocketMqMessagePushEventListener extends AbstractBpmnEventListener<NoticeOperationContext> implements BpmnMessagePushEventListener, Ordered { public class RocketMqMessagePushEventListener extends AbstractBpmnEventListener<NoticeOperationContext> implements BpmnMessagePushEventListener, Ordered {
@Resource @Resource
private EventProducer<?> eventProducer; private EventProducer<?> eventProducer;

View File

@ -19,6 +19,7 @@ import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.ProcessDefinitionUtil; import org.flowable.engine.impl.util.ProcessDefinitionUtil;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -34,6 +35,7 @@ import java.util.Optional;
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
@RefreshScope @RefreshScope
public class MessagePushProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered { public class MessagePushProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered {
@Value("${workflow.carbonCopyTemplateCode}") @Value("${workflow.carbonCopyTemplateCode}")

View File

@ -20,6 +20,7 @@ import org.flowable.engine.delegate.event.impl.FlowableProcessCancelledEventImpl
import org.flowable.engine.impl.persistence.entity.ExecutionEntityImpl; import org.flowable.engine.impl.persistence.entity.ExecutionEntityImpl;
import org.flowable.engine.repository.Deployment; import org.flowable.engine.repository.Deployment;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -46,6 +47,7 @@ import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INS
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
public class RocketMqBpmnProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered { public class RocketMqBpmnProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered {
@Resource @Resource
private RuntimeService runtimeService; private RuntimeService runtimeService;

View File

@ -31,6 +31,7 @@ import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService; import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity; import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -67,6 +68,7 @@ import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJEC
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
@AllArgsConstructor @AllArgsConstructor
public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered { public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
@Override @Override

View File

@ -24,6 +24,7 @@ import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.ProcessDefinitionUtil; import org.flowable.engine.impl.util.ProcessDefinitionUtil;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -50,6 +51,7 @@ import static cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEventType.ADD
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
@AllArgsConstructor @AllArgsConstructor
public class MessagePushTaskEvent_103_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered { public class MessagePushTaskEvent_103_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
@Override @Override

View File

@ -17,6 +17,7 @@ import org.flowable.engine.repository.Deployment;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -43,6 +44,7 @@ import static cn.axzo.workflow.common.enums.ProcessTaskEventEnum.PROCESS_TASK_DE
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
public class RocketMqBpmnTaskEvent_102_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered { public class RocketMqBpmnTaskEvent_102_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
@Override @Override
public int getOrder() { public int getOrder() {

View File

@ -10,6 +10,7 @@ import org.apache.commons.collections4.ListUtils;
import org.flowable.engine.RuntimeService; import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService; import org.flowable.engine.TaskService;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -31,6 +32,7 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELAT
*/ */
@Slf4j @Slf4j
@Component @Component
@Scope("prototype")
@AllArgsConstructor @AllArgsConstructor
public class SnapshotBpmnTaskTaskEvent_100_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered { public class SnapshotBpmnTaskTaskEvent_100_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
@Override @Override

View File

@ -1,9 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<configuration> <configuration>
<property name="LOGBACK_ROLLINGPOLICY_MAX_FILE_SIZE" value="20MB"/>
<!-- 导入安心筑全局日志配置 --> <!-- 导入安心筑全局日志配置 -->
<include resource="logback/logback-axzo.xml"/> <include resource="logback/logback-axzo.xml"/>
<!-- 覆盖开发环境日志配置 --> <!-- 覆盖开发环境日志配置 -->
<springProfile name="local,dev"> <springProfile name="local,dev">
<logger name="cn.axzo" level="DEBUG"/> <logger name="cn.axzo" level="DEBUG">
<appender-ref ref="ASYNC"/>
</logger>
</springProfile> </springProfile>
</configuration> </configuration>