diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/service/JobLogProcessor.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/service/JobLogProcessor.java index 3774fcc75..cc6282efc 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/service/JobLogProcessor.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/service/JobLogProcessor.java @@ -1,6 +1,7 @@ package cn.axzo.workflow.core.engine.job.service; import cn.axzo.workflow.common.constant.LogFieldConstants; +import cn.axzo.workflow.core.engine.job.utils.AsyncJobUtils; import cn.azxo.framework.common.constatns.Constants; import cn.hutool.json.JSONUtil; import lombok.Data; @@ -16,15 +17,13 @@ import org.springframework.stereotype.Component; @Component public class JobLogProcessor implements JobProcessor { - private static final String WRAPPED_PREFIX = "@@wrapped@@"; - @Override public void process(JobProcessorContext jobProcessorContext) { if (jobProcessorContext.getPhase() == JobProcessorContext.Phase.BEFORE_EXECUTE) { AbstractJobEntity jobEntity = jobProcessorContext.getJobEntity(); String customValues = jobEntity.getCustomValues(); - if (isWrappedCustomValues(customValues)) { - WrappedJobCustomValuesInfo bean = convert2WrappedCustomValuesInfo(customValues); + if (AsyncJobUtils.isWrappedCustomValues(customValues)) { + AsyncJobUtils.WrappedJobCustomValuesInfo bean = AsyncJobUtils.convert2WrappedCustomValuesInfo(customValues); if (bean != null) { //原始的值重新设置回去 String originalCustomValues = bean.getCustomValues(); @@ -40,42 +39,9 @@ public class JobLogProcessor implements JobProcessor { } else if (jobProcessorContext.getPhase() == JobProcessorContext.Phase.BEFORE_CREATE) {//持久化之前做处理 AbstractJobEntity jobEntity = jobProcessorContext.getJobEntity(); String customValues = jobEntity.getCustomValues(); - jobEntity.setCustomValues(getWrappedCustomValues(customValues)); + jobEntity.setCustomValues(AsyncJobUtils.getWrappedCustomValues(customValues)); } } - private String getWrappedCustomValues(String originalCustomValues) { - WrappedJobCustomValuesInfo jobLogInfo = new WrappedJobCustomValuesInfo(); - String traceId = MDC.get(LogFieldConstants.X_REQUEST_ID); - String cxtLogId = MDC.get(Constants.CTX_LOG_ID_MDC); - jobLogInfo.setCtxLogId(cxtLogId); - jobLogInfo.setXRequestId(traceId); - jobLogInfo.setCustomValues(originalCustomValues); - String jsonStr = JSONUtil.toJsonStr(jobLogInfo); - return WRAPPED_PREFIX + jsonStr; - } - private boolean isWrappedCustomValues(String customValues) { - return customValues != null && !customValues.trim().isEmpty() && customValues.startsWith(WRAPPED_PREFIX); - } - - private WrappedJobCustomValuesInfo convert2WrappedCustomValuesInfo(String wrappedCustomValues) { - WrappedJobCustomValuesInfo bean = null; - if (isWrappedCustomValues(wrappedCustomValues)) { - try { - String str = StringUtils.substring(wrappedCustomValues, WRAPPED_PREFIX.length()); - bean = JSONUtil.toBean(str, WrappedJobCustomValuesInfo.class); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - return bean; - } - - @Data - public static final class WrappedJobCustomValuesInfo { - private String xRequestId; - private String ctxLogId; - private String customValues; - } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/utils/AsyncJobUtils.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/utils/AsyncJobUtils.java new file mode 100644 index 000000000..a85244fb4 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/utils/AsyncJobUtils.java @@ -0,0 +1,51 @@ +package cn.axzo.workflow.core.engine.job.utils; + +import cn.axzo.workflow.common.constant.LogFieldConstants; +import cn.azxo.framework.common.constatns.Constants; +import cn.hutool.json.JSONUtil; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.slf4j.MDC; + +@Slf4j +public class AsyncJobUtils { + + private static final String WRAPPED_PREFIX = "@@wrapped@@"; + + public static String getWrappedCustomValues(String originalCustomValues) { + WrappedJobCustomValuesInfo jobLogInfo = new WrappedJobCustomValuesInfo(); + String traceId = MDC.get(LogFieldConstants.X_REQUEST_ID); + String cxtLogId = MDC.get(Constants.CTX_LOG_ID_MDC); + jobLogInfo.setCtxLogId(cxtLogId); + jobLogInfo.setXRequestId(traceId); + jobLogInfo.setCustomValues(originalCustomValues); + String jsonStr = JSONUtil.toJsonStr(jobLogInfo); + return WRAPPED_PREFIX + jsonStr; + } + + public static boolean isWrappedCustomValues(String customValues) { + return customValues != null && !customValues.trim().isEmpty() && customValues.startsWith(WRAPPED_PREFIX); + } + + public static WrappedJobCustomValuesInfo convert2WrappedCustomValuesInfo(String wrappedCustomValues) { + WrappedJobCustomValuesInfo bean = null; + if (isWrappedCustomValues(wrappedCustomValues)) { + try { + String str = StringUtils.substring(wrappedCustomValues, WRAPPED_PREFIX.length()); + bean = JSONUtil.toBean(str, WrappedJobCustomValuesInfo.class); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + return bean; + } + + @Data + public static final class WrappedJobCustomValuesInfo { + private String xRequestId; + private String ctxLogId; + private String customValues; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java index 9862b016a..49dc85d13 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java @@ -1,11 +1,19 @@ package cn.axzo.workflow.core.engine.listener; +import cn.axzo.workflow.common.constant.LogFieldConstants; +import cn.axzo.workflow.core.engine.job.utils.AsyncJobUtils; import cn.axzo.workflow.core.listener.BpmnAsyncJobEventListener; +import cn.azxo.framework.common.constatns.Constants; +import cn.hutool.json.JSONUtil; import com.google.common.collect.ImmutableSet; import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.impl.event.FlowableEngineEventImpl; +import org.flowable.common.engine.impl.event.FlowableEntityEventImpl; import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener; +import org.flowable.job.service.impl.persistence.entity.AbstractJobEntity; +import org.slf4j.MDC; import org.springframework.beans.factory.ObjectProvider; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; @@ -48,6 +56,9 @@ public class EngineAsyncJobEventListener extends AbstractFlowableEngineEventList StopWatch stopWatch = new StopWatch("EngineAsyncJobEventListener"); stopWatch.start("async-job-event-listener"); if (ASYNC_JOB_EVENTS.contains(flowableEvent.getType())) { + if (flowableEvent instanceof FlowableEngineEventImpl) { + resolveJobInfoBeforeOperate((FlowableEntityEventImpl) flowableEvent); + } getOrderedListeners().forEach(i -> { if (i.support((FlowableEngineEventType) flowableEvent.getType())) { i.notify(flowableEvent); @@ -63,4 +74,23 @@ public class EngineAsyncJobEventListener extends AbstractFlowableEngineEventList asyncJobListeners.ifAvailable(orderListeners::addAll); return orderListeners; } + + private void resolveJobInfoBeforeOperate(FlowableEntityEventImpl flowableEvent) { + if (flowableEvent == null) { + return; + } + Object entity = flowableEvent.getEntity(); + log.info("resolve job info,eventInfo:{},jobEntity:{}", JSONUtil.toJsonStr(flowableEvent), entity); + if (entity instanceof AbstractJobEntity) { + AbstractJobEntity jobInfo = (AbstractJobEntity) entity; + String customValues = jobInfo.getCustomValues(); + if (AsyncJobUtils.isWrappedCustomValues(customValues)) { + AsyncJobUtils.WrappedJobCustomValuesInfo wrappedInfo = AsyncJobUtils.convert2WrappedCustomValuesInfo(customValues); + String originalCustomValues = wrappedInfo.getCustomValues(); + jobInfo.setCustomValues(originalCustomValues); + MDC.put(LogFieldConstants.X_REQUEST_ID, wrappedInfo.getXRequestId()); + MDC.put(Constants.CTX_LOG_ID_MDC, wrappedInfo.getCtxLogId()); + } + } + } }