Merge branch 'feature/REQ-1073' into 'master'

Feature/req 1073

See merge request universal/infrastructure/backend/msg-center-plat!34
This commit is contained in:
罗福 2023-09-25 12:54:35 +00:00
commit 19a8083c25
17 changed files with 1203 additions and 4 deletions

View File

@ -0,0 +1,27 @@
package cn.axzo.msg.center.inside.notices.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
/**
* @description
* 消息的系统配置
* @author cold_blade
* @date 2023/8/31
* @version 1.0
*/
@Getter
@Setter
@RefreshScope
@Configuration
public class MessageSystemConfig {
/**
* 消息记录的冷热隔离线-1表示不进行冷热隔离
*/
@Value("${message.common.record.divide-days:-1}")
private Integer dataDivideDays;
}

View File

@ -0,0 +1,127 @@
package cn.axzo.msg.center.inside.notices.config;
import cn.axzo.msg.center.api.enums.MsgTempBizCategoryEnum;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyTypeRes;
import com.alibaba.fastjson.JSON;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @description
* 待办消息的相关业务类配置
* @author cold_blade
* @date 2023/8/31
* @version 1.0
*/
@Setter
@RefreshScope
@Configuration
@ConfigurationProperties(prefix = "message.pending")
public class PendingMessageBizConfig {
private Map<MsgTempBizCategoryEnum, List<Long>> category;
private Map<MsgTempBizCategoryEnum, Integer> categoryOrder;
private Map<MsgTempBizCategoryEnum, String> categoryIcon;
private Map<MsgTempBizCategoryEnum, String> categoryJumpUrl;
public List<MsgTempBizCategoryConfig> tempCategoriesConfig() {
return Arrays.stream(MsgTempBizCategoryEnum.values())
.filter(e -> category.containsKey(e))
.map(this::build)
.filter(MsgTempBizCategoryConfig::isValid)
.collect(Collectors.toList());
}
private MsgTempBizCategoryConfig build(MsgTempBizCategoryEnum category) {
return MsgTempBizCategoryConfig.build(category, this.category.get(category),
categoryOrder.get(category),
categoryIcon.get(category),
categoryJumpUrl.get(category));
}
@Setter
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public static class MsgTempBizCategoryConfig implements Comparable<MsgTempBizCategoryConfig> {
/**
* 消息临时分类的枚举
*/
private MsgTempBizCategoryEnum category;
/**
* 消息临时分类映射的关联关系id集合
*/
private Set<Long> configRelationIds;
/**
* 消息临时分类页面展示顺序
*/
private Integer sortOrder;
/**
* 图标地址
*/
private String iconUrl;
/**
* 指定的跳转地址
*/
private String jumpUrl;
public static MsgTempBizCategoryConfig build(MsgTempBizCategoryEnum category, List<Long> configRelationIds,
Integer sortOrder, String iconUrl, String jumpUrl) {
// 去掉首尾的无效空格
jumpUrl = Optional.ofNullable(jumpUrl).map(String::trim).orElse(null);
return new MsgTempBizCategoryConfig(category, new HashSet<>(configRelationIds), sortOrder, iconUrl, jumpUrl);
}
public PendingMessageTemporarilyTypeRes toTemporarilyType() {
PendingMessageTemporarilyTypeRes type = new PendingMessageTemporarilyTypeRes();
type.setCategoryCode(this.category.name());
type.setCategoryDesc(this.category.getDesc());
type.setIconUrl(this.iconUrl);
return type;
}
public boolean isValid() {
return CollectionUtils.isNotEmpty(configRelationIds)
&& Objects.nonNull(sortOrder)
&& StringUtils.isNotBlank(iconUrl);
}
public boolean match(Long relationId) {
return isValid() && configRelationIds.contains(relationId);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Override
public int compareTo(@Nullable MsgTempBizCategoryConfig tgt) {
if (Objects.isNull(tgt)) {
// 默认自然排序.null对象做兼容
return 1;
}
return sortOrder.compareTo(tgt.sortOrder);
}
}
}

View File

@ -0,0 +1,51 @@
package cn.axzo.msg.center.inside.notices.controller.pending;
import cn.axzo.msg.center.api.PendingMessageServiceApi;
import cn.axzo.msg.center.api.request.PendingMessagePageReq;
import cn.axzo.msg.center.api.request.PendingMessageTemporarilyStatisticReq;
import cn.axzo.msg.center.api.response.PendingMessageStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyTypeRes;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.inside.notices.service.PendingMessageService;
import cn.azxo.framework.common.model.CommonResponse;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
/**
* @description
* 待办消息的controller
* 由于需要给二方服务提供SDK为了防止URL的重复定义这里选择实现SDK的API接口
*
* @author cold_blade
* @date 2023/8/30
* @version 1.0
*/
@RestController
public class PendingMessageStatisticController implements PendingMessageServiceApi {
@Resource
private PendingMessageService pendingMessageService;
@Resource
private PendingMessageBizConfig pendingMessageBizConfig;
@Override
public CommonResponse<List<PendingMessageTemporarilyStatisticRes>> temporarilyStatistic(
PendingMessageTemporarilyStatisticReq request) {
return CommonResponse.success(pendingMessageService.temporarilyStatistic(request));
}
@Override
public CommonResponse<PendingMessageStatisticRes> temporarilyPage(PendingMessagePageReq request) {
return CommonResponse.success(pendingMessageService.temporarilyPage(request));
}
@Override
public CommonResponse<List<PendingMessageTemporarilyTypeRes>> listTemporarilyTypes() {
return CommonResponse.success(pendingMessageService.listTemporarilyTypes());
}
}

View File

@ -0,0 +1,43 @@
package cn.axzo.msg.center.inside.notices.service;
import cn.axzo.msg.center.api.request.PendingMessagePageReq;
import cn.axzo.msg.center.api.request.PendingMessageTemporarilyStatisticReq;
import cn.axzo.msg.center.api.response.PendingMessageStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyTypeRes;
import java.util.List;
/**
* 代办类型的消息相关功能接口
*
* @author cold_blade
* @version 1.0
* @date 2023/8/30
*/
public interface PendingMessageService {
/**
* 统计代办类消息
*
* @param request 统计请求的入参
* @return 符合条件的代办数量
*/
List<PendingMessageTemporarilyStatisticRes> temporarilyStatistic(PendingMessageTemporarilyStatisticReq request);
/**
* 根据消息的临时的业务分类分页查询个人相关的待办消息
*
* @param request 分页查询的入参
* @return 符合条件的待办消息
*/
PendingMessageStatisticRes temporarilyPage(PendingMessagePageReq request);
/**
* 列表查询待办消息临时分类
* Notice: 该接口是一个临时解决方案不建议其它业务场景采用该接口
*
* @return 待办消息临时分类的列表
*/
List<PendingMessageTemporarilyTypeRes> listTemporarilyTypes();
}

View File

@ -0,0 +1,444 @@
package cn.axzo.msg.center.inside.notices.service.impl;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.enums.MsgTempBizCategoryEnum;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import cn.axzo.msg.center.api.request.PendingMessagePageReq;
import cn.axzo.msg.center.api.request.PendingMessageTemporarilyStatisticReq;
import cn.axzo.msg.center.api.response.MessageRouterInfoRes;
import cn.axzo.msg.center.api.response.PendingMessageBriefRes;
import cn.axzo.msg.center.api.response.PendingMessageStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyTypeRes;
import cn.axzo.msg.center.common.utils.BeanConvertUtils;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.dal.MessageRecordDao;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.entity.MessageRouter;
import cn.axzo.msg.center.domain.enums.MsgRouteTypeEnum;
import cn.axzo.msg.center.domain.enums.NativeTypeEnum;
import cn.axzo.msg.center.domain.enums.YesNoEnum;
import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig.MsgTempBizCategoryConfig;
import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
import cn.axzo.msg.center.inside.notices.service.PendingMessageService;
import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum;
import cn.axzo.msg.center.notices.common.exception.BizException;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.google.common.collect.Lists;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class PendingMessageServiceImpl implements PendingMessageService {
private static final List<MsgStateEnum> SEND_SUCCESSFULLY_STATES = Lists.newArrayList(
MsgStateEnum.HAS_BEEN_SENT,
MsgStateEnum.RECEIVED
);
private static final List<MsgStateEnum> ALL_WITHOUT_UNSENT = Lists.newArrayList(
MsgStateEnum.HAS_BEEN_SENT,
MsgStateEnum.RECEIVED,
MsgStateEnum.COMPLETE
);
private static final List<ReceiveTypeEnum> SPECIFY_IDENTIFIES = Lists.newArrayList(
ReceiveTypeEnum.CM_WORKER,
ReceiveTypeEnum.CM_LEADER
);
private final ThreadFactory statisticTaskThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true)
.setNameFormat("MODULE_STATISTIC_TASK_%d")
.get();
private final ExecutorService stateStatisticTaskThreadPool = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), statisticTaskThreadFactory);
private final MessageRecordDao messageRecordDao;
private final MessageSystemConfig messageSystemConfig;
private final MessageRouterService messageRouterService;
private final PendingMessageBizConfig pendingMessageBizConfig;
@Override
public List<PendingMessageTemporarilyStatisticRes> temporarilyStatistic(PendingMessageTemporarilyStatisticReq request) {
// 参数检测
checkParam(request);
// 获取数据冷热隔离的分界
final LocalDateTime queryFrom = getQueryFromTime();
final List<MsgTempBizCategoryConfig> categoryConfigs = pendingMessageBizConfig.tempCategoriesConfig();
if (CollectionUtils.isEmpty(categoryConfigs)) {
log.warn("the nacos config is lost......");
return Collections.emptyList();
}
try {
return categoryConfigs.stream()
// 自然排序,前端仅做渲染,展示顺序的工作由后端配置
.sorted()
.map(e -> statisticAndPickFirst(request, e, SPECIFY_IDENTIFIES, queryFrom))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("broke out some exception while statistic pending message.", e);
throw new BizException(ReturnCodeEnum.SYSTEM_ERROR, "system exception.");
}
}
@Override
public PendingMessageStatisticRes temporarilyPage(PendingMessagePageReq request) {
IPage<MessageRecord> page = request.toPage();
// 获取数据冷热隔离的分界
final LocalDateTime queryFrom = getQueryFromTime();
// 消息临时分类的配置
final List<MsgTempBizCategoryConfig> categoryConfigs = pendingMessageBizConfig.tempCategoriesConfig();
// FIXME: 这里由于是临时接口不具有普适性这里是链路上传参的问题的临时解决方案 {@see ReceiveTypeEnum#CM_WORKER}
request.setIdentifyTypes(SPECIFY_IDENTIFIES);
// FIXME: 这里前端可以不传临时的业务分类标识若不传的话查询配置指定的几个分类的合集
Set<Long> relationIds = fetchRelationIdFromRequestOrConfig(request, categoryConfigs);
if (CollectionUtils.isEmpty(relationIds)) {
log.warn("the tempBizCategory is invalid. {}", request.getTempBizCategory());
return PendingMessageStatisticRes.builder()
.totalElements(0L)
.pageNum(page.getCurrent())
.pageSize(page.getSize())
.unCompleteCnt(0)
.completeCnt(0)
.briefResList(Collections.emptyList())
.build();
}
// 这里对前端未传入参的情况做一个兜底
Collection<MsgStateEnum> msgStates = CollectionUtils.isEmpty(request.getStates()) ? ALL_WITHOUT_UNSENT :
request.getStates();
String unCompleteStateCntKey = "unCompleteCnt";
String completeStateCntKey = "completeCnt";
List<StateStatisticModule> modules = Lists.newArrayList(
StateStatisticModule.build(unCompleteStateCntKey, SEND_SUCCESSFULLY_STATES),
StateStatisticModule.build(completeStateCntKey, Collections.singletonList(MsgStateEnum.COMPLETE))
);
Map<String, Integer> stateStatisticMap = statistic(modules, relationIds, request, queryFrom);
// 分页查询message_record的数据
page = page(request.getPersonId(), request.getIdentifyId(), request.getIdentifyTypes(), relationIds,
msgStates, queryFrom, page);
if (CollectionUtils.isEmpty(page.getRecords())) {
// 列表为空的场景
return PendingMessageStatisticRes.builder()
.totalElements(page.getTotal())
.pageNum(page.getCurrent())
.pageSize(page.getSize())
.unCompleteCnt(stateStatisticMap.get(unCompleteStateCntKey))
.completeCnt(stateStatisticMap.get(completeStateCntKey))
.briefResList(Collections.emptyList())
.build();
}
// 查询消息对应的路由集
Map<Long, MessageRouterInfoRes> routerMap = listRouters(page.getRecords(), request.getSystemType());
// 原始的数据库模型数据转化为页面展示的数据模型
List<PendingMessageBriefRes> msgRecordBriefs = page.getRecords().stream()
.map(e -> convert(e, categoryConfigs, routerMap))
.collect(Collectors.toList());
// 列表为空的场景
return PendingMessageStatisticRes.builder()
.totalElements(page.getTotal())
.pageNum(page.getCurrent())
.pageSize(page.getSize())
.unCompleteCnt(stateStatisticMap.get(unCompleteStateCntKey))
.completeCnt(stateStatisticMap.get(completeStateCntKey))
.briefResList(msgRecordBriefs)
.build();
}
@Override
public List<PendingMessageTemporarilyTypeRes> listTemporarilyTypes() {
return pendingMessageBizConfig.tempCategoriesConfig().stream()
.sorted()
.map(MsgTempBizCategoryConfig::toTemporarilyType)
.collect(Collectors.toList());
}
private PendingMessageTemporarilyStatisticRes statisticAndPickFirst(PendingMessageTemporarilyStatisticReq request,
MsgTempBizCategoryConfig categoryConfig,
Collection<ReceiveTypeEnum> identifyTypes,
LocalDateTime begin) {
Set<Long> relationIds = categoryConfig.getConfigRelationIds();
// 统计数量
Integer count = doStatistic(request, relationIds, identifyTypes, begin);
PendingMessageTemporarilyStatisticRes res = new PendingMessageTemporarilyStatisticRes();
res.setUnCompletedCount(count);
res.setCategoryCode(categoryConfig.getCategory().name());
res.setCategoryDesc(categoryConfig.getCategory().getDesc());
res.setIconUrl(categoryConfig.getIconUrl());
if (StringUtils.isNotBlank(categoryConfig.getJumpUrl())) {
// 分类指定了具体的跳转地址,则优先使用其指定的地址这里指定为原生APP
res.setRouter(new MessageRouterInfoRes(NativeTypeEnum.H5.getMessage(), categoryConfig.getJumpUrl(), ""));
} else if (1 == count) {
// 这里仅统计已发送的待办
MessageRecord record = selectOne(request.getPersonId(), request.getIdentifyId(), identifyTypes,
relationIds, SEND_SUCCESSFULLY_STATES, begin);
if (Objects.nonNull(record)) {
res.setMessageId(record.getId());
res.setRouter(getRouter(record, request.getSystemType()));
}
}
return res;
}
private Integer doStatistic(PendingMessageTemporarilyStatisticReq request, Collection<Long> relationIds,
Collection<ReceiveTypeEnum> identifyTypes, LocalDateTime begin) {
return doStatistic(request.getPersonId(), request.getIdentifyId(), relationIds, identifyTypes,
SEND_SUCCESSFULLY_STATES, begin);
}
private void checkParam(PendingMessageTemporarilyStatisticReq request) {
if (Objects.isNull(request)
|| Objects.isNull(request.getIdentifyId())) {
log.error("the param is invalid. {}", request);
throw new BizException(ReturnCodeEnum.INVALID_PARAMETER);
}
}
private LocalDateTime getQueryFromTime() {
return Optional.ofNullable(messageSystemConfig.getDataDivideDays())
.filter(e -> e > 0)
.map(e -> LocalDate.now().minusDays(e).atStartOfDay())
.orElse(null);
}
private Set<Long> fetchRelationIdFromRequestOrConfig(PendingMessagePageReq request,
List<MsgTempBizCategoryConfig> categoryConfigs) {
MsgTempBizCategoryEnum category = request.getTempBizCategory();
if (Objects.isNull(category)) {
// 若前端未指定具体的分类,则取配置的所有的分类
return categoryConfigs.stream()
.flatMap(e -> e.getConfigRelationIds().stream())
.collect(Collectors.toSet());
}
return categoryConfigs.stream()
.filter(e -> e.getCategory().equals(category))
.flatMap(e -> e.getConfigRelationIds().stream())
.collect(Collectors.toSet());
}
private PendingMessageBriefRes convert(MessageRecord record, List<MsgTempBizCategoryConfig> categoryConfigs,
Map<Long, MessageRouterInfoRes> routerMap) {
PendingMessageBriefRes msgBrief = BeanConvertUtils.copyBean(record, PendingMessageBriefRes.class);
msgBrief.setMsgId(record.getId());
msgBrief.setTempBizCategory(getTempCategory(record.getRelationId(), categoryConfigs));
msgBrief.setRouter(routerMap.get(record.getId()));
msgBrief.setCreateTimestamp(Optional.ofNullable(record.getCreateAt()).map(Date::getTime).orElse(0L));
return msgBrief;
}
private MsgTempBizCategoryEnum getTempCategory(Long relationId, List<MsgTempBizCategoryConfig> categoryConfigs) {
return categoryConfigs.stream()
.filter(e -> e.match(relationId))
.map(MsgTempBizCategoryConfig::getCategory)
.findFirst()
.orElse(MsgTempBizCategoryEnum.PENDING_TASK);
}
private MessageRouterInfoRes getRouter(MessageRecord msgRecord, String systemType) {
return listRouters(Lists.newArrayList(msgRecord), systemType).get(msgRecord.getId());
}
private Map<Long, MessageRouterInfoRes> listRouters(Collection<MessageRecord> msgRecords, String systemType) {
final Set<Long> relationIds = msgRecords.stream().map(MessageRecord::getRelationId).collect(Collectors.toSet());
Map<Long, List<MessageRouter>> rawRouterMap = messageRouterService.listRoutersByRelationIds(relationIds).stream()
.collect(Collectors.groupingBy(MessageRouter::getRelationId));
final Map<Long, MessageRouterInfoRes> resRouterMap = new HashMap<>();
msgRecords.stream()
.filter(e -> rawRouterMap.containsKey(e.getRelationId()))
.forEach(e -> {
MessageRouterInfoRes router = pickAndParseMessageRouter(rawRouterMap.get(e.getRelationId()),
systemType, e.getRouterParams());
resRouterMap.put(e.getId(), router);
});
return resRouterMap;
}
private MessageRouterInfoRes pickAndParseMessageRouter(List<MessageRouter> rawRouters, String systemType, String routerParam) {
MsgRouteTypeEnum routeType = getSystemType(systemType);
return pickAndParseMessageRouter(rawRouters, routeType, routerParam);
}
private MessageRouterInfoRes pickAndParseMessageRouter(List<MessageRouter> messageRouters, MsgRouteTypeEnum msgRouteTypeEnum, String routerParam) {
String rawRouterUrl = "";
Integer routerType = null;
if (messageRouters.size() == 1) {
NativeTypeEnum nativeTypeEnum = NativeTypeEnum.getByCode(messageRouters.get(0).getRouterType().getCode());
if (Objects.nonNull(nativeTypeEnum)) {
rawRouterUrl = messageRouters.get(0).getRouterUrl();
routerType = nativeTypeEnum.getMessage();
}
} else {
//不止一条 获取当前系统对应路由
List<MessageRouter> routers = messageRouters.stream()
.filter(k -> k.getRouterType().equals(msgRouteTypeEnum))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(routers)) {
NativeTypeEnum nativeTypeEnum = NativeTypeEnum.getByCode(routers.get(0).getRouterType().getCode());
if (nativeTypeEnum != null) {
rawRouterUrl = routers.get(0).getRouterUrl();
routerType = nativeTypeEnum.getMessage();
}
} else {
//优先取H5
List<MessageRouter> routerH5 = messageRouters.stream()
.filter(k -> k.getRouterType().equals(MsgRouteTypeEnum.WEBVIEW))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(routerH5)) {
NativeTypeEnum nativeTypeEnum = NativeTypeEnum.getByCode(routerH5.get(0).getRouterType().getCode());
if (nativeTypeEnum != null) {
rawRouterUrl = routerH5.get(0).getRouterUrl();
routerType = nativeTypeEnum.getMessage();
}
}
//再取小程序
List<MessageRouter> routerUni = messageRouters.stream()
.filter(k -> k.getRouterType().equals(MsgRouteTypeEnum.MINI_PROGRAM)).collect(
Collectors.toList());
if (CollectionUtils.isNotEmpty(routerUni)) {
NativeTypeEnum nativeTypeEnum = NativeTypeEnum.getByCode(routerUni.get(0).getRouterType().getCode());
if (nativeTypeEnum != null) {
rawRouterUrl = routerUni.get(0).getRouterUrl();
routerType = nativeTypeEnum.getMessage();
}
}
}
}
PlaceholderResolver resolver = PlaceholderResolver.getDefaultResolver();
if (Objects.nonNull(routerType)) {
String routerUrl = resolver.resolveByMap(rawRouterUrl,
isRouterParamValid(routerParam) ? JSON.parseObject(routerParam) : Collections.emptyMap());
return new MessageRouterInfoRes(routerType, routerUrl, routerParam);
}
return null;
}
private boolean isRouterParamValid(String routerParam) {
return StringUtils.isNotEmpty(routerParam)
&& !"null".equals(routerParam);
}
private MsgRouteTypeEnum getSystemType(String systemType) {
/*String systemType = ContextInfoHolder.get().getSystemAndDeviceInfo().getSystemType();*/
//判断是否为手机端调用
if (MsgRouteTypeEnum.IOS.getMessage().equals(systemType)) {
return MsgRouteTypeEnum.IOS;
}
if (MsgRouteTypeEnum.ANDROID.getMessage().equals(systemType)) {
return MsgRouteTypeEnum.ANDROID;
}
if (MsgRouteTypeEnum.WECHAT_MINI_PROGRAM.getMessage().equals(systemType)) {
return MsgRouteTypeEnum.WECHAT_MINI_PROGRAM;
}
//都不是表明确定web端调用
return MsgRouteTypeEnum.WEB;
}
private MessageRecord selectOne(Long personId, Long identifyId, Collection<ReceiveTypeEnum> identifyTypes,
Collection<Long> relationIds, Collection<MsgStateEnum> states,
LocalDateTime queryFrom) {
List<MessageRecord> records = messageRecordDao.lambdaQuery()
.eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
.eq(Objects.nonNull(personId), MessageRecord::getPersonId, personId)
.eq(Objects.nonNull(identifyId), MessageRecord::getToId, identifyId)
.in(CollectionUtils.isNotEmpty(identifyTypes), MessageRecord::getReceiveType, identifyTypes)
.in(CollectionUtils.isNotEmpty(states), MessageRecord::getState, states)
.in(CollectionUtils.isNotEmpty(relationIds), MessageRecord::getRelationId, relationIds)
.ge(Objects.nonNull(queryFrom), MessageRecord::getCreateAt, queryFrom)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
.orderByDesc(MessageRecord::getCreateAt)
.last("LIMIT 1")
.list();
return CollectionUtils.isEmpty(records) ? null : records.get(0);
}
private IPage<MessageRecord> page(Long personId, Long identifyId, Collection<ReceiveTypeEnum> identifyTypes,
Collection<Long> relationIds, Collection<MsgStateEnum> states,
LocalDateTime queryFrom, IPage<MessageRecord> page) {
return messageRecordDao.lambdaQuery()
.eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
.eq(Objects.nonNull(personId), MessageRecord::getPersonId, personId)
.eq(Objects.nonNull(identifyId), MessageRecord::getToId, identifyId)
.in(CollectionUtils.isNotEmpty(identifyTypes), MessageRecord::getReceiveType, identifyTypes)
.in(CollectionUtils.isNotEmpty(states), MessageRecord::getState, states)
.in(CollectionUtils.isNotEmpty(relationIds), MessageRecord::getRelationId, relationIds)
.ge(Objects.nonNull(queryFrom), MessageRecord::getCreateAt, queryFrom)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
.orderByAsc(MessageRecord::getState)
.orderByDesc(MessageRecord::getCreateAt)
.page(page);
}
private Map<String, Integer> statistic(Collection<StateStatisticModule> stateStatisticModules,
Collection<Long> relationIds,
PendingMessagePageReq request, LocalDateTime queryFrom) {
final Map<String, Integer> resultMap = new ConcurrentHashMap<>();
CompletableFuture[] asyncStatisticArray = stateStatisticModules.stream()
.map(e -> CompletableFuture.runAsync(() -> {
Integer cnt = doStatistic(request.getPersonId(), request.getIdentifyId(),
relationIds, request.getIdentifyTypes(), e.states, queryFrom);
resultMap.put(e.stateKey, cnt);
},
stateStatisticTaskThreadPool))
.toArray(CompletableFuture[]::new);
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(asyncStatisticArray);
completableFuture.join();
return resultMap;
}
private Integer doStatistic(Long personId, Long identifyId, Collection<Long> relationIds,
Collection<ReceiveTypeEnum> identifyTypes,
Collection<MsgStateEnum> msgStates, LocalDateTime begin) {
return messageRecordDao.lambdaQuery()
.eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
.eq(Objects.nonNull(personId), MessageRecord::getPersonId, personId)
.eq(Objects.nonNull(identifyId), MessageRecord::getToId, identifyId)
.in(CollectionUtils.isNotEmpty(identifyTypes), MessageRecord::getReceiveType, identifyTypes)
.in(MessageRecord::getState, msgStates)
.in(CollectionUtils.isNotEmpty(relationIds), MessageRecord::getRelationId, relationIds)
.ge(Objects.nonNull(begin), MessageRecord::getCreateAt, begin)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
.count();
}
@AllArgsConstructor
private static class StateStatisticModule {
private String stateKey;
private Collection<MsgStateEnum> states;
static StateStatisticModule build(String stateKey, Collection<MsgStateEnum> states) {
return new StateStatisticModule(stateKey, states);
}
}
}

