feat(REQ-2324): 调整 MQ 类路径, 新增事务回滚的回调钩子, 同时在发送成功的回调钩子中,新增返回 MQ 的 MessageID

This commit is contained in:
wangli 2024-05-15 16:08:12 +08:00
parent a1fed917e7
commit d0511ed4ed
3 changed files with 110 additions and 56 deletions

View File

@ -38,6 +38,11 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
this.defaultContext = defaultContext;
}
public BiConsumer<Event, Context<RocketMQEventProducer.RocketMQMessageMeta>> getRollbackHandler() {
return ((event, context) -> {
});
}
@Override
public void send(@NonNull Event event, @NonNull Context context) {
// XXX不要在send的时候修改event的值有副作用
@ -69,12 +74,20 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
throw e;
}
};
Runnable rollbackRunnable = () -> {
try{
getRollbackHandler().accept(copiedEvent, copiedContext);
} catch (Exception e) {
// ignore
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
afterCommitExecutor.execute(() -> runnable.run());
afterCommitExecutor.executeAndRollback(() -> runnable.run(), ()-> rollbackRunnable.run());
}
}
@ -83,58 +96,7 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
send(event, defaultContext);
}
/**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
RUNNABLES.remove();
}
public AfterCommitExecutorImpl getAfterCommitExecutor() {
return afterCommitExecutor;
}
}

View File

@ -0,0 +1,90 @@
package cn.axzo.framework.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.List; /**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
@Slf4j
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
private static final ThreadLocal<List<Runnable>> ROLLBACK_RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
public void executeAndRollback(Runnable runnable, Runnable rollbackRunnable) {
execute(runnable);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<Runnable> runnables = ROLLBACK_RUNNABLES.get();
if (runnables == null) {
runnables = new ArrayList<>();
ROLLBACK_RUNNABLES.set(runnables);
}
runnables.add(rollbackRunnable);
}
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
if(status != TransactionSynchronization.STATUS_COMMITTED) {
for (int i = 0; i < ROLLBACK_RUNNABLES.get().size(); i++) {
Runnable runnable = ROLLBACK_RUNNABLES.get().get(i);
try{
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable at transaction ROLLBACK " + runnable, e);
}
}
}
RUNNABLES.remove();
}
public List<Runnable> getRunnables() {
return RUNNABLES.get();
}
}

View File

@ -30,6 +30,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback;
public static final String MQ_MESSAGE_ID = "messageId";
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
String defaultModule,
@ -73,9 +74,10 @@ public class RocketMQEventProducer extends AbstractEventProducer {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
context.getHeaders().put(MQ_MESSAGE_ID, sendResult.getMsgId());
sendCallback.accept(event, context);
}
} catch (Throwable e) {