Merge branch 'feature/merged_all_req' into 'master'

Feature/merged all req

See merge request universal/framework/backend/axzo-framework-commons!94
This commit is contained in:
金海洋 2024-06-05 10:37:46 +00:00
commit 3ba3b5fb1c
5 changed files with 132 additions and 55 deletions

View File

@ -60,6 +60,13 @@ public class RespErrorCodeMappingProperties {
DEFAULT_NON_BASE_MAPPING.put("*", UNAVAILABLE_FOR_LEGAL_REASONS);
}
@Getter
private HttpStatus defaultNonBaseHttpStatus;
public void setDefaultNonBaseHttpStatus(HttpStatus defaultNonBaseHttpStatus) {
DEFAULT_NON_BASE_MAPPING.put("*", defaultNonBaseHttpStatus);
}
// 基础错误响应码 -> HTTP状态码
@NotNull
@Setter

View File

@ -15,6 +15,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
@ -38,6 +39,11 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
this.defaultContext = defaultContext;
}
public BiConsumer<Event, Context<RocketMQEventProducer.RocketMQMessageMeta>> getRollbackHandler() {
return ((event, context) -> {
});
}
@Override
public void send(@NonNull Event event, @NonNull Context context) {
// XXX不要在send的时候修改event的值有副作用
@ -69,12 +75,20 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
throw e;
}
};
Runnable rollbackRunnable = () -> {
try{
getRollbackHandler().accept(copiedEvent, copiedContext);
} catch (Exception e) {
// ignore
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
afterCommitExecutor.execute(() -> runnable.run());
afterCommitExecutor.executeAndRollback(() -> runnable.run(), ()-> rollbackRunnable.run());
}
}
@ -83,58 +97,14 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
send(event, defaultContext);
}
/**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
@Override
public void send(@NonNull Event event, Map<String, String> addHeader) {
Map<String, String> headers = defaultContext.getHeaders();
headers.putAll(addHeader);
send(event, defaultContext);
}
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
RUNNABLES.remove();
}
public AfterCommitExecutorImpl getAfterCommitExecutor() {
return afterCommitExecutor;
}
}

View File

@ -0,0 +1,90 @@
package cn.axzo.framework.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.List; /**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
@Slf4j
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
private static final ThreadLocal<List<Runnable>> ROLLBACK_RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
public void executeAndRollback(Runnable runnable, Runnable rollbackRunnable) {
execute(runnable);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<Runnable> runnables = ROLLBACK_RUNNABLES.get();
if (runnables == null) {
runnables = new ArrayList<>();
ROLLBACK_RUNNABLES.set(runnables);
}
runnables.add(rollbackRunnable);
}
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
if(status != TransactionSynchronization.STATUS_COMMITTED) {
for (int i = 0; i < ROLLBACK_RUNNABLES.get().size(); i++) {
Runnable runnable = ROLLBACK_RUNNABLES.get().get(i);
try{
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable at transaction ROLLBACK " + runnable, e);
}
}
}
RUNNABLES.remove();
}
public List<Runnable> getRunnables() {
return RUNNABLES.get();
}
}

View File

@ -29,6 +29,14 @@ public interface EventProducer<Meta> {
*/
void send(Event event);
/**
* 发送 event, 使用默认的 context,并在 context 中追加 header 属性
*
* @param event
* @param addHeaders
*/
void send(Event event, Map<String, String> addHeaders);
@Getter
@Builder(toBuilder = true)
@NoArgsConstructor

View File

@ -30,6 +30,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback;
public static final String MQ_MESSAGE_ID = "messageId";
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
String defaultModule,
@ -73,9 +74,10 @@ public class RocketMQEventProducer extends AbstractEventProducer {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
context.getHeaders().put(MQ_MESSAGE_ID, sendResult.getMsgId());
sendCallback.accept(event, context);
}
} catch (Throwable e) {