diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java index 050335c2..ceab3700 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java @@ -173,6 +173,15 @@ public class PendingMessageBizConfig { @Getter private int todoTitleSearchMaxSize = 5000; + @Getter + private boolean requestReplayWindowEnabled = true; + + @Getter + private long requestReplayWindowExpireTimeInSeconds = 3600L; + + @Getter + private Set requestReplayWhitelistTemplateCodes = new HashSet<>(); + public boolean determineOldMsgStatCacheOn() { return isOldMsgStatCacheOn(); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java index 1a97dfaf..d50d0b4f 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java @@ -11,6 +11,7 @@ import cn.axzo.msg.center.common.utils.MiscUtils; import cn.axzo.msg.center.dal.CardDao; import cn.axzo.msg.center.domain.entity.Card; import cn.axzo.msg.center.domain.entity.MessageTemplateButtonV3; +import cn.axzo.msg.center.domain.enums.RequestReplayType; import cn.axzo.msg.center.message.domain.dto.TemplateModelV3; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.service.card.broadcast.CardBroadcaster; @@ -18,6 +19,8 @@ import cn.axzo.msg.center.message.service.card.domain.CardGroup; import cn.axzo.msg.center.message.service.card.domain.CardSendModel; import cn.axzo.msg.center.message.service.card.log.CardLogger; import cn.axzo.msg.center.message.service.card.log.CardLoggers; +import cn.axzo.msg.center.message.service.replay.RequestInfo; +import cn.axzo.msg.center.message.service.replay.RequestReplayService; import cn.axzo.msg.center.nimpush.device.PushDeviceService; import cn.axzo.msg.center.nimpush.device.PushDeviceSnapshots; import cn.axzo.msg.center.service.ButtonV3; @@ -69,6 +72,7 @@ public class CardManager { private final CardParser cardParser; private final CardBroadcaster cardBroadcaster; private final CardProps cardProps; + private final RequestReplayService requestReplayService; private final ExecutorService executor = new ThreadPoolExecutor( 5, 15, 5L, TimeUnit.MINUTES, @@ -78,6 +82,16 @@ public class CardManager { // 校验参数 BizAssertions.assertNotNull(request.getSender(), "发送人不能为空"); BizAssertions.assertNotEmpty(request.getReceivers(), "接收人不能为空"); + RequestInfo requestInfo = new RequestInfo(); + requestInfo.setTemplateCode(request.getTemplateCode()); + requestInfo.setBizCode(request.getBizCode()); + requestInfo.setSubBizCode(request.getSubBizCode()); + requestInfo.setReplayType(RequestReplayType.IM_CARD_SEND); + requestInfo.setRequest(request); + return requestReplayService.run(requestInfo, new CardSendResponse(), () -> sendImpl(request)); + } + + private CardSendResponse sendImpl(CardSendRequest request) { CardTemplate cardTemplate = cardSupport.parseCardTemplate(cardSupport .ensureImChannelPresent(request.getTemplateCode()), request); // 主要逻辑 diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java index 0dd97357..648ee049 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java @@ -1,6 +1,5 @@ package cn.axzo.msg.center.message.service.card; -import cn.axzo.basics.common.exception.ServiceException; import cn.axzo.framework.jackson.utility.JSON; import cn.axzo.im.center.api.vo.PersonAccountAttribute; import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam; @@ -82,7 +81,6 @@ public class CardSupport { cardIdempotentDao.save(idempotent); } catch (DuplicateKeyException e) { log.warn("重复创建卡片, request={}", request); - throw new ServiceException(String.format("重复创建卡片: %s", idempotent)); } } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestAcquireResult.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestAcquireResult.java new file mode 100644 index 00000000..4772e148 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestAcquireResult.java @@ -0,0 +1,9 @@ +package cn.axzo.msg.center.message.service.replay; + +/** + * @author yanglin + */ +enum RequestAcquireResult { + NEW_REQUEST, + DUPLICATE_REQUEST, +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestInfo.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestInfo.java new file mode 100644 index 00000000..6e7042f4 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestInfo.java @@ -0,0 +1,24 @@ +package cn.axzo.msg.center.message.service.replay; + +import cn.axzo.msg.center.domain.enums.RequestReplayType; +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +/** + * @author yanglin + */ +@Setter +@Getter +public class RequestInfo { + private String templateCode; + private String bizCode; + private String subBizCode; + private RequestReplayType replayType; + private Object request; + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestReplayService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestReplayService.java new file mode 100644 index 00000000..ef4426e6 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/replay/RequestReplayService.java @@ -0,0 +1,101 @@ +package cn.axzo.msg.center.message.service.replay; + +import cn.axzo.msg.center.common.utils.BizAssertions; +import cn.axzo.msg.center.common.utils.MD5; +import cn.axzo.msg.center.dal.RequestReplayDao; +import cn.axzo.msg.center.domain.entity.RequestReplay; +import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; +import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.function.Supplier; + +/** + * @author yanglin + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RequestReplayService { + + private final RequestReplayDao requestReplayDao; + private final PendingMessageBizConfig cfg; + private final RedisTemplate redisTemplate; + + public T run(RequestInfo requestInfo, T fallbackValue, Supplier supplier) { + if (!isWindowEnabled(requestInfo)) + return supplier.get(); + if (acquire(requestInfo) == RequestAcquireResult.DUPLICATE_REQUEST) + return fallbackValue; + try { + return supplier.get(); + } catch (Exception e) { + release(requestInfo); + log.warn("run request error, requestInfo={}", requestInfo, e); + throw e; + } + } + + private RequestAcquireResult acquire(RequestInfo info) { + try { + return acquireRequestImpl(info); + } catch (Exception e) { + log.warn("acquire request error, requestInfo={}", info, e); + return RequestAcquireResult.NEW_REQUEST; + } + } + + private void release(RequestInfo requestInfo) { + if (!isWindowEnabled(requestInfo)) + return; + try { + BizAssertions.assertNotNull(requestInfo, "requestInfo is null"); + redisTemplate.delete(buildKey(hash(requestInfo))); + } catch (Exception e) { + log.warn("release request error, requestInfo={}", requestInfo, e); + } + } + + private RequestAcquireResult acquireRequestImpl(RequestInfo requestInfo) { + BizAssertions.assertNotNull(requestInfo, "info is null"); + BizAssertions.assertNotNull(requestInfo.getRequest(), "request is null"); + String hash = hash(requestInfo); + RequestReplay replay = new RequestReplay(); + replay.setTemplateCode(requestInfo.getTemplateCode()); + replay.setBizCode(requestInfo.getBizCode()); + replay.setSubBizCode(requestInfo.getSubBizCode()); + replay.setType(requestInfo.getReplayType()); + replay.setHash(hash); + replay.setRequest(JSON.parseObject(JSON.toJSONString(requestInfo.getRequest()))); + requestReplayDao.save(replay); + String key = buildKey(hash); + Duration timeout = Duration.ofSeconds(cfg.getRequestReplayWindowExpireTimeInSeconds()); + Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "", timeout); + RequestAcquireResult acquireResult = Boolean.TRUE.equals(success) + ? RequestAcquireResult.NEW_REQUEST + : RequestAcquireResult.DUPLICATE_REQUEST; + if (acquireResult == RequestAcquireResult.DUPLICATE_REQUEST) + log.warn("duplicate request, requestInfo={}", requestInfo); + return acquireResult; + } + + private boolean isWindowEnabled(RequestInfo requestInfo) { + return cfg.isRequestReplayWindowEnabled() + && !cfg.getRequestReplayWhitelistTemplateCodes() + .contains(requestInfo.getTemplateCode()); + } + + private String hash(RequestInfo requestInfo) { + String jsonString = JSON.toJSONString(requestInfo.getRequest()); + return MD5.getMD5Code(jsonString); + } + + private static String buildKey(String hash) { + return String.format("msg-center:request-replay:%s", hash); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java index c586a6f4..79857dce 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java @@ -13,11 +13,14 @@ import cn.axzo.msg.center.dal.TodoDao; import cn.axzo.msg.center.domain.entity.Todo; import cn.axzo.msg.center.domain.entity.TodoBusiness; import cn.axzo.msg.center.domain.entity.TodoHandoverMapping; +import cn.axzo.msg.center.domain.enums.RequestReplayType; import cn.axzo.msg.center.domain.persistence.BaseEntityExt; import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.message.domain.dto.TemplateModelV3; import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam; import cn.axzo.msg.center.message.service.impl.v3.ModelV3Service; +import cn.axzo.msg.center.message.service.replay.RequestInfo; +import cn.axzo.msg.center.message.service.replay.RequestReplayService; import cn.axzo.msg.center.message.service.todo.TodoWithCardWrapper; import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoBroadcaster; import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaster; @@ -101,6 +104,7 @@ public class TodoManager { private final TodoBroadcaster todoBroadcaster; private final TransactionTemplate transactionTemplate; private final TodoWithCardWrapper todoWithCardWrapper; + private final RequestReplayService requestReplayService; public List send(PendingMessagePushParam request) { TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize()); @@ -109,6 +113,16 @@ public class TodoManager { } public List send(TodoRequestContext ctx, PendingMessagePushParam request) { + RequestInfo requestInfo = new RequestInfo(); + requestInfo.setTemplateCode(request.getTemplateCode()); + requestInfo.setBizCode(request.getBizCode()); + requestInfo.setSubBizCode(request.getSubBizCode()); + requestInfo.setReplayType(RequestReplayType.TODO_SEND); + requestInfo.setRequest(request); + return requestReplayService.run(requestInfo, Collections.emptyList(), () -> sendOrRetry(ctx, request)); + } + + private List sendOrRetry(TodoRequestContext ctx, PendingMessagePushParam request) { // 10 seconds at most for (int i = 0; i < 20; i++) { try { @@ -621,7 +635,7 @@ public class TodoManager { if (isAdvancedOrCompleted) { sendMqMessageOnPresetButtonPressed(ctx, request, todo); todoBroadcaster.fireTodoUpdates("presetButtonPressed", todo); - todoWithCardWrapper.fireCardWhenPresetButtonPressedByTodo(request, todo,isSyncCard); + todoWithCardWrapper.fireCardWhenPresetButtonPressedByTodo(request, todo, isSyncCard); // 如果不是重复发送, 就只记一条日志. 如果是重复发送, 就单独记录一条日志 if (!advanceResult.isAdvanced()) diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/RequestReplayDao.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/RequestReplayDao.java new file mode 100644 index 00000000..c8e54a48 --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/RequestReplayDao.java @@ -0,0 +1,15 @@ +package cn.axzo.msg.center.dal; + +import cn.axzo.msg.center.dal.mapper.RequestReplayMapper; +import cn.axzo.msg.center.domain.entity.RequestReplay; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author yanglin + */ +@Slf4j +@Component +public class RequestReplayDao extends ServiceImpl { +} diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/RequestReplayMapper.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/RequestReplayMapper.java new file mode 100644 index 00000000..c111e43f --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/RequestReplayMapper.java @@ -0,0 +1,10 @@ +package cn.axzo.msg.center.dal.mapper; + +import cn.axzo.msg.center.domain.entity.RequestReplay; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * @author yanglin + */ +public interface RequestReplayMapper extends BaseMapper { +} diff --git a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/RequestReplay.java b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/RequestReplay.java new file mode 100644 index 00000000..62b57d2e --- /dev/null +++ b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/RequestReplay.java @@ -0,0 +1,51 @@ +package cn.axzo.msg.center.domain.entity; + +import cn.axzo.msg.center.domain.enums.RequestReplayType; +import cn.axzo.msg.center.domain.persistence.BaseEntityExt; +import cn.axzo.msg.center.domain.utils.IgnorePropsJsonTypeHandler; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.Setter; + +/** + * @author yanglin + */ +@Setter +@Getter +@TableName(value = "request_replay", autoResultMap = true) +public class RequestReplay extends BaseEntityExt { + + /** + * 模版编码 + */ + private String templateCode; + + /** + * 业务编码 + */ + private String bizCode; + + /** + * 子业务编码 + */ + private String subBizCode; + + /** + * 请求类型. TODO_SEND: 发送待办, IM_CARD_SEND: 发送IM卡片 + */ + private RequestReplayType type; + + /** + * 请求的hash值, 具体hash算法未指定 + */ + private String hash; + + /** + * 请求 + */ + @TableField(typeHandler = IgnorePropsJsonTypeHandler.class) + private JSONObject request; + +} \ No newline at end of file diff --git a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/RequestReplayType.java b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/RequestReplayType.java new file mode 100644 index 00000000..9e867e77 --- /dev/null +++ b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/RequestReplayType.java @@ -0,0 +1,9 @@ +package cn.axzo.msg.center.domain.enums; + +/** + * @author yanglin + */ +public enum RequestReplayType { + TODO_SEND, + IM_CARD_SEND +}