Merge remote-tracking branch 'origin/master' into feature/REQ-2300

This commit is contained in:
李昆鹏 2024-07-26 15:51:55 +08:00
commit 3934d0337f
12 changed files with 510 additions and 55 deletions

View File

@ -60,6 +60,13 @@ public class RespErrorCodeMappingProperties {
DEFAULT_NON_BASE_MAPPING.put("*", UNAVAILABLE_FOR_LEGAL_REASONS);
}
@Getter
private HttpStatus defaultNonBaseHttpStatus;
public void setDefaultNonBaseHttpStatus(HttpStatus defaultNonBaseHttpStatus) {
DEFAULT_NON_BASE_MAPPING.put("*", defaultNonBaseHttpStatus);
}
// 基础错误响应码 -> HTTP状态码
@NotNull
@Setter

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>axzo-common-loggings</artifactId>
<groupId>cn.axzo.framework.logging</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>axzo-common-trace</artifactId>
<name>Axzo Common trace</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,63 @@
package com.axzo.framework.trace.interceptor;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author xudawei
* @date 2024/04/23
* @desc 调用前从 MDC中获取上一步骤设置的traceId放置到 header中供下一个服务从header中获取trace_id(注意接口服务方接收到的header里trace_id是小写的)
*/
@Configuration
@Slf4j
public class FeignFillHeaderInterceptor implements RequestInterceptor {
private static final String CTX_LOG_ID = "ctxLogId";
private static final String X_REQUEST_ID = "x-request-id";
private static final String TRACE_ID = "traceId";
private static final String SPAN_ID = "spanId";
private static final String LOGIC_ID = "logicId";
@Override
public void apply(RequestTemplate requestTemplate) {
log.info("FeignFillHeaderInterceptor,traceId:{}", MDC.get(TRACE_ID));
requestTemplate.header(TRACE_ID, MDC.get(TRACE_ID));
requestTemplate.header(CTX_LOG_ID, MDC.get(TRACE_ID));
requestTemplate.header(X_REQUEST_ID, MDC.get(TRACE_ID));
requestTemplate.header(SPAN_ID, this.buildNextSpanId());
}
/**
* 下游的spanId:有本次spanId+logic组成
* logic默认从0开始每次调用下游时logic则自增1
*/
private String buildNextSpanId() {
//获取spanId
String spanId = this.fetchSpanId();
if (!StringUtils.hasText(MDC.get(LOGIC_ID))) {
MDC.put(LOGIC_ID, "0");
}
AtomicLong atomicLong = new AtomicLong(Long.parseLong(MDC.get(LOGIC_ID)));
Long nextSpanId = atomicLong.getAndIncrement();
MDC.put(LOGIC_ID, atomicLong.toString());
return spanId + "." + nextSpanId;
}
/**
* 获取spanId
*/
private String fetchSpanId() {
String spanId = MDC.get(SPAN_ID);
if (!StringUtils.hasText(spanId)) {
MDC.put(SPAN_ID, "0");
}
return spanId;
}
}

View File

