REQ-2405: almost there

This commit is contained in:
yanglin 2024-04-25 18:19:42 +08:00
parent 77fcf015e0
commit 789b755c5c
21 changed files with 412 additions and 78 deletions

View File

@ -101,7 +101,7 @@ public class PendingMessageBizConfig {
private boolean formatExecSQL;
@Getter
private int oldMsgStatDataExpireHours = 6;
private int oldMsgStatCacheDataExpireHours = 8;
@Getter
private int oldMsgStateDataBackendUpdateBatchSize = 100;
@ -109,6 +109,12 @@ public class PendingMessageBizConfig {
@Getter
private long msgCenterMqSelfConsumeMaxExecMs = 20000L;
@Getter
private int nodeStatCacheMaxRequestNodeCodeSize = 10;
@Getter
private int nodeStatCacheDataExpireHours = 8;
public boolean hasMessageDetailStyle(String code) {
return name2DetailStyle != null && name2DetailStyle.containsKey(code);
}

View File

@ -3,6 +3,7 @@ package cn.axzo.msg.center.inside.notices.service.impl;
import cn.axzo.core.domain.PageResult;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.MessageModuleStatisticReq;
import cn.axzo.msg.center.api.request.MessagePageQueryReq;
@ -34,6 +35,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService;
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.message.service.PendingMessageNewService;
import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum;
import cn.azxo.framework.common.model.Page;
@ -99,7 +103,7 @@ public class MessageCoreServiceImpl implements MessageCoreService {
@Resource
private RawMessageRecordService rawMessageRecordService;
@Resource
private PendingMessageNewService pendingMessageNewService;
private MqProducer mqProducer;
public MsgRouteTypeEnum getSystemType(String systemType) {
/*String systemType = ContextInfoHolder.get().getSystemAndDeviceInfo().getSystemType();*/
@ -355,13 +359,18 @@ public class MessageCoreServiceImpl implements MessageCoreService {
// 阅读单条数据
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
request.getMsgId());
return;
} else {
List<Long> moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream()
.map(MessageModule::getId)
.collect(Collectors.toList());
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds);
}
List<Long> moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream()
.map(MessageModule::getId)
.collect(Collectors.toList());
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds);
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
mqMessage.setReadRequest(request);
mqProducer.send(MqMessageRecord
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
.build());
}
@Override

View File

@ -186,7 +186,7 @@ public class MessageRecordServiceImpl implements MessageRecordService {
pushMessages.addAll(messageRecords);
});
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
mqMessage.setRequest(message);
mqMessage.setSendRequest(message);
mqProducer.send(MqMessageRecord
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
.build());

View File

