Merge remote-tracking branch 'origin/master' into feature/merge_2924_2752
This commit is contained in:
commit
a394c8a690
@ -135,7 +135,9 @@ public class ErrorReportAspect implements Ordered {
|
|||||||
*/
|
*/
|
||||||
@AfterThrowing(pointcut = "@within(errorReporter) && @annotation(operation)", throwing = "e", argNames = "joinPoint,errorReporter,operation,e")
|
@AfterThrowing(pointcut = "@within(errorReporter) && @annotation(operation)", throwing = "e", argNames = "joinPoint,errorReporter,operation,e")
|
||||||
public void doAfterThrowing(JoinPoint joinPoint, ErrorReporter errorReporter, Operation operation, Exception e) {
|
public void doAfterThrowing(JoinPoint joinPoint, ErrorReporter errorReporter, Operation operation, Exception e) {
|
||||||
log.info("ErrorReportAspect 记录异常信息: {}", e.getMessage());
|
if(log.isDebugEnabled()) {
|
||||||
|
log.debug("ErrorReportAspect 记录异常信息: {}", e.getMessage());
|
||||||
|
}
|
||||||
EnvConfig[] envConfigs = errorReporter.envConfig();
|
EnvConfig[] envConfigs = errorReporter.envConfig();
|
||||||
for (EnvConfig envConfig : envConfigs) {
|
for (EnvConfig envConfig : envConfigs) {
|
||||||
if (Arrays.asList(envConfig.profiles()).contains(profile)) {
|
if (Arrays.asList(envConfig.profiles()).contains(profile)) {
|
||||||
|
|||||||
@ -110,7 +110,9 @@ public class StarterBroadcastMQConfiguration {
|
|||||||
// 处理 properties 配置进行事件过滤
|
// 处理 properties 配置进行事件过滤
|
||||||
for (InnerMessageQueueHandleBeforeFilter filter : filters) {
|
for (InnerMessageQueueHandleBeforeFilter filter : filters) {
|
||||||
if (filter.doFilter(message)) {
|
if (filter.doFilter(message)) {
|
||||||
log.info("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -107,7 +107,9 @@ public class StarterRPCInvokeMQConfiguration {
|
|||||||
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendBeforeCallback() {
|
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendBeforeCallback() {
|
||||||
return (event, context) -> {
|
return (event, context) -> {
|
||||||
event.setEventId(IdHelper.get32UUID());
|
event.setEventId(IdHelper.get32UUID());
|
||||||
log.info("mq_send_Before: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("mq_send_Before: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,7 +123,9 @@ public class StarterRPCInvokeMQConfiguration {
|
|||||||
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendAfterCallback() {
|
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendAfterCallback() {
|
||||||
return (event, context) -> {
|
return (event, context) -> {
|
||||||
String messageId = context.getHeaders().get(MQ_MESSAGE_ID);
|
String messageId = context.getHeaders().get(MQ_MESSAGE_ID);
|
||||||
log.info("mq_send_after: {}, uniqueId: {}, messageId: {}", event.getShardingKey(), event.getEventId(), messageId);
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("mq_send_after: {}, uniqueId: {}, messageId: {}", event.getShardingKey(), event.getEventId(), messageId);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,7 +138,9 @@ public class StarterRPCInvokeMQConfiguration {
|
|||||||
*/
|
*/
|
||||||
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getTransactionRollbackHandler() {
|
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getTransactionRollbackHandler() {
|
||||||
return (event, context) -> {
|
return (event, context) -> {
|
||||||
log.info("mq_transaction_rollback: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("mq_transaction_rollback: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,7 +162,9 @@ public class StarterRPCInvokeMQConfiguration {
|
|||||||
if (eventWrapper.isHandled()) {
|
if (eventWrapper.isHandled()) {
|
||||||
// 只收集被App真正消费的消息.
|
// 只收集被App真正消费的消息.
|
||||||
Event event = eventWrapper.getEvent();
|
Event event = eventWrapper.getEvent();
|
||||||
log.info("WorkflowEngineStarter RPC MQ, handled event: {}", event.toPrettyJsonString());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("WorkflowEngineStarter RPC MQ, handled event: {}", event.toPrettyJsonString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return new DefaultEventConsumer(applicationName + MODULE_NAME_SUFFIX, workflowEngineStarterEventHandlerRepository, callback);
|
return new DefaultEventConsumer(applicationName + MODULE_NAME_SUFFIX, workflowEngineStarterEventHandlerRepository, callback);
|
||||||
@ -185,7 +193,9 @@ public class StarterRPCInvokeMQConfiguration {
|
|||||||
@Override
|
@Override
|
||||||
public void onMessage(MessageExt message) {
|
public void onMessage(MessageExt message) {
|
||||||
if (filter.doFilter(message)) {
|
if (filter.doFilter(message)) {
|
||||||
log.info("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
super.onEvent(message, workflowEngineStarterEventConsumer);
|
super.onEvent(message, workflowEngineStarterEventConsumer);
|
||||||
|
|||||||
@ -104,7 +104,9 @@ public class WorkflowEngineStarterAutoConfiguration {
|
|||||||
private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties) {
|
private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties) {
|
||||||
BroadcastListenerProperties listenerRetry = starterProperties.getBroadcast();
|
BroadcastListenerProperties listenerRetry = starterProperties.getBroadcast();
|
||||||
FailHandleTypeEnum failHandleType = listenerRetry.getFailHandleType();
|
FailHandleTypeEnum failHandleType = listenerRetry.getFailHandleType();
|
||||||
log.info("workflow engine starter fail handle type : {}", failHandleType);
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("workflow engine starter fail handle type : {}", failHandleType);
|
||||||
|
}
|
||||||
switch (failHandleType) {
|
switch (failHandleType) {
|
||||||
case FAIL_BACK:
|
case FAIL_BACK:
|
||||||
// return new FailBackInterceptor();
|
// return new FailBackInterceptor();
|
||||||
|
|||||||
@ -244,8 +244,8 @@ public class ComplexInvokeClient implements Client {
|
|||||||
// at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
|
// at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
|
||||||
// at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753)
|
// at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753)
|
||||||
// 只设置为字符串会报错,恢复为设置成CommonResponse
|
// 只设置为字符串会报错,恢复为设置成CommonResponse
|
||||||
final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success(HttpStatus.OK.value(), "Send MQ Success", null))
|
final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success(HttpStatus.OK.value(), null, null))
|
||||||
.getBytes(UTF_8));
|
.getBytes(UTF_8));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer length() {
|
public Integer length() {
|
||||||
|
|||||||
@ -72,7 +72,9 @@ final class WorkflowEngineStarterDecoder implements Decoder {
|
|||||||
wrappedType = ParameterizedTypeImpl.make(CommonResponse.class, new Type[]{type}, null);
|
wrappedType = ParameterizedTypeImpl.make(CommonResponse.class, new Type[]{type}, null);
|
||||||
}
|
}
|
||||||
Object decode = delegate.decode(response, wrappedType);
|
Object decode = delegate.decode(response, wrappedType);
|
||||||
log.info("workflow engine starter response :{}", JSON.toJSONString(decode));
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("workflow engine starter response :{}", JSON.toJSONString(decode));
|
||||||
|
}
|
||||||
if (decode instanceof CommonResponse) {
|
if (decode instanceof CommonResponse) {
|
||||||
CommonResponse<?> commonResponse = (CommonResponse<?>) decode;
|
CommonResponse<?> commonResponse = (CommonResponse<?>) decode;
|
||||||
if (response.status() == 202) {
|
if (response.status() == 202) {
|
||||||
|
|||||||
@ -61,8 +61,10 @@ class WorkflowEngineStarterInvocationHandler implements InvocationHandler {
|
|||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
List<String> ignoreMethods = Lists.newArrayList("sync", "async");
|
List<String> ignoreMethods = Lists.newArrayList("sync", "async");
|
||||||
if (!ignoreMethods.contains(method.getName())) {
|
if (!ignoreMethods.contains(method.getName())) {
|
||||||
log.info("Workflow starter Method invoke record: {}.{}, args: {}, result: {} const: {} ms",
|
if (log.isDebugEnabled()) {
|
||||||
target.type().getSimpleName(), method.getName(), JSON.toJSONString(args), JSON.toJSONString(invoke), stopWatch.getTotalTimeMillis());
|
log.debug("Workflow starter Method invoke record: {}.{}, args: {}, result: {} const: {} ms",
|
||||||
|
target.type().getSimpleName(), method.getName(), JSON.toJSONString(args), JSON.toJSONString(invoke), stopWatch.getTotalTimeMillis());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return invoke;
|
return invoke;
|
||||||
|
|||||||
@ -35,7 +35,9 @@ public final class FailOverInterceptor extends AbstractListenerInterceptor {
|
|||||||
int failedAttempts = 0;
|
int failedAttempts = 0;
|
||||||
do {
|
do {
|
||||||
if (failedAttempts > 0) {
|
if (failedAttempts > 0) {
|
||||||
log.info("Waiting for {} ms before retrying the command. retryTimes: {}", waitTime, failedAttempts);
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Waiting for {} ms before retrying the command. retryTimes: {}", waitTime, failedAttempts);
|
||||||
|
}
|
||||||
waitBeforeRetry(waitTime);
|
waitBeforeRetry(waitTime);
|
||||||
waitTime *= waitIncreaseFactor;
|
waitTime *= waitIncreaseFactor;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,8 +21,8 @@ public final class LogInterceptor extends AbstractListenerInterceptor {
|
|||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("--- starting handle mq ---- ");
|
log.debug("--- starting handle mq ---- ");
|
||||||
}
|
}
|
||||||
if (log.isInfoEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.info("messageId: {}, eventCode: {}, messageBody: {}", context.getMsgId(), context.getEventCode().toString(), JSON.toJSONString(t));
|
log.debug("messageId: {}, eventCode: {}, messageBody: {}", context.getMsgId(), context.getEventCode().toString(), JSON.toJSONString(t));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
getNext().execute(executor, consumer, context, t);
|
getNext().execute(executor, consumer, context, t);
|
||||||
|
|||||||
@ -59,7 +59,9 @@ public abstract class AbstractInnerWorkflowListener<H extends Ordered, F extends
|
|||||||
if (!CollectionUtils.isEmpty(businessListeners)) {
|
if (!CollectionUtils.isEmpty(businessListeners)) {
|
||||||
for (F filter : businessFilters) {
|
for (F filter : businessFilters) {
|
||||||
if (filter.doFilter(event, context, convert)) {
|
if (filter.doFilter(event, context, convert)) {
|
||||||
log.info("【{}】filtered message, messageId: {}", filter.getClass().getSimpleName(), context.getMsgId());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("【{}】filtered message, messageId: {}", filter.getClass().getSimpleName(), context.getMsgId());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user