@ -0,0 +1,97 @@
package com.axzo.framework.trace.interceptor;
import cn.hutool.core.util.StrUtil;
import com.axzo.framework.trace.util.ExceptionUtil;
import com.axzo.framework.trace.wrapper.BodyReaderHttpServletRequestWrapper;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.UUID;
/**
* 设置日志中traceId-MDC中设置traceId
*/
@Slf4j
@ConditionalOnProperty(prefix = "axzo.log.common.trace", name = "enable", havingValue = "true", matchIfMissing = true)
@Component
@Order(-99999)
public class TraceSpanIdFilter extends OncePerRequestFilter {
private static final String TRACE_ID = "traceId";
private static final String CTX_LOG_ID = "ctxLogId";
private static final String X_REQUEST_ID = "x-request-id";
private static final String SPAN_ID = "spanId";
@Override
protected void doFilterInternal(@NonNull HttpServletRequest request,
@NonNull HttpServletResponse response,
@NonNull FilterChain filterChain) throws ServletException, IOException {
if ((StringUtils.hasText(request.getContentType()) && request.getContentType().toLowerCase().startsWith("multipart/form-data"))) {
this.doFilter(filterChain, request, response);
} else {
// wrapper
BodyReaderHttpServletRequestWrapper bodyRequest = new BodyReaderHttpServletRequestWrapper(
request);
this.doFilter(filterChain, bodyRequest, response);
}
}
private void doFilter(FilterChain filterChain, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// trace id 补充
ExceptionUtil.ignoreException(() -> setTraceId(request, response), null);
//do
try {
filterChain.doFilter(request, response);
} finally {
MDC.clear();
}
}
private void setTraceId(@NonNull HttpServletRequest request,
@NonNull HttpServletResponse response) {
String traceId = this.fetchTraceId(request);
//set
MDC.put(TRACE_ID, traceId);
MDC.put(CTX_LOG_ID, traceId);
MDC.put(SPAN_ID, this.fetchSpanId(request));
response.setHeader(TRACE_ID, traceId);
}
/**
* 获取traceId
*/
private String fetchTraceId(HttpServletRequest request) {
//header: ctxLogId -> traceId -> x-request-id
String traceId = StrUtil.blankToDefault(request.getHeader(CTX_LOG_ID),
StrUtil.blankToDefault(request.getHeader(TRACE_ID),
request.getHeader(X_REQUEST_ID)));
// blank to new
return StrUtil.blankToDefault(traceId, UUID.randomUUID().toString().replaceAll("-", ""));
}
/**
* 获取spanId
*/
private String fetchSpanId(HttpServletRequest request) {
// blank to new
String spanId = StrUtil.blankToDefault(request.getHeader(SPAN_ID), "0");
return spanId;
}
}

View File

@ -0,0 +1,28 @@
package com.axzo.framework.trace.util;
import java.util.function.Consumer;
/**
* @author tanjie@axzo.cn
* @date 2021/12/13 14:10
*/
public class ExceptionUtil {
/**
* 忽略错误
* @param supplier
* @param catchConsumer
*/
public static void ignoreException(Runnable supplier, Consumer<Exception> catchConsumer) {
try {
supplier.run();
} catch (Exception e) {
if (null != catchConsumer) {
catchConsumer.accept(e);
}
}
}
}

View File