View File

@ -15,6 +15,7 @@ import cn.axzo.msg.center.domain.entity.MessageRouter;
import cn.axzo.msg.center.domain.enums.MsgRouteTypeEnum;
import cn.axzo.msg.center.domain.enums.NativeTypeEnum;
import cn.axzo.msg.center.domain.enums.YesNoEnum;
import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig;
import cn.axzo.msg.center.inside.notices.service.MessageModuleService;
import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService;
@ -48,18 +49,16 @@ import java.util.stream.Collectors;
* @author cold_blade
* @date 2023/9/13
* @version 1.0
* TODO: [cold_blade]待优化
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RawMessageRecordServiceImpl implements RawMessageRecordService {
private static final int HOT_DATA_SAVE_DAYS = 90;
private final MessageRecordDao messageRecordDao;
private final MessageModuleService messageModuleService;
private final MessageRouterService messageRouterService;
private final MessageSystemConfig messageSystemConfig;
@Override
public void updatePersonMessageState(Long personId, Collection<MsgStateEnum> srcStates, MsgStateEnum tgtState,
@ -229,7 +228,10 @@ public class RawMessageRecordServiceImpl implements RawMessageRecordService {
}
private LocalDateTime getQueryFromTime() {
return LocalDate.now().minusDays(HOT_DATA_SAVE_DAYS).atStartOfDay();
return Optional.ofNullable(messageSystemConfig.getDataDivideDays())
.filter(e -> e > 0)
.map(e -> LocalDate.now().minusDays(e).atStartOfDay())
.orElseGet(() -> LocalDate.now().minusYears(10).atStartOfDay());
}
private MessageDetailRes convert(MessageRecord msgRecord, MessageRouterDTO router, MsgModuleDTO module) {
@ -240,6 +242,7 @@ public class RawMessageRecordServiceImpl implements RawMessageRecordService {
if (Objects.nonNull(router)) {
res.setRouteUrl(router.getUrl());
res.setRouteType(router.getType());
res.setRouterParam(msgRecord.getRouterParams());
}
res.setModuleName(module.getMsgModuleName());
res.setModuleIcon(module.getMsgModuleIcon());

View File

@ -0,0 +1,59 @@
package cn.axzo.msg.center.api;
import cn.axzo.msg.center.api.fallback.PendingMessageServiceApiFallBack;
import cn.axzo.msg.center.api.request.PendingMessagePageReq;
import cn.axzo.msg.center.api.request.PendingMessageTemporarilyStatisticReq;
import cn.axzo.msg.center.api.response.PendingMessageStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyTypeRes;
import cn.azxo.framework.common.model.CommonResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import javax.validation.Valid;
import java.util.List;
/**
* 代办类型的消息相关接口
*
* @author cold_blade
* @version 1.0
* @date 2023/8/30
*/
@Component
@FeignClient(name = "msg-center", url = "${msg-center.serviceUrl:http://msg-center:8080}",
fallback = PendingMessageServiceApiFallBack.class)
public interface PendingMessageServiceApi {
/**
* 统计待办消息数量
* Notice: 该接口是一个临时解决方案不建议其它业务场景采用该接口
*
* @param request 统计请求的入参
* @return 符合条件的待办消息数量
*/
@PostMapping("api/message/pending-msg/temporary-statistic")
CommonResponse<List<PendingMessageTemporarilyStatisticRes>> temporarilyStatistic(
@RequestBody @Valid PendingMessageTemporarilyStatisticReq request);
/**
* 根据消息的临时的业务分类分页查询个人相关的待办消息
* Notice: 该接口是一个临时解决方案不建议其它业务场景采用该接口
*
* @param request 分页查询的入参
* @return 符合条件的待办消息
*/
@PostMapping("api/message/pending-msg/temporary-page")
CommonResponse<PendingMessageStatisticRes> temporarilyPage(@RequestBody @Valid PendingMessagePageReq request);
/**
* 列表查询待办消息临时分类
* Notice: 该接口是一个临时解决方案不建议其它业务场景采用该接口
*
* @return 待办消息临时分类的列表
*/
@PostMapping("api/message/pending-msg/temporary-type/list")
CommonResponse<List<PendingMessageTemporarilyTypeRes>> listTemporarilyTypes();
}

View File

@ -0,0 +1,32 @@
package cn.axzo.msg.center.api.enums;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @description
* 消息的临时业务分类
* @author cold_blade
* @date 2023/8/31
* @version 1.0
*/
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum MsgTempBizCategoryEnum {
/**
* 待接收任务
*/
PENDING_TASK("待接受任务"),
/**
* 待确认记工
*/
TO_BE_CONFIRMED_WORK_RECORD("待确认记工"),
/**
* 待确认借支
*/
TO_BE_CONFIRMED_LEND_RECORD("待确认借支");
private final String desc;
}

View File

@ -0,0 +1,46 @@
package cn.axzo.msg.center.api.fallback;
import cn.axzo.msg.center.api.PendingMessageServiceApi;
import cn.axzo.msg.center.api.request.PendingMessagePageReq;
import cn.axzo.msg.center.api.request.PendingMessageTemporarilyStatisticReq;
import cn.axzo.msg.center.api.response.PendingMessageStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyStatisticRes;
import cn.axzo.msg.center.api.response.PendingMessageTemporarilyTypeRes;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @description
* 待办消息接口请求失败时的降级处理
*
* @author cold_blade
* @date 2023/8/30
* @version 1.0
*/
@Slf4j
@Component
public class PendingMessageServiceApiFallBack implements PendingMessageServiceApi {
@Override
public CommonResponse<List<PendingMessageTemporarilyStatisticRes>> temporarilyStatistic(
PendingMessageTemporarilyStatisticReq request) {
log.error("调用统计待办消息的接口时发送异常...... request={}", request);
return CommonResponse.error("调用统计待办消息的接口时发送异常");
}
@Override
public CommonResponse<PendingMessageStatisticRes> temporarilyPage(PendingMessagePageReq request) {
log.error("调用分页查询待办消息的接口时发送异常...... request={}", request);
return CommonResponse.error("调用分页查询待办消息的接口时发送异常");
}
@Override
public CommonResponse<List<PendingMessageTemporarilyTypeRes>> listTemporarilyTypes() {
log.error("调用列表查询待办消息临时分类的接口时发送异常......");
return CommonResponse.error("调用列表查询待办消息临时分类的接口时发送异常");
}
}

View File

@ -0,0 +1,57 @@
package cn.axzo.msg.center.api.request;
import cn.axzo.basics.common.page.PageRequest;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.enums.MsgTempBizCategoryEnum;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.Collection;
/**
* @description
* 待办消息的分页查询
* @author cold_blade
* @date 2023/9/1
* @version 1.0
*/
@Setter
@Getter
public class PendingMessagePageReq extends PageRequest implements Serializable {
private static final long serialVersionUID = -6786973059210532683L;
/**
* 自然人id
*/
@NotNull(message = "personId is required")
private Long personId;
/**
* 身份标识
*/
@NotNull(message = "identifyId is required")
private Long identifyId;
/**
* 身份类型枚举集合
*/
private Collection<ReceiveTypeEnum> identifyTypes;
/**
* 消息的临时业务分类
*/
private MsgTempBizCategoryEnum tempBizCategory;
/**
* 消息状态,待办仅关注以下三种状态:
* 1. {@link MsgStateEnum#UNSENT}
* 2. {@link MsgStateEnum#HAS_BEEN_SENT}
* 3. {@link MsgStateEnum#COMPLETE}
*/
private Collection<MsgStateEnum> states;
/**
* 系统类型
*/
@NotNull(message = "systemType is required")
private String systemType;
}

View File

@ -0,0 +1,46 @@
package cn.axzo.msg.center.api.request;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.Collection;
/**
* @description
* 代办消息的统计请求入参数据模型
*
* @author cold_blade
* @version 1.0
* @date 2023/8/30
*/
@Setter
@Getter
public class PendingMessageTemporarilyStatisticReq implements Serializable {
private static final long serialVersionUID = -4634535558851288288L;
/**
* 自然人id
*/
@NotNull(message = "personId is required")
private Long personId;
/**
* 身份标识
*/
@NotNull(message = "identifyId is required")
private Long identifyId;
/**
* 系统类型
*/
@NotNull(message = "systemType is required")
private String systemType;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -81,6 +81,11 @@ public class MessageDetailRes implements Serializable {
*/
private Integer routeType;
/**
* 路由参数
*/
private String routerParam;
@Override
public String toString() {
return JSON.toJSONString(this);

View File

@ -0,0 +1,37 @@
package cn.axzo.msg.center.api.response;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
/**
* @description
* 消息路由描述
* @author cold_blade
* @date 2023/9/4
* @version 1.0
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class MessageRouterInfoRes implements Serializable {
private static final long serialVersionUID = -5885492691812198526L;
/**
* 路由类型(0=null 1=uniapp 2=native 3=H5 4=web 5=WeChat_MP)"
*/
private Integer routerType;
/**
* 路由地址{0} %d
*/
private String routerUrl;
/**
* 路由参数
*/
private String routerParam;
}

View File

@ -0,0 +1,74 @@
package cn.axzo.msg.center.api.response;
import cn.axzo.msg.center.api.enums.MsgRecordTerminalTypeEnum;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.enums.MsgTempBizCategoryEnum;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @description
* 待办消息的基本信息
* @author cold_blade
* @date 2023/8/31
* @version 1.0
*/
@Setter
@Getter
public class PendingMessageBriefRes implements Serializable {
private static final long serialVersionUID = -856618872444412554L;
/**
* 消息id
*/
private Long msgId;
/**
* 消息标题
*/
private String title;
/**
* 消息内容
*/
private String content;
/**
* 接收者类型身份类型
*/
private ReceiveTypeEnum receiveType;
/**
* 消息关联的终端类型企业/项目
*/
private MsgRecordTerminalTypeEnum terminalType;
/**
* 消息关联的终端名称企业名/项目名
*/
private String terminalName;
/**
* 消息时间戳
*/
private Long createTimestamp;
/**
* 消息状态,待办仅关注以下三种状态:
* 1. {@link MsgStateEnum#UNSENT}
* 2. {@link MsgStateEnum#HAS_BEEN_SENT}
* 3. {@link MsgStateEnum#COMPLETE}
*/
private MsgStateEnum state;
/**
* 消息的临时业务分类
*/
private MsgTempBizCategoryEnum tempBizCategory;
/**
* 消息路由数据模型
*/
private MessageRouterInfoRes router;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.msg.center.api.response;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
import java.util.List;
/**
* @description
*
* @author cold_blade
* @date 2023/9/18
* @version 1.0
*/
@Setter
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PendingMessageStatisticRes implements Serializable {
private static final long serialVersionUID = -3864039207092979609L;
/**
* 总数
*/
private Long totalElements;
/**
* 当前页码
*/
private Long pageNum;
/**
* 每页展示的条数
*/
private Long pageSize;
/**
* 待办数量
*/
private Integer unCompleteCnt;
/**
* 已完成数量
*/
private Integer completeCnt;
/**
* 分页查询的数量
*/
private List<PendingMessageBriefRes> briefResList;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,51 @@
package cn.axzo.msg.center.api.response;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @description
* 代办消息的统计请求出参数据模型
* @author cold_blade
* @date 2023/9/4
* @version 1.0
*/
@Setter
@Getter
public class PendingMessageTemporarilyStatisticRes implements Serializable {
private static final long serialVersionUID = -8249938191986600399L;
/**
* 消息的临时业务分类编码
*/
private String categoryCode;
/**
* 消息的临时业务分类描述
*/
private String categoryDesc;
/**
* 图标地址
*/
private String iconUrl;
/**
* 待办消息未处理总量
*/
private Integer unCompletedCount;
/**
* 消息id,当且仅当数量为 {@code 1} 的时候才会赋值
*/
private Long messageId;
/**
* 消息路由数据模型,当且仅当数量为 {@code 1} 的时候才会赋值
*/
private MessageRouterInfoRes router;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,39 @@
package cn.axzo.msg.center.api.response;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @description
* 代办消息的临时分类请求出参数据模型
* @author cold_blade
* @date 2023/9/4
* @version 1.0
*/
@Setter
@Getter
public class PendingMessageTemporarilyTypeRes implements Serializable {
private static final long serialVersionUID = -1092099603390107573L;
/**
* 消息的临时业务分类编码
*/
private String categoryCode;
/**
* 消息的临时业务分类描述
*/
private String categoryDesc;
/**
* 图标地址
*/
private String iconUrl;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}