@ -49,6 +49,10 @@ public class MessageGroupNodeStatisticParam implements Serializable {
*/
private Boolean withIdentify;
public Long getPersonId() {
return getOperator().getId();
}
public static MessageGroupNodeStatisticParam from(PendingMessageStatisticRequest request) {
MessageGroupNodeStatisticParam param = BeanConverter.convert(request, MessageGroupNodeStatisticParam.class);
IdentityDTO identity = IdentityDTO.builder()

View File

@ -9,8 +9,7 @@ import lombok.Data;
@Data
public class OldMsgCacheReloadJobParam {
private static final int DEFAULT_SYNC_DAY = 60;
private static final int DEFAULT_BATCH_SIZE = 1000;
private static final int DEFAULT_SYNC_DAY = 30;
private int syncDay;
private int batchSize;
@ -18,7 +17,6 @@ public class OldMsgCacheReloadJobParam {
static OldMsgCacheReloadJobParam defaultParam() {
OldMsgCacheReloadJobParam defaultParam = new OldMsgCacheReloadJobParam();
defaultParam.setSyncDay(DEFAULT_SYNC_DAY);
defaultParam.setBatchSize(DEFAULT_BATCH_SIZE);
return defaultParam;
}
@ -26,10 +24,6 @@ public class OldMsgCacheReloadJobParam {
return syncDay <= 0 ? DEFAULT_SYNC_DAY : syncDay;
}
int determineBatchSize() {
return batchSize <= 0 ? DEFAULT_BATCH_SIZE : batchSize;
}
@Override
public String toString() {
return JSON.toJSONString(this);

View File

@ -6,11 +6,12 @@ import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.api.request.MessageReadAllReq;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl;
import cn.axzo.msg.center.message.service.impl.oldmsg.CacheValue.IdentityResponse;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse;
import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo;
import cn.axzo.msg.center.message.service.impl.person.PersonService;
import cn.axzo.msg.center.mq.MqMessageType;
@ -24,6 +25,7 @@ import cn.azxo.framework.common.model.Page;
import cn.hutool.core.date.StopWatch;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@ -62,7 +64,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
private final PendingMessageBizConfig cfg;
private final EventConsumer eventConsumer;
private final PersonService personService;
private final ForkJoinPool foregroundExecutor = new ForkJoinPool(20);
private final ForkJoinPool foregroundExecutor = new ForkJoinPool(15);
private final ForkJoinPool backgroundExecutor = new ForkJoinPool(10);
public GeneralMessageOldDataStatisticResponse getCacheResponseOrReload(GeneralMessageOldDataStatisticRequest request) {
@ -71,7 +73,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
}
private GeneralMessageOldDataStatisticResponse getCacheResponse(GeneralMessageOldDataStatisticRequest request) {
CacheValue cacheValue = getCacheValue(request.getPersonId());
OldMsgStatCacheValue cacheValue = getCacheValue(request.getPersonId());
if (cacheValue == null) {
log.info("oldMsgStat: cache miss, need to reload in foreground. request={}", request);
return null;
@ -91,7 +93,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
StopWatch stopWatch = new StopWatch(String.format(
"%s-reloadForeground", getClass().getSimpleName()));
stopWatch.start("initPersonCacheValueForeground");
CacheValue cacheValue = initPersonCacheValueForeground(request.getPersonId());
OldMsgStatCacheValue cacheValue = initPersonCacheValueForeground(request.getPersonId());
stopWatch.stop();
GeneralMessageOldDataStatisticResponse response = cacheValue
.findResponse(request.getIdentities())
@ -112,13 +114,13 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
.orElse(null);
}
public Map<Long, CacheValue> reloadBackground(Set<Long> personIds) throws Exception {
public Map<Long, OldMsgStatCacheValue> reloadBackground(Set<Long> personIds) throws Exception {
log.info("oldMsgStat: reload cache for person in background. personIds={}", JSON.toJSONString(personIds));
if (CollectionUtils.isEmpty(personIds)) {
log.info("oldMsgStat: personIds is empty");
return Collections.emptyMap();
}
HashMap<Long, CacheValue> personId2CacheValue = new HashMap<>();
HashMap<Long, OldMsgStatCacheValue> personId2CacheValue = new HashMap<>();
// 温柔点, 别一次性全丢到线程池
int bachSize = cfg.getOldMsgStateDataBackendUpdateBatchSize();
List<List<Long>> batches = Lists.partition(new ArrayList<>(personIds), bachSize);
@ -126,7 +128,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
ArrayList<CompletableFuture<PersonAndCacheValue>> futures = new ArrayList<>();
for (Long personId : batch) {
futures.add(CompletableFuture.supplyAsync(() -> {
CacheValue cacheValue = initPersonCacheValueBackground(personId);
OldMsgStatCacheValue cacheValue = initPersonCacheValueBackground(personId);
setCacheValue(personId, cacheValue);
return new PersonAndCacheValue(personId, cacheValue);
}, backgroundExecutor));
@ -140,13 +142,13 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
return personId2CacheValue;
}
private CacheValue initPersonCacheValueForeground(Long personId) {
private OldMsgStatCacheValue initPersonCacheValueForeground(Long personId) {
log.info("initPersonCacheValueForeground. personId={}", personId);
Function<Boolean, IdentityResponse> buildResponseFun = asManager -> {
OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager);
return getIdentityResponse(personId, identifyInfo.getIdentities());
};
CacheValue cacheValue = new CacheValue();
OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue();
CompletableFuture<IdentityResponse> asManagerFuture = CompletableFuture
.supplyAsync(() -> buildResponseFun.apply(true), foregroundExecutor);
CompletableFuture<IdentityResponse> asWorkerFuture = CompletableFuture
@ -162,13 +164,13 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
return cacheValue;
}
private CacheValue initPersonCacheValueBackground(Long personId) {
private OldMsgStatCacheValue initPersonCacheValueBackground(Long personId) {
log.info("initPersonCacheValueBackground. personId={}", personId);
Function<Boolean, IdentityResponse> buildResponseFun = asManager -> {
OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager);
return getIdentityResponse(personId, identifyInfo.getIdentities());
};
CacheValue cacheValue = new CacheValue();
OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue();
cacheValue.addIdentityResponse(buildResponseFun.apply(true));
cacheValue.addIdentityResponse(buildResponseFun.apply(false));
return cacheValue;
@ -187,10 +189,10 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
// !! cache helper
private void setCacheValue(Long personId, CacheValue cacheValue) {
private void setCacheValue(Long personId, OldMsgStatCacheValue cacheValue) {
log.info("set cache value for person. personId={}", personId);
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getOldMsgStatDataExpireHours());
// with random seconds delta, 0-600 seconds
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getOldMsgStatCacheDataExpireHours());
// add some random delta seconds, range: 0-600 seconds
secondsDelta += new SecureRandom().nextInt(10 * 60);
String redisKey = buildCacheKey(personId);
@ -198,10 +200,10 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS);
}
private CacheValue getCacheValue(Long personId) {
private OldMsgStatCacheValue getCacheValue(Long personId) {
String redisKey = buildCacheKey(personId);
String json = stringRedisTemplate.opsForValue().get(redisKey);
return StringUtils.isBlank(json) ? null : JSON.parseObject(json, CacheValue.class);
return StringUtils.isBlank(json) ? null : JSON.parseObject(json, OldMsgStatCacheValue.class);
}
private static String buildCacheKey(Long personId) {
@ -273,12 +275,23 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
private void handleMqMessage(Event event) throws Exception {
SendMessageRecordMessage mqMessage = event.normalizedData(SendMessageRecordMessage.class);
GeneralMessageReq request = mqMessage.getRequest();
Map<Long, Long> knownIdentityId2PersonId = request.getToldIdPersonIdMap();
GeneralMessageReq sendRequest = mqMessage.getSendRequest();
MessageReadAllReq readRequest = mqMessage.getReadRequest();
if (sendRequest != null) {
log.info("handle mq event - driven by sendRequest={}", sendRequest);
handleSendRequestEvent(sendRequest);
} else if (readRequest != null) {
log.info("handle mq event - driven by readRequest={}", readRequest);
handleReadRequest(readRequest);
}
}
private void handleSendRequestEvent(GeneralMessageReq sendRequest) throws Exception {
Map<Long, Long> knownIdentityId2PersonId = sendRequest.getToldIdPersonIdMap();
if (knownIdentityId2PersonId == null)
knownIdentityId2PersonId = new HashMap<>();
HashSet<Long> personIds = new HashSet<>(knownIdentityId2PersonId.values());
Set<Long> receiverIdentityIds = request.getToId();
Set<Long> receiverIdentityIds = sendRequest.getToId();
if (receiverIdentityIds == null)
receiverIdentityIds = Collections.emptySet();
HashSet<Long> toQueryPersonIdIdentityIds = new HashSet<>();
@ -290,11 +303,19 @@ public class OldMsgStatCache implements EventHandler, InitializingBean {
PersonIdInfo personIdInfo = personService.getPersonIdsByIdentities(toQueryPersonIdIdentityIds);
personIds.addAll(personIdInfo.getPersonIds());
}
StopWatch stopWatch = new StopWatch("reloadBackground driven by mq event");
stopWatch.start("reloadBackground");
StopWatch stopWatch = new StopWatch("reloadBackground");
stopWatch.start("handleSendRequestEvent");
reloadBackground(personIds);
stopWatch.stop();
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
private void handleReadRequest(MessageReadAllReq readRequest) throws Exception {
StopWatch stopWatch = new StopWatch("reloadBackground");
stopWatch.start("handleReadRequest");
reloadBackground(Sets.newHashSet(readRequest.getPersonId()));
stopWatch.stop();
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
}

View File

@ -57,10 +57,10 @@ public class OldMsgStatCacheReloadJob extends IJobHandler {
Date sinceDate = DateTime.now().minusDays(jobParam.determineSyncDay()).toDate();
StopWatch stopWatch = new StopWatch("reloadBackground driven by job");
Set<Long> reloadedPersonIds = new HashSet<>();
Iterable<Page<MessageRecord>> pages = messageRecordMapper.iterateBath(
jobParam.determineBatchSize(), query(MessageRecord.class)
Iterable<Page<MessageRecord>> pages = messageRecordMapper
.iterateBath(query(MessageRecord.class)
.ge(MessageRecord::getCreateAt, sinceDate)
// 刷最近的
// 刷最近的
.orderByDesc(MessageRecord::getId));
for (Page<MessageRecord> page : pages) {
stopWatch.start("reload stat, records size=" + page.getRecords().size());

View File

@ -16,7 +16,7 @@ import java.util.Set;
* @author yanglin
*/
@Data
public class CacheValue {
public class OldMsgStatCacheValue {
private List<IdentityResponse> identityResponses;

View File

@ -11,7 +11,7 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class PersonAndCacheValue {
private final Long personId;
private final CacheValue cacheValue;
private final OldMsgStatCacheValue cacheValue;
@Override
public String toString() {

View File

@ -1,5 +1,6 @@
package cn.axzo.msg.center.message.service.todo;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.maokai.api.util.Ref;
import cn.axzo.maokai.api.vo.response.tree.ValueNode;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
@ -14,6 +15,7 @@ import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.message.service.group.GroupTemplateService;
import cn.axzo.msg.center.message.service.group.NodeWrapper;
import cn.axzo.msg.center.message.service.impl.PendingMessageNewServiceImpl;
import cn.axzo.msg.center.message.service.todo.cache.NodeStatCache;
import cn.axzo.msg.center.message.service.todo.mybatis.CollectSQLInterceptor;
import cn.axzo.msg.center.message.service.todo.pagequery.PageQuerySort;
import cn.axzo.msg.center.message.service.todo.queryanalyze.SimpleAnalyzer;
@ -44,6 +46,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.Supplier;
@ -67,6 +70,8 @@ public class TodoRangeQueryService {
private final TodoRespBuilder todoRespBuilder;
private final AnalysisConfig analysisConfig;
private final GroupTemplateService groupTemplateService;
private final NodeStatCache nodeStatCache;
private final ForkJoinPool statExecutor = new ForkJoinPool(20);
// !! page query
@ -224,6 +229,10 @@ public class TodoRangeQueryService {
// !! stat
public PendingMessageStatisticResponseV2 countStatGrouped(MessageGroupNodeStatisticParam request) {
return nodeStatCache.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request));
}
private PendingMessageStatisticResponseV2 countStatGroupedImpl(MessageGroupNodeStatisticParam request) {
List<ValueNode<NodeWrapper>> nodes = determineStatNodes(request);
Function<ValueNode<NodeWrapper>, GroupStat> nodeStatFun = valueNode -> {
int executableCount = countStatByNode(request, valueNode, TodoType.EXECUTABLE);
@ -236,20 +245,30 @@ public class TodoRangeQueryService {
groupStat.setStat(new Stat(executableCount, copiedToMeCount));
return groupStat;
};
PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2();
List<GroupStat> stats = nodes.parallelStream().map(nodeStatFun).collect(toList());
for (GroupStat stat : stats)
resp.addGroupStat(stat);
resp.setTotalStat();
return resp;
try {
PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2();
// 用一个偏大的线程池, IO密集型任务, commonPool不够用
List<GroupStat> stats = statExecutor
.submit(() -> nodes.parallelStream().map(nodeStatFun).collect(toList()))
.get();
for (GroupStat stat : stats)
resp.addGroupStat(stat);
resp.setTotalStat();
return resp;
} catch (Exception e) {
log.warn("分类统计异常, request={}", request, e);
throw new ServiceException("分类统计, 请稍后重试");
}
}
/**
* app首页
*/
public Integer countStat(MessageGroupNodeStatisticParam request) {
List<ValueNode<NodeWrapper>> nodes = determineStatNodes(request);
return countStatByNodes(request, nodes, TodoType.EXECUTABLE);
PendingMessageStatisticResponseV2 nodeStat = nodeStatCache
.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request));
Stat totalStat = nodeStat.getTotalStat();
return totalStat == null ? 0 : totalStat.getPendingCount();
}
private int countStatByNode(MessageGroupNodeStatisticParam request,
@ -265,7 +284,7 @@ public class TodoRangeQueryService {
OuInfo ouInfo = OuInfo.create(request.getOuId(), request.getTerminalType(), todoType);
LambdaQueryWrapper<Todo> query = todoQuery(ouInfo)
.in(Todo::getTemplateCode, templateCodes)
.eq(Todo::getExecutorPersonId, request.getOperator().getId())
.eq(Todo::getExecutorPersonId, request.getPersonId())
.and(todoType == TodoType.EXECUTABLE, nested -> nested
.eq(Todo::getType, TodoType.EXECUTABLE)
.eq(Todo::getState, PendingMessageStateEnum.HAS_BEEN_SENT))
@ -276,8 +295,15 @@ public class TodoRangeQueryService {
}
public List<ValueNode<NodeWrapper>> determineStatNodes(MessageGroupNodeStatisticParam request) {
List<ValueNode<NodeWrapper>> nodes = CollectionUtils.isNotEmpty(request.getGroupNodeCodes())
? groupTemplateService.getGroupRoot().getNodes(request.getGroupNodeCodes())
Collection<String> nodeCodes = request.getGroupNodeCodes();
if (nodeCodes == null) {
nodeCodes = Collections.emptyList();
}
nodeCodes = nodeCodes.stream()
.filter(StringUtils::isNotBlank)
.collect(toList());
List<ValueNode<NodeWrapper>> nodes = CollectionUtils.isNotEmpty(nodeCodes)
? groupTemplateService.getGroupRoot().getNodes(nodeCodes)
: groupTemplateService.getTodoGroups(request.getTerminalType());
// 只返回给前端叶子节点的信息, 前端不需要层级信息
return groupTemplateService.collectLeafNodes(nodes);

View File

@ -0,0 +1,99 @@
package cn.axzo.msg.center.message.service.todo.cache;
import cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper;
import cn.axzo.msg.center.dal.mapper.TodoMapper;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NodeStatCache {
private final MessageBaseTemplateMapper messageBaseTemplateMapper;
private final TodoMapper todoMapper;
private final StringRedisTemplate stringRedisTemplate;
private final PendingMessageBizConfig cfg;
public PendingMessageStatisticResponseV2 getCacheResponseOrReload(
MessageGroupNodeStatisticParam request, Supplier<PendingMessageStatisticResponseV2> loader) {
Collection<String> nodeCodes = request.getGroupNodeCodes();
if (CollectionUtils.isNotEmpty(nodeCodes) && nodeCodes.size() > cfg.getNodeStatCacheMaxRequestNodeCodeSize()) {
log.info("超过了允许缓存的最大请求节点编码数量,不使用缓存, 直接从数据库获取. request={}", request);
return loader.get();
}
Date templateLatestUpdateTime = messageBaseTemplateMapper.getTemplateLatestUpdateTime();
Date todoLatestUpdateTime = todoMapper.getTodoLatestUpdateTime(request.getPersonId());
NodeStatCacheValue cacheValue = getCacheValue(request);
if (cacheValue == null || cacheValue.shouldReload(templateLatestUpdateTime, todoLatestUpdateTime)) {
log.info("reloading node stat data, request={}", request);
PendingMessageStatisticResponseV2 response = loader.get();
cacheValue = new NodeStatCacheValue();
cacheValue.setResponse(response);
cacheValue.setTemplateLatestUpdateTime(templateLatestUpdateTime);
cacheValue.setTodoLatestUpdateTime(todoLatestUpdateTime);
setCacheValue(request, cacheValue);
}
return cacheValue.getResponse();
}
// !! cache helper
private NodeStatCacheValue getCacheValue(MessageGroupNodeStatisticParam request) {
String redisKey = buildCacheKey(request);
String json = stringRedisTemplate.opsForValue().get(redisKey);
return StringUtils.isBlank(json) ? null : JSON.parseObject(json, NodeStatCacheValue.class);
}
private void setCacheValue(MessageGroupNodeStatisticParam request, NodeStatCacheValue cacheValue) {
log.info("set cache value for request. request={}", request);
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getNodeStatCacheDataExpireHours());
// add some random delta seconds, range: 0-600 seconds
secondsDelta += new SecureRandom().nextInt(10 * 60);
String redisKey = buildCacheKey(request);
String redisValue = JSON.toJSONString(cacheValue);
stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS);
}
private static String buildCacheKey(MessageGroupNodeStatisticParam request) {
StringBuffer buf = new StringBuffer();
buf.append("msg_center:node_stat:v1");
Consumer<Object> appender = value -> {
if (value != null)
buf.append(":").append(value);
};
appender.accept(request.getPersonId());
appender.accept(request.getOuId());
appender.accept(request.getTerminalType());
Collection<String> nodeCodes = request.getGroupNodeCodes();
if (CollectionUtils.isNotEmpty(nodeCodes)) {
// 排序避免编码顺序和重复编码的影响
nodeCodes.stream()
.filter(StringUtils::isNotBlank)
.distinct()
.sorted()
.forEach(appender);
}
return buf.toString();
}
}

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.message.service.todo.cache;
import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2;
import lombok.Data;
import java.util.Date;
import java.util.Objects;
/**
* @author yanglin
*/
@Data
class NodeStatCacheValue {
private Date templateLatestUpdateTime;
private Date todoLatestUpdateTime;
private PendingMessageStatisticResponseV2 response;
boolean shouldReload(Date templateLatestUpdateTime, Date todoLatestUpdateTime) {
return !Objects.equals(this.templateLatestUpdateTime, templateLatestUpdateTime)
|| !Objects.equals(this.todoLatestUpdateTime, todoLatestUpdateTime);
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.msg.center.api.mq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.api.request.MessageReadAllReq;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -12,5 +13,6 @@ import java.io.Serializable;
@Data
@EqualsAndHashCode(callSuper = true)
public class SendMessageRecordMessage extends MqMessage implements Serializable {
private GeneralMessageReq request;
private GeneralMessageReq sendRequest;
private MessageReadAllReq readRequest;
}

View File

@ -4,6 +4,9 @@ import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import javax.annotation.Nullable;
import java.util.Date;
/**
* @description
* 消息基础模板Mapper
@ -13,4 +16,8 @@ import org.apache.ibatis.annotations.Mapper;
*/
@Mapper
public interface MessageBaseTemplateMapper extends BaseMapper<MessageBaseTemplate> {
@Nullable
Date getTemplateLatestUpdateTime();
}

View File

@ -5,6 +5,7 @@ import cn.axzo.msg.center.domain.entity.Todo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.List;
@ -21,4 +22,7 @@ public interface TodoMapper extends BaseMapper<Todo> {
List<Long> getMigratedBusinessId();
@Nullable
Date getTodoLatestUpdateTime(@Param("executorPersonId") Long executorPersonId);
}

View File

@ -9,55 +9,121 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
*/
public interface IterableMapper<T> extends BaseMapper<T> {
int DEFAULT_BATCH_SIZE = 2000;
/**
* 遍历数据
* <p>对查询出的数据的操作<b>肯定不会</b>更新query中查询条件包含的字段
* foreach遍历数据. 默认批量大小
*
* <p>对查询出的数据的操作<b>肯定不会</b>更新query查询条件包含的字段
*/
default Iterable<Page<T>> iterateBath(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBath(this, DEFAULT_BATCH_SIZE, query);
}
/**
* foreach遍历数据
*
* <p>对查询出的数据的操作<b>肯定不会</b>更新query查询条件包含的字段
*/
default Iterable<Page<T>> iterateBath(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBath(this, pageSize, query);
}
/**
* 遍历数据
* <p>对查询出的数据的操作<b>肯定不会</b>更新query中查询条件包含的字段
* foreach遍历数据. 默认批量大小
* <p>对查询出的数据的操作<b>肯定不会</b>更新query查询条件包含的字段
*/
default Iterable<T> iterateElement(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElement(this, DEFAULT_BATCH_SIZE, query);
}
/**
* foreach遍历数据
* <p>对查询出的数据的操作<b>肯定不会</b>更新query查询条件包含的字段
*/
default Iterable<T> iterateElement(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElement(this, pageSize, query);
}
/**
* 遍历数据
* <p>对查询出的数据的操作<b>肯定会</b>更新query中查询条件包含的字段
* <p>
* foreach遍历数据. 默认批量大小
*
* <p>对查询出的数据的操作<b>肯定会</b>更新query查询条件包含的字段
* <blockquote><pre>
* LambdaQueryWrapper<MessageRecord> query = Queries.query(MessageRecord.class)
* .eq(MessageRecord::getWorkspaceId, workspaceId)
* // 通过状态查询
* .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
* for(Page<T> page : iterateBathDoUpdateQueryFields(1000, query)) {
* for(T record : page.getRecords()) {
* for(Page<MessageRecord> page : iterateBathDoUpdateQueryFields(1000, query)) {
* for(MessageRecord record : page.getRecords()) {
* // 更新状态
* record.setState(MsgStateEnum.HAS_BEEN_SENT);
* messageRecordMapper.updateById(record);
* }
* }
* </pre></blockquote>
*/
default Iterable<Page<T>> iterateBathDoUpdateQueryFields(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBathDoUpdateQueryFields(this, DEFAULT_BATCH_SIZE, query);
}
/**
* foreach遍历数据
*
* <p>对查询出的数据的操作<b>肯定会</b>更新query查询条件包含的字段
* <blockquote><pre>
* LambdaQueryWrapper<MessageRecord> query = Queries.query(MessageRecord.class)
* .eq(MessageRecord::getWorkspaceId, workspaceId)
* // 通过状态查询
* .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
* for(Page<MessageRecord> page : iterateBathDoUpdateQueryFields(1000, query)) {
* for(MessageRecord record : page.getRecords()) {
* // 更新状态
* record.setState(MsgStateEnum.HAS_BEEN_SENT);
* messageRecordMapper.updateById(record);
* }
* }
* </pre></blockquote>
*/
default Iterable<Page<T>> iterateBathDoUpdateQueryFields(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBathDoUpdateQueryFields(this, pageSize, query);
}
/**
* 遍历数据
* <p>对查询出的数据的操作<b>肯定会</b>更新query中查询条件包含的字段
* <p>
* foreach遍历数据. 默认批量大小
*
* <p>对查询出的数据的操作<b>肯定会</b>更新query查询条件包含的字段
* <blockquote><pre>
* LambdaQueryWrapper<MessageRecord> query = Queries.query(MessageRecord.class)
* .eq(MessageRecord::getWorkspaceId, workspaceId)
* // 通过状态查询
* .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
* for(T record : iterateElementDoUpdateQueryFields(1000, query)) {
* for(MessageRecord record : iterateElementDoUpdateQueryFields(1000, query)) {
* // 更新状态
* record.setState(MsgStateEnum.HAS_BEEN_SENT);
* messageRecordMapper.updateById(record);
* }
* </pre></blockquote>
*/
default Iterable<T> iterateElementDoUpdateQueryFields(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElementDoUpdateQueryFields(this, DEFAULT_BATCH_SIZE, query);
}
/**
* foreach遍历数据
*
* <p>对查询出的数据的操作<b>肯定会</b>更新query查询条件包含的字段
* <blockquote><pre>
* LambdaQueryWrapper<MessageRecord> query = Queries.query(MessageRecord.class)
* .eq(MessageRecord::getWorkspaceId, workspaceId)
* // 通过状态查询
* .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
* for(MessageRecord record : iterateElementDoUpdateQueryFields(1000, query)) {
* // 更新状态
* record.setState(MsgStateEnum.HAS_BEEN_SENT);
* messageRecordMapper.updateById(record);
* }
* </pre></blockquote>
*/
default Iterable<T> iterateElementDoUpdateQueryFields(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElementDoUpdateQueryFields(this, pageSize, query);

View File

@ -3,12 +3,15 @@ package cn.axzo.msg.center.util;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.beans.BeanUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* 不要直接使用这个类, {@link IterableMapper}
*
* @author yanglin
*/
class RecordIterable {
@ -26,13 +29,13 @@ class RecordIterable {
static <T, M extends BaseMapper<T>> Iterable<Page<T>> iterateBathDoUpdateQueryFields(
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
return () -> new BatchIterator<>(mapper, pageSize, query, PageStrategy.WILL_UPDATE_QUERY_FIELDS);
return () -> new BatchIterator<>(mapper, pageSize, query, PageStrategy.DO_UPDATE_QUERY_FIELDS);
}
static <T, M extends BaseMapper<T>> Iterable<T> iterateElementDoUpdateQueryFields(
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
return () -> new ElementIterator<>(
new BatchIterator<>(mapper, pageSize, query, PageStrategy.WILL_UPDATE_QUERY_FIELDS));
new BatchIterator<>(mapper, pageSize, query, PageStrategy.DO_UPDATE_QUERY_FIELDS));
}
private static class ElementIterator<T, M extends BaseMapper<T>> implements Iterator<T> {
@ -58,10 +61,11 @@ class RecordIterable {
}
private void maybeAdvance() {
if (current == null || !current.hasNext() && batch.hasNext())
if (current == null || !current.hasNext() && batch.hasNext()) {
current = batch.hasNext()
? batch.next().getRecords().iterator()
: Collections.emptyIterator();
}
}
}
@ -87,7 +91,7 @@ class RecordIterable {
public boolean hasNext() {
maybeFetchNextPage();
// 1. 很明显的还有下一页
// 2. 看当前页的数据消费情况 (第一页和最后一页)
// 2. 看当前页的数据以及当前页的消费情况 (第一页和最后一页)
return page.getCurrent() < page.getPages() || (!pageConsumed && !page.getRecords().isEmpty());
}
@ -97,11 +101,8 @@ class RecordIterable {
throw new NoSuchElementException();
pageConsumed = true;
Page<T> copy = new Page<>();
copy.setRecords(page.getRecords());
copy.setTotal(page.getTotal());
copy.setSize(page.getSize());
copy.setCurrent(page.getCurrent());
copy.setOrders(page.orders());
// 不能暴露内部状态. 如果外部改动了内部的page, 会影响到下次迭代
BeanUtils.copyProperties(page, copy);
return copy;
}
@ -119,6 +120,6 @@ class RecordIterable {
}
private enum PageStrategy {
ADVANCE_CURRENT_PAGE, WILL_UPDATE_QUERY_FIELDS
ADVANCE_CURRENT_PAGE, DO_UPDATE_QUERY_FIELDS
}
}

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper">
<select id="getTemplateLatestUpdateTime" resultType="date">
SELECT max(update_at) FROM message_base_template
</select>
</mapper>

View File

@ -16,5 +16,9 @@
AND record_ext -> '$.isMigratedFromPendingMessage' = true
</select>
<select id="getTodoLatestUpdateTime" resultType="date">
SELECT max(update_at) FROM todo WHERE executor_person_id = #{executorPersonId}
</select>
</mapper>

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.msg.center.MsgCenterApplication;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class OldMsgStatCacheReloadJobTest {
private final OldMsgStatCacheReloadJob oldMsgStatCacheReloadJob;
@Test
void foo() throws Exception {
oldMsgStatCacheReloadJob.execute(null);
}
}

View File

@ -0,0 +1,34 @@
package cn.axzo.msg.center.util;
import cn.axzo.msg.center.MsgCenterApplication;
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.inside.notices.utils.Queries;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class RecordIterableTest {
private final MessageRecordMapper messageRecordMapper;
@Test
void foo() {
Iterable<MessageRecord> records = messageRecordMapper
.iterateElement(1, query(MessageRecord.class)
.eq(MessageRecord::getId, 3));
for (MessageRecord record : records) {
System.out.println("record=" + record);
}
}
}