REQ-3201: fix bugs

This commit is contained in:
yanglin 2024-12-18 16:12:45 +08:00
parent cfc57b91e7
commit 01fec6ab88
7 changed files with 313 additions and 39 deletions

View File

@ -7,6 +7,7 @@ import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.common.utils.MiscUtils;
import cn.axzo.msg.center.dal.CardDao;
import cn.axzo.msg.center.dal.CardIdempotentDao;
import cn.axzo.msg.center.domain.entity.Card;
@ -41,6 +42,10 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author yanglin
@ -60,6 +65,10 @@ public class CardManager {
private final CardParser cardParser;
private final CardBroadcaster cardBroadcaster;
private final CardProps cardProps;
private final ExecutorService executor = new ThreadPoolExecutor(
5, 15,
5L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(2000));
public CardSendResponse send(CardSendRequest request) {
// 校验参数
@ -78,7 +87,9 @@ public class CardManager {
.reloadAndLogCards("send:enqueue");
});
PushDeviceSnapshots deviceSnapshots = pushDeviceService.createDeviceSnapshots();
CardSendExecutor batchExecutor = new CardSendExecutor(executor);
for (CardGroup group : sendModel.getCardGroups()) {
batchExecutor.submit(() -> {
SendTemplateMessageParam imRequest = cardSupport.buildImSendRequest(
sendModel, group, deviceSnapshots);
CardLogger groupLogger = cardLoggers.createLogger(requestContext, group.getCards());
@ -89,6 +100,16 @@ public class CardManager {
cardDao.setSendSuccess(group.getCards(), imResponse);
groupLogger.reloadAndLogCards("send:success");
});
} catch (Exception e) {
log.warn("发送IM消息失败, deleteCardsWhenSendFail={}, request={}",
cardProps.isDeleteCardsWhenSendFail(), request, e);
groupLogger.reloadAndLogCards("send:fail", e);
throw e;
}
});
}
try {
batchExecutor.awaitTermination(5, TimeUnit.MINUTES);
} catch (Exception e) {
log.warn("发送IM消息失败, deleteCardsWhenSendFail={}, request={}",
cardProps.isDeleteCardsWhenSendFail(), request, e);
@ -98,10 +119,8 @@ public class CardManager {
request.getTemplateCode(), request.getIdempotentCode());
cardDao.deleteCards(sendModel.getCards());
}
groupLogger.reloadAndLogCards("send:fail", e);
});
throw e;
}
throw MiscUtils.wrapException(e);
}
CardSendResponse response = new CardSendResponse();
if (request.isReturnCards()) {

View File

@ -0,0 +1,55 @@
package cn.axzo.msg.center.message.service.card;
import cn.axzo.msg.center.utils.AsyncRunTasks;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author yanglin
*/
@Slf4j
class CardSendExecutor {
private final AsyncRunTasks asyncTasks;
private final AtomicReference<Exception> error = new AtomicReference<>();
private int submitCount = 0;
CardSendExecutor(ExecutorService executor) {
this.asyncTasks = new AsyncRunTasks(executor);
}
void submit(Runnable sender) {
submitCount++;
if (submitCount <= 3) {
sender.run();
return;
}
log.info("using async card send, submitBatchSize={}", submitCount);
asyncTasks.runAsync(() -> {
Exception exception = error.get();
if (exception != null) {
log.warn("其它分组发送消息失败, 放弃当前分组");
return;
}
try {
sender.run();
} catch (Exception e) {
log.warn("发送消息失败", e);
error.compareAndSet(null, e);
}
});
}
void awaitTermination(long timeout, TimeUnit unit) throws Exception {
asyncTasks.awaitTermination(timeout, unit);
Exception exception = error.get();
if (exception != null) {
log.warn("发送消息失败", exception);
throw exception;
}
}
}

View File

@ -3,6 +3,7 @@ package cn.axzo.msg.center.message.service.todo.mybatis;
import cn.axzo.msg.center.inside.notices.config.DiagnosisProps;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.service.todo.mybatis.rowcountwarn.RowCountInterceptor;
import cn.axzo.msg.center.util.DeleteAwareInterceptor;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -31,6 +32,11 @@ public class SqlInterceptorConfig {
return new CollectSQLInterceptor();
}
@Bean
DeleteAwareInterceptor deleteAwareInterceptor() {
return new DeleteAwareInterceptor();
}
@Bean
public BeautifulPaginationInterceptor paginationInterceptor2() {
return new BeautifulPaginationInterceptor();

View File

@ -1,27 +1,14 @@
package cn.axzo.msg.center.common.utils;
import java.util.concurrent.TimeUnit;
import cn.axzo.basics.common.exception.ServiceException;
/**
* @author yanglin
*/
public class MiscUtils {
public static void sleepQuietly(long sleepTime, TimeUnit unit) {
long deadline = System.currentTimeMillis() + unit.toMillis(sleepTime);
long waitMs = deadline - System.currentTimeMillis();
boolean isInterrupted = false;
while (waitMs > 0) {
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
isInterrupted = true;
}
waitMs = deadline - System.currentTimeMillis();
}
if (isInterrupted) {
Thread.currentThread().interrupt();
}
public static RuntimeException wrapException(Exception e) {
return e instanceof ServiceException ? (ServiceException) e : new ServiceException(e);
}
}

View File

@ -21,6 +21,10 @@
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>

View File

@ -6,6 +6,7 @@ import cn.axzo.msg.center.dal.mapper.CardMapper;
import cn.axzo.msg.center.domain.entity.Card;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.service.pending.request.CardStateInfo;
import cn.axzo.msg.center.util.DeleteAwareInterceptor;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@ -80,7 +81,7 @@ public class CardDao extends ServiceImpl<CardMapper, Card> {
public List<Card> reloadCards(List<Card> cards) {
if (CollectionUtils.isEmpty(cards))
return Collections.emptyList();
return getBaseMapper().getByIds(collectCardIds(cards));
return DeleteAwareInterceptor.execute(()-> listByIds(collectCardIds(cards)));
}
public void deleteCards(List<Card> cards) {

View File

@ -0,0 +1,202 @@
package cn.axzo.msg.center.util;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.visitor.SQLASTVisitor;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.ParameterMapping;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.mapping.SqlSource;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Signature;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* 把select和update语句中mybatis-plus自动添加到where条件里面的is_delete条件去掉(替换成常量表达式)
*
* @author yanglin
*/
@Intercepts(value = {
@Signature(type = Executor.class, method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
@Signature(type = Executor.class, method = "update",
args = {MappedStatement.class, Object.class})
})
public class DeleteAwareInterceptor implements Interceptor {
private static final ThreadLocal<Boolean> ENABLE = new ThreadLocal<>();
/**
* 不建议此种使用方式容易忘记把 {@link DeleteAwareInterceptor#disableDeleteAware} 写到finally代码块中
* 建议使用 {@link DeleteAwareInterceptor#execute}
*/
@Deprecated
public static void enableDeleteAware() {
ENABLE.set(Boolean.TRUE);
}
/**
* 不建议此种使用方式容易忘记把 {@link DeleteAwareInterceptor#disableDeleteAware} 写到finally代码块中
* 建议使用 {@link DeleteAwareInterceptor#execute}
*/
@Deprecated
public static void disableDeleteAware() {
ENABLE.remove();
}
public static <T> T execute(Supplier<T> supplier) {
try {
enableDeleteAware();
return supplier.get();
} finally {
disableDeleteAware();
}
}
public static void execute(Runnable runnable) {
try {
enableDeleteAware();
runnable.run();
} finally {
disableDeleteAware();
}
}
public static boolean isDeleteAware() {
Boolean enabled = ENABLE.get();
return enabled != null && enabled;
}
@Override
public Object intercept(Invocation invocation) throws Throwable {
if (!isDeleteAware()) {
return invocation.proceed();
}
final Object[] args = invocation.getArgs();
MappedStatement ms = (MappedStatement) args[0];
SqlCommandType commandType = ms.getSqlCommandType();
BoundSql boundSql = ms.getBoundSql(args[1]);
SQLStatement stmt;
if (commandType == SqlCommandType.UPDATE) {
stmt = new MySqlStatementParser(boundSql.getSql()).parseUpdateStatement();
} else if (commandType == SqlCommandType.SELECT) {
stmt = new MySqlStatementParser(boundSql.getSql()).parseSelect();
} else {
return invocation.proceed();
}
stmt.accept(new ReplaceIsDeleteExprVisitor());
String sqlWithoutIsDelete = stmt.toString();
args[0] = copyMappedStatement(ms, parameterObject -> new CustomBoundSql(
ms.getConfiguration(), sqlWithoutIsDelete, boundSql.getParameterMappings(), parameterObject, boundSql));
return invocation.proceed();
}
private static class ReplaceIsDeleteExprVisitor implements SQLASTVisitor {
@Override
public void endVisit(SQLBinaryOpExpr x) {
// 不是 is_delete = 0/1 的形式
if (!(x.getLeft() instanceof SQLIdentifierExpr)
|| !(x.getRight() instanceof SQLIntegerExpr)) {
return;
}
// 字段名不叫 is_delete
if (!((SQLIdentifierExpr) x.getLeft()).getName().equalsIgnoreCase("is_delete")) {
return;
}
if (x.getParent() instanceof MySqlSelectQueryBlock) {
((MySqlSelectQueryBlock) x.getParent()).removeCondition(x);
} else if (x.getParent() instanceof MySqlUpdateStatement) {
((MySqlUpdateStatement) x.getParent()).removeCondition(x);
} else if (x.getParent() instanceof SQLBinaryOpExpr) {
SQLBinaryOpExpr parent = (SQLBinaryOpExpr) x.getParent();
Consumer<SQLExpr> replaceFun = newExpr -> {
if (parent.getLeft() == x) {
parent.setLeft(newExpr);
} else if (parent.getRight() == x) {
parent.setRight(newExpr);
}
};
// 下面就取巧设置了, 可以少写很多代码
if (parent.getOperator() == SQLBinaryOperator.BooleanAnd) {
replaceFun.accept(SQLUtils.toMySqlExpr("1 = 1"));
} else if (parent.getOperator() == SQLBinaryOperator.BooleanOr) {
replaceFun.accept(SQLUtils.toMySqlExpr("0 = 1"));
}
}
}
}
private static class CustomBoundSql extends BoundSql {
final BoundSql delegate;
CustomBoundSql(Configuration configuration, String sql, List<ParameterMapping> parameterMappings,
Object parameterObject, BoundSql delegate) {
super(configuration, sql, parameterMappings, parameterObject);
this.delegate = delegate;
}
@Override
public boolean hasAdditionalParameter(String name) {
if (super.hasAdditionalParameter(name))
return true;
return delegate.hasAdditionalParameter(name);
}
@Override
public Object getAdditionalParameter(String name) {
Object parameter = super.getAdditionalParameter(name);
if (parameter != null)
return parameter;
return delegate.getAdditionalParameter(name);
}
}
private MappedStatement copyMappedStatement(MappedStatement ms, SqlSource sqlSource) {
MappedStatement.Builder builder =
new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), sqlSource, ms.getSqlCommandType());
builder.resource(ms.getResource());
builder.fetchSize(ms.getFetchSize());
builder.statementType(ms.getStatementType());
builder.keyGenerator(ms.getKeyGenerator());
if (ms.getKeyProperties() != null && ms.getKeyProperties().length != 0) {
StringBuilder keyProperties = new StringBuilder();
for (String keyProperty : ms.getKeyProperties()) {
keyProperties.append(keyProperty).append(",");
}
keyProperties.delete(keyProperties.length() - 1, keyProperties.length());
builder.keyProperty(keyProperties.toString());
}
builder.timeout(ms.getTimeout());
builder.parameterMap(ms.getParameterMap());
builder.resultMaps(ms.getResultMaps());
builder.resultSetType(ms.getResultSetType());
builder.cache(ms.getCache());
builder.flushCacheRequired(ms.isFlushCacheRequired());
builder.useCache(ms.isUseCache());
return builder.build();
}
}