Merge branch 'feature/REQ-3578'

This commit is contained in:
yanglin 2025-01-14 18:16:47 +08:00
commit 184363cbc3
12 changed files with 263 additions and 2 deletions

View File

@ -173,6 +173,15 @@ public class PendingMessageBizConfig {
@Getter @Getter
private int todoTitleSearchMaxSize = 5000; private int todoTitleSearchMaxSize = 5000;
@Getter
private boolean requestReplayWindowEnabled = true;
@Getter
private long requestReplayWindowExpireTimeInSeconds = 3600L;
@Getter
private Set<String> requestReplayWhitelistTemplateCodes = new HashSet<>();
public boolean determineOldMsgStatCacheOn() { public boolean determineOldMsgStatCacheOn() {
return isOldMsgStatCacheOn(); return isOldMsgStatCacheOn();
} }

View File

@ -11,6 +11,7 @@ import cn.axzo.msg.center.common.utils.MiscUtils;
import cn.axzo.msg.center.dal.CardDao; import cn.axzo.msg.center.dal.CardDao;
import cn.axzo.msg.center.domain.entity.Card; import cn.axzo.msg.center.domain.entity.Card;
import cn.axzo.msg.center.domain.entity.MessageTemplateButtonV3; import cn.axzo.msg.center.domain.entity.MessageTemplateButtonV3;
import cn.axzo.msg.center.domain.enums.RequestReplayType;
import cn.axzo.msg.center.message.domain.dto.TemplateModelV3; import cn.axzo.msg.center.message.domain.dto.TemplateModelV3;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.message.service.card.broadcast.CardBroadcaster; import cn.axzo.msg.center.message.service.card.broadcast.CardBroadcaster;
@ -18,6 +19,8 @@ import cn.axzo.msg.center.message.service.card.domain.CardGroup;
import cn.axzo.msg.center.message.service.card.domain.CardSendModel; import cn.axzo.msg.center.message.service.card.domain.CardSendModel;
import cn.axzo.msg.center.message.service.card.log.CardLogger; import cn.axzo.msg.center.message.service.card.log.CardLogger;
import cn.axzo.msg.center.message.service.card.log.CardLoggers; import cn.axzo.msg.center.message.service.card.log.CardLoggers;
import cn.axzo.msg.center.message.service.replay.RequestInfo;
import cn.axzo.msg.center.message.service.replay.RequestReplayService;
import cn.axzo.msg.center.nimpush.device.PushDeviceService; import cn.axzo.msg.center.nimpush.device.PushDeviceService;
import cn.axzo.msg.center.nimpush.device.PushDeviceSnapshots; import cn.axzo.msg.center.nimpush.device.PushDeviceSnapshots;
import cn.axzo.msg.center.service.ButtonV3; import cn.axzo.msg.center.service.ButtonV3;
@ -69,6 +72,7 @@ public class CardManager {
private final CardParser cardParser; private final CardParser cardParser;
private final CardBroadcaster cardBroadcaster; private final CardBroadcaster cardBroadcaster;
private final CardProps cardProps; private final CardProps cardProps;
private final RequestReplayService requestReplayService;
private final ExecutorService executor = new ThreadPoolExecutor( private final ExecutorService executor = new ThreadPoolExecutor(
5, 15, 5, 15,
5L, TimeUnit.MINUTES, 5L, TimeUnit.MINUTES,
@ -78,6 +82,16 @@ public class CardManager {
// 校验参数 // 校验参数
BizAssertions.assertNotNull(request.getSender(), "发送人不能为空"); BizAssertions.assertNotNull(request.getSender(), "发送人不能为空");
BizAssertions.assertNotEmpty(request.getReceivers(), "接收人不能为空"); BizAssertions.assertNotEmpty(request.getReceivers(), "接收人不能为空");
RequestInfo requestInfo = new RequestInfo();
requestInfo.setTemplateCode(request.getTemplateCode());
requestInfo.setBizCode(request.getBizCode());
requestInfo.setSubBizCode(request.getSubBizCode());
requestInfo.setReplayType(RequestReplayType.IM_CARD_SEND);
requestInfo.setRequest(request);
return requestReplayService.run(requestInfo, () -> sendImpl(request));
}
private CardSendResponse sendImpl(CardSendRequest request) {
CardTemplate cardTemplate = cardSupport.parseCardTemplate(cardSupport CardTemplate cardTemplate = cardSupport.parseCardTemplate(cardSupport
.ensureImChannelPresent(request.getTemplateCode()), request); .ensureImChannelPresent(request.getTemplateCode()), request);
// 主要逻辑 // 主要逻辑

View File

@ -82,7 +82,7 @@ public class CardSupport {
cardIdempotentDao.save(idempotent); cardIdempotentDao.save(idempotent);
} catch (DuplicateKeyException e) { } catch (DuplicateKeyException e) {
log.warn("重复创建卡片, request={}", request); log.warn("重复创建卡片, request={}", request);
throw new ServiceException(String.format("重复创建卡片: %s", idempotent)); throw new ServiceException(403, String.format("重复创建卡片: %s", idempotent));
} }
} }

View File

@ -0,0 +1,9 @@
package cn.axzo.msg.center.message.service.replay;
/**
* @author yanglin
*/
enum RequestAcquireResult {
NEW_REQUEST,
DUPLICATE_REQUEST,
}

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.message.service.replay;
import cn.axzo.msg.center.domain.enums.RequestReplayType;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class RequestInfo {
private String templateCode;
private String bizCode;
private String subBizCode;
private RequestReplayType replayType;
private Object request;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,105 @@
package cn.axzo.msg.center.message.service.replay;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.common.utils.MD5;
import cn.axzo.msg.center.dal.RequestReplayDao;
import cn.axzo.msg.center.domain.entity.RequestReplay;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.function.Supplier;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RequestReplayService {
private final RequestReplayDao requestReplayDao;
private final PendingMessageBizConfig cfg;
private final RedisTemplate<String, String> redisTemplate;
public <T> T run(RequestInfo requestInfo, Supplier<T> supplier) {
if (!isWindowEnabled(requestInfo))
return supplier.get();
if (acquire(requestInfo) == RequestAcquireResult.DUPLICATE_REQUEST)
throw new ServiceException(403, "重复请求: " + requestInfo);
try {
return supplier.get();
} catch (Exception e) {
release(requestInfo);
log.warn("run request error, requestInfo={}", requestInfo, e);
throw e;
}
}
private RequestAcquireResult acquire(RequestInfo info) {
try {
return acquireRequestImpl(info);
} catch (Exception e) {
log.warn("acquire request error, requestInfo={}", info, e);
return RequestAcquireResult.NEW_REQUEST;
}
}
private void release(RequestInfo requestInfo) {
if (!isWindowEnabled(requestInfo))
return;
try {
BizAssertions.assertNotNull(requestInfo, "requestInfo is null");
redisTemplate.delete(buildKey(hash(requestInfo)));
} catch (Exception e) {
log.warn("release request error, requestInfo={}", requestInfo, e);
}
}
private RequestAcquireResult acquireRequestImpl(RequestInfo requestInfo) {
BizAssertions.assertNotNull(requestInfo, "info is null");
BizAssertions.assertNotNull(requestInfo.getRequest(), "request is null");
String hash = hash(requestInfo);
RequestReplay replay = new RequestReplay();
replay.setTemplateCode(requestInfo.getTemplateCode());
replay.setBizCode(requestInfo.getBizCode());
replay.setSubBizCode(requestInfo.getSubBizCode());
replay.setType(requestInfo.getReplayType());
replay.setHash(hash);
replay.setRequest(JSON.parseObject(JSON.toJSONString(requestInfo.getRequest())));
try {
requestReplayDao.save(replay);
} catch (Exception e) {
log.warn("save request error, requestInfo={}", requestInfo, e);
}
Duration timeout = Duration.ofSeconds(cfg.getRequestReplayWindowExpireTimeInSeconds());
Boolean success = redisTemplate.opsForValue().setIfAbsent(buildKey(hash), "", timeout);
RequestAcquireResult acquireResult = Boolean.TRUE.equals(success)
? RequestAcquireResult.NEW_REQUEST
: RequestAcquireResult.DUPLICATE_REQUEST;
if (acquireResult == RequestAcquireResult.DUPLICATE_REQUEST)
log.warn("duplicate request, requestInfo={}", requestInfo);
return acquireResult;
}
private boolean isWindowEnabled(RequestInfo requestInfo) {
return cfg.isRequestReplayWindowEnabled()
&& !cfg.getRequestReplayWhitelistTemplateCodes()
.contains(requestInfo.getTemplateCode());
}
private String hash(RequestInfo requestInfo) {
String jsonString = JSON.toJSONString(requestInfo.getRequest());
return MD5.getMD5Code(jsonString);
}
private static String buildKey(String hash) {
return String.format("msg-center:request-replay:%s", hash);
}
}

View File

@ -13,11 +13,14 @@ import cn.axzo.msg.center.dal.TodoDao;
import cn.axzo.msg.center.domain.entity.Todo; import cn.axzo.msg.center.domain.entity.Todo;
import cn.axzo.msg.center.domain.entity.TodoBusiness; import cn.axzo.msg.center.domain.entity.TodoBusiness;
import cn.axzo.msg.center.domain.entity.TodoHandoverMapping; import cn.axzo.msg.center.domain.entity.TodoHandoverMapping;
import cn.axzo.msg.center.domain.enums.RequestReplayType;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt; import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.domain.dto.TemplateModelV3; import cn.axzo.msg.center.message.domain.dto.TemplateModelV3;
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam; import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
import cn.axzo.msg.center.message.service.impl.v3.ModelV3Service; import cn.axzo.msg.center.message.service.impl.v3.ModelV3Service;
import cn.axzo.msg.center.message.service.replay.RequestInfo;
import cn.axzo.msg.center.message.service.replay.RequestReplayService;
import cn.axzo.msg.center.message.service.todo.TodoWithCardWrapper; import cn.axzo.msg.center.message.service.todo.TodoWithCardWrapper;
import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoBroadcaster; import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoBroadcaster;
import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaster; import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaster;
@ -101,6 +104,7 @@ public class TodoManager {
private final TodoBroadcaster todoBroadcaster; private final TodoBroadcaster todoBroadcaster;
private final TransactionTemplate transactionTemplate; private final TransactionTemplate transactionTemplate;
private final TodoWithCardWrapper todoWithCardWrapper; private final TodoWithCardWrapper todoWithCardWrapper;
private final RequestReplayService requestReplayService;
public List<PushPendingMessageDTO> send(PendingMessagePushParam request) { public List<PushPendingMessageDTO> send(PendingMessagePushParam request) {
TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize()); TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize());
@ -109,6 +113,16 @@ public class TodoManager {
} }
public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) { public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) {
RequestInfo requestInfo = new RequestInfo();
requestInfo.setTemplateCode(request.getTemplateCode());
requestInfo.setBizCode(request.getBizCode());
requestInfo.setSubBizCode(request.getSubBizCode());
requestInfo.setReplayType(RequestReplayType.TODO_SEND);
requestInfo.setRequest(request);
return requestReplayService.run(requestInfo, () -> sendOrRetry(ctx, request));
}
private List<PushPendingMessageDTO> sendOrRetry(TodoRequestContext ctx, PendingMessagePushParam request) {
// 10 seconds at most // 10 seconds at most
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
try { try {
@ -621,7 +635,7 @@ public class TodoManager {
if (isAdvancedOrCompleted) { if (isAdvancedOrCompleted) {
sendMqMessageOnPresetButtonPressed(ctx, request, todo); sendMqMessageOnPresetButtonPressed(ctx, request, todo);
todoBroadcaster.fireTodoUpdates("presetButtonPressed", todo); todoBroadcaster.fireTodoUpdates("presetButtonPressed", todo);
todoWithCardWrapper.fireCardWhenPresetButtonPressedByTodo(request, todo,isSyncCard); todoWithCardWrapper.fireCardWhenPresetButtonPressedByTodo(request, todo, isSyncCard);
// 如果不是重复发送, 就只记一条日志. 如果是重复发送, 就单独记录一条日志 // 如果不是重复发送, 就只记一条日志. 如果是重复发送, 就单独记录一条日志
if (!advanceResult.isAdvanced()) if (!advanceResult.isAdvanced())

View File

@ -47,6 +47,7 @@ public class CardSendRequest implements CardContent {
/** /**
* 用于幂等的唯一编码. 可选, 需要做幂等时传入. 强烈建议传, 不然可能会产生重复的IM消息 * 用于幂等的唯一编码. 可选, 需要做幂等时传入. 强烈建议传, 不然可能会产生重复的IM消息
* <p>最大长度250
*/ */
private String idempotentCode; private String idempotentCode;

View File

@ -0,0 +1,15 @@
package cn.axzo.msg.center.dal;
import cn.axzo.msg.center.dal.mapper.RequestReplayMapper;
import cn.axzo.msg.center.domain.entity.RequestReplay;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author yanglin
*/
@Slf4j
@Component
public class RequestReplayDao extends ServiceImpl<RequestReplayMapper, RequestReplay> {
}

View File

@ -0,0 +1,10 @@
package cn.axzo.msg.center.dal.mapper;
import cn.axzo.msg.center.domain.entity.RequestReplay;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author yanglin
*/
public interface RequestReplayMapper extends BaseMapper<RequestReplay> {
}

View File

@ -0,0 +1,51 @@
package cn.axzo.msg.center.domain.entity;
import cn.axzo.msg.center.domain.enums.RequestReplayType;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.domain.utils.IgnorePropsJsonTypeHandler;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
@TableName(value = "request_replay", autoResultMap = true)
public class RequestReplay extends BaseEntityExt<RequestReplay> {
/**
* 模版编码
*/
private String templateCode;
/**
* 业务编码
*/
private String bizCode;
/**
* 子业务编码
*/
private String subBizCode;
/**
* 请求类型. TODO_SEND: 发送待办, IM_CARD_SEND: 发送IM卡片
*/
private RequestReplayType type;
/**
* 请求的hash值, 具体hash算法未指定
*/
private String hash;
/**
* 请求
*/
@TableField(typeHandler = IgnorePropsJsonTypeHandler.class)
private JSONObject request;
}

View File

@ -0,0 +1,9 @@
package cn.axzo.msg.center.domain.enums;
/**
* @author yanglin
*/
public enum RequestReplayType {
TODO_SEND,
IM_CARD_SEND
}