Merge branch 'master' into feature/REQ-3502
# Conflicts: # inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java # inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSupport.java # inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java
This commit is contained in:
commit
c6633be946
@ -182,6 +182,15 @@ public class PendingMessageBizConfig {
|
||||
@Getter
|
||||
private String workflowIMChannelAppMinVersionCm = "";
|
||||
|
||||
@Getter
|
||||
private boolean requestReplayWindowEnabled = true;
|
||||
|
||||
@Getter
|
||||
private long requestReplayWindowExpireTimeInSeconds = 3600L;
|
||||
|
||||
@Getter
|
||||
private Set<String> requestReplayWhitelistTemplateCodes = new HashSet<>();
|
||||
|
||||
public boolean determineOldMsgStatCacheOn() {
|
||||
return isOldMsgStatCacheOn();
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ import cn.axzo.msg.center.common.utils.MiscUtils;
|
||||
import cn.axzo.msg.center.dal.CardDao;
|
||||
import cn.axzo.msg.center.domain.entity.Card;
|
||||
import cn.axzo.msg.center.domain.entity.MessageTemplateButtonV3;
|
||||
import cn.axzo.msg.center.domain.enums.RequestReplayType;
|
||||
import cn.axzo.msg.center.message.domain.dto.TemplateModelV3;
|
||||
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
|
||||
import cn.axzo.msg.center.message.service.card.broadcast.CardBroadcaster;
|
||||
@ -18,6 +19,8 @@ import cn.axzo.msg.center.message.service.card.domain.CardGroup;
|
||||
import cn.axzo.msg.center.message.service.card.domain.CardSendModel;
|
||||
import cn.axzo.msg.center.message.service.card.log.CardLogger;
|
||||
import cn.axzo.msg.center.message.service.card.log.CardLoggers;
|
||||
import cn.axzo.msg.center.message.service.replay.RequestInfo;
|
||||
import cn.axzo.msg.center.message.service.replay.RequestReplayService;
|
||||
import cn.axzo.msg.center.nimpush.device.PushDeviceService;
|
||||
import cn.axzo.msg.center.nimpush.device.PushDeviceSnapshots;
|
||||
import cn.axzo.msg.center.service.ButtonV3;
|
||||
@ -68,6 +71,7 @@ public class CardManager {
|
||||
private final CardParser cardParser;
|
||||
private final CardBroadcaster cardBroadcaster;
|
||||
private final CardProps cardProps;
|
||||
private final RequestReplayService requestReplayService;
|
||||
private final ExecutorService executor = new ThreadPoolExecutor(
|
||||
5, 15,
|
||||
5L, TimeUnit.MINUTES,
|
||||
@ -77,6 +81,16 @@ public class CardManager {
|
||||
// 校验参数
|
||||
BizAssertions.assertNotNull(request.getSender(), "发送人不能为空");
|
||||
BizAssertions.assertNotEmpty(request.getReceivers(), "接收人不能为空");
|
||||
RequestInfo requestInfo = new RequestInfo();
|
||||
requestInfo.setTemplateCode(request.getTemplateCode());
|
||||
requestInfo.setBizCode(request.getBizCode());
|
||||
requestInfo.setSubBizCode(request.getSubBizCode());
|
||||
requestInfo.setReplayType(RequestReplayType.IM_CARD_SEND);
|
||||
requestInfo.setRequest(request);
|
||||
return requestReplayService.run(requestInfo, () -> sendImpl(request));
|
||||
}
|
||||
|
||||
private CardSendResponse sendImpl(CardSendRequest request) {
|
||||
CardTemplate cardTemplate = cardSupport.parseCardTemplate(cardSupport
|
||||
.ensureImChannelPresent(request.getTemplateCode()), request);
|
||||
// 主要逻辑
|
||||
|
||||
@ -85,7 +85,7 @@ public class CardSupport {
|
||||
cardIdempotentDao.save(idempotent);
|
||||
} catch (DuplicateKeyException e) {
|
||||
log.warn("重复创建卡片, request={}", request);
|
||||
throw new CardIdempotentException(String.format("重复创建卡片: %s", idempotent));
|
||||
throw new CardIdempotentException(403, String.format("重复创建卡片: %s", idempotent));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,9 @@
|
||||
package cn.axzo.msg.center.message.service.replay;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
enum RequestAcquireResult {
|
||||
NEW_REQUEST,
|
||||
DUPLICATE_REQUEST,
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package cn.axzo.msg.center.message.service.replay;
|
||||
|
||||
import cn.axzo.msg.center.domain.enums.RequestReplayType;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
public class RequestInfo {
|
||||
private String templateCode;
|
||||
private String bizCode;
|
||||
private String subBizCode;
|
||||
private RequestReplayType replayType;
|
||||
private Object request;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,105 @@
|
||||
package cn.axzo.msg.center.message.service.replay;
|
||||
|
||||
import cn.axzo.basics.common.exception.ServiceException;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
import cn.axzo.msg.center.common.utils.MD5;
|
||||
import cn.axzo.msg.center.dal.RequestReplayDao;
|
||||
import cn.axzo.msg.center.domain.entity.RequestReplay;
|
||||
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RequestReplayService {
|
||||
|
||||
private final RequestReplayDao requestReplayDao;
|
||||
private final PendingMessageBizConfig cfg;
|
||||
private final RedisTemplate<String, String> redisTemplate;
|
||||
|
||||
public <T> T run(RequestInfo requestInfo, Supplier<T> supplier) {
|
||||
if (!isWindowEnabled(requestInfo))
|
||||
return supplier.get();
|
||||
if (acquire(requestInfo) == RequestAcquireResult.DUPLICATE_REQUEST)
|
||||
throw new ServiceException(403, "重复请求: " + requestInfo);
|
||||
try {
|
||||
return supplier.get();
|
||||
} catch (Exception e) {
|
||||
release(requestInfo);
|
||||
log.warn("run request error, requestInfo={}", requestInfo, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private RequestAcquireResult acquire(RequestInfo info) {
|
||||
try {
|
||||
return acquireRequestImpl(info);
|
||||
} catch (Exception e) {
|
||||
log.warn("acquire request error, requestInfo={}", info, e);
|
||||
return RequestAcquireResult.NEW_REQUEST;
|
||||
}
|
||||
}
|
||||
|
||||
private void release(RequestInfo requestInfo) {
|
||||
if (!isWindowEnabled(requestInfo))
|
||||
return;
|
||||
try {
|
||||
BizAssertions.assertNotNull(requestInfo, "requestInfo is null");
|
||||
redisTemplate.delete(buildKey(hash(requestInfo)));
|
||||
} catch (Exception e) {
|
||||
log.warn("release request error, requestInfo={}", requestInfo, e);
|
||||
}
|
||||
}
|
||||
|
||||
private RequestAcquireResult acquireRequestImpl(RequestInfo requestInfo) {
|
||||
BizAssertions.assertNotNull(requestInfo, "info is null");
|
||||
BizAssertions.assertNotNull(requestInfo.getRequest(), "request is null");
|
||||
String hash = hash(requestInfo);
|
||||
RequestReplay replay = new RequestReplay();
|
||||
replay.setTemplateCode(requestInfo.getTemplateCode());
|
||||
replay.setBizCode(requestInfo.getBizCode());
|
||||
replay.setSubBizCode(requestInfo.getSubBizCode());
|
||||
replay.setType(requestInfo.getReplayType());
|
||||
replay.setHash(hash);
|
||||
replay.setRequest(JSON.parseObject(JSON.toJSONString(requestInfo.getRequest())));
|
||||
try {
|
||||
requestReplayDao.save(replay);
|
||||
} catch (Exception e) {
|
||||
log.warn("save request error, requestInfo={}", requestInfo, e);
|
||||
}
|
||||
Duration timeout = Duration.ofSeconds(cfg.getRequestReplayWindowExpireTimeInSeconds());
|
||||
Boolean success = redisTemplate.opsForValue().setIfAbsent(buildKey(hash), "", timeout);
|
||||
RequestAcquireResult acquireResult = Boolean.TRUE.equals(success)
|
||||
? RequestAcquireResult.NEW_REQUEST
|
||||
: RequestAcquireResult.DUPLICATE_REQUEST;
|
||||
if (acquireResult == RequestAcquireResult.DUPLICATE_REQUEST)
|
||||
log.warn("duplicate request, requestInfo={}", requestInfo);
|
||||
return acquireResult;
|
||||
}
|
||||
|
||||
private boolean isWindowEnabled(RequestInfo requestInfo) {
|
||||
return cfg.isRequestReplayWindowEnabled()
|
||||
&& !cfg.getRequestReplayWhitelistTemplateCodes()
|
||||
.contains(requestInfo.getTemplateCode());
|
||||
}
|
||||
|
||||
private String hash(RequestInfo requestInfo) {
|
||||
String jsonString = JSON.toJSONString(requestInfo.getRequest());
|
||||
return MD5.getMD5Code(jsonString);
|
||||
}
|
||||
|
||||
private static String buildKey(String hash) {
|
||||
return String.format("msg-center:request-replay:%s", hash);
|
||||
}
|
||||
|
||||
}
|
||||
@ -13,11 +13,15 @@ import cn.axzo.msg.center.dal.TodoDao;
|
||||
import cn.axzo.msg.center.domain.entity.Todo;
|
||||
import cn.axzo.msg.center.domain.entity.TodoBusiness;
|
||||
import cn.axzo.msg.center.domain.entity.TodoHandoverMapping;
|
||||
import cn.axzo.msg.center.domain.enums.RequestReplayType;
|
||||
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
|
||||
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
|
||||
import cn.axzo.msg.center.message.domain.dto.TemplateModelV3;
|
||||
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
|
||||
import cn.axzo.msg.center.message.service.impl.v3.ModelV3Service;
|
||||
import cn.axzo.msg.center.message.service.replay.RequestInfo;
|
||||
import cn.axzo.msg.center.message.service.replay.RequestReplayService;
|
||||
import cn.axzo.msg.center.message.service.todo.TodoWithCardWrapper;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoBroadcaster;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaster;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent;
|
||||
@ -95,6 +99,7 @@ public class TodoManager {
|
||||
private final ApplicationContext applicationContext;
|
||||
private final TodoBroadcaster todoBroadcaster;
|
||||
private final TransactionTemplate transactionTemplate;
|
||||
private final RequestReplayService requestReplayService;
|
||||
|
||||
public List<PushPendingMessageDTO> send(PendingMessagePushParam request) {
|
||||
TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize());
|
||||
@ -103,6 +108,16 @@ public class TodoManager {
|
||||
}
|
||||
|
||||
public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) {
|
||||
RequestInfo requestInfo = new RequestInfo();
|
||||
requestInfo.setTemplateCode(request.getTemplateCode());
|
||||
requestInfo.setBizCode(request.getBizCode());
|
||||
requestInfo.setSubBizCode(request.getSubBizCode());
|
||||
requestInfo.setReplayType(RequestReplayType.TODO_SEND);
|
||||
requestInfo.setRequest(request);
|
||||
return requestReplayService.run(requestInfo, () -> sendOrRetry(ctx, request));
|
||||
}
|
||||
|
||||
private List<PushPendingMessageDTO> sendOrRetry(TodoRequestContext ctx, PendingMessagePushParam request) {
|
||||
// 10 seconds at most
|
||||
for (int i = 0; i < 20; i++) {
|
||||
try {
|
||||
|
||||
@ -47,6 +47,7 @@ public class CardSendRequest implements CardContent {
|
||||
|
||||
/**
|
||||
* 用于幂等的唯一编码. 可选, 需要做幂等时传入. 强烈建议传, 不然可能会产生重复的IM消息
|
||||
* <p>最大长度250
|
||||
*/
|
||||
private String idempotentCode;
|
||||
|
||||
|
||||
@ -0,0 +1,15 @@
|
||||
package cn.axzo.msg.center.dal;
|
||||
|
||||
import cn.axzo.msg.center.dal.mapper.RequestReplayMapper;
|
||||
import cn.axzo.msg.center.domain.entity.RequestReplay;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RequestReplayDao extends ServiceImpl<RequestReplayMapper, RequestReplay> {
|
||||
}
|
||||
@ -0,0 +1,10 @@
|
||||
package cn.axzo.msg.center.dal.mapper;
|
||||
|
||||
import cn.axzo.msg.center.domain.entity.RequestReplay;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
public interface RequestReplayMapper extends BaseMapper<RequestReplay> {
|
||||
}
|
||||
@ -0,0 +1,51 @@
|
||||
package cn.axzo.msg.center.domain.entity;
|
||||
|
||||
import cn.axzo.msg.center.domain.enums.RequestReplayType;
|
||||
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
|
||||
import cn.axzo.msg.center.domain.utils.IgnorePropsJsonTypeHandler;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
@TableName(value = "request_replay", autoResultMap = true)
|
||||
public class RequestReplay extends BaseEntityExt<RequestReplay> {
|
||||
|
||||
/**
|
||||
* 模版编码
|
||||
*/
|
||||
private String templateCode;
|
||||
|
||||
/**
|
||||
* 业务编码
|
||||
*/
|
||||
private String bizCode;
|
||||
|
||||
/**
|
||||
* 子业务编码
|
||||
*/
|
||||
private String subBizCode;
|
||||
|
||||
/**
|
||||
* 请求类型. TODO_SEND: 发送待办, IM_CARD_SEND: 发送IM卡片
|
||||
*/
|
||||
private RequestReplayType type;
|
||||
|
||||
/**
|
||||
* 请求的hash值, 具体hash算法未指定
|
||||
*/
|
||||
private String hash;
|
||||
|
||||
/**
|
||||
* 请求
|
||||
*/
|
||||
@TableField(typeHandler = IgnorePropsJsonTypeHandler.class)
|
||||
private JSONObject request;
|
||||
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package cn.axzo.msg.center.domain.enums;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
public enum RequestReplayType {
|
||||
TODO_SEND,
|
||||
IM_CARD_SEND
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user