diff --git a/axzo-common-autoconfigure/src/main/java/cn/axzo/framework/autoconfigure/web/exception/RespErrorCodeMappingProperties.java b/axzo-common-autoconfigure/src/main/java/cn/axzo/framework/autoconfigure/web/exception/RespErrorCodeMappingProperties.java index 9d53f28..21148c9 100644 --- a/axzo-common-autoconfigure/src/main/java/cn/axzo/framework/autoconfigure/web/exception/RespErrorCodeMappingProperties.java +++ b/axzo-common-autoconfigure/src/main/java/cn/axzo/framework/autoconfigure/web/exception/RespErrorCodeMappingProperties.java @@ -60,6 +60,13 @@ public class RespErrorCodeMappingProperties { DEFAULT_NON_BASE_MAPPING.put("*", UNAVAILABLE_FOR_LEGAL_REASONS); } + @Getter + private HttpStatus defaultNonBaseHttpStatus; + + public void setDefaultNonBaseHttpStatus(HttpStatus defaultNonBaseHttpStatus) { + DEFAULT_NON_BASE_MAPPING.put("*", defaultNonBaseHttpStatus); + } + // 基础错误响应码 -> HTTP状态码 @NotNull @Setter diff --git a/axzo-common-loggings/axzo-common-trace/pom.xml b/axzo-common-loggings/axzo-common-trace/pom.xml new file mode 100644 index 0000000..c7efbb1 --- /dev/null +++ b/axzo-common-loggings/axzo-common-trace/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + axzo-common-loggings + cn.axzo.framework.logging + 1.0.0-SNAPSHOT + + axzo-common-trace + Axzo Common trace + + + + org.springframework.boot + spring-boot-starter-web + + + + cn.hutool + hutool-all + + + + io.github.openfeign + feign-httpclient + + + + com.alibaba + fastjson + + + + + diff --git a/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/interceptor/FeignFillHeaderInterceptor.java b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/interceptor/FeignFillHeaderInterceptor.java new file mode 100644 index 0000000..1cc8550 --- /dev/null +++ b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/interceptor/FeignFillHeaderInterceptor.java @@ -0,0 +1,63 @@ +package com.axzo.framework.trace.interceptor; + +import feign.RequestInterceptor; +import feign.RequestTemplate; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.StringUtils; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author xudawei + * @date 2024/04/23 + * @desc 调用前从 MDC中获取上一步骤设置的traceId,放置到 header中,供下一个服务从header中获取trace_id(注意,接口服务方接收到的header里trace_id是小写的) + */ +@Configuration +@Slf4j +public class FeignFillHeaderInterceptor implements RequestInterceptor { + private static final String CTX_LOG_ID = "ctxLogId"; + private static final String X_REQUEST_ID = "x-request-id"; + private static final String TRACE_ID = "traceId"; + private static final String SPAN_ID = "spanId"; + private static final String LOGIC_ID = "logicId"; + + @Override + public void apply(RequestTemplate requestTemplate) { + log.info("FeignFillHeaderInterceptor,traceId:{}", MDC.get(TRACE_ID)); + requestTemplate.header(TRACE_ID, MDC.get(TRACE_ID)); + requestTemplate.header(CTX_LOG_ID, MDC.get(TRACE_ID)); + requestTemplate.header(X_REQUEST_ID, MDC.get(TRACE_ID)); + requestTemplate.header(SPAN_ID, this.buildNextSpanId()); + } + + /** + * 下游的spanId:有本次spanId+logic组成 + * logic默认从0开始,每次调用下游时logic则自增1 + */ + private String buildNextSpanId() { + //获取spanId + String spanId = this.fetchSpanId(); + if (!StringUtils.hasText(MDC.get(LOGIC_ID))) { + MDC.put(LOGIC_ID, "0"); + } + + AtomicLong atomicLong = new AtomicLong(Long.parseLong(MDC.get(LOGIC_ID))); + Long nextSpanId = atomicLong.getAndIncrement(); + MDC.put(LOGIC_ID, atomicLong.toString()); + return spanId + "." + nextSpanId; + } + + /** + * 获取spanId + */ + private String fetchSpanId() { + String spanId = MDC.get(SPAN_ID); + if (!StringUtils.hasText(spanId)) { + MDC.put(SPAN_ID, "0"); + } + return spanId; + } +} + diff --git a/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/interceptor/TraceSpanIdFilter.java b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/interceptor/TraceSpanIdFilter.java new file mode 100644 index 0000000..a026874 --- /dev/null +++ b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/interceptor/TraceSpanIdFilter.java @@ -0,0 +1,97 @@ +package com.axzo.framework.trace.interceptor; + +import cn.hutool.core.util.StrUtil; +import com.axzo.framework.trace.util.ExceptionUtil; +import com.axzo.framework.trace.wrapper.BodyReaderHttpServletRequestWrapper; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import org.springframework.web.filter.OncePerRequestFilter; + +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.UUID; + +/** + * 设置日志中traceId-MDC中设置traceId + */ +@Slf4j +@ConditionalOnProperty(prefix = "axzo.log.common.trace", name = "enable", havingValue = "true", matchIfMissing = true) +@Component +@Order(-99999) +public class TraceSpanIdFilter extends OncePerRequestFilter { + + private static final String TRACE_ID = "traceId"; + private static final String CTX_LOG_ID = "ctxLogId"; + private static final String X_REQUEST_ID = "x-request-id"; + private static final String SPAN_ID = "spanId"; + + + + @Override + protected void doFilterInternal(@NonNull HttpServletRequest request, + @NonNull HttpServletResponse response, + @NonNull FilterChain filterChain) throws ServletException, IOException { + if ((StringUtils.hasText(request.getContentType()) && request.getContentType().toLowerCase().startsWith("multipart/form-data"))) { + this.doFilter(filterChain, request, response); + } else { + // wrapper + BodyReaderHttpServletRequestWrapper bodyRequest = new BodyReaderHttpServletRequestWrapper( + request); + this.doFilter(filterChain, bodyRequest, response); + } + } + + private void doFilter(FilterChain filterChain, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + // trace id 补充 + ExceptionUtil.ignoreException(() -> setTraceId(request, response), null); + //do + try { + filterChain.doFilter(request, response); + } finally { + MDC.clear(); + } + } + + + private void setTraceId(@NonNull HttpServletRequest request, + @NonNull HttpServletResponse response) { + + String traceId = this.fetchTraceId(request); + //set + MDC.put(TRACE_ID, traceId); + MDC.put(CTX_LOG_ID, traceId); + + MDC.put(SPAN_ID, this.fetchSpanId(request)); + response.setHeader(TRACE_ID, traceId); + } + + /** + * 获取traceId + */ + private String fetchTraceId(HttpServletRequest request) { + //header: ctxLogId -> traceId -> x-request-id + String traceId = StrUtil.blankToDefault(request.getHeader(CTX_LOG_ID), + StrUtil.blankToDefault(request.getHeader(TRACE_ID), + request.getHeader(X_REQUEST_ID))); + // blank to new + return StrUtil.blankToDefault(traceId, UUID.randomUUID().toString().replaceAll("-", "")); + } + + /** + * 获取spanId + */ + private String fetchSpanId(HttpServletRequest request) { + // blank to new + String spanId = StrUtil.blankToDefault(request.getHeader(SPAN_ID), "0"); + return spanId; + } +} + diff --git a/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/util/ExceptionUtil.java b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/util/ExceptionUtil.java new file mode 100644 index 0000000..517fa37 --- /dev/null +++ b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/util/ExceptionUtil.java @@ -0,0 +1,28 @@ +package com.axzo.framework.trace.util; + + +import java.util.function.Consumer; + +/** + * @author tanjie@axzo.cn + * @date 2021/12/13 14:10 + */ +public class ExceptionUtil { + + /** + * 忽略错误 + * @param supplier + * @param catchConsumer + */ + public static void ignoreException(Runnable supplier, Consumer catchConsumer) { + try { + supplier.run(); + } catch (Exception e) { + if (null != catchConsumer) { + catchConsumer.accept(e); + } + + } + + } +} diff --git a/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/util/RequestUtil.java b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/util/RequestUtil.java new file mode 100644 index 0000000..4164c29 --- /dev/null +++ b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/util/RequestUtil.java @@ -0,0 +1,58 @@ +package com.axzo.framework.trace.util; + +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * @author TanJ + * @date 2021/5/24 + */ +public class RequestUtil { + + + /** + * getRequest + * + * @return + */ + public static HttpServletRequest getRequest() { + RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); + if (requestAttributes!=null) { + return + ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()) + .getRequest(); + } + + return null; + } + + + /** + * getResponse + * + * @return + */ + public static HttpServletResponse getResponse() { + RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); + if (null != requestAttributes) { + return + ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()) + .getResponse(); + } + return null; + } + + + /** + * setInheritableHolder + */ + public static void setInheritableHolder() { + RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(), true); + } + + +} diff --git a/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/wrapper/BodyReaderHttpServletRequestWrapper.java b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/wrapper/BodyReaderHttpServletRequestWrapper.java new file mode 100644 index 0000000..5f252ab --- /dev/null +++ b/axzo-common-loggings/axzo-common-trace/src/main/java/com/axzo/framework/trace/wrapper/BodyReaderHttpServletRequestWrapper.java @@ -0,0 +1,95 @@ +package com.axzo.framework.trace.wrapper; + +import lombok.SneakyThrows; + +import javax.servlet.ReadListener; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import javax.servlet.http.Part; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Collection; + +/** + * 自定义request + * + * @author TanJ + */ +public class BodyReaderHttpServletRequestWrapper extends HttpServletRequestWrapper { + + private final byte[] body; + + private final HttpServletRequest request; + + @SneakyThrows + public BodyReaderHttpServletRequestWrapper(HttpServletRequest request) throws IOException { + super(request); + this.request = request; +// request.getParameterMap(); + String sessionStream = getBodyString(request); + body = sessionStream.getBytes(Charset.forName("UTF-8")); + } + + + + @Override + public Collection getParts() throws IOException, ServletException { + return request.getParts(); + } + + private String getBodyString(ServletRequest request) throws IOException { + StringBuilder sb = new StringBuilder(); + InputStream ins = request.getInputStream(); + + try (BufferedReader isr = new BufferedReader( + new InputStreamReader(ins, Charset.forName("UTF-8")));) { + String line = ""; + while ((line = isr.readLine()) != null) { + sb.append(line); + } + } catch (IOException e) { + throw e; + } + return sb.toString(); + } + + @Override + public BufferedReader getReader() throws IOException { + return new BufferedReader(new InputStreamReader(getInputStream())); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + + final ByteArrayInputStream bais = new ByteArrayInputStream(body); + + return new ServletInputStream() { + + @Override + public int read() throws IOException { + return bais.read(); + } + + @Override + public boolean isFinished() { + return bais.available() == 0; + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setReadListener(ReadListener readListener) { + } + }; + } +} \ No newline at end of file diff --git a/axzo-common-loggings/pom.xml b/axzo-common-loggings/pom.xml index f54dbc4..81b55f1 100644 --- a/axzo-common-loggings/pom.xml +++ b/axzo-common-loggings/pom.xml @@ -18,5 +18,6 @@ log4j2-starter logback-starter + axzo-common-trace \ No newline at end of file diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java index e4deab7..2831890 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AbstractEventProducer.java @@ -15,6 +15,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -38,6 +39,11 @@ public abstract class AbstractEventProducer implements EventProducer this.defaultContext = defaultContext; } + public BiConsumer> getRollbackHandler() { + return ((event, context) -> { + }); + } + @Override public void send(@NonNull Event event, @NonNull Context context) { // XXX:不要在send的时候修改event的值,有副作用。 @@ -69,12 +75,20 @@ public abstract class AbstractEventProducer implements EventProducer throw e; } }; + + Runnable rollbackRunnable = () -> { + try{ + getRollbackHandler().accept(copiedEvent, copiedContext); + } catch (Exception e) { + // ignore + } + }; if (copiedContext.isTransactional()) { // https://www.jianshu.com/p/59891ede5f90 runnable.run(); } else { // 并发会导致事件时序出现问题. 所以串行执行 - afterCommitExecutor.execute(() -> runnable.run()); + afterCommitExecutor.executeAndRollback(() -> runnable.run(), ()-> rollbackRunnable.run()); } } @@ -83,58 +97,14 @@ public abstract class AbstractEventProducer implements EventProducer send(event, defaultContext); } - /** - * stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html - * 保证在交易结束后被调用. - */ - private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter { - private static final ThreadLocal> RUNNABLES = new ThreadLocal>(); + @Override + public void send(@NonNull Event event, Map addHeader) { + Map headers = defaultContext.getHeaders(); + headers.putAll(addHeader); + send(event, defaultContext); + } - public void execute(Runnable runnable) { - if (log.isDebugEnabled()) { - log.debug("Submitting new runnable {} to run after commit", runnable); - } - if (!TransactionSynchronizationManager.isSynchronizationActive()) { - if (log.isDebugEnabled()) { - log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable); - } - runnable.run(); - return; - } - List threadRunnables = RUNNABLES.get(); - if (threadRunnables == null) { - threadRunnables = new ArrayList<>(); - RUNNABLES.set(threadRunnables); - TransactionSynchronizationManager.registerSynchronization(this); - } - threadRunnables.add(runnable); - } - - @Override - public void afterCommit() { - List threadRunnables = RUNNABLES.get(); - if (log.isDebugEnabled()) { - log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size()); - } - for (int i = 0; i < threadRunnables.size(); i++) { - Runnable runnable = threadRunnables.get(i); - if (log.isDebugEnabled()) { - log.debug("Executing runnable {}", runnable); - } - try { - runnable.run(); - } catch (RuntimeException e) { - log.error("Failed to execute runnable " + runnable, e); - } - } - } - - @Override - public void afterCompletion(int status) { - if (log.isDebugEnabled()) { - log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"); - } - RUNNABLES.remove(); - } + public AfterCommitExecutorImpl getAfterCommitExecutor() { + return afterCommitExecutor; } } diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java new file mode 100644 index 0000000..5817136 --- /dev/null +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/AfterCommitExecutorImpl.java @@ -0,0 +1,90 @@ +package cn.axzo.framework.rocketmq; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.util.ArrayList; +import java.util.List; /** + * stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html + * 保证在交易结束后被调用. + */ +@Slf4j +public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter { + private static final ThreadLocal> RUNNABLES = new ThreadLocal>(); + private static final ThreadLocal> ROLLBACK_RUNNABLES = new ThreadLocal>(); + + public void execute(Runnable runnable) { + if (log.isDebugEnabled()) { + log.debug("Submitting new runnable {} to run after commit", runnable); + } + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + if (log.isDebugEnabled()) { + log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable); + } + runnable.run(); + return; + } + List threadRunnables = RUNNABLES.get(); + if (threadRunnables == null) { + threadRunnables = new ArrayList<>(); + RUNNABLES.set(threadRunnables); + TransactionSynchronizationManager.registerSynchronization(this); + } + threadRunnables.add(runnable); + } + + public void executeAndRollback(Runnable runnable, Runnable rollbackRunnable) { + execute(runnable); + if (TransactionSynchronizationManager.isSynchronizationActive()) { + List runnables = ROLLBACK_RUNNABLES.get(); + if (runnables == null) { + runnables = new ArrayList<>(); + ROLLBACK_RUNNABLES.set(runnables); + } + runnables.add(rollbackRunnable); + } + } + + @Override + public void afterCommit() { + List threadRunnables = RUNNABLES.get(); + if (log.isDebugEnabled()) { + log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size()); + } + for (int i = 0; i < threadRunnables.size(); i++) { + Runnable runnable = threadRunnables.get(i); + if (log.isDebugEnabled()) { + log.debug("Executing runnable {}", runnable); + } + try { + runnable.run(); + } catch (RuntimeException e) { + log.error("Failed to execute runnable " + runnable, e); + } + } + } + + @Override + public void afterCompletion(int status) { + if (log.isDebugEnabled()) { + log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"); + } + if(status != TransactionSynchronization.STATUS_COMMITTED) { + for (int i = 0; i < ROLLBACK_RUNNABLES.get().size(); i++) { + Runnable runnable = ROLLBACK_RUNNABLES.get().get(i); + try{ + runnable.run(); + } catch (RuntimeException e) { + log.error("Failed to execute runnable at transaction ROLLBACK " + runnable, e); + } + } + } + RUNNABLES.remove(); + } + + public List getRunnables() { + return RUNNABLES.get(); + } +} diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/EventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/EventProducer.java index ade5f16..8cdb4c5 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/EventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/EventProducer.java @@ -29,6 +29,14 @@ public interface EventProducer { */ void send(Event event); + /** + * 发送 event, 使用默认的 context,并在 context 中追加 header 属性 + * + * @param event + * @param addHeaders + */ + void send(Event event, Map addHeaders); + @Getter @Builder(toBuilder = true) @NoArgsConstructor diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index 406b209..ec37fee 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -30,6 +30,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { * 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控 */ private BiConsumer> sendCallback; + public static final String MQ_MESSAGE_ID = "messageId"; public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate, String defaultModule, @@ -73,9 +74,10 @@ public class RocketMQEventProducer extends AbstractEventProducer { try { // 同步发送超时时间支持在properties中设置 SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context)); - log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}", - context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId()); + log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}", + context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId()); if (sendCallback != null) { + context.getHeaders().put(MQ_MESSAGE_ID, sendResult.getMsgId()); sendCallback.accept(event, context); } } catch (Throwable e) {