Merge branch 'REQ-2324' of axzsource.com:universal/infrastructure/backend/msg-center-plat into REQ-2324
This commit is contained in:
commit
7f227a4e8d
@ -21,6 +21,11 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.mybatis</groupId>
|
||||
<artifactId>mybatis</artifactId>
|
||||
<version>3.5.6</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
|
||||
@ -110,6 +110,43 @@ public class PendingMessageBizConfig {
|
||||
return dontConcatRouterParamsTemplateCodes == null || !dontConcatRouterParamsTemplateCodes.contains(templateCode);
|
||||
}
|
||||
|
||||
// !! msg-center MQ消息自产自消
|
||||
|
||||
@Getter
|
||||
private long msgCenterMqSelfConsumeMaxExecMs = 20000L;
|
||||
|
||||
// !! 老消息缓存
|
||||
|
||||
/**
|
||||
* 是否开启老消息走否走缓存
|
||||
*/
|
||||
@Getter
|
||||
private boolean oldMsgStatCacheOn = true;
|
||||
|
||||
@Getter
|
||||
private int oldMsgStatCacheDataExpireHours = 16;
|
||||
|
||||
@Getter
|
||||
private int oldMsgStateDataBackendUpdateBatchSize = 100;
|
||||
|
||||
// !! 待办分类统计缓存
|
||||
|
||||
/**
|
||||
* 待办分类统计是否走缓存
|
||||
*/
|
||||
@Getter
|
||||
private boolean nodeStatCacheOn = true;
|
||||
|
||||
@Getter
|
||||
private int nodeStatCacheMaxRequestNodeCodeSize = 10;
|
||||
|
||||
@Getter
|
||||
private int nodeStatCacheDataExpireHours = 8;
|
||||
|
||||
public boolean determineOldMsgStatCacheOn() {
|
||||
return isOldMsgStatCacheOn();
|
||||
}
|
||||
|
||||
public boolean hasMessageDetailStyle(String code) {
|
||||
return name2DetailStyle != null && name2DetailStyle.containsKey(code);
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package cn.axzo.msg.center.inside.notices.service.impl;
|
||||
import cn.axzo.core.domain.PageResult;
|
||||
import cn.axzo.msg.center.api.enums.MsgStateEnum;
|
||||
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
|
||||
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
|
||||
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
|
||||
import cn.axzo.msg.center.api.request.MessageModuleStatisticReq;
|
||||
import cn.axzo.msg.center.api.request.MessagePageQueryReq;
|
||||
@ -34,6 +35,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
|
||||
import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService;
|
||||
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
|
||||
import cn.axzo.msg.center.message.service.PendingMessageNewService;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
import cn.axzo.msg.center.service.dto.PersonDTO;
|
||||
import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum;
|
||||
import cn.azxo.framework.common.model.Page;
|
||||
@ -99,7 +103,7 @@ public class MessageCoreServiceImpl implements MessageCoreService {
|
||||
@Resource
|
||||
private RawMessageRecordService rawMessageRecordService;
|
||||
@Resource
|
||||
private PendingMessageNewService pendingMessageNewService;
|
||||
private MqProducer mqProducer;
|
||||
|
||||
public MsgRouteTypeEnum getSystemType(String systemType) {
|
||||
/*String systemType = ContextInfoHolder.get().getSystemAndDeviceInfo().getSystemType();*/
|
||||
@ -355,13 +359,18 @@ public class MessageCoreServiceImpl implements MessageCoreService {
|
||||
// 阅读单条数据
|
||||
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
|
||||
request.getMsgId());
|
||||
return;
|
||||
} else {
|
||||
List<Long> moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream()
|
||||
.map(MessageModule::getId)
|
||||
.collect(Collectors.toList());
|
||||
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
|
||||
MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds);
|
||||
}
|
||||
List<Long> moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream()
|
||||
.map(MessageModule::getId)
|
||||
.collect(Collectors.toList());
|
||||
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
|
||||
MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds);
|
||||
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
|
||||
mqMessage.setReadRequest(request);
|
||||
mqProducer.send(MqMessageRecord
|
||||
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -7,6 +7,7 @@ import cn.axzo.msg.center.api.enums.MsgRecordTerminalTypeEnum;
|
||||
import cn.axzo.msg.center.api.enums.MsgStateEnum;
|
||||
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
|
||||
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
|
||||
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
|
||||
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
|
||||
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
|
||||
import cn.axzo.msg.center.api.request.GeneralMessageReq;
|
||||
@ -49,6 +50,9 @@ import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq;
|
||||
import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
|
||||
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
|
||||
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.utils.PersonIdentityUtil;
|
||||
@ -115,6 +119,8 @@ public class MessageRecordServiceImpl implements MessageRecordService {
|
||||
private MessageModuleDao messageModuleDao;
|
||||
@Resource
|
||||
private MessageRouterDao messageRouterDao;
|
||||
@Resource
|
||||
private MqProducer mqProducer;
|
||||
/*@Resource
|
||||
private IdentityProfileService identityProfileService;*/
|
||||
|
||||
@ -179,6 +185,11 @@ public class MessageRecordServiceImpl implements MessageRecordService {
|
||||
// 不影响原有消息通道
|
||||
pushMessages.addAll(messageRecords);
|
||||
});
|
||||
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
|
||||
mqMessage.setSendRequest(message);
|
||||
mqProducer.send(MqMessageRecord
|
||||
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
|
||||
.build());
|
||||
if(pushAthena) {
|
||||
asyncPushAthena(message, messageTemplate.getAudioFileName(), messageModule.getModuleName(), pushMessages);
|
||||
}
|
||||
@ -432,6 +443,11 @@ public class MessageRecordServiceImpl implements MessageRecordService {
|
||||
} else {
|
||||
messageRecordDao.readMsg(request, req.getPersonId(), req.getIdentityId());
|
||||
}
|
||||
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
|
||||
mqMessage.setCmsReadRequest(req);
|
||||
mqProducer.send(MqMessageRecord
|
||||
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -69,7 +69,6 @@ public class RawMessageRecordServiceImpl implements RawMessageRecordService {
|
||||
|
||||
messageRecordDao.lambdaUpdate()
|
||||
.set(MessageRecord::getState, tgtState)
|
||||
.eq(MessageRecord::getPersonId, personId)
|
||||
.in(MessageRecord::getState, srcStates)
|
||||
.eq(MessageRecord::getId, msgId)
|
||||
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
|
||||
|
||||
@ -4,6 +4,9 @@ import cn.hutool.http.HttpUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
/**
|
||||
* api文档: <a href="https://developer.umeng.com/docs/67966/detail/68343">api文档</a>
|
||||
*/
|
||||
@Slf4j
|
||||
public class PushClient {
|
||||
|
||||
|
||||
@ -50,6 +50,7 @@ public class GeneralMessageController implements GeneralMessageClient {
|
||||
|
||||
@Override
|
||||
public CommonResponse<Page<MessageNewRes>> pageQueryOldMessage(CmsMsgQueryReq request) {
|
||||
request.setLogRequest(true);
|
||||
return CommonResponse.success(generalMessageOldService.pageMsgInfo(request));
|
||||
}
|
||||
|
||||
|
||||
@ -9,10 +9,14 @@ import cn.axzo.msg.center.inside.notices.service.impl.TodoSearchService;
|
||||
import cn.axzo.msg.center.inside.notices.service.impl.v3.MessageRecordServiceV3;
|
||||
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
|
||||
import cn.axzo.msg.center.message.service.group.GroupTemplateService;
|
||||
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.TodoManager;
|
||||
import cn.axzo.msg.center.message.xxl.MigrateOldMsgHotDataJob;
|
||||
import cn.azxo.framework.common.model.CommonResponse;
|
||||
import com.alibaba.cloud.nacos.NacosConfigManager;
|
||||
import com.alibaba.cloud.nacos.parser.NacosDataParserHandler;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
@ -42,7 +46,9 @@ public class PrivateMessageController {
|
||||
private final TodoSearchService todoSearchService;
|
||||
private final TodoManager todoManager;
|
||||
private final GroupTemplateService groupTemplateService;
|
||||
private final OldMsgStatCache oldMsgStatCache;
|
||||
private final NacosConfigManager nacosConfigManager;
|
||||
private final MigrateOldMsgHotDataJob migrateOldMsgHotDataJob;
|
||||
|
||||
@PostMapping("/sendPendingMessage")
|
||||
public Object sendPendingMessage(@RequestBody @Valid PendingMessagePushParam request) {
|
||||
@ -74,6 +80,11 @@ public class PrivateMessageController {
|
||||
return groupTemplateService.collectTemplateCodes(groupNodeCode);
|
||||
}
|
||||
|
||||
@PostMapping("/reloadOldMsgStat")
|
||||
public Object reloadOldMsgStat(@RequestParam("personId") Long personId) throws Exception {
|
||||
return oldMsgStatCache.reloadBackground(Sets.newHashSet(personId));
|
||||
}
|
||||
|
||||
@PostMapping("/getPersonIdByPhone")
|
||||
public Object getPersonIdByPhone(@RequestParam("phones") String phones) {
|
||||
List<String> phonesStr = Splitter.on(",")
|
||||
@ -107,4 +118,12 @@ public class PrivateMessageController {
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
@PostMapping("/migrateOldMsgHotData")
|
||||
public Object migrateOldMsgHotData(
|
||||
@RequestBody(required = false) MigrateOldMsgHotDataJob.Param param) {
|
||||
migrateOldMsgHotDataJob.tryExecute(param, true);
|
||||
return CommonResponse.success("submitted");
|
||||
}
|
||||
|
||||
}
|
||||
@ -54,6 +54,10 @@ public class MessageGroupNodeStatisticParam implements Serializable {
|
||||
*/
|
||||
private List<Long> workspaceIds;
|
||||
|
||||
public Long getPersonId() {
|
||||
return getOperator().getId();
|
||||
}
|
||||
|
||||
public static MessageGroupNodeStatisticParam from(PendingMessageStatisticRequest request) {
|
||||
MessageGroupNodeStatisticParam param = BeanConverter.convert(request, MessageGroupNodeStatisticParam.class);
|
||||
IdentityDTO identity = IdentityDTO.builder()
|
||||
|
||||
@ -52,10 +52,8 @@ public class GroupTemplateService {
|
||||
return new RootNodeWrapper(TreeBuilder.build(nodes, true));
|
||||
}
|
||||
|
||||
public List<ValueNode<NodeWrapper>> getTodoGroups(
|
||||
AppTerminalTypeEnum terminal, boolean leafOnly) {
|
||||
Set<Long> configuredIds = new HashSet<>(
|
||||
cfg.fetchMessageGroupTreeNodeIds(terminal));
|
||||
public List<ValueNode<NodeWrapper>> getTodoGroups(AppTerminalTypeEnum terminal) {
|
||||
Set<Long> configuredIds = new HashSet<>(cfg.fetchMessageGroupTreeNodeIds(terminal));
|
||||
ArrayList<ValueNode<NodeWrapper>> configuredNodes = new ArrayList<>();
|
||||
getGroupRoot().unwrap().walkDown(node -> {
|
||||
//noinspection unchecked
|
||||
@ -69,10 +67,12 @@ public class GroupTemplateService {
|
||||
configuredNodes.add(valueNode);
|
||||
return true;
|
||||
});
|
||||
if (!leafOnly)
|
||||
return configuredNodes;
|
||||
return configuredNodes;
|
||||
}
|
||||
|
||||
public List<ValueNode<NodeWrapper>> collectLeafNodes(List<ValueNode<NodeWrapper>> nodes) {
|
||||
List<ValueNode<NodeWrapper>> leafNodes = new ArrayList<>();
|
||||
for (ValueNode<NodeWrapper> configuredNode : configuredNodes) {
|
||||
for (ValueNode<NodeWrapper> configuredNode : nodes) {
|
||||
configuredNode.walkDown(node -> {
|
||||
//noinspection unchecked
|
||||
ValueNode<NodeWrapper> valueNode = node.tryCast(ValueNode.class);
|
||||
|
||||
@ -16,8 +16,11 @@ import cn.axzo.msg.center.inside.notices.service.MessageModuleService;
|
||||
import cn.axzo.msg.center.inside.notices.service.MessageRelationService;
|
||||
import cn.axzo.msg.center.message.service.GeneralMessageOldService;
|
||||
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
|
||||
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.dto.PersonDTO;
|
||||
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
|
||||
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
|
||||
import cn.axzo.msg.center.utils.PersonIdentityUtil;
|
||||
import cn.azxo.framework.common.model.Page;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
@ -26,7 +29,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@ -45,6 +47,7 @@ public class GeneralMessageOldServiceImpl implements GeneralMessageOldService {
|
||||
private final MessageModuleService messageModuleService;
|
||||
private final MessageRelationService messageRelationService;
|
||||
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
|
||||
private final OldMsgStatCache oldMsgStatCache;
|
||||
|
||||
@Override
|
||||
public int countUnread(PersonDTO person) {
|
||||
@ -58,16 +61,11 @@ public class GeneralMessageOldServiceImpl implements GeneralMessageOldService {
|
||||
public int countUnreadWithIdentities(Long personId, List<IdentityDTO> identities) {
|
||||
log.info("GeneralMessageOldServiceImpl#countUnreadWithIdentities. personId={}, identifies={}",
|
||||
personId, JSON.toJSONString(identities));
|
||||
if (CollectionUtils.isEmpty(identities)) {
|
||||
return 0;
|
||||
}
|
||||
List<Long> sendTwiceMsgIds = messageSendTwiceRecordService.listByPerson(personId);
|
||||
int count = 0;
|
||||
for (IdentityDTO identity : identities) {
|
||||
PersonDTO person = PersonDTO.from(personId, identity.getId(), identity.getType());
|
||||
count += countUnread(person, sendTwiceMsgIds);
|
||||
}
|
||||
return count;
|
||||
GeneralMessageOldDataStatisticRequest request = new GeneralMessageOldDataStatisticRequest();
|
||||
request.setPersonId(personId);
|
||||
request.setIdentities(identities);
|
||||
GeneralMessageOldDataStatisticResponse statResp = oldMsgStatCache.getCacheResponseOrReload(request);
|
||||
return statResp == null ? 0 : statResp.getUnreadCount();
|
||||
}
|
||||
|
||||
public int countUnread(Long personId, List<IdentityDTO> identities, List<Long> excludeMsgIds) {
|
||||
@ -104,7 +102,8 @@ public class GeneralMessageOldServiceImpl implements GeneralMessageOldService {
|
||||
|
||||
@Override
|
||||
public Page<MessageNewRes> pageMsgInfo(CmsMsgQueryReq request) {
|
||||
log.info("GeneralMessageOldServiceImpl#pageMsgInfo. request:{}", request);
|
||||
if (request.isLogRequest())
|
||||
log.info("GeneralMessageOldServiceImpl#pageMsgInfo. request:{}", request);
|
||||
if (CollectionUtils.isEmpty(request.determineIdentities())) {
|
||||
return Page.zero();
|
||||
}
|
||||
|
||||
@ -5,8 +5,6 @@ import cn.axzo.im.center.api.feign.MessageApi;
|
||||
import cn.axzo.im.center.api.vo.req.MessageInfo;
|
||||
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
|
||||
import cn.axzo.im.center.common.enums.AppTypeEnum;
|
||||
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
|
||||
import cn.axzo.msg.center.api.response.MessageNewRes;
|
||||
import cn.axzo.msg.center.common.exception.ServiceException;
|
||||
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
|
||||
import cn.axzo.msg.center.dal.GeneralMessageRecordDao;
|
||||
@ -19,19 +17,18 @@ import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
|
||||
import cn.axzo.msg.center.message.service.GeneralMessageService;
|
||||
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
|
||||
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
|
||||
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.dto.MessageCardContentItemDTO;
|
||||
import cn.axzo.msg.center.service.dto.PersonDTO;
|
||||
import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum;
|
||||
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
|
||||
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
|
||||
import cn.axzo.msg.center.service.enums.PushTerminalEnum;
|
||||
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
|
||||
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
|
||||
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
|
||||
import cn.axzo.msg.center.utils.MessageRouterUtil;
|
||||
import cn.axzo.msg.center.utils.UUIDUtil;
|
||||
import cn.azxo.framework.common.model.Page;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@ -48,7 +45,6 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -74,10 +70,10 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
|
||||
private final MessageApi messageApi;
|
||||
private final MessageSystemConfig messageSystemConfig;
|
||||
private final GeneralMessageRecordDao generalMessageRecordDao;
|
||||
private final GeneralMessageOldServiceImpl generalMessageOldService;
|
||||
private final MessageTemplateNewService messageTemplateNewService;
|
||||
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
|
||||
private final MessageRouterUtil messageRouterUtil;
|
||||
private final OldMsgStatCache oldMsgStatCache;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
|
||||
@ -94,20 +90,7 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
|
||||
|
||||
@Override
|
||||
public GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request) {
|
||||
// 查询双发的消息记录
|
||||
List<Long> sendTwiceMsgIds = messageSendTwiceRecordService.listByPerson(request.getPersonId());
|
||||
// 分页查询最新一条数据
|
||||
Page<MessageNewRes> result = generalMessageOldService.pageMsgInfo(build(request, sendTwiceMsgIds));
|
||||
// 统计旧的未读普通消息数量
|
||||
int count = generalMessageOldService.countUnread(
|
||||
request.getPersonId(), request.determineIdentities(), sendTwiceMsgIds);
|
||||
// 编排组合成界面展示的数据结构
|
||||
MessageNewRes msg = CollectionUtils.isNotEmpty(result.getList()) ? result.getList().get(0) : null;
|
||||
return GeneralMessageOldDataStatisticResponse.builder()
|
||||
.unreadCount(count)
|
||||
.latestMsgSendTimestamp(Optional.ofNullable(msg).map(v -> v.getCreateAt().getTime()).orElse(null))
|
||||
.latestMsgContent(Optional.ofNullable(msg).map(MessageNewRes::getContent).orElse(null))
|
||||
.build();
|
||||
return oldMsgStatCache.getCacheResponseOrReload(request);
|
||||
}
|
||||
|
||||
private List<GeneralMessageRecord> buildMessageRecord(GeneralMessageSendRequest request, MessageTemplateDTO template) {
|
||||
@ -208,16 +191,4 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
|
||||
return PlaceholderResolver.getDefaultResolver().resolveByMap(string, params);
|
||||
}
|
||||
|
||||
private CmsMsgQueryReq build(GeneralMessageOldDataStatisticRequest request, List<Long> excludeMsgIds) {
|
||||
CmsMsgQueryReq req = new CmsMsgQueryReq();
|
||||
req.setMsgType(MessageCategoryEnum.GENERAL_MESSAGE.getCode());
|
||||
// 这里查询消息中心全部状态的数据
|
||||
req.setMsgStatus(0);
|
||||
req.setPage(1L);
|
||||
req.setPageSize(1L);
|
||||
req.setPersonId(request.getPersonId());
|
||||
req.setIdentities(request.determineIdentities());
|
||||
req.setExcludeMsgIds(excludeMsgIds);
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Data
|
||||
public class OldMsgCacheReloadJobParam {
|
||||
|
||||
private static final int DEFAULT_SYNC_DAY = 30;
|
||||
|
||||
private int syncDay;
|
||||
private int batchSize;
|
||||
|
||||
static OldMsgCacheReloadJobParam defaultParam() {
|
||||
OldMsgCacheReloadJobParam defaultParam = new OldMsgCacheReloadJobParam();
|
||||
defaultParam.setSyncDay(DEFAULT_SYNC_DAY);
|
||||
return defaultParam;
|
||||
}
|
||||
|
||||
int determineSyncDay() {
|
||||
return syncDay <= 0 ? DEFAULT_SYNC_DAY : syncDay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
|
||||
import cn.axzo.basics.profiles.dto.basic.IdentityProfileDto;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class OldMsgIdentifyFilter {
|
||||
|
||||
private final UserProfileServiceApi userProfileServiceApi;
|
||||
|
||||
public OldMsgIdentifyInfo getIdentifyInfo(Long personId, boolean asManager) {
|
||||
Set<IdentityTypeEnum> identifiesFilter;
|
||||
// cms或者管理端
|
||||
if (asManager) {
|
||||
identifiesFilter = ImmutableSet.of(IdentityTypeEnum.PRACTITIONER, IdentityTypeEnum.WORKER_LEADER);
|
||||
}
|
||||
// 工人端
|
||||
else {
|
||||
identifiesFilter = ImmutableSet.of(IdentityTypeEnum.WORKER);
|
||||
}
|
||||
List<IdentityProfileDto> profiles = BizAssertions.assertResponse(userProfileServiceApi
|
||||
.getPersonIdentityProfile(personId), "获取身份信息失败, personId={}", personId);
|
||||
ArrayList<IdentityDTO> identities = new ArrayList<>();
|
||||
for (IdentityProfileDto profile : profiles) {
|
||||
IdentityTypeEnum identityType = IdentityTypeEnum
|
||||
.codeOf(profile.getIdentityType().getCode())
|
||||
.orElse(null);
|
||||
if (identityType == null || !identifiesFilter.contains(identityType)) {
|
||||
continue;
|
||||
}
|
||||
IdentityDTO identify = new IdentityDTO();
|
||||
identify.setId(profile.getId());
|
||||
identify.setType(identityType);
|
||||
identities.add(identify);
|
||||
}
|
||||
return new OldMsgIdentifyInfo(personId, identities);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public class OldMsgIdentifyInfo {
|
||||
private final Long personId;
|
||||
private final List<IdentityDTO> identities;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,354 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventHandler;
|
||||
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
|
||||
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
|
||||
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
|
||||
import cn.axzo.msg.center.api.request.GeneralMessageReq;
|
||||
import cn.axzo.msg.center.api.request.MessageReadAllReq;
|
||||
import cn.axzo.msg.center.api.response.MessageNewRes;
|
||||
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
|
||||
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
|
||||
import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl;
|
||||
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse;
|
||||
import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo;
|
||||
import cn.axzo.msg.center.message.service.impl.person.PersonService;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum;
|
||||
import cn.axzo.msg.center.notices.common.exception.BizException;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
|
||||
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
|
||||
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
|
||||
import cn.azxo.framework.common.model.Page;
|
||||
import cn.hutool.core.date.StopWatch;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class OldMsgStatCache implements EventHandler, InitializingBean {
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
|
||||
private final OldMsgIdentifyFilter oldMsgIdentifyFilter;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final PendingMessageBizConfig cfg;
|
||||
private final EventConsumer eventConsumer;
|
||||
private final PersonService personService;
|
||||
private final ForkJoinPool foregroundExecutor = new ForkJoinPool(15);
|
||||
private final ForkJoinPool backgroundExecutor = new ForkJoinPool(10);
|
||||
|
||||
public GeneralMessageOldDataStatisticResponse getCacheResponseOrReload(GeneralMessageOldDataStatisticRequest request) {
|
||||
if (!cfg.determineOldMsgStatCacheOn()) {
|
||||
return statisticOldDataImpl(request);
|
||||
}
|
||||
GeneralMessageOldDataStatisticResponse resp = getCacheResponse(request);
|
||||
if (resp != null) {
|
||||
resp.setReloadForeground(false);
|
||||
return resp;
|
||||
}
|
||||
GeneralMessageOldDataStatisticResponse response = reloadForeground(request);
|
||||
response.setReloadForeground(true);
|
||||
return response;
|
||||
}
|
||||
|
||||
private GeneralMessageOldDataStatisticResponse getCacheResponse(GeneralMessageOldDataStatisticRequest request) {
|
||||
OldMsgStatCacheValue cacheValue = getCacheValue(request.getPersonId());
|
||||
if (cacheValue == null) {
|
||||
log.info("oldMsgStat: cache miss, need to reload in foreground. request={}", request);
|
||||
return null;
|
||||
}
|
||||
GeneralMessageOldDataStatisticResponse response = cacheValue
|
||||
.findResponse(request.getIdentities())
|
||||
.orElse(null);
|
||||
if (response != null)
|
||||
return response;
|
||||
// 身份发生了变化, 而且异步刷新不及时
|
||||
log.info("oldMsgStat: identities cache miss, need to reload in foreground. request={}", request);
|
||||
return null;
|
||||
}
|
||||
|
||||
private GeneralMessageOldDataStatisticResponse reloadForeground(GeneralMessageOldDataStatisticRequest request) {
|
||||
log.info("oldMsgStat: reload cache for request. request={}", request);
|
||||
StopWatch stopWatch = new StopWatch(String.format(
|
||||
"%s-reloadForeground", getClass().getSimpleName()));
|
||||
stopWatch.start("initPersonCacheValueForeground");
|
||||
OldMsgStatCacheValue cacheValue = initPersonCacheValueForeground(request.getPersonId());
|
||||
stopWatch.stop();
|
||||
GeneralMessageOldDataStatisticResponse response = cacheValue
|
||||
.findResponse(request.getIdentities())
|
||||
.orElse(null);
|
||||
// 边界
|
||||
if (response == null) {
|
||||
log.info("oldMsgStat: identities cache missing? reload stat for request identities. request={}", request);
|
||||
stopWatch.start("initPersonCacheValueForeground - miss again?");
|
||||
IdentityResponse newIdentityResponse = getIdentityResponse(request.getPersonId(), request.getIdentities());
|
||||
stopWatch.stop();
|
||||
cacheValue.addIdentityResponse(newIdentityResponse);
|
||||
}
|
||||
setCacheValue(request.getPersonId(), cacheValue);
|
||||
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
|
||||
return cacheValue
|
||||
.findResponse(request.getIdentities())
|
||||
// should never happen
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
public Map<Long, OldMsgStatCacheValue> reloadBackground(Set<Long> personIds) throws Exception {
|
||||
log.info("oldMsgStat: reload cache for person in background. personIds={}", JSON.toJSONString(personIds));
|
||||
if (CollectionUtils.isEmpty(personIds)) {
|
||||
log.info("oldMsgStat: personIds is empty");
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
HashMap<Long, OldMsgStatCacheValue> personId2CacheValue = new HashMap<>();
|
||||
// 温柔点, 别一次性全丢到线程池
|
||||
int bachSize = cfg.getOldMsgStateDataBackendUpdateBatchSize();
|
||||
List<List<Long>> batches = Lists.partition(new ArrayList<>(personIds), bachSize);
|
||||
for (List<Long> batch : batches) {
|
||||
ArrayList<CompletableFuture<PersonAndCacheValue>> futures = new ArrayList<>();
|
||||
for (Long personId : batch) {
|
||||
futures.add(CompletableFuture.supplyAsync(() -> {
|
||||
OldMsgStatCacheValue cacheValue = initPersonCacheValueBackground(personId);
|
||||
setCacheValue(personId, cacheValue);
|
||||
return new PersonAndCacheValue(personId, cacheValue);
|
||||
}, backgroundExecutor));
|
||||
}
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
for (CompletableFuture<PersonAndCacheValue> future : futures) {
|
||||
PersonAndCacheValue tmp = future.get();
|
||||
personId2CacheValue.put(tmp.getPersonId(), tmp.getCacheValue());
|
||||
}
|
||||
}
|
||||
return personId2CacheValue;
|
||||
}
|
||||
|
||||
private OldMsgStatCacheValue initPersonCacheValueForeground(Long personId) {
|
||||
log.info("initPersonCacheValueForeground. personId={}", personId);
|
||||
Function<Boolean, IdentityResponse> buildResponseFun = asManager -> {
|
||||
OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager);
|
||||
return getIdentityResponse(personId, identifyInfo.getIdentities());
|
||||
};
|
||||
OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue();
|
||||
CompletableFuture<IdentityResponse> asManagerFuture = CompletableFuture
|
||||
.supplyAsync(() -> buildResponseFun.apply(true), foregroundExecutor);
|
||||
CompletableFuture<IdentityResponse> asWorkerFuture = CompletableFuture
|
||||
.supplyAsync(() -> buildResponseFun.apply(false), foregroundExecutor);
|
||||
CompletableFuture.allOf(asManagerFuture, asWorkerFuture).join();
|
||||
try {
|
||||
cacheValue.addIdentityResponse(asManagerFuture.get());
|
||||
cacheValue.addIdentityResponse(asWorkerFuture.get());
|
||||
} catch (Exception e) {
|
||||
log.warn("oldMsgStat: fail to stat old msg. personId={}", personId, e);
|
||||
throw new BizException(ReturnCodeEnum.SYSTEM_ERROR.getCode(), e);
|
||||
}
|
||||
return cacheValue;
|
||||
}
|
||||
|
||||
private OldMsgStatCacheValue initPersonCacheValueBackground(Long personId) {
|
||||
log.info("initPersonCacheValueBackground. personId={}", personId);
|
||||
Function<Boolean, IdentityResponse> buildResponseFun = asManager -> {
|
||||
OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager);
|
||||
return getIdentityResponse(personId, identifyInfo.getIdentities());
|
||||
};
|
||||
OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue();
|
||||
cacheValue.addIdentityResponse(buildResponseFun.apply(true));
|
||||
cacheValue.addIdentityResponse(buildResponseFun.apply(false));
|
||||
return cacheValue;
|
||||
}
|
||||
|
||||
private IdentityResponse getIdentityResponse(Long personId, List<IdentityDTO> identities) {
|
||||
GeneralMessageOldDataStatisticRequest request = new GeneralMessageOldDataStatisticRequest();
|
||||
request.setPersonId(personId);
|
||||
request.setIdentities(identities);
|
||||
GeneralMessageOldDataStatisticResponse response = statisticOldDataImpl(request);
|
||||
IdentityResponse identityResponse = new IdentityResponse();
|
||||
identityResponse.setResponse(response);
|
||||
identityResponse.setIdentities(new HashSet<>(identities));
|
||||
return identityResponse;
|
||||
}
|
||||
|
||||
// !! cache helper
|
||||
|
||||
private void setCacheValue(Long personId, OldMsgStatCacheValue cacheValue) {
|
||||
cacheValue.clear();
|
||||
log.info("set cache value for person. personId={}", personId);
|
||||
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getOldMsgStatCacheDataExpireHours());
|
||||
// add some random delta seconds, range: 0-600 seconds
|
||||
secondsDelta += new SecureRandom().nextInt(10 * 60);
|
||||
|
||||
String redisKey = buildCacheKey(personId);
|
||||
String redisValue = JSON.toJSONString(cacheValue);
|
||||
stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private OldMsgStatCacheValue getCacheValue(Long personId) {
|
||||
String redisKey = buildCacheKey(personId);
|
||||
String json = stringRedisTemplate.opsForValue().get(redisKey);
|
||||
if (StringUtils.isBlank(json)) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return JSON.parseObject(json, OldMsgStatCacheValue.class);
|
||||
} catch (Exception e) {
|
||||
// 人为改了缓存, 而且没有改对
|
||||
log.warn("fail to parse cache value, will reload. personId={}, cacheValue={}", personId, json, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static String buildCacheKey(Long personId) {
|
||||
return String.format("msg-center:old-msg-stat:v1:%d", personId);
|
||||
}
|
||||
|
||||
// !! 以下为原有的统计逻辑
|
||||
|
||||
private GeneralMessageOldDataStatisticResponse statisticOldDataImpl(GeneralMessageOldDataStatisticRequest request) {
|
||||
if (request.determineIdentities().isEmpty()) {
|
||||
return GeneralMessageOldDataStatisticResponse.builder()
|
||||
.unreadCount(0)
|
||||
.latestMsgSendTimestamp(null)
|
||||
.latestMsgContent(null)
|
||||
.build();
|
||||
}
|
||||
// 查询双发的消息记录
|
||||
List<Long> sendTwiceMsgIds = messageSendTwiceRecordService.listByPerson(request.getPersonId());
|
||||
// 打破循环依赖, 避免大量改造 (反正快下线了)
|
||||
GeneralMessageOldServiceImpl generalMessageOldService = applicationContext
|
||||
.getBean(GeneralMessageOldServiceImpl.class);
|
||||
// 分页查询最新一条数据
|
||||
Page<MessageNewRes> result = generalMessageOldService.pageMsgInfo(build(request, sendTwiceMsgIds));
|
||||
// 统计旧的未读普通消息数量
|
||||
int count = generalMessageOldService.countUnread(
|
||||
request.getPersonId(), request.determineIdentities(), sendTwiceMsgIds);
|
||||
// 编排组合成界面展示的数据结构
|
||||
MessageNewRes msg = CollectionUtils.isNotEmpty(result.getList()) ? result.getList().get(0) : null;
|
||||
return GeneralMessageOldDataStatisticResponse.builder()
|
||||
.unreadCount(count)
|
||||
.latestMsgSendTimestamp(Optional.ofNullable(msg).map(v -> v.getCreateAt().getTime()).orElse(null))
|
||||
.latestMsgContent(Optional.ofNullable(msg).map(MessageNewRes::getContent).orElse(null))
|
||||
.build();
|
||||
}
|
||||
|
||||
private CmsMsgQueryReq build(GeneralMessageOldDataStatisticRequest request, List<Long> excludeMsgIds) {
|
||||
CmsMsgQueryReq req = new CmsMsgQueryReq();
|
||||
req.setMsgType(MessageCategoryEnum.GENERAL_MESSAGE.getCode());
|
||||
// 这里查询消息中心全部状态的数据
|
||||
req.setMsgStatus(0);
|
||||
req.setPage(1L);
|
||||
req.setPageSize(1L);
|
||||
req.setPersonId(request.getPersonId());
|
||||
req.setIdentities(request.determineIdentities());
|
||||
req.setExcludeMsgIds(excludeMsgIds);
|
||||
req.setLogRequest(false);
|
||||
return req;
|
||||
}
|
||||
|
||||
// !! 处理mq消息, 异步更新缓存
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
eventConsumer.registerHandler(MqMessageType.OLD_MSG_SEND.getEventCode(), this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event, EventConsumer.Context context) {
|
||||
log.info("oldMsgStat: start - handle mq event, event={}", JSON.toJSONString(event));
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
long start = System.currentTimeMillis();
|
||||
handleMqMessage(event);
|
||||
long end = System.currentTimeMillis();
|
||||
log.info("oldMsgStat: end - handle mq event, used={}ms, event={}", end - start, JSON.toJSONString(event));
|
||||
} catch (Exception e) {
|
||||
log.warn("oldMsgStat: error - handle mq event, event={}", JSON.toJSONString(event));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleMqMessage(Event event) throws Exception {
|
||||
SendMessageRecordMessage mqMessage = event.normalizedData(SendMessageRecordMessage.class);
|
||||
GeneralMessageReq sendRequest = mqMessage.getSendRequest();
|
||||
MessageReadAllReq readRequest = mqMessage.getReadRequest();
|
||||
CmsReadMsgReq cmsReadRequest = mqMessage.getCmsReadRequest();
|
||||
if (sendRequest != null) {
|
||||
log.info("handle mq event - driven by sendRequest={}", sendRequest);
|
||||
handleSendRequestEvent(sendRequest);
|
||||
} else if (readRequest != null) {
|
||||
log.info("handle mq event - driven by readRequest={}", readRequest);
|
||||
handleReadRequest(readRequest);
|
||||
} else if (cmsReadRequest != null) {
|
||||
log.info("handle mq event - driven by cmsReadRequest={}", cmsReadRequest);
|
||||
handleCmsReadRequest(cmsReadRequest);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleSendRequestEvent(GeneralMessageReq sendRequest) throws Exception {
|
||||
Map<Long, Long> knownIdentityId2PersonId = sendRequest.getToldIdPersonIdMap();
|
||||
if (knownIdentityId2PersonId == null)
|
||||
knownIdentityId2PersonId = new HashMap<>();
|
||||
HashSet<Long> personIds = new HashSet<>(knownIdentityId2PersonId.values());
|
||||
Set<Long> receiverIdentityIds = sendRequest.getToId();
|
||||
if (receiverIdentityIds == null)
|
||||
receiverIdentityIds = Collections.emptySet();
|
||||
HashSet<Long> toQueryPersonIdIdentityIds = new HashSet<>();
|
||||
for (Long identityId : receiverIdentityIds) {
|
||||
if (!knownIdentityId2PersonId.containsKey(identityId))
|
||||
toQueryPersonIdIdentityIds.add(identityId);
|
||||
}
|
||||
if (!toQueryPersonIdIdentityIds.isEmpty()) {
|
||||
PersonIdInfo personIdInfo = personService.getPersonIdsByIdentities(toQueryPersonIdIdentityIds);
|
||||
personIds.addAll(personIdInfo.getPersonIds());
|
||||
}
|
||||
StopWatch stopWatch = new StopWatch("reloadBackground");
|
||||
stopWatch.start("handleSendRequestEvent");
|
||||
reloadBackground(personIds);
|
||||
stopWatch.stop();
|
||||
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
private void handleReadRequest(MessageReadAllReq readRequest) throws Exception {
|
||||
StopWatch stopWatch = new StopWatch("reloadBackground");
|
||||
stopWatch.start("handleReadRequest");
|
||||
reloadBackground(Sets.newHashSet(readRequest.getPersonId()));
|
||||
stopWatch.stop();
|
||||
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
private void handleCmsReadRequest(CmsReadMsgReq cmsReadRequest) throws Exception {
|
||||
StopWatch stopWatch = new StopWatch("reloadBackground");
|
||||
stopWatch.start("handleCmsReadRequest");
|
||||
reloadBackground(Sets.newHashSet(cmsReadRequest.getPersonId()));
|
||||
stopWatch.stop();
|
||||
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,100 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
|
||||
import cn.axzo.msg.center.domain.entity.MessageRecord;
|
||||
import cn.axzo.msg.center.message.service.impl.person.PersonService;
|
||||
import cn.hutool.core.date.StopWatch;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.joda.time.DateTime;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class OldMsgStatCacheReloadJob extends IJobHandler {
|
||||
|
||||
private final MessageRecordMapper messageRecordMapper;
|
||||
private final PersonService personService;
|
||||
private final OldMsgStatCache oldMsgStatCache;
|
||||
|
||||
@Override
|
||||
@XxlJob("oldMsgStatCacheReloadJob")
|
||||
public ReturnT<String> execute(String param) throws Exception {
|
||||
try {
|
||||
log.info("start - running job using param: {}", param);
|
||||
executeImpl(param);
|
||||
log.info("end - running job using param: {}", param);
|
||||
return ReturnT.SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.warn("run job error, param={}", param, e);
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
private void executeImpl(String param) throws Exception {
|
||||
OldMsgCacheReloadJobParam jobParam = StringUtils.isBlank(param)
|
||||
? OldMsgCacheReloadJobParam.defaultParam()
|
||||
: JSON.parseObject(param, OldMsgCacheReloadJobParam.class);
|
||||
Date sinceDate = DateTime.now().minusDays(jobParam.determineSyncDay()).toDate();
|
||||
StopWatch stopWatch = new StopWatch("reloadBackground driven by job");
|
||||
Set<Long> reloadedPersonIds = new HashSet<>();
|
||||
Iterable<Page<MessageRecord>> pages = messageRecordMapper
|
||||
.iterateBath(query(MessageRecord.class)
|
||||
.ge(MessageRecord::getCreateAt, sinceDate)
|
||||
// 先刷最近的
|
||||
.orderByDesc(MessageRecord::getId));
|
||||
for (Page<MessageRecord> page : pages) {
|
||||
stopWatch.start("reload stat, records size=" + page.getRecords().size());
|
||||
reloadStat(reloadedPersonIds, page);
|
||||
stopWatch.stop();
|
||||
}
|
||||
log.info("end sync old msg stat. reloaded personIds size={}", reloadedPersonIds.size());
|
||||
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
private void reloadStat(Set<Long> reloadedPersonIds, Page<MessageRecord> page) throws Exception {
|
||||
log.info("start sync old msg stat. page size={}, page current={}, total pages={}",
|
||||
page.getSize(), page.getCurrent(), page.getPages());
|
||||
List<MessageRecord> records = page.getRecords();
|
||||
Set<Long> maybeReloadPersonIds = new HashSet<>();
|
||||
// 传了personId的
|
||||
records.stream()
|
||||
.map(MessageRecord::getPersonId)
|
||||
.filter(personId -> personId != 0L)
|
||||
.forEach(maybeReloadPersonIds::add);
|
||||
// 没有传personId的
|
||||
Set<Long> identityIds = records.stream()
|
||||
.filter(record -> record.getPersonId() == 0L)
|
||||
.map(MessageRecord::getToId)
|
||||
.collect(toSet());
|
||||
if (!identityIds.isEmpty()) {
|
||||
List<Long> fetchedPersonIds = personService.getPersonIdsByIdentities(identityIds).getPersonIds();
|
||||
maybeReloadPersonIds.addAll(fetchedPersonIds);
|
||||
}
|
||||
maybeReloadPersonIds.removeAll(reloadedPersonIds);
|
||||
if (!maybeReloadPersonIds.isEmpty()) {
|
||||
oldMsgStatCache.reloadBackground(maybeReloadPersonIds);
|
||||
reloadedPersonIds.addAll(maybeReloadPersonIds);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,56 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Data
|
||||
public class OldMsgStatCacheValue {
|
||||
|
||||
private List<IdentityResponse> identityResponses;
|
||||
|
||||
public Optional<GeneralMessageOldDataStatisticResponse> findResponse(Collection<IdentityDTO> identities) {
|
||||
if (CollectionUtils.isEmpty(identityResponses))
|
||||
return Optional.empty();
|
||||
return identityResponses.stream()
|
||||
.filter(response -> response.isIdentitiesMatch(identities))
|
||||
.map(IdentityResponse::getResponse)
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
public void addIdentityResponse(IdentityResponse identityResponse) {
|
||||
if (identityResponses == null)
|
||||
identityResponses = new ArrayList<>();
|
||||
identityResponses.add(identityResponse);
|
||||
}
|
||||
|
||||
void clear() {
|
||||
if (identityResponses == null) return;
|
||||
for (IdentityResponse identityResponse : identityResponses) {
|
||||
if (identityResponse.response != null)
|
||||
identityResponse.response.setReloadForeground(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class IdentityResponse {
|
||||
private GeneralMessageOldDataStatisticResponse response;
|
||||
private Set<IdentityDTO> identities;
|
||||
|
||||
public boolean isIdentitiesMatch(Collection<IdentityDTO> identities) {
|
||||
// 比较时忽略顺序
|
||||
return this.identities.equals(new HashSet<>(identities));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public class PersonAndCacheValue {
|
||||
private final Long personId;
|
||||
private final OldMsgStatCacheValue cacheValue;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package cn.axzo.msg.center.message.service.impl.person;
|
||||
|
||||
import cn.axzo.basics.profiles.dto.basic.IdentityProfileDto;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
public class PersonIdInfo {
|
||||
|
||||
private final List<IdentityProfileDto> identityProfiles;
|
||||
|
||||
public List<Long> getPersonIds() {
|
||||
return identityProfiles.stream()
|
||||
.map(i -> i.getPersonProfile().getId())
|
||||
.distinct()
|
||||
.collect(toList());
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
package cn.axzo.msg.center.message.service.impl.person;
|
||||
|
||||
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
|
||||
import cn.axzo.basics.profiles.common.enums.IdentityType;
|
||||
import cn.axzo.basics.profiles.dto.basic.IdentityProfileDto;
|
||||
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
import cn.azxo.framework.common.model.CommonResponse;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PersonService {
|
||||
|
||||
private final UserProfileServiceApi userProfileServiceApi;
|
||||
|
||||
public PersonIdInfo getPersonIdsByIdentities(Collection<Long> identityIds) {
|
||||
log.info("start - querying person ids by identity ids: {}", JSON.toJSONString(identityIds));
|
||||
ArrayList<Long> uniqueIdentityIds = new ArrayList<>(new HashSet<>(identityIds));
|
||||
CommonResponse<List<IdentityProfileDto>> resp = userProfileServiceApi.getIdentitiesByIdSet(
|
||||
uniqueIdentityIds, IdentityType.NOT_SUPPORT.getCode());
|
||||
log.info("end - querying person ids by identity ids, request={}, response={}",
|
||||
JSON.toJSONString(identityIds), JSON.toJSONString(resp));
|
||||
BizAssertions.assertResponse(resp);
|
||||
return new PersonIdInfo(resp.getData());
|
||||
}
|
||||
|
||||
public Long getPersonIdByPhone(String phone) {
|
||||
CommonResponse<PersonProfileDto> profileResp = userProfileServiceApi
|
||||
.getUnionPersonProfile(null, phone);
|
||||
PersonProfileDto personProfile = BizAssertions.assertResponse(profileResp, "未找根据手机找到人员");
|
||||
BizAssertions.assertNotNull(personProfile, "未找根据手机找到人员");
|
||||
return personProfile.getId();
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
package cn.axzo.msg.center.message.service.todo;
|
||||
|
||||
import cn.axzo.basics.common.exception.ServiceException;
|
||||
import cn.axzo.maokai.api.util.Ref;
|
||||
import cn.axzo.maokai.api.vo.response.tree.ValueNode;
|
||||
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
|
||||
@ -14,6 +15,7 @@ import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
|
||||
import cn.axzo.msg.center.message.service.group.GroupTemplateService;
|
||||
import cn.axzo.msg.center.message.service.group.NodeWrapper;
|
||||
import cn.axzo.msg.center.message.service.impl.PendingMessageNewServiceImpl;
|
||||
import cn.axzo.msg.center.message.service.todo.cache.NodeStatCache;
|
||||
import cn.axzo.msg.center.message.service.todo.mybatis.CollectSQLInterceptor;
|
||||
import cn.axzo.msg.center.message.service.todo.pagequery.PageQuerySort;
|
||||
import cn.axzo.msg.center.message.service.todo.queryanalyze.SimpleAnalyzer;
|
||||
@ -44,6 +46,8 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ -67,6 +71,8 @@ public class TodoRangeQueryService {
|
||||
private final TodoRespBuilder todoRespBuilder;
|
||||
private final AnalysisConfig analysisConfig;
|
||||
private final GroupTemplateService groupTemplateService;
|
||||
private final NodeStatCache nodeStatCache;
|
||||
private final ForkJoinPool statExecutor = new ForkJoinPool(20);
|
||||
|
||||
// !! page query
|
||||
|
||||
@ -219,14 +225,17 @@ public class TodoRangeQueryService {
|
||||
if (!request.determineGroupNodeCodes().isEmpty())
|
||||
return groupTemplateService.collectTemplateCodes(request.determineGroupNodeCodes());
|
||||
// 获取可见的模版时, 不用定位到叶子节点
|
||||
List<ValueNode<NodeWrapper>> nodes = groupTemplateService
|
||||
.getTodoGroups(request.getAppTerminalType(), false);
|
||||
List<ValueNode<NodeWrapper>> nodes = groupTemplateService.getTodoGroups(request.getAppTerminalType());
|
||||
return groupTemplateService.collectTemplateCodes(nodes);
|
||||
}
|
||||
|
||||
// !! stat
|
||||
|
||||
public PendingMessageStatisticResponseV2 countStatGrouped(MessageGroupNodeStatisticParam request) {
|
||||
return nodeStatCache.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request));
|
||||
}
|
||||
|
||||
private PendingMessageStatisticResponseV2 countStatGroupedImpl(MessageGroupNodeStatisticParam request) {
|
||||
List<ValueNode<NodeWrapper>> nodes = determineStatNodes(request);
|
||||
Function<ValueNode<NodeWrapper>, GroupStat> nodeStatFun = valueNode -> {
|
||||
int executableCount = countStatByNode(request, valueNode, TodoType.EXECUTABLE);
|
||||
@ -239,20 +248,44 @@ public class TodoRangeQueryService {
|
||||
groupStat.setStat(new Stat(executableCount, copiedToMeCount));
|
||||
return groupStat;
|
||||
};
|
||||
PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2();
|
||||
List<GroupStat> stats = nodes.parallelStream().map(nodeStatFun).collect(toList());
|
||||
for (GroupStat stat : stats)
|
||||
resp.addGroupStat(stat);
|
||||
resp.setTotalStat();
|
||||
return resp;
|
||||
try {
|
||||
PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2();
|
||||
// 先提交任务
|
||||
CompletableFuture<Integer> uniqueExecutableCountFuture = CompletableFuture.supplyAsync(
|
||||
() -> countStatByNodes(request, nodes, TodoType.EXECUTABLE), statExecutor);
|
||||
CompletableFuture<Integer> uniqueCopiedToMeCountFuture = CompletableFuture.supplyAsync(
|
||||
() -> countStatByNodes(request, nodes, TodoType.COPIED_TO_ME), statExecutor);
|
||||
// 用一个偏大的线程池, IO密集型任务, commonPool不够用
|
||||
// 提交任务并获取结果
|
||||
List<GroupStat> stats = statExecutor
|
||||
.submit(() -> nodes
|
||||
.parallelStream()
|
||||
.map(nodeStatFun)
|
||||
.collect(toList()))
|
||||
.get();
|
||||
resp.addGroupStats(stats);
|
||||
// 一个端可能会有重复分类
|
||||
resp.setTotalStat();
|
||||
CompletableFuture.allOf(uniqueExecutableCountFuture, uniqueCopiedToMeCountFuture).join();
|
||||
// 一个端不包含重复分类
|
||||
int uniqueExecutableCount = uniqueExecutableCountFuture.get();
|
||||
int uniqueCopiedToMeCount = uniqueCopiedToMeCountFuture.get();
|
||||
resp.setUniqueTotalStat(new Stat(uniqueExecutableCount, uniqueCopiedToMeCount));
|
||||
return resp;
|
||||
} catch (Exception e) {
|
||||
log.warn("分类统计异常, request={}", request, e);
|
||||
throw new ServiceException("分类统计, 请稍后重试");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* app首页
|
||||
*/
|
||||
public Integer countStat(MessageGroupNodeStatisticParam request) {
|
||||
List<ValueNode<NodeWrapper>> nodes = determineStatNodes(request);
|
||||
return countStatByNodes(request, nodes, TodoType.EXECUTABLE);
|
||||
PendingMessageStatisticResponseV2 nodeStat = nodeStatCache
|
||||
.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request));
|
||||
Stat totalStat = nodeStat.getUniqueTotalStat();
|
||||
return totalStat == null ? 0 : totalStat.getPendingCount();
|
||||
}
|
||||
|
||||
private int countStatByNode(MessageGroupNodeStatisticParam request,
|
||||
@ -268,7 +301,7 @@ public class TodoRangeQueryService {
|
||||
OuInfo ouInfo = OuInfo.create(request.getOuId(), request.getTerminalType(), todoType);
|
||||
LambdaQueryWrapper<Todo> query = todoQuery(ouInfo)
|
||||
.in(Todo::getTemplateCode, templateCodes)
|
||||
.eq(Todo::getExecutorPersonId, request.getOperator().getId())
|
||||
.eq(Todo::getExecutorPersonId, request.getPersonId())
|
||||
.in(CollectionUtils.isNotEmpty(request.getWorkspaceIds()), Todo::getOrgId, request.getWorkspaceIds())
|
||||
.and(todoType == TodoType.EXECUTABLE, nested -> nested
|
||||
.eq(Todo::getType, TodoType.EXECUTABLE)
|
||||
@ -280,10 +313,18 @@ public class TodoRangeQueryService {
|
||||
}
|
||||
|
||||
public List<ValueNode<NodeWrapper>> determineStatNodes(MessageGroupNodeStatisticParam request) {
|
||||
if (CollectionUtils.isNotEmpty(request.getGroupNodeCodes()))
|
||||
return groupTemplateService.getGroupRoot().getNodes(request.getGroupNodeCodes());
|
||||
Collection<String> nodeCodes = request.getGroupNodeCodes();
|
||||
if (nodeCodes == null) {
|
||||
nodeCodes = Collections.emptyList();
|
||||
}
|
||||
nodeCodes = nodeCodes.stream()
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(toList());
|
||||
List<ValueNode<NodeWrapper>> nodes = CollectionUtils.isNotEmpty(nodeCodes)
|
||||
? groupTemplateService.getGroupRoot().getNodes(nodeCodes)
|
||||
: groupTemplateService.getTodoGroups(request.getTerminalType());
|
||||
// 只返回给前端叶子节点的信息, 前端不需要层级信息
|
||||
return groupTemplateService.getTodoGroups(request.getTerminalType(), true);
|
||||
return groupTemplateService.collectLeafNodes(nodes);
|
||||
}
|
||||
|
||||
// helper
|
||||
|
||||
129
inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCache.java
vendored
Normal file
129
inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCache.java
vendored
Normal file
@ -0,0 +1,129 @@
|
||||
package cn.axzo.msg.center.message.service.todo.cache;
|
||||
|
||||
import cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper;
|
||||
import cn.axzo.msg.center.dal.mapper.TodoMapper;
|
||||
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
|
||||
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
|
||||
import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class NodeStatCache {
|
||||
|
||||
private final MessageBaseTemplateMapper messageBaseTemplateMapper;
|
||||
private final TodoMapper todoMapper;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final PendingMessageBizConfig cfg;
|
||||
|
||||
public PendingMessageStatisticResponseV2 getCacheResponseOrReload(
|
||||
MessageGroupNodeStatisticParam request, Supplier<PendingMessageStatisticResponseV2> loader) {
|
||||
if (!cfg.isNodeStatCacheOn()) {
|
||||
return loader.get();
|
||||
}
|
||||
Collection<String> nodeCodes = request.getGroupNodeCodes();
|
||||
if (CollectionUtils.isNotEmpty(nodeCodes) && nodeCodes.size() > cfg.getNodeStatCacheMaxRequestNodeCodeSize()) {
|
||||
log.info("超过了允许缓存的最大请求节点编码数量,不使用缓存, 直接从数据库获取. request={}", request);
|
||||
return loader.get();
|
||||
}
|
||||
Date templateLatestUpdateTime = messageBaseTemplateMapper.getTemplateLatestUpdateTime();
|
||||
Date todoLatestUpdateTime = todoMapper.getTodoLatestUpdateTime(request.getPersonId());
|
||||
NodeStatCacheValue cacheValue = getCacheValue(request);
|
||||
boolean reloaded = false;
|
||||
if (cacheValue == null || cacheValue.shouldReload(templateLatestUpdateTime, todoLatestUpdateTime)) {
|
||||
log.info("reloading node stat data, request={}", request);
|
||||
PendingMessageStatisticResponseV2 response = loader.get();
|
||||
cacheValue = new NodeStatCacheValue();
|
||||
cacheValue.setResponse(response);
|
||||
cacheValue.setTemplateLatestUpdateTime(templateLatestUpdateTime);
|
||||
cacheValue.setTodoLatestUpdateTime(todoLatestUpdateTime);
|
||||
setCacheValue(request, cacheValue);
|
||||
reloaded = true;
|
||||
}
|
||||
PendingMessageStatisticResponseV2 copy = new PendingMessageStatisticResponseV2();
|
||||
BeanUtils.copyProperties(cacheValue.getResponse(), copy);
|
||||
copy.setCacheReloaded(reloaded);
|
||||
return copy;
|
||||
}
|
||||
|
||||
// !! cache helper
|
||||
|
||||
private NodeStatCacheValue getCacheValue(MessageGroupNodeStatisticParam request) {
|
||||
String redisKey = buildCacheKey(request);
|
||||
String json = stringRedisTemplate.opsForValue().get(redisKey);
|
||||
if (StringUtils.isBlank(json)) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return JSON.parseObject(json, NodeStatCacheValue.class);
|
||||
} catch (Exception e) {
|
||||
// 人为改了缓存, 而且没有改对
|
||||
log.warn("fail to parse cache value, will reload. request={}, cacheValue={}", request, json, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void setCacheValue(MessageGroupNodeStatisticParam request, NodeStatCacheValue cacheValue) {
|
||||
log.info("set cache value for request. request={}", request);
|
||||
cacheValue.getResponse().setCacheReloaded(null);
|
||||
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getNodeStatCacheDataExpireHours());
|
||||
// add some random delta seconds, range: 0-600 seconds
|
||||
secondsDelta += new SecureRandom().nextInt(10 * 60);
|
||||
|
||||
String redisKey = buildCacheKey(request);
|
||||
String redisValue = JSON.toJSONString(cacheValue);
|
||||
stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private static String buildCacheKey(MessageGroupNodeStatisticParam request) {
|
||||
StringBuffer buf = new StringBuffer();
|
||||
buf.append("msg-center:node-stat:v3");
|
||||
Consumer<Object> appender = value -> {
|
||||
if (value != null) {
|
||||
buf.append(":").append(value);
|
||||
}
|
||||
};
|
||||
appender.accept(request.getPersonId());
|
||||
appender.accept(request.getOuId());
|
||||
appender.accept(request.getTerminalType());
|
||||
Collection<String> nodeCodes = request.getGroupNodeCodes();
|
||||
if (CollectionUtils.isNotEmpty(nodeCodes)) {
|
||||
// 排序避免编码顺序和重复编码的影响
|
||||
nodeCodes.stream()
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.distinct()
|
||||
.sorted()
|
||||
.forEach(appender);
|
||||
}
|
||||
List<Long> workspaceIds = request.getWorkspaceIds();
|
||||
if (CollectionUtils.isNotEmpty(workspaceIds)) {
|
||||
workspaceIds.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.distinct()
|
||||
.sorted()
|
||||
.forEach(appender);
|
||||
}
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
package cn.axzo.msg.center.message.service.todo.cache;
|
||||
|
||||
import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2;
|
||||
import cn.axzo.msg.center.utils.DateFormatUtil;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Data
|
||||
class NodeStatCacheValue {
|
||||
|
||||
private Date templateLatestUpdateTime;
|
||||
private Date todoLatestUpdateTime;
|
||||
private String templateLatestUpdateTimeStr;
|
||||
private String todoLatestUpdateTimeStr;
|
||||
private PendingMessageStatisticResponseV2 response;
|
||||
|
||||
boolean shouldReload(Date templateLatestUpdateTime, Date todoLatestUpdateTime) {
|
||||
return !Objects.equals(this.templateLatestUpdateTime, templateLatestUpdateTime)
|
||||
|| !Objects.equals(this.todoLatestUpdateTime, todoLatestUpdateTime);
|
||||
}
|
||||
|
||||
public void setTemplateLatestUpdateTime(Date templateLatestUpdateTime) {
|
||||
this.templateLatestUpdateTime = templateLatestUpdateTime;
|
||||
this.templateLatestUpdateTimeStr = DateFormatUtil.toReadableString(templateLatestUpdateTime);
|
||||
}
|
||||
|
||||
public void setTodoLatestUpdateTime(Date todoLatestUpdateTime) {
|
||||
this.todoLatestUpdateTime = todoLatestUpdateTime;
|
||||
this.todoLatestUpdateTimeStr = DateFormatUtil.toReadableString(todoLatestUpdateTime);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,183 @@
|
||||
package cn.axzo.msg.center.message.service.todo.mybatis;
|
||||
|
||||
import com.alibaba.druid.sql.ast.SQLExpr;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
|
||||
import com.alibaba.druid.sql.visitor.SQLASTVisitor;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.ibatis.cache.CacheKey;
|
||||
import org.apache.ibatis.executor.Executor;
|
||||
import org.apache.ibatis.mapping.BoundSql;
|
||||
import org.apache.ibatis.mapping.MappedStatement;
|
||||
import org.apache.ibatis.mapping.ParameterMapping;
|
||||
import org.apache.ibatis.mapping.SqlCommandType;
|
||||
import org.apache.ibatis.mapping.SqlSource;
|
||||
import org.apache.ibatis.plugin.Interceptor;
|
||||
import org.apache.ibatis.plugin.Intercepts;
|
||||
import org.apache.ibatis.plugin.Invocation;
|
||||
import org.apache.ibatis.plugin.Signature;
|
||||
import org.apache.ibatis.session.Configuration;
|
||||
import org.apache.ibatis.session.ResultHandler;
|
||||
import org.apache.ibatis.session.RowBounds;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Intercepts({
|
||||
@Signature(type = Executor.class, method = "update",
|
||||
args = {MappedStatement.class, Object.class}),
|
||||
@Signature(type = Executor.class, method = "query",
|
||||
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
|
||||
@Signature(type = Executor.class, method = "query",
|
||||
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
|
||||
})
|
||||
public class ReplaceInterceptor implements Interceptor {
|
||||
|
||||
private static final ThreadLocal<Replacement> LOCAL = new ThreadLocal<>();
|
||||
|
||||
public static void enable(Replacement replacement) {
|
||||
LOCAL.set(replacement);
|
||||
}
|
||||
|
||||
public static void disable() {
|
||||
LOCAL.remove();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object intercept(Invocation invocation) throws Throwable {
|
||||
Replacement replacement = LOCAL.get();
|
||||
if (replacement == null)
|
||||
return invocation.proceed();
|
||||
final Object[] args = invocation.getArgs();
|
||||
MappedStatement ms = (MappedStatement) args[0];
|
||||
SqlCommandType commandType = ms.getSqlCommandType();
|
||||
BoundSql boundSql = ms.getBoundSql(args[1]);
|
||||
SQLStatement stmt;
|
||||
MySqlStatementParser parser = new MySqlStatementParser(boundSql.getSql());
|
||||
if (commandType == SqlCommandType.UPDATE) {
|
||||
stmt = parser.parseUpdateStatement();
|
||||
} else if (commandType == SqlCommandType.SELECT) {
|
||||
stmt = parser.parseSelect();
|
||||
} else if (commandType == SqlCommandType.INSERT) {
|
||||
stmt = parser.parseInsert();
|
||||
} else if (commandType == SqlCommandType.DELETE) {
|
||||
stmt = parser.parseDeleteStatement();
|
||||
} else {
|
||||
return invocation.proceed();
|
||||
}
|
||||
if (stmt instanceof MySqlInsertStatement && replacement.isInsertPreserveId()) {
|
||||
maybeInsertPreserveId((MySqlInsertStatement) stmt, boundSql.getParameterObject());
|
||||
}
|
||||
stmt.accept(new ReplaceVisitor(replacement));
|
||||
String newSql = stmt.toString();
|
||||
args[0] = copyMappedStatement(ms, parameterObject -> new CustomBoundSql(
|
||||
ms.getConfiguration(), newSql, boundSql.getParameterMappings(), parameterObject, boundSql));
|
||||
return invocation.proceed();
|
||||
}
|
||||
|
||||
private void maybeInsertPreserveId(MySqlInsertStatement stmt, Object parameterObject) {
|
||||
if (stmt.getColumns().isEmpty() || stmt.getValues() == null) {
|
||||
return;
|
||||
}
|
||||
List<SQLExpr> columns = stmt.getColumns();
|
||||
String idColumn = columns.stream()
|
||||
.filter(column -> column instanceof SQLIdentifierExpr)
|
||||
.map(SQLIdentifierExpr.class::cast)
|
||||
.map(SQLIdentifierExpr::getName)
|
||||
.filter(name -> name.equals("id"))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (idColumn != null) return;
|
||||
Number idValue = getIdValue(parameterObject);
|
||||
if (idValue == null) return;
|
||||
columns.add(new SQLIdentifierExpr("id"));
|
||||
stmt.getValues().addValue(idValue.longValue());
|
||||
}
|
||||
|
||||
private Number getIdValue(Object parameterObject) {
|
||||
Field field = ReflectionUtils.findField(parameterObject.getClass(), "id");
|
||||
if (field == null) return null;
|
||||
field.setAccessible(true);
|
||||
Object value = ReflectionUtils.getField(field, parameterObject);
|
||||
return value instanceof Number ? (Number) value : null;
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class ReplaceVisitor implements SQLASTVisitor {
|
||||
|
||||
final Replacement replacement;
|
||||
|
||||
@Override
|
||||
public boolean visit(SQLExprTableSource x) {
|
||||
SQLExpr expr = x.getExpr();
|
||||
if (expr instanceof SQLIdentifierExpr) {
|
||||
String tableName = ((SQLIdentifierExpr) expr).getName();
|
||||
if (tableName.equals(replacement.getSrcTable())) {
|
||||
x.replace(expr, new SQLIdentifierExpr(replacement.getDestTable()));
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class CustomBoundSql extends BoundSql {
|
||||
|
||||
final BoundSql delegate;
|
||||
|
||||
CustomBoundSql(Configuration configuration, String sql, List<ParameterMapping> parameterMappings,
|
||||
Object parameterObject, BoundSql delegate) {
|
||||
super(configuration, sql, parameterMappings, parameterObject);
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasAdditionalParameter(String name) {
|
||||
if (super.hasAdditionalParameter(name))
|
||||
return true;
|
||||
return delegate.hasAdditionalParameter(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAdditionalParameter(String name) {
|
||||
Object parameter = super.getAdditionalParameter(name);
|
||||
if (parameter != null)
|
||||
return parameter;
|
||||
return delegate.getAdditionalParameter(name);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private MappedStatement copyMappedStatement(MappedStatement ms, SqlSource sqlSource) {
|
||||
MappedStatement.Builder builder =
|
||||
new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), sqlSource, ms.getSqlCommandType());
|
||||
builder.resource(ms.getResource());
|
||||
builder.fetchSize(ms.getFetchSize());
|
||||
builder.statementType(ms.getStatementType());
|
||||
builder.keyGenerator(ms.getKeyGenerator());
|
||||
if (ms.getKeyProperties() != null && ms.getKeyProperties().length != 0) {
|
||||
StringBuilder keyProperties = new StringBuilder();
|
||||
for (String keyProperty : ms.getKeyProperties()) {
|
||||
keyProperties.append(keyProperty).append(",");
|
||||
}
|
||||
keyProperties.delete(keyProperties.length() - 1, keyProperties.length());
|
||||
builder.keyProperty(keyProperties.toString());
|
||||
}
|
||||
builder.timeout(ms.getTimeout());
|
||||
builder.parameterMap(ms.getParameterMap());
|
||||
builder.resultMaps(ms.getResultMaps());
|
||||
builder.resultSetType(ms.getResultSetType());
|
||||
builder.cache(ms.getCache());
|
||||
builder.flushCacheRequired(ms.isFlushCacheRequired());
|
||||
builder.useCache(ms.isUseCache());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,30 @@
|
||||
package cn.axzo.msg.center.message.service.todo.mybatis;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public enum Replacement {
|
||||
|
||||
TO_MESSAGE_RECORD_COLD("message_record", "message_record_cold", true)
|
||||
|
||||
;
|
||||
|
||||
private final String srcTable;
|
||||
private final String destTable;
|
||||
private final boolean insertPreserveId;
|
||||
|
||||
public void run(Runnable runnable) {
|
||||
ReplaceInterceptor.enable(this);
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
ReplaceInterceptor.disable();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -12,6 +12,11 @@ import org.springframework.context.annotation.Configuration;
|
||||
@Configuration
|
||||
public class SqlInterceptorConfig {
|
||||
|
||||
@Bean
|
||||
ReplaceInterceptor replaceTableInterceptor() {
|
||||
return new ReplaceInterceptor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
CollectSQLInterceptor sqlCollectInterceptor() {
|
||||
return new CollectSQLInterceptor();
|
||||
|
||||
@ -0,0 +1,130 @@
|
||||
package cn.axzo.msg.center.message.xxl;
|
||||
|
||||
import cn.axzo.basics.common.exception.ServiceException;
|
||||
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
|
||||
import cn.axzo.msg.center.domain.entity.MessageRecord;
|
||||
import cn.axzo.msg.center.domain.persistence.BaseEntity;
|
||||
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
|
||||
import cn.axzo.msg.center.message.service.todo.mybatis.ReplaceInterceptor;
|
||||
import cn.axzo.msg.center.message.service.todo.mybatis.Replacement;
|
||||
import cn.axzo.msg.center.utils.DateFormatUtil;
|
||||
import cn.axzo.msg.center.utils.JSONObjectUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.taobao.api.internal.util.NamedThreadFactory;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.joda.time.DateTime;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class MigrateOldMsgHotDataJob extends IJobHandler {
|
||||
|
||||
private final MessageRecordMapper messageRecordMapper;
|
||||
private final FunctionalTransactionTemplate transactionTemplate;
|
||||
private final ExecutorService executor = Executors.newSingleThreadExecutor(
|
||||
new NamedThreadFactory("migrate-old-msg-hot-data-job"));
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
@Override
|
||||
@XxlJob("migrateOldMsgHotDataJob")
|
||||
public ReturnT<String> execute(String paramStr) throws Exception {
|
||||
log.info("start - run job with param={}", paramStr);
|
||||
try {
|
||||
Param param = JSONObjectUtil.parseObject(paramStr, Param.class);
|
||||
tryExecute(param, false);
|
||||
log.info("end - run job with param={}", param);
|
||||
return ReturnT.SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.warn("job failed. param={}", paramStr, e);
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
public void tryExecute(@Nullable Param param, boolean async) {
|
||||
synchronized (this) {
|
||||
if (isRunning) throw new ServiceException("job is running, please wait");
|
||||
isRunning = true;
|
||||
}
|
||||
Runnable task = () -> {
|
||||
try {
|
||||
executeImpl(param == null ? new Param() : param);
|
||||
} finally {
|
||||
isRunning = false;
|
||||
}
|
||||
};
|
||||
if (async) {
|
||||
executor.execute(task);
|
||||
} else {
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
|
||||
private void executeImpl(Param param) {
|
||||
Date until = DateTime.now().minusDays(param.daysAgo).toDate();
|
||||
Iterable<Page<MessageRecord>> pages = messageRecordMapper
|
||||
.iterateBathStickyPage(query(MessageRecord.class)
|
||||
.le(MessageRecord::getCreateAt, until)
|
||||
.orderByAsc(MessageRecord::getId));
|
||||
log.info("migrating records before/equal date: {} and it's {} days ago",
|
||||
DateFormatUtil.toReadableString(until), param.daysAgo);
|
||||
int currentPage = 0, totalPage = 0, totalRecordCount = 0;
|
||||
for (Page<MessageRecord> page : pages) {
|
||||
++currentPage;
|
||||
// 由于会删除数据, 总页数会变化, 所以只取第一次的总页数才是对的
|
||||
if (totalPage == 0) totalPage = (int) page.getPages();
|
||||
log.info("migrating page={}, total page={}", currentPage, totalPage);
|
||||
for (List<MessageRecord> batch : Lists.partition(page.getRecords(), param.saveBatch)) {
|
||||
totalRecordCount += batch.size();
|
||||
transactionTemplate.exec(() -> migrateRecords(batch));
|
||||
log.info("current total migrated record count={}", totalRecordCount);
|
||||
}
|
||||
}
|
||||
log.info("final total migrated record count={}", totalRecordCount);
|
||||
}
|
||||
|
||||
private void migrateRecords(List<MessageRecord> records) {
|
||||
ReplaceInterceptor.enable(Replacement.TO_MESSAGE_RECORD_COLD);
|
||||
try {
|
||||
messageRecordMapper.batchInsertWithId(records);
|
||||
} catch (Exception e) {
|
||||
List<Long> ids = records.stream()
|
||||
.map(BaseEntity::getId).collect(toList());
|
||||
log.warn("migrate records failed, recordIds={}, records={}",
|
||||
JSON.toJSONString(ids), JSON.toJSONString(records), e);
|
||||
throw e;
|
||||
} finally {
|
||||
ReplaceInterceptor.disable();
|
||||
}
|
||||
List<Long> ids = records.stream()
|
||||
.map(MessageRecord::getId)
|
||||
.collect(toList());
|
||||
messageRecordMapper.deleteBatchIds(ids);
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Param {
|
||||
private int daysAgo = 180;
|
||||
private int saveBatch = 200;
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -12,9 +13,16 @@ import lombok.RequiredArgsConstructor;
|
||||
public enum MqMessageType {
|
||||
|
||||
TODO_PRESET_BUTTON_PRESSED("msg-center-todo", "msg-center-todo-preset-button-pressed", "预设按钮被点击"),
|
||||
OLD_MSG_SEND("msg-center-old-msg", "old-msg-send", "发送旧消息");
|
||||
|
||||
;
|
||||
private final String model;
|
||||
private final String tag;
|
||||
private final String desc;
|
||||
|
||||
public Event.EventCode getEventCode() {
|
||||
return Event.EventCode.builder()
|
||||
.module(model)
|
||||
.name(tag)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@ -1,7 +1,6 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.Event.EventCode;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
@ -25,12 +24,11 @@ public class MqProducer {
|
||||
log.info("开始 - 发送消息. messageRecord={}", record);
|
||||
try {
|
||||
sendImpl(record);
|
||||
log.info("结束 - 发送消息. messageRecord={}", record);
|
||||
} catch (Exception e) {
|
||||
log.warn("异常 - 发送消息. messageRecord={}", record, e);
|
||||
// 由调用方决定怎么处理异常
|
||||
throw e;
|
||||
} finally {
|
||||
log.info("结束 - 发送消息. messageRecord={}", record);
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +38,7 @@ public class MqProducer {
|
||||
.shardingKey(record.getShardingKey())
|
||||
.targetId(record.getTargetId())
|
||||
.targetType(messageType.getModel())
|
||||
.eventCode(new EventCode(messageType.getModel(), messageType.getTag()))
|
||||
.eventCode(messageType.getEventCode())
|
||||
.eventModule(messageType.getModel())
|
||||
.eventName(messageType.getTag())
|
||||
.operatorId(record.getOperatorId())
|
||||
|
||||
@ -8,19 +8,28 @@ import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.framework.rocketmq.EventProducer.Context;
|
||||
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
|
||||
import cn.axzo.framework.rocketmq.RocketMQEventProducer.RocketMQMessageMeta;
|
||||
import cn.axzo.framework.rocketmq.utils.TraceUtils;
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.slf4j.MDC;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
@ -77,6 +86,52 @@ public class RocketMQConfig {
|
||||
}
|
||||
}
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
|
||||
consumerGroup = "GID_topic_msg_center_self_consume_${spring.profiles.active}",
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
nameServer = "${rocketmq.name-server}"
|
||||
)
|
||||
public static class MsgCenterListener extends BaseListener implements RocketMQListener<MessageExt> {
|
||||
|
||||
private final EventConsumer eventConsumer;
|
||||
private final PendingMessageBizConfig cfg;
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt message) {
|
||||
onEvent(message, eventConsumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 拷贝下来改一下, 设置最大处理时间
|
||||
*/
|
||||
public void onEvent(MessageExt message, EventConsumer eventConsumer) {
|
||||
String topic = message.getTopic();
|
||||
String value = new String(message.getBody());
|
||||
Map<String, String> headers = message.getProperties();
|
||||
MDC.put(TraceUtils.CTX_LOG_ID, headers.get(TraceUtils.TRACE_ID));
|
||||
MDC.put(TraceUtils.TRACE_ID, headers.get(TraceUtils.TRACE_ID));
|
||||
MDC.put(TraceUtils.TRACE_ID_IN_MDC, headers.get(TraceUtils.TRACE_ID));
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("received message, topic={}, headers={}, value={}", topic, headers, value);
|
||||
}
|
||||
|
||||
//当前消息所在分区的lag. 而不是整个topic
|
||||
Long partitionLag = Optional.ofNullable(message.getProperties().get(MessageConst.PROPERTY_MAX_OFFSET))
|
||||
.map(e -> Long.parseLong(e) - message.getQueueOffset()).orElse(0L);
|
||||
|
||||
eventConsumer.onEvent(value, EventConsumer.Context.builder()
|
||||
.msgId(message.getMsgId())
|
||||
.ext(ImmutableMap.of("topic", topic))
|
||||
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {})))
|
||||
.lagSupplier(() -> partitionLag)
|
||||
.maxAllowElapsedMillis(cfg.getMsgCenterMqSelfConsumeMaxExecMs())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
EventHandlerRepository eventHandlerRepository() {
|
||||
return new EventHandlerRepository((ex, logText) -> {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package cn.axzo.msg.center.utils;
|
||||
|
||||
import cn.axzo.msg.center.common.utils.ReflectionUtils;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.AccessLevel;
|
||||
@ -35,6 +36,19 @@ public final class JSONObjectUtil {
|
||||
return JSON.parseObject(str);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析JSON字符串,若字符串格式不正确,抛异常
|
||||
*
|
||||
* @param str 待解析的字符串
|
||||
* @return JSONObject
|
||||
*/
|
||||
public static <T> T parseObject(String str, Class<T> type) {
|
||||
if (StringUtils.isBlank(str)) {
|
||||
return ReflectionUtils.newInstance(type);
|
||||
}
|
||||
return JSON.parseObject(str, type);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析JSON字符串,若字符串格式不正确,抛异常
|
||||
*
|
||||
|
||||
@ -0,0 +1,20 @@
|
||||
package cn.axzo.msg.center.api.mq;
|
||||
|
||||
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
|
||||
import cn.axzo.msg.center.api.request.GeneralMessageReq;
|
||||
import cn.axzo.msg.center.api.request.MessageReadAllReq;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class SendMessageRecordMessage extends MqMessage implements Serializable {
|
||||
private GeneralMessageReq sendRequest;
|
||||
private MessageReadAllReq readRequest;
|
||||
private CmsReadMsgReq cmsReadRequest;
|
||||
}
|
||||
@ -107,4 +107,5 @@ public class CmsMsgQueryReq extends PageRequest {
|
||||
@Deprecated
|
||||
private Long identityId;
|
||||
|
||||
private boolean logRequest = true;
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package cn.axzo.msg.center.api.request;
|
||||
|
||||
import cn.axzo.msg.center.api.enums.BizTypeEnum;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Data;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
@ -29,4 +30,8 @@ public class CmsReadMsgReq {
|
||||
*/
|
||||
private transient BizTypeEnum bizTypeEnum = BizTypeEnum.CONSTRUCTION;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@ package cn.axzo.msg.center.api.request;
|
||||
|
||||
import cn.axzo.msg.center.api.enums.MsgRecordTerminalTypeEnum;
|
||||
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
@ -78,5 +78,10 @@ public class GeneralMessageReq extends AbstractMessage implements Serializable {
|
||||
* appClient: e.g. cmp cm
|
||||
*/
|
||||
public String appClient;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -40,6 +40,19 @@ public class IdentityDTO implements Serializable {
|
||||
return Objects.nonNull(id) && Objects.nonNull(type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
IdentityDTO that = (IdentityDTO) o;
|
||||
return Objects.equals(id, that.id) && type == that.type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
|
||||
@ -38,6 +38,8 @@ public class GeneralMessageOldDataStatisticResponse implements Serializable {
|
||||
*/
|
||||
private String latestMsgContent;
|
||||
|
||||
private Boolean reloadForeground;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
|
||||
@ -28,15 +28,26 @@ public class PendingMessageStatisticResponseV2 implements Serializable {
|
||||
private static final long serialVersionUID = 972200410809186003L;
|
||||
|
||||
/**
|
||||
* 统计情况总数 (各分类相加)
|
||||
* 统计总数 (各分类相加)
|
||||
*/
|
||||
private Stat totalStat;
|
||||
|
||||
/**
|
||||
* 统计总数 (一个端分类配置重复了也没有关系)
|
||||
*/
|
||||
private Stat uniqueTotalStat;
|
||||
|
||||
/**
|
||||
* 分类情况
|
||||
*/
|
||||
private List<GroupStat> groupStats = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
private Boolean cacheReloaded;
|
||||
|
||||
public void addGroupStats(List<GroupStat> groupStats) {
|
||||
this.groupStats.addAll(groupStats);
|
||||
}
|
||||
|
||||
public void addGroupStat(GroupStat groupStat) {
|
||||
this.groupStats.add(groupStat);
|
||||
}
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
package cn.axzo.msg.center.common.utils;
|
||||
|
||||
import cn.axzo.basics.common.exception.ServiceException;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
@ -15,6 +17,14 @@ import java.util.function.Predicate;
|
||||
*/
|
||||
public final class ReflectionUtils {
|
||||
|
||||
public static <T> T newInstance(Class<T> type) {
|
||||
try {
|
||||
return type.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(String.format("初始化失败. type=%s", type.getName()), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取所有field字段,包含父类继承的
|
||||
|
||||
@ -4,6 +4,9 @@ import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* @description
|
||||
* 消息基础模板Mapper
|
||||
@ -13,4 +16,8 @@ import org.apache.ibatis.annotations.Mapper;
|
||||
*/
|
||||
@Mapper
|
||||
public interface MessageBaseTemplateMapper extends BaseMapper<MessageBaseTemplate> {
|
||||
|
||||
@Nullable
|
||||
Date getTemplateLatestUpdateTime();
|
||||
|
||||
}
|
||||
|
||||
@ -9,8 +9,7 @@ import cn.axzo.msg.center.domain.dto.MsgStatisticsDTO;
|
||||
import cn.axzo.msg.center.domain.dto.UpdateReadDTO;
|
||||
import cn.axzo.msg.center.domain.entity.MessageRecord;
|
||||
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import cn.axzo.msg.center.util.IterableMapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
@ -23,8 +22,11 @@ import java.util.List;
|
||||
* @since 2022-03-28 14:59:16
|
||||
*/
|
||||
/*@Mapper*/
|
||||
public interface MessageRecordMapper extends BaseMapper<MessageRecord>{
|
||||
public interface MessageRecordMapper extends IterableMapper<MessageRecord> {
|
||||
|
||||
void batchInsertWithId(@Param("records") List<MessageRecord> records);
|
||||
|
||||
void deleteRecordsPhysically(@Param("ids") List<Long> ids);
|
||||
|
||||
List<MsgStatisticsDTO> statisticsMsg(@Param("bizType") Integer bizType,
|
||||
@Param("personId") Long personId,
|
||||
|
||||
@ -5,6 +5,7 @@ import cn.axzo.msg.center.domain.entity.Todo;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@ -21,4 +22,7 @@ public interface TodoMapper extends BaseMapper<Todo> {
|
||||
|
||||
List<Long> getMigratedBusinessId();
|
||||
|
||||
}
|
||||
@Nullable
|
||||
Date getTodoLatestUpdateTime(@Param("executorPersonId") Long executorPersonId);
|
||||
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
package cn.axzo.msg.center.util;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
public interface IterableMapper<T> extends BaseMapper<T> {
|
||||
|
||||
// 注意自己的分页插件有没有限制分页的最大条数
|
||||
int DEFAULT_BATCH_SIZE = 2000;
|
||||
|
||||
default Iterable<Page<T>> iterateBath(LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateBath(this, DEFAULT_BATCH_SIZE, query);
|
||||
}
|
||||
|
||||
default Iterable<Page<T>> iterateBath(long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateBath(this, pageSize, query);
|
||||
}
|
||||
|
||||
default Iterable<T> iterateElement(LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateElement(this, DEFAULT_BATCH_SIZE, query);
|
||||
}
|
||||
|
||||
default Iterable<T> iterateElement(long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateElement(this, pageSize, query);
|
||||
}
|
||||
|
||||
default Iterable<Page<T>> iterateBathStickyPage(LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateBathStickyPage(this, DEFAULT_BATCH_SIZE, query);
|
||||
}
|
||||
|
||||
default Iterable<Page<T>> iterateBathStickyPage(long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateBathStickyPage(this, pageSize, query);
|
||||
}
|
||||
|
||||
default Iterable<T> iterateElementStickyPage(LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateElementStickyPage(this, DEFAULT_BATCH_SIZE, query);
|
||||
}
|
||||
|
||||
default Iterable<T> iterateElementStickyPage(long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return RecordIterable.iterateElementStickyPage(this, pageSize, query);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,127 @@
|
||||
package cn.axzo.msg.center.util;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* 不要直接使用这个类, 用 {@link IterableMapper}
|
||||
*
|
||||
* @author yanglin
|
||||
*/
|
||||
class RecordIterable {
|
||||
|
||||
static <T, M extends BaseMapper<T>> Iterable<Page<T>> iterateBath(
|
||||
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return () -> new BatchIterator<>(
|
||||
mapper, pageSize, query, PageStrategy.ADVANCE_CURRENT_PAGE);
|
||||
}
|
||||
|
||||
static <T, M extends BaseMapper<T>> Iterable<T> iterateElement(
|
||||
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return () -> new ElementIterator<>(new BatchIterator<>(
|
||||
mapper, pageSize, query, PageStrategy.ADVANCE_CURRENT_PAGE));
|
||||
}
|
||||
|
||||
static <T, M extends BaseMapper<T>> Iterable<Page<T>> iterateBathStickyPage(
|
||||
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return () -> new BatchIterator<>(
|
||||
mapper, pageSize, query, PageStrategy.STICKY_PAGE);
|
||||
}
|
||||
|
||||
static <T, M extends BaseMapper<T>> Iterable<T> iterateElementStickyPage(
|
||||
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
|
||||
return () -> new ElementIterator<>(new BatchIterator<>(
|
||||
mapper, pageSize, query, PageStrategy.STICKY_PAGE));
|
||||
}
|
||||
|
||||
private static class ElementIterator<T, M extends BaseMapper<T>> implements Iterator<T> {
|
||||
|
||||
private final BatchIterator<T, M> pageIterator;
|
||||
private Iterator<T> elementIterator;
|
||||
|
||||
ElementIterator(BatchIterator<T, M> pageIterator) {
|
||||
this.pageIterator = pageIterator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
maybeAdvance();
|
||||
return elementIterator.hasNext() || pageIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next() {
|
||||
if (!hasNext())
|
||||
throw new NoSuchElementException();
|
||||
return elementIterator.next();
|
||||
}
|
||||
|
||||
private void maybeAdvance() {
|
||||
if (elementIterator == null || !elementIterator.hasNext() && pageIterator.hasNext()) {
|
||||
elementIterator = pageIterator.hasNext()
|
||||
? pageIterator.next().getRecords().iterator()
|
||||
: Collections.emptyIterator();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class BatchIterator<T, M extends BaseMapper<T>> implements Iterator<Page<T>> {
|
||||
|
||||
private final M mapper;
|
||||
private final LambdaQueryWrapper<T> query;
|
||||
private final PageStrategy pageStrategy;
|
||||
private final Page<T> page;
|
||||
private boolean pageConsumed = false;
|
||||
|
||||
BatchIterator(M mapper, long pageSize, LambdaQueryWrapper<T> query, PageStrategy pageStrategy) {
|
||||
this.mapper = mapper;
|
||||
this.query = query;
|
||||
this.pageStrategy = pageStrategy;
|
||||
this.page = new Page<>();
|
||||
this.page.setSize(pageSize);
|
||||
this.page.setCurrent(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
maybeFetchNextPage();
|
||||
// 1. 很明显的还有下一页
|
||||
// 2. 看当前页的数据以及当前页的消费情况 (第一页和最后一页)
|
||||
return page.getCurrent() < page.getPages() || (!pageConsumed && !page.getRecords().isEmpty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<T> next() {
|
||||
if (!hasNext())
|
||||
throw new NoSuchElementException();
|
||||
pageConsumed = true;
|
||||
Page<T> copy = new Page<>();
|
||||
// 不能暴露内部状态. 如果外部改动了内部的page, 会影响到下次迭代
|
||||
BeanUtils.copyProperties(page, copy);
|
||||
return copy;
|
||||
}
|
||||
|
||||
private void maybeFetchNextPage() {
|
||||
// 还没有获取过数据或者当前页数据已经消费, 就获取新page数据
|
||||
if (page.getCurrent() == 0 || (pageConsumed && page.hasNext())) {
|
||||
if (pageStrategy == PageStrategy.ADVANCE_CURRENT_PAGE)
|
||||
page.setCurrent(page.getCurrent() + 1);
|
||||
else
|
||||
page.setCurrent(1);
|
||||
mapper.selectPage(page, query);
|
||||
pageConsumed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private enum PageStrategy {
|
||||
ADVANCE_CURRENT_PAGE, STICKY_PAGE
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper">
|
||||
|
||||
<select id="getTemplateLatestUpdateTime" resultType="date">
|
||||
SELECT max(update_at) FROM message_base_template
|
||||
</select>
|
||||
|
||||
</mapper>
|
||||
@ -2,6 +2,30 @@
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="cn.axzo.msg.center.dal.mapper.MessageRecordMapper">
|
||||
|
||||
<insert id="batchInsertWithId">
|
||||
INSERT INTO message_record(
|
||||
id, `from_id`, `to_id`, `person_id`, `type`, `relation_id`, `module_id`, `title`, `content`, `receive_type`, `terminal_type`,
|
||||
`terminal_id`, `terminal_name`, `state`, `biz_id`, `router_params`, `extra`, `retry_count`, `callback_url`, `fail_cause`,
|
||||
`event_source`, `tenant_id`, `is_delete`, `create_at`, `update_at`, `old_type_id`
|
||||
)
|
||||
VALUES
|
||||
<foreach collection="records" item="i" separator=",">
|
||||
(
|
||||
#{i.id}, #{i.fromId}, #{i.toId}, #{i.personId}, COALESCE(#{i.type}, 0), #{i.relationId}, #{i.moduleId}, #{i.title}, #{i.content}, COALESCE(#{i.receiveType}, 0), COALESCE(#{i.terminalType}, 0),
|
||||
#{i.terminalId}, #{i.terminalName}, COALESCE(#{i.state}, 0), #{i.bizId}, #{i.routerParams}, #{i.extra}, #{i.retryCount}, #{i.callbackUrl}, #{i.failCause},
|
||||
#{i.eventSource}, #{i.tenantId}, #{i.isDelete}, #{i.createAt}, #{i.updateAt}, #{i.oldTypeId}
|
||||
)
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
<delete id="deleteRecordsPhysically">
|
||||
DELETE FROM message_record
|
||||
WHERE id IN
|
||||
<foreach collection="ids" item="id" separator="," open="(" close=")">
|
||||
#{id}
|
||||
</foreach>
|
||||
</delete>
|
||||
|
||||
<select id="statisticsMsg"
|
||||
resultType="cn.axzo.msg.center.domain.dto.MsgStatisticsDTO">
|
||||
select type,
|
||||
|
||||
@ -16,5 +16,9 @@
|
||||
AND record_ext -> '$.isMigratedFromPendingMessage' = true
|
||||
</select>
|
||||
|
||||
<select id="getTodoLatestUpdateTime" resultType="date">
|
||||
SELECT max(update_at) FROM todo WHERE executor_person_id = #{executorPersonId}
|
||||
</select>
|
||||
|
||||
</mapper>
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
<include resource="logback/logback-axzo.xml" />
|
||||
<!-- 覆盖开发环境日志配置 -->
|
||||
<springProfile name="local,dev">
|
||||
<logger name="cn.axzo" level="DEBUG" />
|
||||
<logger name="cn.axzo" level="INFO" />
|
||||
</springProfile>
|
||||
|
||||
<appender name="XXL-JOB" class="cn.axzo.msg.center.notices.client.util.XxlAppender">
|
||||
|
||||
@ -194,9 +194,6 @@ public class SmsManager extends BaseManager implements SmsGateway {
|
||||
request.setParams(param);
|
||||
request.setRequestNo(UUID.randomUUID().toString());
|
||||
request.setInternalObj(MnsType.VERIFY_CODE);
|
||||
HashMap<String, Object> params = new HashMap<>();
|
||||
params.put("code", code);
|
||||
request.setParams(params);
|
||||
messageService.sendMessage(request);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,50 @@
|
||||
package cn.axzo.msg.center.dal.mapper;
|
||||
|
||||
import cn.axzo.msg.center.MsgCenterApplication;
|
||||
import cn.axzo.msg.center.dal.MessageRecordDao;
|
||||
import cn.axzo.msg.center.domain.entity.MessageRecord;
|
||||
import cn.axzo.msg.center.message.service.todo.mybatis.Replacement;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@SpringBootTest(classes = MsgCenterApplication.class)
|
||||
@RequiredArgsConstructor(onConstructor_ = @Autowired)
|
||||
class MessageRecordMapperTest {
|
||||
|
||||
private final MessageRecordMapper messageRecordMapper;
|
||||
private final MessageRecordDao messageRecordDao;
|
||||
|
||||
@Test
|
||||
void tryMigrate() {
|
||||
Iterable<Page<MessageRecord>> pages = messageRecordMapper
|
||||
.iterateBath(10, query(MessageRecord.class)
|
||||
.orderByAsc(MessageRecord::getId));
|
||||
|
||||
Iterator<Page<MessageRecord>> iterator = pages.iterator();
|
||||
// 试试前5页数据,测试一下
|
||||
for (int i = 0; i < 5 && iterator.hasNext(); i++) {
|
||||
Page<MessageRecord> page = iterator.next();
|
||||
Replacement.TO_MESSAGE_RECORD_COLD.run(() -> {
|
||||
List<MessageRecord> records = page.getRecords();
|
||||
List<Long> ids = records.stream()
|
||||
.map(MessageRecord::getId)
|
||||
.collect(toList());
|
||||
messageRecordMapper.deleteRecordsPhysically(ids);
|
||||
messageRecordDao.saveBatch(records);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package cn.axzo.msg.center.message.service.impl.oldmsg;
|
||||
|
||||
import cn.axzo.msg.center.MsgCenterApplication;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@SpringBootTest(classes = MsgCenterApplication.class)
|
||||
@RequiredArgsConstructor(onConstructor_ = @Autowired)
|
||||
class OldMsgStatCacheReloadJobTest {
|
||||
private final OldMsgStatCacheReloadJob oldMsgStatCacheReloadJob;
|
||||
|
||||
@Test
|
||||
void foo() throws Exception {
|
||||
oldMsgStatCacheReloadJob.execute(null);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package cn.axzo.msg.center.message.xxl;
|
||||
|
||||
import cn.axzo.msg.center.MsgCenterApplication;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.Commit;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@SpringBootTest(classes = MsgCenterApplication.class)
|
||||
@RequiredArgsConstructor(onConstructor_ = @Autowired)
|
||||
class MigrateOldMsgHotDataJobTest {
|
||||
|
||||
private final MigrateOldMsgHotDataJob migrateOldMsgHotDataJob;
|
||||
|
||||
@Test @Commit
|
||||
void foo() throws Exception {
|
||||
migrateOldMsgHotDataJob.execute("{ \"daysAgo\": 180, \"saveBatch\": 200 }\n");
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,34 @@
|
||||
package cn.axzo.msg.center.util;
|
||||
|
||||
import cn.axzo.msg.center.MsgCenterApplication;
|
||||
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
|
||||
import cn.axzo.msg.center.domain.entity.MessageRecord;
|
||||
import cn.axzo.msg.center.inside.notices.utils.Queries;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@SpringBootTest(classes = MsgCenterApplication.class)
|
||||
@RequiredArgsConstructor(onConstructor_ = @Autowired)
|
||||
class RecordIterableTest {
|
||||
|
||||
private final MessageRecordMapper messageRecordMapper;
|
||||
|
||||
@Test
|
||||
void foo() {
|
||||
Iterable<MessageRecord> records = messageRecordMapper
|
||||
.iterateElement(1, query(MessageRecord.class)
|
||||
.eq(MessageRecord::getId, 3));
|
||||
for (MessageRecord record : records) {
|
||||
System.out.println("record=" + record);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user