From 01fec6ab884eb4662dea0e3ae4d42bf4a8b2e8ac Mon Sep 17 00:00:00 2001 From: yanglin Date: Wed, 18 Dec 2024 16:12:45 +0800 Subject: [PATCH] REQ-3201: fix bugs --- .../message/service/card/CardManager.java | 63 ++++-- .../service/card/CardSendExecutor.java | 55 +++++ .../todo/mybatis/SqlInterceptorConfig.java | 6 + .../msg/center/common/utils/MiscUtils.java | 19 +- msg-center-dal/pom.xml | 4 + .../java/cn/axzo/msg/center/dal/CardDao.java | 3 +- .../center/util/DeleteAwareInterceptor.java | 202 ++++++++++++++++++ 7 files changed, 313 insertions(+), 39 deletions(-) create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java create mode 100644 msg-center-dal/src/main/java/cn/axzo/msg/center/util/DeleteAwareInterceptor.java diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java index abfa5779..55be0d3f 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java @@ -7,6 +7,7 @@ import cn.axzo.im.center.api.vo.req.UpdateMessageRequest; import cn.axzo.im.center.api.vo.resp.MessageTaskResp; import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse; import cn.axzo.msg.center.common.utils.BizAssertions; +import cn.axzo.msg.center.common.utils.MiscUtils; import cn.axzo.msg.center.dal.CardDao; import cn.axzo.msg.center.dal.CardIdempotentDao; import cn.axzo.msg.center.domain.entity.Card; @@ -41,6 +42,10 @@ import org.springframework.transaction.support.TransactionTemplate; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author yanglin @@ -60,6 +65,10 @@ public class CardManager { private final CardParser cardParser; private final CardBroadcaster cardBroadcaster; private final CardProps cardProps; + private final ExecutorService executor = new ThreadPoolExecutor( + 5, 15, + 5L, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(2000)); public CardSendResponse send(CardSendRequest request) { // 校验参数 @@ -78,30 +87,40 @@ public class CardManager { .reloadAndLogCards("send:enqueue"); }); PushDeviceSnapshots deviceSnapshots = pushDeviceService.createDeviceSnapshots(); + CardSendExecutor batchExecutor = new CardSendExecutor(executor); for (CardGroup group : sendModel.getCardGroups()) { - SendTemplateMessageParam imRequest = cardSupport.buildImSendRequest( - sendModel, group, deviceSnapshots); - CardLogger groupLogger = cardLoggers.createLogger(requestContext, group.getCards()); - try { - MessageTaskResp imResponse = BizAssertions.assertResponse( - messageApi.sendTemplateMessageAsync(imRequest)); - execTransactional(() -> { - cardDao.setSendSuccess(group.getCards(), imResponse); - groupLogger.reloadAndLogCards("send:success"); - }); - } catch (Exception e) { - log.warn("发送IM消息失败, deleteCardsWhenSendFail={}, request={}", - cardProps.isDeleteCardsWhenSendFail(), request, e); - execTransactional(() -> { - if (cardProps.isDeleteCardsWhenSendFail()) { - cardIdempotentDao.deleteIdempotent(request.getAppCode(), - request.getTemplateCode(), request.getIdempotentCode()); - cardDao.deleteCards(sendModel.getCards()); - } + batchExecutor.submit(() -> { + SendTemplateMessageParam imRequest = cardSupport.buildImSendRequest( + sendModel, group, deviceSnapshots); + CardLogger groupLogger = cardLoggers.createLogger(requestContext, group.getCards()); + try { + MessageTaskResp imResponse = BizAssertions.assertResponse( + messageApi.sendTemplateMessageAsync(imRequest)); + execTransactional(() -> { + cardDao.setSendSuccess(group.getCards(), imResponse); + groupLogger.reloadAndLogCards("send:success"); + }); + } catch (Exception e) { + log.warn("发送IM消息失败, deleteCardsWhenSendFail={}, request={}", + cardProps.isDeleteCardsWhenSendFail(), request, e); groupLogger.reloadAndLogCards("send:fail", e); - }); - throw e; - } + throw e; + } + }); + } + try { + batchExecutor.awaitTermination(5, TimeUnit.MINUTES); + } catch (Exception e) { + log.warn("发送IM消息失败, deleteCardsWhenSendFail={}, request={}", + cardProps.isDeleteCardsWhenSendFail(), request, e); + execTransactional(() -> { + if (cardProps.isDeleteCardsWhenSendFail()) { + cardIdempotentDao.deleteIdempotent(request.getAppCode(), + request.getTemplateCode(), request.getIdempotentCode()); + cardDao.deleteCards(sendModel.getCards()); + } + }); + throw MiscUtils.wrapException(e); } CardSendResponse response = new CardSendResponse(); if (request.isReturnCards()) { diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java new file mode 100644 index 00000000..5c72228a --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java @@ -0,0 +1,55 @@ +package cn.axzo.msg.center.message.service.card; + +import cn.axzo.msg.center.utils.AsyncRunTasks; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author yanglin + */ +@Slf4j +class CardSendExecutor { + + private final AsyncRunTasks asyncTasks; + private final AtomicReference error = new AtomicReference<>(); + private int submitCount = 0; + + CardSendExecutor(ExecutorService executor) { + this.asyncTasks = new AsyncRunTasks(executor); + } + + void submit(Runnable sender) { + submitCount++; + if (submitCount <= 3) { + sender.run(); + return; + } + log.info("using async card send, submitBatchSize={}", submitCount); + asyncTasks.runAsync(() -> { + Exception exception = error.get(); + if (exception != null) { + log.warn("其它分组发送消息失败, 放弃当前分组"); + return; + } + try { + sender.run(); + } catch (Exception e) { + log.warn("发送消息失败", e); + error.compareAndSet(null, e); + } + }); + } + + void awaitTermination(long timeout, TimeUnit unit) throws Exception { + asyncTasks.awaitTermination(timeout, unit); + Exception exception = error.get(); + if (exception != null) { + log.warn("发送消息失败", exception); + throw exception; + } + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/mybatis/SqlInterceptorConfig.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/mybatis/SqlInterceptorConfig.java index a86a123e..fddd8fa3 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/mybatis/SqlInterceptorConfig.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/mybatis/SqlInterceptorConfig.java @@ -3,6 +3,7 @@ package cn.axzo.msg.center.message.service.todo.mybatis; import cn.axzo.msg.center.inside.notices.config.DiagnosisProps; import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.message.service.todo.mybatis.rowcountwarn.RowCountInterceptor; +import cn.axzo.msg.center.util.DeleteAwareInterceptor; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -31,6 +32,11 @@ public class SqlInterceptorConfig { return new CollectSQLInterceptor(); } + @Bean + DeleteAwareInterceptor deleteAwareInterceptor() { + return new DeleteAwareInterceptor(); + } + @Bean public BeautifulPaginationInterceptor paginationInterceptor2() { return new BeautifulPaginationInterceptor(); diff --git a/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/MiscUtils.java b/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/MiscUtils.java index af1caaa9..52bf2cf9 100644 --- a/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/MiscUtils.java +++ b/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/MiscUtils.java @@ -1,27 +1,14 @@ package cn.axzo.msg.center.common.utils; -import java.util.concurrent.TimeUnit; +import cn.axzo.basics.common.exception.ServiceException; /** * @author yanglin */ public class MiscUtils { - public static void sleepQuietly(long sleepTime, TimeUnit unit) { - long deadline = System.currentTimeMillis() + unit.toMillis(sleepTime); - long waitMs = deadline - System.currentTimeMillis(); - boolean isInterrupted = false; - while (waitMs > 0) { - try { - Thread.sleep(waitMs); - } catch (InterruptedException e) { - isInterrupted = true; - } - waitMs = deadline - System.currentTimeMillis(); - } - if (isInterrupted) { - Thread.currentThread().interrupt(); - } + public static RuntimeException wrapException(Exception e) { + return e instanceof ServiceException ? (ServiceException) e : new ServiceException(e); } } \ No newline at end of file diff --git a/msg-center-dal/pom.xml b/msg-center-dal/pom.xml index 4766cae1..57efd578 100644 --- a/msg-center-dal/pom.xml +++ b/msg-center-dal/pom.xml @@ -21,6 +21,10 @@ + + com.alibaba + druid + mysql mysql-connector-java diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/CardDao.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/CardDao.java index 2100846d..ded2ebf5 100644 --- a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/CardDao.java +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/CardDao.java @@ -6,6 +6,7 @@ import cn.axzo.msg.center.dal.mapper.CardMapper; import cn.axzo.msg.center.domain.entity.Card; import cn.axzo.msg.center.domain.persistence.BaseEntityExt; import cn.axzo.msg.center.service.pending.request.CardStateInfo; +import cn.axzo.msg.center.util.DeleteAwareInterceptor; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -80,7 +81,7 @@ public class CardDao extends ServiceImpl { public List reloadCards(List cards) { if (CollectionUtils.isEmpty(cards)) return Collections.emptyList(); - return getBaseMapper().getByIds(collectCardIds(cards)); + return DeleteAwareInterceptor.execute(()-> listByIds(collectCardIds(cards))); } public void deleteCards(List cards) { diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/util/DeleteAwareInterceptor.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/util/DeleteAwareInterceptor.java new file mode 100644 index 00000000..c3bb7bd6 --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/util/DeleteAwareInterceptor.java @@ -0,0 +1,202 @@ +package cn.axzo.msg.center.util; + +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement; +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.visitor.SQLASTVisitor; +import org.apache.ibatis.executor.Executor; +import org.apache.ibatis.mapping.BoundSql; +import org.apache.ibatis.mapping.MappedStatement; +import org.apache.ibatis.mapping.ParameterMapping; +import org.apache.ibatis.mapping.SqlCommandType; +import org.apache.ibatis.mapping.SqlSource; +import org.apache.ibatis.plugin.Interceptor; +import org.apache.ibatis.plugin.Intercepts; +import org.apache.ibatis.plugin.Invocation; +import org.apache.ibatis.plugin.Signature; +import org.apache.ibatis.session.Configuration; +import org.apache.ibatis.session.ResultHandler; +import org.apache.ibatis.session.RowBounds; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * 把select和update语句中mybatis-plus自动添加到where条件里面的is_delete条件去掉(替换成常量表达式) + * + * @author yanglin + */ +@Intercepts(value = { + @Signature(type = Executor.class, method = "query", + args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}), + @Signature(type = Executor.class, method = "update", + args = {MappedStatement.class, Object.class}) +}) +public class DeleteAwareInterceptor implements Interceptor { + + private static final ThreadLocal ENABLE = new ThreadLocal<>(); + + /** + * 不建议此种使用方式,容易忘记把 {@link DeleteAwareInterceptor#disableDeleteAware} 写到finally代码块中 + * 建议使用 {@link DeleteAwareInterceptor#execute} + */ + @Deprecated + public static void enableDeleteAware() { + ENABLE.set(Boolean.TRUE); + } + + /** + * 不建议此种使用方式,容易忘记把 {@link DeleteAwareInterceptor#disableDeleteAware} 写到finally代码块中 + * 建议使用 {@link DeleteAwareInterceptor#execute} + */ + @Deprecated + public static void disableDeleteAware() { + ENABLE.remove(); + } + + public static T execute(Supplier supplier) { + try { + enableDeleteAware(); + return supplier.get(); + } finally { + disableDeleteAware(); + } + } + + public static void execute(Runnable runnable) { + try { + enableDeleteAware(); + runnable.run(); + } finally { + disableDeleteAware(); + } + } + + public static boolean isDeleteAware() { + Boolean enabled = ENABLE.get(); + return enabled != null && enabled; + } + + @Override + public Object intercept(Invocation invocation) throws Throwable { + if (!isDeleteAware()) { + return invocation.proceed(); + } + + final Object[] args = invocation.getArgs(); + MappedStatement ms = (MappedStatement) args[0]; + SqlCommandType commandType = ms.getSqlCommandType(); + BoundSql boundSql = ms.getBoundSql(args[1]); + SQLStatement stmt; + if (commandType == SqlCommandType.UPDATE) { + stmt = new MySqlStatementParser(boundSql.getSql()).parseUpdateStatement(); + } else if (commandType == SqlCommandType.SELECT) { + stmt = new MySqlStatementParser(boundSql.getSql()).parseSelect(); + } else { + return invocation.proceed(); + } + stmt.accept(new ReplaceIsDeleteExprVisitor()); + String sqlWithoutIsDelete = stmt.toString(); + args[0] = copyMappedStatement(ms, parameterObject -> new CustomBoundSql( + ms.getConfiguration(), sqlWithoutIsDelete, boundSql.getParameterMappings(), parameterObject, boundSql)); + return invocation.proceed(); + } + + private static class ReplaceIsDeleteExprVisitor implements SQLASTVisitor { + + @Override + public void endVisit(SQLBinaryOpExpr x) { + // 不是 is_delete = 0/1 的形式 + if (!(x.getLeft() instanceof SQLIdentifierExpr) + || !(x.getRight() instanceof SQLIntegerExpr)) { + return; + } + // 字段名不叫 is_delete + if (!((SQLIdentifierExpr) x.getLeft()).getName().equalsIgnoreCase("is_delete")) { + return; + } + if (x.getParent() instanceof MySqlSelectQueryBlock) { + ((MySqlSelectQueryBlock) x.getParent()).removeCondition(x); + } else if (x.getParent() instanceof MySqlUpdateStatement) { + ((MySqlUpdateStatement) x.getParent()).removeCondition(x); + } else if (x.getParent() instanceof SQLBinaryOpExpr) { + SQLBinaryOpExpr parent = (SQLBinaryOpExpr) x.getParent(); + Consumer replaceFun = newExpr -> { + if (parent.getLeft() == x) { + parent.setLeft(newExpr); + } else if (parent.getRight() == x) { + parent.setRight(newExpr); + } + }; + // 下面就取巧设置了, 可以少写很多代码 + if (parent.getOperator() == SQLBinaryOperator.BooleanAnd) { + replaceFun.accept(SQLUtils.toMySqlExpr("1 = 1")); + } else if (parent.getOperator() == SQLBinaryOperator.BooleanOr) { + replaceFun.accept(SQLUtils.toMySqlExpr("0 = 1")); + } + } + } + + } + + private static class CustomBoundSql extends BoundSql { + + final BoundSql delegate; + + CustomBoundSql(Configuration configuration, String sql, List parameterMappings, + Object parameterObject, BoundSql delegate) { + super(configuration, sql, parameterMappings, parameterObject); + this.delegate = delegate; + } + + @Override + public boolean hasAdditionalParameter(String name) { + if (super.hasAdditionalParameter(name)) + return true; + return delegate.hasAdditionalParameter(name); + } + + @Override + public Object getAdditionalParameter(String name) { + Object parameter = super.getAdditionalParameter(name); + if (parameter != null) + return parameter; + return delegate.getAdditionalParameter(name); + } + + } + + private MappedStatement copyMappedStatement(MappedStatement ms, SqlSource sqlSource) { + MappedStatement.Builder builder = + new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), sqlSource, ms.getSqlCommandType()); + builder.resource(ms.getResource()); + builder.fetchSize(ms.getFetchSize()); + builder.statementType(ms.getStatementType()); + builder.keyGenerator(ms.getKeyGenerator()); + if (ms.getKeyProperties() != null && ms.getKeyProperties().length != 0) { + StringBuilder keyProperties = new StringBuilder(); + for (String keyProperty : ms.getKeyProperties()) { + keyProperties.append(keyProperty).append(","); + } + keyProperties.delete(keyProperties.length() - 1, keyProperties.length()); + builder.keyProperty(keyProperties.toString()); + } + builder.timeout(ms.getTimeout()); + builder.parameterMap(ms.getParameterMap()); + builder.resultMaps(ms.getResultMaps()); + builder.resultSetType(ms.getResultSetType()); + builder.cache(ms.getCache()); + builder.flushCacheRequired(ms.isFlushCacheRequired()); + builder.useCache(ms.isUseCache()); + return builder.build(); + } + +} \ No newline at end of file