@ -0,0 +1,58 @@
package com.axzo.framework.trace.util;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author TanJ
* @date 2021/5/24
*/
public class RequestUtil {
/**
* getRequest
*
* @return
*/
public static HttpServletRequest getRequest() {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
if (requestAttributes!=null) {
return
((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
}
return null;
}
/**
* getResponse
*
* @return
*/
public static HttpServletResponse getResponse() {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
if (null != requestAttributes) {
return
((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getResponse();
}
return null;
}
/**
* setInheritableHolder
*/
public static void setInheritableHolder() {
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(), true);
}
}

View File

@ -0,0 +1,95 @@
package com.axzo.framework.trace.wrapper;
import lombok.SneakyThrows;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.Part;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Collection;
/**
* 自定义request
*
* @author TanJ
*/
public class BodyReaderHttpServletRequestWrapper extends HttpServletRequestWrapper {
private final byte[] body;
private final HttpServletRequest request;
@SneakyThrows
public BodyReaderHttpServletRequestWrapper(HttpServletRequest request) throws IOException {
super(request);
this.request = request;
// request.getParameterMap();
String sessionStream = getBodyString(request);
body = sessionStream.getBytes(Charset.forName("UTF-8"));
}
@Override
public Collection<Part> getParts() throws IOException, ServletException {
return request.getParts();
}
private String getBodyString(ServletRequest request) throws IOException {
StringBuilder sb = new StringBuilder();
InputStream ins = request.getInputStream();
try (BufferedReader isr = new BufferedReader(
new InputStreamReader(ins, Charset.forName("UTF-8")));) {
String line = "";
while ((line = isr.readLine()) != null) {
sb.append(line);
}
} catch (IOException e) {
throw e;
}
return sb.toString();
}
@Override
public BufferedReader getReader() throws IOException {
return new BufferedReader(new InputStreamReader(getInputStream()));
}
@Override
public ServletInputStream getInputStream() throws IOException {
final ByteArrayInputStream bais = new ByteArrayInputStream(body);
return new ServletInputStream() {
@Override
public int read() throws IOException {
return bais.read();
}
@Override
public boolean isFinished() {
return bais.available() == 0;
}
@Override
public boolean isReady() {
return true;
}
@Override
public void setReadListener(ReadListener readListener) {
}
};
}
}

View File

@ -18,5 +18,6 @@
<modules>
<module>log4j2-starter</module>
<module>logback-starter</module>
<module>axzo-common-trace</module>
</modules>
</project>

View File

@ -15,6 +15,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
@ -38,6 +39,11 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
this.defaultContext = defaultContext;
}
public BiConsumer<Event, Context<RocketMQEventProducer.RocketMQMessageMeta>> getRollbackHandler() {
return ((event, context) -> {
});
}
@Override
public void send(@NonNull Event event, @NonNull Context context) {
// XXX不要在send的时候修改event的值有副作用
@ -69,12 +75,20 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
throw e;
}
};
Runnable rollbackRunnable = () -> {
try{
getRollbackHandler().accept(copiedEvent, copiedContext);
} catch (Exception e) {
// ignore
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
afterCommitExecutor.execute(() -> runnable.run());
afterCommitExecutor.executeAndRollback(() -> runnable.run(), ()-> rollbackRunnable.run());
}
}
@ -83,58 +97,14 @@ public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta>
send(event, defaultContext);
}
/**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
public void send(@NonNull Event event, Map<String, String> addHeader) {
Map<String, String> headers = defaultContext.getHeaders();
headers.putAll(addHeader);
send(event, defaultContext);
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
RUNNABLES.remove();
}
public AfterCommitExecutorImpl getAfterCommitExecutor() {
return afterCommitExecutor;
}
}

View File

@ -0,0 +1,90 @@
package cn.axzo.framework.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.List; /**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
@Slf4j
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
private static final ThreadLocal<List<Runnable>> ROLLBACK_RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
public void executeAndRollback(Runnable runnable, Runnable rollbackRunnable) {
execute(runnable);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<Runnable> runnables = ROLLBACK_RUNNABLES.get();
if (runnables == null) {
runnables = new ArrayList<>();
ROLLBACK_RUNNABLES.set(runnables);
}
runnables.add(rollbackRunnable);
}
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
if(status != TransactionSynchronization.STATUS_COMMITTED) {
for (int i = 0; i < ROLLBACK_RUNNABLES.get().size(); i++) {
Runnable runnable = ROLLBACK_RUNNABLES.get().get(i);
try{
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable at transaction ROLLBACK " + runnable, e);
}
}
}
RUNNABLES.remove();
}
public List<Runnable> getRunnables() {
return RUNNABLES.get();
}
}

View File

@ -29,6 +29,14 @@ public interface EventProducer<Meta> {
*/
void send(Event event);
/**
* 发送 event, 使用默认的 context,并在 context 中追加 header 属性
*
* @param event
* @param addHeaders
*/
void send(Event event, Map<String, String> addHeaders);
@Getter
@Builder(toBuilder = true)
@NoArgsConstructor

View File

@ -30,6 +30,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback;
public static final String MQ_MESSAGE_ID = "messageId";
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
String defaultModule,
@ -73,9 +74,10 @@ public class RocketMQEventProducer extends AbstractEventProducer {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
context.getHeaders().put(MQ_MESSAGE_ID, sendResult.getMsgId());
sendCallback.accept(event, context);
}
} catch (Throwable e) {