From d0511ed4ed5c76bc988b11215610a33d187e657a Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Wed, 15 May 2024 16:08:12 +0800
Subject: [PATCH] =?UTF-8?q?feat(REQ-2324):=20=E8=B0=83=E6=95=B4=20MQ=20?=
=?UTF-8?q?=E7=B1=BB=E8=B7=AF=E5=BE=84,=20=E6=96=B0=E5=A2=9E=E4=BA=8B?=
=?UTF-8?q?=E5=8A=A1=E5=9B=9E=E6=BB=9A=E7=9A=84=E5=9B=9E=E8=B0=83=E9=92=A9?=
=?UTF-8?q?=E5=AD=90,=20=E5=90=8C=E6=97=B6=E5=9C=A8=E5=8F=91=E9=80=81?=
=?UTF-8?q?=E6=88=90=E5=8A=9F=E7=9A=84=E5=9B=9E=E8=B0=83=E9=92=A9=E5=AD=90?=
=?UTF-8?q?=E4=B8=AD,=E6=96=B0=E5=A2=9E=E8=BF=94=E5=9B=9E=20MQ=20=E7=9A=84?=
=?UTF-8?q?=20MessageID?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../rocketmq/AbstractEventProducer.java | 70 ++++-----------
.../rocketmq/AfterCommitExecutorImpl.java | 90 +++++++++++++++++++
.../rocketmq/RocketMQEventProducer.java | 6 +-
3 files changed, 110 insertions(+), 56 deletions(-)
create mode 100644 axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java
diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java
index e4deab7..c74117c 100644
--- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java
+++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java
@@ -38,6 +38,11 @@ public abstract class AbstractEventProducer implements EventProducer
this.defaultContext = defaultContext;
}
+ public BiConsumer> getRollbackHandler() {
+ return ((event, context) -> {
+ });
+ }
+
@Override
public void send(@NonNull Event event, @NonNull Context context) {
// XXX:不要在send的时候修改event的值,有副作用。
@@ -69,12 +74,20 @@ public abstract class AbstractEventProducer implements EventProducer
throw e;
}
};
+
+ Runnable rollbackRunnable = () -> {
+ try{
+ getRollbackHandler().accept(copiedEvent, copiedContext);
+ } catch (Exception e) {
+ // ignore
+ }
+ };
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
- afterCommitExecutor.execute(() -> runnable.run());
+ afterCommitExecutor.executeAndRollback(() -> runnable.run(), ()-> rollbackRunnable.run());
}
}
@@ -83,58 +96,7 @@ public abstract class AbstractEventProducer implements EventProducer
send(event, defaultContext);
}
- /**
- * stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
- * 保证在交易结束后被调用.
- */
- private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
- private static final ThreadLocal> RUNNABLES = new ThreadLocal>();
-
- public void execute(Runnable runnable) {
- if (log.isDebugEnabled()) {
- log.debug("Submitting new runnable {} to run after commit", runnable);
- }
- if (!TransactionSynchronizationManager.isSynchronizationActive()) {
- if (log.isDebugEnabled()) {
- log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
- }
- runnable.run();
- return;
- }
- List threadRunnables = RUNNABLES.get();
- if (threadRunnables == null) {
- threadRunnables = new ArrayList<>();
- RUNNABLES.set(threadRunnables);
- TransactionSynchronizationManager.registerSynchronization(this);
- }
- threadRunnables.add(runnable);
- }
-
- @Override
- public void afterCommit() {
- List threadRunnables = RUNNABLES.get();
- if (log.isDebugEnabled()) {
- log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
- }
- for (int i = 0; i < threadRunnables.size(); i++) {
- Runnable runnable = threadRunnables.get(i);
- if (log.isDebugEnabled()) {
- log.debug("Executing runnable {}", runnable);
- }
- try {
- runnable.run();
- } catch (RuntimeException e) {
- log.error("Failed to execute runnable " + runnable, e);
- }
- }
- }
-
- @Override
- public void afterCompletion(int status) {
- if (log.isDebugEnabled()) {
- log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
- }
- RUNNABLES.remove();
- }
+ public AfterCommitExecutorImpl getAfterCommitExecutor() {
+ return afterCommitExecutor;
}
}
diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java
new file mode 100644
index 0000000..5817136
--- /dev/null
+++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java
@@ -0,0 +1,90 @@
+package cn.axzo.framework.rocketmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+
+import java.util.ArrayList;
+import java.util.List; /**
+ * stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
+ * 保证在交易结束后被调用.
+ */
+@Slf4j
+public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
+ private static final ThreadLocal> RUNNABLES = new ThreadLocal>();
+ private static final ThreadLocal> ROLLBACK_RUNNABLES = new ThreadLocal>();
+
+ public void execute(Runnable runnable) {
+ if (log.isDebugEnabled()) {
+ log.debug("Submitting new runnable {} to run after commit", runnable);
+ }
+ if (!TransactionSynchronizationManager.isSynchronizationActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
+ }
+ runnable.run();
+ return;
+ }
+ List threadRunnables = RUNNABLES.get();
+ if (threadRunnables == null) {
+ threadRunnables = new ArrayList<>();
+ RUNNABLES.set(threadRunnables);
+ TransactionSynchronizationManager.registerSynchronization(this);
+ }
+ threadRunnables.add(runnable);
+ }
+
+ public void executeAndRollback(Runnable runnable, Runnable rollbackRunnable) {
+ execute(runnable);
+ if (TransactionSynchronizationManager.isSynchronizationActive()) {
+ List runnables = ROLLBACK_RUNNABLES.get();
+ if (runnables == null) {
+ runnables = new ArrayList<>();
+ ROLLBACK_RUNNABLES.set(runnables);
+ }
+ runnables.add(rollbackRunnable);
+ }
+ }
+
+ @Override
+ public void afterCommit() {
+ List threadRunnables = RUNNABLES.get();
+ if (log.isDebugEnabled()) {
+ log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
+ }
+ for (int i = 0; i < threadRunnables.size(); i++) {
+ Runnable runnable = threadRunnables.get(i);
+ if (log.isDebugEnabled()) {
+ log.debug("Executing runnable {}", runnable);
+ }
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ log.error("Failed to execute runnable " + runnable, e);
+ }
+ }
+ }
+
+ @Override
+ public void afterCompletion(int status) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
+ }
+ if(status != TransactionSynchronization.STATUS_COMMITTED) {
+ for (int i = 0; i < ROLLBACK_RUNNABLES.get().size(); i++) {
+ Runnable runnable = ROLLBACK_RUNNABLES.get().get(i);
+ try{
+ runnable.run();
+ } catch (RuntimeException e) {
+ log.error("Failed to execute runnable at transaction ROLLBACK " + runnable, e);
+ }
+ }
+ }
+ RUNNABLES.remove();
+ }
+
+ public List getRunnables() {
+ return RUNNABLES.get();
+ }
+}
diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java
index 406b209..ec37fee 100644
--- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java
+++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java
@@ -30,6 +30,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer> sendCallback;
+ public static final String MQ_MESSAGE_ID = "messageId";
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
String defaultModule,
@@ -73,9 +74,10 @@ public class RocketMQEventProducer extends AbstractEventProducer {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
- log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
- context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
+ log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}",
+ context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
+ context.getHeaders().put(MQ_MESSAGE_ID, sendResult.getMsgId());
sendCallback.accept(event, context);
}
} catch (Throwable e) {