diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java index 9d3cad3e..6643b65a 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java @@ -182,6 +182,15 @@ public class PendingMessageBizConfig { @Getter private String workflowIMChannelAppMinVersionCm = ""; + @Getter + private boolean requestReplayWindowEnabled = true; + + @Getter + private long requestReplayWindowExpireTimeInSeconds = 3600L; + + @Getter + private Set requestReplayWhitelistTemplateCodes = new HashSet<>(); + public boolean determineOldMsgStatCacheOn() { return isOldMsgStatCacheOn(); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java index 1dd5f126..eec63ed1 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java @@ -11,6 +11,7 @@ import cn.axzo.msg.center.common.utils.MiscUtils; import cn.axzo.msg.center.dal.CardDao; import cn.axzo.msg.center.domain.entity.Card; import cn.axzo.msg.center.domain.entity.MessageTemplateButtonV3; +import cn.axzo.msg.center.domain.enums.RequestReplayType; import cn.axzo.msg.center.message.domain.dto.TemplateModelV3; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.service.card.broadcast.CardBroadcaster; @@ -18,6 +19,8 @@ import cn.axzo.msg.center.message.service.card.domain.CardGroup; import cn.axzo.msg.center.message.service.card.domain.CardSendModel; import cn.axzo.msg.center.message.service.card.log.CardLogger; import cn.axzo.msg.center.message.service.card.log.CardLoggers; +import cn.axzo.msg.center.message.service.replay.RequestInfo; +import cn.axzo.msg.center.message.service.replay.RequestReplayService; import cn.axzo.msg.center.nimpush.device.PushDeviceService; import cn.axzo.msg.center.nimpush.device.PushDeviceSnapshots; import cn.axzo.msg.center.service.ButtonV3; @@ -68,6 +71,7 @@ public class CardManager { private final CardParser cardParser; private final CardBroadcaster cardBroadcaster; private final CardProps cardProps; + private final RequestReplayService requestReplayService; private final ExecutorService executor = new ThreadPoolExecutor( 5, 15, 5L, TimeUnit.MINUTES, @@ -77,6 +81,16 @@ public class CardManager { // 校验参数 BizAssertions.assertNotNull(request.getSender(), "发送人不能为空"); BizAssertions.assertNotEmpty(request.getReceivers(), "接收人不能为空"); + RequestInfo requestInfo = new RequestInfo(); + requestInfo.setTemplateCode(request.getTemplateCode()); + requestInfo.setBizCode(request.getBizCode()); + requestInfo.setSubBizCode(request.getSubBizCode()); + requestInfo.setReplayType(RequestReplayType.IM_CARD_SEND); + requestInfo.setRequest(request); + return requestReplayService.run(requestInfo, () -> sendImpl(request)); + } + + private CardSendResponse sendImpl(CardSendRequest request) { CardTemplate cardTemplate = cardSupport.parseCardTemplate(cardSupport .ensureImChannelPresent(request.getTemplateCode()), request); // 主要逻辑 diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java index 627f2f3f..a69a9423 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java @@ -85,7 +85,7 @@ public class CardSupport { cardIdempotentDao.save(idempotent); } catch (DuplicateKeyException e) { log.warn("重复创建卡片, request={}", request); - throw new CardIdempotentException(String.format("重复创建卡片: %s", idempotent)); + throw new CardIdempotentException(403, String.format("重复创建卡片: %s", idempotent)); } } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestAcquireResult.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestAcquireResult.java new file mode 100644 index 00000000..4772e148 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestAcquireResult.java @@ -0,0 +1,9 @@ +package cn.axzo.msg.center.message.service.replay; + +/** + * @author yanglin + */ +enum RequestAcquireResult { + NEW_REQUEST, + DUPLICATE_REQUEST, +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestInfo.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestInfo.java new file mode 100644 index 00000000..6e7042f4 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestInfo.java @@ -0,0 +1,24 @@ +package cn.axzo.msg.center.message.service.replay; + +import cn.axzo.msg.center.domain.enums.RequestReplayType; +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +/** + * @author yanglin + */ +@Setter +@Getter +public class RequestInfo { + private String templateCode; + private String bizCode; + private String subBizCode; + private RequestReplayType replayType; + private Object request; + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestReplayService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestReplayService.java new file mode 100644 index 00000000..f321073a --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestReplayService.java @@ -0,0 +1,105 @@ +package cn.axzo.msg.center.message.service.replay; + +import cn.axzo.basics.common.exception.ServiceException; +import cn.axzo.msg.center.common.utils.BizAssertions; +import cn.axzo.msg.center.common.utils.MD5; +import cn.axzo.msg.center.dal.RequestReplayDao; +import cn.axzo.msg.center.domain.entity.RequestReplay; +import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; +import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.function.Supplier; + +/** + * @author yanglin + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RequestReplayService { + + private final RequestReplayDao requestReplayDao; + private final PendingMessageBizConfig cfg; + private final RedisTemplate redisTemplate; + + public T run(RequestInfo requestInfo, Supplier supplier) { + if (!isWindowEnabled(requestInfo)) + return supplier.get(); + if (acquire(requestInfo) == RequestAcquireResult.DUPLICATE_REQUEST) + throw new ServiceException(403, "重复请求: " + requestInfo); + try { + return supplier.get(); + } catch (Exception e) { + release(requestInfo); + log.warn("run request error, requestInfo={}", requestInfo, e); + throw e; + } + } + + private RequestAcquireResult acquire(RequestInfo info) { + try { + return acquireRequestImpl(info); + } catch (Exception e) { + log.warn("acquire request error, requestInfo={}", info, e); + return RequestAcquireResult.NEW_REQUEST; + } + } + + private void release(RequestInfo requestInfo) { + if (!isWindowEnabled(requestInfo)) + return; + try { + BizAssertions.assertNotNull(requestInfo, "requestInfo is null"); + redisTemplate.delete(buildKey(hash(requestInfo))); + } catch (Exception e) { + log.warn("release request error, requestInfo={}", requestInfo, e); + } + } + + private RequestAcquireResult acquireRequestImpl(RequestInfo requestInfo) { + BizAssertions.assertNotNull(requestInfo, "info is null"); + BizAssertions.assertNotNull(requestInfo.getRequest(), "request is null"); + String hash = hash(requestInfo); + RequestReplay replay = new RequestReplay(); + replay.setTemplateCode(requestInfo.getTemplateCode()); + replay.setBizCode(requestInfo.getBizCode()); + replay.setSubBizCode(requestInfo.getSubBizCode()); + replay.setType(requestInfo.getReplayType()); + replay.setHash(hash); + replay.setRequest(JSON.parseObject(JSON.toJSONString(requestInfo.getRequest()))); + try { + requestReplayDao.save(replay); + } catch (Exception e) { + log.warn("save request error, requestInfo={}", requestInfo, e); + } + Duration timeout = Duration.ofSeconds(cfg.getRequestReplayWindowExpireTimeInSeconds()); + Boolean success = redisTemplate.opsForValue().setIfAbsent(buildKey(hash), "", timeout); + RequestAcquireResult acquireResult = Boolean.TRUE.equals(success) + ? RequestAcquireResult.NEW_REQUEST + : RequestAcquireResult.DUPLICATE_REQUEST; + if (acquireResult == RequestAcquireResult.DUPLICATE_REQUEST) + log.warn("duplicate request, requestInfo={}", requestInfo); + return acquireResult; + } + + private boolean isWindowEnabled(RequestInfo requestInfo) { + return cfg.isRequestReplayWindowEnabled() + && !cfg.getRequestReplayWhitelistTemplateCodes() + .contains(requestInfo.getTemplateCode()); + } + + private String hash(RequestInfo requestInfo) { + String jsonString = JSON.toJSONString(requestInfo.getRequest()); + return MD5.getMD5Code(jsonString); + } + + private static String buildKey(String hash) { + return String.format("msg-center:request-replay:%s", hash); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java index 73527b8d..fe30259b 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java @@ -13,11 +13,15 @@ import cn.axzo.msg.center.dal.TodoDao; import cn.axzo.msg.center.domain.entity.Todo; import cn.axzo.msg.center.domain.entity.TodoBusiness; import cn.axzo.msg.center.domain.entity.TodoHandoverMapping; +import cn.axzo.msg.center.domain.enums.RequestReplayType; import cn.axzo.msg.center.domain.persistence.BaseEntityExt; import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.message.domain.dto.TemplateModelV3; import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam; import cn.axzo.msg.center.message.service.impl.v3.ModelV3Service; +import cn.axzo.msg.center.message.service.replay.RequestInfo; +import cn.axzo.msg.center.message.service.replay.RequestReplayService; +import cn.axzo.msg.center.message.service.todo.TodoWithCardWrapper; import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoBroadcaster; import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaster; import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent; @@ -95,6 +99,7 @@ public class TodoManager { private final ApplicationContext applicationContext; private final TodoBroadcaster todoBroadcaster; private final TransactionTemplate transactionTemplate; + private final RequestReplayService requestReplayService; public List send(PendingMessagePushParam request) { TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize()); @@ -103,6 +108,16 @@ public class TodoManager { } public List send(TodoRequestContext ctx, PendingMessagePushParam request) { + RequestInfo requestInfo = new RequestInfo(); + requestInfo.setTemplateCode(request.getTemplateCode()); + requestInfo.setBizCode(request.getBizCode()); + requestInfo.setSubBizCode(request.getSubBizCode()); + requestInfo.setReplayType(RequestReplayType.TODO_SEND); + requestInfo.setRequest(request); + return requestReplayService.run(requestInfo, () -> sendOrRetry(ctx, request)); + } + + private List sendOrRetry(TodoRequestContext ctx, PendingMessagePushParam request) { // 10 seconds at most for (int i = 0; i < 20; i++) { try { diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/CardSendRequest.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/CardSendRequest.java index 8e4e85b3..a026bc59 100644 --- a/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/CardSendRequest.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/CardSendRequest.java @@ -47,6 +47,7 @@ public class CardSendRequest implements CardContent { /** * 用于幂等的唯一编码. 可选, 需要做幂等时传入. 强烈建议传, 不然可能会产生重复的IM消息 + *

最大长度250 */ private String idempotentCode; diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/RequestReplayDao.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/RequestReplayDao.java new file mode 100644 index 00000000..c8e54a48 --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/RequestReplayDao.java @@ -0,0 +1,15 @@ +package cn.axzo.msg.center.dal; + +import cn.axzo.msg.center.dal.mapper.RequestReplayMapper; +import cn.axzo.msg.center.domain.entity.RequestReplay; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author yanglin + */ +@Slf4j +@Component +public class RequestReplayDao extends ServiceImpl { +} diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/RequestReplayMapper.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/RequestReplayMapper.java new file mode 100644 index 00000000..c111e43f --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/RequestReplayMapper.java @@ -0,0 +1,10 @@ +package cn.axzo.msg.center.dal.mapper; + +import cn.axzo.msg.center.domain.entity.RequestReplay; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * @author yanglin + */ +public interface RequestReplayMapper extends BaseMapper { +} diff --git a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/RequestReplay.java b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/RequestReplay.java new file mode 100644 index 00000000..62b57d2e --- /dev/null +++ b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/RequestReplay.java @@ -0,0 +1,51 @@ +package cn.axzo.msg.center.domain.entity; + +import cn.axzo.msg.center.domain.enums.RequestReplayType; +import cn.axzo.msg.center.domain.persistence.BaseEntityExt; +import cn.axzo.msg.center.domain.utils.IgnorePropsJsonTypeHandler; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.Setter; + +/** + * @author yanglin + */ +@Setter +@Getter +@TableName(value = "request_replay", autoResultMap = true) +public class RequestReplay extends BaseEntityExt { + + /** + * 模版编码 + */ + private String templateCode; + + /** + * 业务编码 + */ + private String bizCode; + + /** + * 子业务编码 + */ + private String subBizCode; + + /** + * 请求类型. TODO_SEND: 发送待办, IM_CARD_SEND: 发送IM卡片 + */ + private RequestReplayType type; + + /** + * 请求的hash值, 具体hash算法未指定 + */ + private String hash; + + /** + * 请求 + */ + @TableField(typeHandler = IgnorePropsJsonTypeHandler.class) + private JSONObject request; + +} \ No newline at end of file diff --git a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/RequestReplayType.java b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/RequestReplayType.java new file mode 100644 index 00000000..9e867e77 --- /dev/null +++ b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/RequestReplayType.java @@ -0,0 +1,9 @@ +package cn.axzo.msg.center.domain.enums; + +/** + * @author yanglin + */ +public enum RequestReplayType { + TODO_SEND, + IM_CARD_SEND +}