From a9583014698990b4c9f2d50e69d29e90536c5d75 Mon Sep 17 00:00:00 2001 From: lilong Date: Wed, 27 Mar 2024 20:43:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20(REQ-2129)=E5=A2=9E=E5=8A=A0push?= =?UTF-8?q?=E6=B6=88=E6=81=AFhandler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nanopart-server/pom.xml | 5 + .../cn/axzo/nanopart/NanopartApplication.java | 3 + .../config/RocketMQEventConfiguration.java | 90 +++++++++ .../cn/axzo/nanopart/config/XxlJobConfig.java | 69 +++++++ op/op-server/pom.xml | 10 + .../controller/OpMessageConfigController.java | 19 ++ .../server/controller/PrivateController.java | 10 + .../server/domain/OpMessageConfig.java | 6 + .../server/event/outer/EventTypeEnum.java | 31 +++ .../outer/PushYouMengMessageHandler.java | 184 ++++++++++++++++++ .../payload/MessageHistoryUpdatedPayload.java | 176 +++++++++++++++++ .../service/OpMessageConfigService.java | 3 + .../server/xxljob/SendMessageJob.java | 18 +- 13 files changed, 616 insertions(+), 8 deletions(-) create mode 100644 nanopart-server/src/main/java/cn/axzo/nanopart/config/RocketMQEventConfiguration.java create mode 100644 op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/EventTypeEnum.java create mode 100644 op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/PushYouMengMessageHandler.java create mode 100644 op/op-server/src/main/java/cn/axzo/nanopart/server/event/payload/MessageHistoryUpdatedPayload.java diff --git a/nanopart-server/pom.xml b/nanopart-server/pom.xml index a49b945d..dc5ecf22 100644 --- a/nanopart-server/pom.xml +++ b/nanopart-server/pom.xml @@ -118,6 +118,11 @@ cn.axzo.basics basics-profiles-api + + + cn.axzo.framework.rocketmq + axzo-common-rocketmq + diff --git a/nanopart-server/src/main/java/cn/axzo/nanopart/NanopartApplication.java b/nanopart-server/src/main/java/cn/axzo/nanopart/NanopartApplication.java index 21e1ae73..521bb1b8 100644 --- a/nanopart-server/src/main/java/cn/axzo/nanopart/NanopartApplication.java +++ b/nanopart-server/src/main/java/cn/axzo/nanopart/NanopartApplication.java @@ -1,10 +1,12 @@ package cn.axzo.nanopart; +import cn.axzo.nanopart.config.RocketMQEventConfiguration; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.EnableAspectJAutoProxy; +import org.springframework.context.annotation.Import; @MapperScan(value = {"cn.axzo.**.mapper"}) @SpringBootApplication @@ -12,6 +14,7 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy; "cn.axzo" }) @EnableAspectJAutoProxy() +@Import(RocketMQEventConfiguration.class) public class NanopartApplication { public static void main(String[] args) { SpringApplication.run(NanopartApplication.class, args); diff --git a/nanopart-server/src/main/java/cn/axzo/nanopart/config/RocketMQEventConfiguration.java b/nanopart-server/src/main/java/cn/axzo/nanopart/config/RocketMQEventConfiguration.java new file mode 100644 index 00000000..31ff6a9e --- /dev/null +++ b/nanopart-server/src/main/java/cn/axzo/nanopart/config/RocketMQEventConfiguration.java @@ -0,0 +1,90 @@ +package cn.axzo.nanopart.config; + +import cn.axzo.framework.rocketmq.BaseListener; +import cn.axzo.framework.rocketmq.DefaultEventConsumer; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.framework.rocketmq.EventHandlerRepository; +import cn.axzo.framework.rocketmq.EventProducer; +import cn.axzo.framework.rocketmq.RocketMQEventProducer; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; + +/** + * @Author: liyong.tian + * @Date: 2023/7/25 14:43 + * @Description: + */ +@Slf4j +public class RocketMQEventConfiguration { + + @Value("${spring.application.name}") + private String appName; + + @Value("${topic}") + private String topic; + + @Bean + public RocketMQTemplate ser(){ + return new RocketMQTemplate(); + } + @Bean + EventProducer eventProducer(RocketMQTemplate rocketMQTemplate) { + return new RocketMQEventProducer(rocketMQTemplate, + "nanopart", + appName, + EventProducer.Context.builder() + .meta(RocketMQEventProducer.RocketMQMessageMeta.builder() + .topic(topic) + .build()) + .build(), + null + ); + } + + @Bean + EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) { + Consumer callback = (eventWrapper) -> { + if (eventWrapper.isHandled()) { + // 只收集被App真正消费的消息. + //String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY); + + } + }; + return new DefaultEventConsumer(appName, eventHandlerRepository, callback); + } + + @Slf4j + @Component + @RocketMQMessageListener(topic = "topic_im_center_${spring.profiles.active}", + consumerGroup = "GID_topic_im_center_${spring.application.name}_${spring.profiles.active}", + consumeMode = ConsumeMode.ORDERLY, + nameServer = "${rocketmq.name-server}" + ) + public static class DefaultListener extends BaseListener implements RocketMQListener { + + @Autowired + private EventConsumer eventConsumer; + + @Override + public void onMessage(MessageExt message) { + super.onEvent(message, eventConsumer); + } + } + + @Bean + EventHandlerRepository eventHandlerRepository() { + return new EventHandlerRepository((ex, logText) -> { + log.warn("MQ, handle warning {}", logText, ex); + }); + } +} diff --git a/nanopart-server/src/main/java/cn/axzo/nanopart/config/XxlJobConfig.java b/nanopart-server/src/main/java/cn/axzo/nanopart/config/XxlJobConfig.java index e69de29b..21788baa 100644 --- a/nanopart-server/src/main/java/cn/axzo/nanopart/config/XxlJobConfig.java +++ b/nanopart-server/src/main/java/cn/axzo/nanopart/config/XxlJobConfig.java @@ -0,0 +1,69 @@ +package cn.axzo.nanopart.config; + +import cn.azxo.framework.common.logger.JobLoggerTemplate; +import cn.azxo.framework.common.service.JobParamResolver; +import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * xxl-job config + * + * @author xuxueli 2017-04-28 + */ +@Configuration +public class XxlJobConfig { + + private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); + + // @Value("http://dev-xxl-job.axzo.cn/xxl-job-admin") + @Value("${xxl.job.admin.addresses}") + private String adminAddresses; + + @Value("${xxl.job.executor.appname}") + private String appName; + + @Value("") + private String ip; + + @Value("${xxl.job.executor.port}") + private int port; + + // @Value("${xxl.job.accessToken}") + @Value("") + private String accessToken; + + @Value("") + private String logPath; + + @Value("-1") + private int logRetentionDays; + + @Bean + public XxlJobSpringExecutor xxlJobExecutor() { + logger.info(">>>>>>>>>>> xxl-job config init."); + XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); + xxlJobSpringExecutor.setAdminAddresses(adminAddresses); + xxlJobSpringExecutor.setAppname(appName); + xxlJobSpringExecutor.setIp(ip); + xxlJobSpringExecutor.setPort(port); + xxlJobSpringExecutor.setAccessToken(accessToken); + xxlJobSpringExecutor.setLogPath(logPath); + xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); + + return xxlJobSpringExecutor; + } + + @Bean("jobParamResolver") + public JobParamResolver jobParamResolver() { + return new JobParamResolver(); + } + + @Bean("jobLoggerTemplate") + public JobLoggerTemplate jobLoggerTemplate() { + return new JobLoggerTemplate(); + } +} diff --git a/op/op-server/pom.xml b/op/op-server/pom.xml index cd5e2826..040cbe95 100644 --- a/op/op-server/pom.xml +++ b/op/op-server/pom.xml @@ -85,5 +85,15 @@ xxl-job-core + + cn.axzo.framework.rocketmq + axzo-common-rocketmq + + + + cn.axzo.msgcenter + msg-center-api + 1.0.1-SNAPSHOT + diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/OpMessageConfigController.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/OpMessageConfigController.java index 4fed418e..35276af6 100644 --- a/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/OpMessageConfigController.java +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/OpMessageConfigController.java @@ -6,6 +6,7 @@ import cn.axzo.nanopart.server.domain.OpMessageConfig; import cn.axzo.nanopart.server.service.OpMessageConfigService; import cn.axzo.op.api.OpMessageConfigApi; import cn.axzo.pokonyan.exception.Aassert; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -64,10 +65,25 @@ public class OpMessageConfigController implements OpMessageConfigApi { createOpMessageConfigParam.setSubmitPersonId(param.getOperatePersonId()); createOpMessageConfigParam.setCreatePersonId(param.getOperatePersonId()); createOpMessageConfigParam.setStatus(OpMessageConfig.Status.PENDING.name()); + createOpMessageConfigParam.setPlanStartTime(resolvePlanStartTime(param.getReceiveData(), param.getPlanStartTime())); + OpMessageConfig opMessageConfig = opMessageConfigService.create(createOpMessageConfigParam); return ApiResult.ok(to(opMessageConfig)); } + private Date resolvePlanStartTime(JSONObject receiveData, Date planStartTime) { + if (planStartTime != null) { + return planStartTime; + } + if (receiveData == null) { + return null; + } + if (receiveData.toJavaObject(OpMessageConfig.ReceiveData.class).isNowStrategy()) { + return new Date(); + } + return null; + } + @Override public ApiResult delete(DeleteOpMessageConfigParam param) { OpMessageConfigService.DeleteOpMessageConfigParam deleteOpMessageConfigParam = OpMessageConfigService.DeleteOpMessageConfigParam.builder() @@ -83,6 +99,7 @@ public class OpMessageConfigController implements OpMessageConfigApi { OpMessageConfigService.UpdateOpMessageConfigParam updateOpMessageConfigParam = OpMessageConfigService.UpdateOpMessageConfigParam.builder().build(); BeanUtils.copyProperties(param, updateOpMessageConfigParam); updateOpMessageConfigParam.setUpdatePersonId(param.getOperatePersonId()); + updateOpMessageConfigParam.setPlanStartTime(resolvePlanStartTime(param.getReceiveData(), param.getPlanStartTime())); OpMessageConfig opMessageConfig = opMessageConfigService.update(updateOpMessageConfigParam); return ApiResult.ok(to(opMessageConfig)); @@ -104,6 +121,7 @@ public class OpMessageConfigController implements OpMessageConfigApi { OpMessageConfigService.UpdateOpMessageConfigParam updateOpMessageConfigParam = OpMessageConfigService.UpdateOpMessageConfigParam.builder().build(); BeanUtils.copyProperties(req, updateOpMessageConfigParam); updateOpMessageConfigParam.setUpdatePersonId(req.getOperatePersonId()); + updateOpMessageConfigParam.setPlanStartTime(resolvePlanStartTime(req.getReceiveData(), req.getPlanStartTime())); OpMessageConfig opMessageConfig = opMessageConfigService.update(updateOpMessageConfigParam); return ApiResult.ok(to(opMessageConfig)); @@ -119,6 +137,7 @@ public class OpMessageConfigController implements OpMessageConfigApi { updateOpMessageConfigParam.setUpdatePersonId(req.getOperatePersonId()); updateOpMessageConfigParam.setSubmitPersonId(req.getOperatePersonId()); updateOpMessageConfigParam.setSubmitTime(new Date()); + updateOpMessageConfigParam.setPlanStartTime(resolvePlanStartTime(req.getReceiveData(), req.getPlanStartTime())); OpMessageConfig opMessageConfig = opMessageConfigService.update(updateOpMessageConfigParam); return ApiResult.ok(to(opMessageConfig)); diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/PrivateController.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/PrivateController.java index e656e06f..a2eaeb99 100644 --- a/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/PrivateController.java +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/controller/PrivateController.java @@ -1,5 +1,7 @@ package cn.axzo.nanopart.server.controller; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.nanopart.server.event.outer.PushYouMengMessageHandler; import cn.axzo.nanopart.server.xxljob.SendMessageJob; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; @@ -14,9 +16,17 @@ public class PrivateController { @Autowired private SendMessageJob sendMessageJob; + @Autowired + private PushYouMengMessageHandler pushYouMengMessageHandler; @PostMapping("/send-message/job/run") public Object runSendMessage(@RequestBody SendMessageJob.SendMessageParam param) throws Exception { return sendMessageJob.execute(JSONObject.toJSONString(param)); } + + @PostMapping("/you-meng/push") + public Object pushYouMeng(@RequestBody Event param) throws Exception { + pushYouMengMessageHandler.onEvent(param, null); + return null; + } } diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/domain/OpMessageConfig.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/domain/OpMessageConfig.java index d33a690a..eb452cda 100644 --- a/op/op-server/src/main/java/cn/axzo/nanopart/server/domain/OpMessageConfig.java +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/domain/OpMessageConfig.java @@ -208,6 +208,10 @@ public class OpMessageConfig { * 接收渠道平台:CMP(B端: APP(管理端)+ CMS)、CM(C端:APP(工人端)) */ private List platforms; + + public boolean isNowStrategy() { + return Objects.equals("now", this.getStrategy()); + } } @Data @@ -288,6 +292,8 @@ public class OpMessageConfig { */ private String voiceFile; + private String voiceFileName; + /** * 提醒方式:voice(声音)、vibrate(震动) */ diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/EventTypeEnum.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/EventTypeEnum.java new file mode 100644 index 00000000..a8eae015 --- /dev/null +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/EventTypeEnum.java @@ -0,0 +1,31 @@ +package cn.axzo.nanopart.server.event.outer; + +import cn.axzo.framework.rocketmq.Event; +import lombok.Getter; + +/** + * @Classname EventTypeEnum + * @Date 2021/2/7 6:05 下午 + * @Created by lilong + */ +@Getter +public enum EventTypeEnum { + + MESSAGE_HISTORY_UPDATED("message-history", "message-history-updated", "发送记录修改") + ; + + EventTypeEnum(String model, String name, String desc) { + this.eventCode = Event.EventCode.builder() + .module(model) + .name(name) + .build(); + this.model = model; + this.name = name; + this.desc = desc; + } + + private String model; + private String name; + private String desc; + private Event.EventCode eventCode; +} diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/PushYouMengMessageHandler.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/PushYouMengMessageHandler.java new file mode 100644 index 00000000..df2bd05a --- /dev/null +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/event/outer/PushYouMengMessageHandler.java @@ -0,0 +1,184 @@ +package cn.axzo.nanopart.server.event.outer; + +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.framework.rocketmq.EventHandler; +import cn.axzo.msg.center.api.MessagePushApi; +import cn.axzo.msg.center.api.request.MsgBody4Guest; +import cn.axzo.nanopart.server.domain.OpMessageConfig; +import cn.axzo.nanopart.server.event.payload.MessageHistoryUpdatedPayload; +import cn.axzo.nanopart.server.service.OpMessageConfigService; +import cn.axzo.nanopart.server.xxljob.SendMessageJob; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static cn.axzo.nanopart.server.event.payload.MessageHistoryUpdatedPayload.PLATFORM_ROUTER_TYPE; + +@Slf4j +@Component +public class PushYouMengMessageHandler implements EventHandler, InitializingBean { + + @Autowired + private EventConsumer eventConsumer; + @Autowired + private OpMessageConfigService opMessageConfigService; + @Autowired + private MessagePushApi messagePushApi; + + /** + * 按照这个顺序解析routerType + */ + private static final List ROUTER_TYPE_SORTED = Lists.newArrayList( + MessageHistoryUpdatedPayload.IOS_PLATFORM, + MessageHistoryUpdatedPayload.ANDROID_PLATFORM, + MessageHistoryUpdatedPayload.WEBVIEW_PLATFORM, + MessageHistoryUpdatedPayload.MINI_PROGRAM_PLATFORM, + MessageHistoryUpdatedPayload.WECHAT_MINI_PROGRAM_PLATFORM); + + /** + * 按照这个顺序解析IOS的url + */ + private static final List IOS_URL_SORTED = Lists.newArrayList( + MessageHistoryUpdatedPayload.IOS_PLATFORM, + MessageHistoryUpdatedPayload.WEBVIEW_PLATFORM, + MessageHistoryUpdatedPayload.MINI_PROGRAM_PLATFORM, + MessageHistoryUpdatedPayload.WECHAT_MINI_PROGRAM_PLATFORM); + + /** + * 按照这个顺序解析ANDROID的url + */ + private static final List ANDROID_URL_SORTED = Lists.newArrayList( + MessageHistoryUpdatedPayload.ANDROID_PLATFORM, + MessageHistoryUpdatedPayload.WEBVIEW_PLATFORM, + MessageHistoryUpdatedPayload.MINI_PROGRAM_PLATFORM, + MessageHistoryUpdatedPayload.WECHAT_MINI_PROGRAM_PLATFORM); + + + @Override + public void onEvent(Event event, EventConsumer.Context context) { + log.info("begin push-handler rocketmq event: {}", event); + MessageHistoryUpdatedPayload payload = event.normalizedData(MessageHistoryUpdatedPayload.class); + if (Objects.isNull(payload) || StringUtils.isBlank(payload.getNewMessageHistory().getBizId())) { + return; + } + + if (!payload.getNewMessageHistory().getBizId().startsWith(SendMessageJob.BIZ_ID_PREFIX)) { + log.info("push-handler 非op-message的消息"); + return; + } + + if (!payload.isSuccess()) { + log.info("push-handler is not success"); + return; + } + + if (payload.getNewMessageHistory().getImMessageTaskId() == null) { + log.info("push-handler imMessageTaskId is null"); + return; + } + + Optional opMessageConfigDTO = opMessageConfigService.page(OpMessageConfigService.PageOpMessageConfigParam.builder() + .imMessageTaskId(payload.getNewMessageHistory().getImMessageTaskId()) + .build()) + .getRecords() + .stream() + .findFirst(); + if (!opMessageConfigDTO.isPresent()) { + log.info("push-handler imMessageTaskId is not found,{}", payload.getNewMessageHistory().getImMessageTaskId()); + return; + } + + if (opMessageConfigDTO.get().getPushData() == null) { + log.info("push-handler, opMessageConfig:{},未配置push信息", opMessageConfigDTO.get().getId()); + return; + } + OpMessageConfig.PushData pushData = opMessageConfigDTO.get().getPushData().toJavaObject(OpMessageConfig.PushData.class); + + if (BooleanUtils.isNotTrue(pushData.isSwitchOn())) { + log.info("push-handler, opMessageConfig:{},push开关未打开", opMessageConfigDTO.get().getId()); + return; + } + + MessageHistoryUpdatedPayload.MessageHistory newMessageHistory = payload.getNewMessageHistory(); + MessageHistoryUpdatedPayload.MessageBody messageBody = newMessageHistory.resolveMessageBody(); + messagePushApi.sendPushMessage(MsgBody4Guest.builder() + .ty(0) + .f("0") + .appClient(newMessageHistory.getAppType()) + .m(messageBody.getMsgContent()) + .m3(newMessageHistory.getReceivePersonId()) + .m2(resolveM2(messageBody, pushData)) + // 为了兼容老版本,保证老版本的工人端能收到push,所以新的push都alias都是person + .t("not_identity" + newMessageHistory.getReceivePersonId()) + .ouId(newMessageHistory.getReceiveOuId()) + .build()); + + log.info("end push-handler rocketmq event: {}", event); + } + + private String resolveM2(MessageHistoryUpdatedPayload.MessageBody messageBody, + OpMessageConfig.PushData pushData) { + JSONObject jsonObject = new JSONObject() + .fluentPut("t", messageBody.getMsgHeader()) + .fluentPut("type", 0); + if (StringUtils.isNotBlank(pushData.getVoiceFile())) { + jsonObject.fluentPut("audio", pushData.getVoiceFile()); + } + + List actionPaths = messageBody.resolveMsgBody().getCardDetailButton().getActionPaths(); + if (CollectionUtils.isNotEmpty(actionPaths)) { + resolveRouter(jsonObject, actionPaths); + } + return jsonObject.toJSONString(); + } + + /** + * + * @param jsonObject + * @param actionPaths + */ + private void resolveRouter(JSONObject jsonObject, List actionPaths) { + + Map actionPathMap = actionPaths.stream() + .collect(Collectors.toMap(MessageHistoryUpdatedPayload.ActionPath::getPlatform, Function.identity())); + Optional routerType = ROUTER_TYPE_SORTED.stream() + .map(actionPathMap::get) + .filter(Objects::nonNull) + .findFirst(); + routerType.ifPresent(actionPath -> jsonObject.fluentPut("rt", PLATFORM_ROUTER_TYPE.get(actionPath.getPlatform()))); + + Optional ios = IOS_URL_SORTED.stream() + .map(actionPathMap::get) + .filter(Objects::nonNull) + .findFirst(); + // IOS的跳转URL + ios.ifPresent(actionPath -> jsonObject.fluentPut("ir", actionPath.getUrl())); + + Optional android = ANDROID_URL_SORTED.stream() + .map(actionPathMap::get) + .filter(Objects::nonNull) + .findFirst(); + // ANDROID的跳转URL + android.ifPresent(actionPath -> jsonObject.fluentPut("ar", actionPath.getUrl())); + + } + + @Override + public void afterPropertiesSet() throws Exception { + eventConsumer.registerHandler(EventTypeEnum.MESSAGE_HISTORY_UPDATED.getEventCode(), this); + } +} diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/event/payload/MessageHistoryUpdatedPayload.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/event/payload/MessageHistoryUpdatedPayload.java new file mode 100644 index 00000000..3b4c24b7 --- /dev/null +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/event/payload/MessageHistoryUpdatedPayload.java @@ -0,0 +1,176 @@ +package cn.axzo.nanopart.server.event.payload; + +import cn.axzo.im.center.common.enums.AppTypeEnum; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MessageHistoryUpdatedPayload implements Serializable { + + public static final String IOS_PLATFORM = "IOS"; + public static final String ANDROID_PLATFORM = "ANDROID"; + public static final String WEBVIEW_PLATFORM = "WEBVIEW"; + public static final String MINI_PROGRAM_PLATFORM = "MINI_PROGRAM"; + public static final String WECHAT_MINI_PROGRAM_PLATFORM = "WECHAT_MINI_PROGRAM"; + + public static final Map PLATFORM_ROUTER_TYPE = Maps.newHashMap(); + static { + PLATFORM_ROUTER_TYPE.put(IOS_PLATFORM, 2); + PLATFORM_ROUTER_TYPE.put(ANDROID_PLATFORM, 2); + PLATFORM_ROUTER_TYPE.put(WEBVIEW_PLATFORM, 3); + PLATFORM_ROUTER_TYPE.put(MINI_PROGRAM_PLATFORM, 1); + PLATFORM_ROUTER_TYPE.put(WECHAT_MINI_PROGRAM_PLATFORM, 5); + } + + private MessageHistory newMessageHistory; + private MessageHistory oldMessageHistory; + + public boolean isSuccess() { + return !Objects.equals(newMessageHistory.getStatus(), oldMessageHistory.getStatus()) + && Objects.equals(newMessageHistory.getStatus(), MessageHistory.Status.SUCCEED.name()); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class MessageHistory implements Serializable { + + private static final long serialVersionUID = 1L; + + private Long id; + + /** + * 上游业务请求ID + */ + private String bizId; + + /** + * 普通用户,通过appType包装 + * 包装以后进行账户注册 + */ + private String messageId; + + /** + * 发送者IM账户 + */ + private String fromAccount; + + /** + * 发送者IM账户 + */ + private String toAccount; + + /** + * 终端类型 + * + * @see AppTypeEnum + */ + private String appType; + + /** + * channel 网易云信 + */ + private String channel; + + private String messageBody; + + private String result; + + private Long imMessageTaskId; + + private String receivePersonId; + + private Long receiveOuId; + + private String status; + + private Integer isDelete; + + private Date createAt; + + private Date updateAt; + + public enum Status { + PENDING, + SUCCEED, + FAILED, + ; + } + + public MessageBody resolveMessageBody() { + return JSONObject.parseObject(messageBody, MessageBody.class); + } + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class MessageBody { + + /** + * 模板消息:Template、聊天消息:Chat、通知消息:Notify + */ + private String msgType; + + /** + * 消息标题 + */ + private String msgHeader; + + /** + * 消息内容 + */ + private String msgContent; + + /** + * 消息通知结构体 + */ + private String msgBody; + + public MsgBody resolveMsgBody() { + return JSONObject.parseObject(msgBody, MsgBody.class); + } + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class MsgBody { + private CardDetailButton cardDetailButton; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class CardDetailButton { + private List actionPaths; + + + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class ActionPath { + private String platform; + + private String url; + } +} diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/service/OpMessageConfigService.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/service/OpMessageConfigService.java index 42e0d8df..25b0d0b4 100644 --- a/op/op-server/src/main/java/cn/axzo/nanopart/server/service/OpMessageConfigService.java +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/service/OpMessageConfigService.java @@ -160,6 +160,9 @@ public interface OpMessageConfigService extends IService { @CriteriaField(field = "contentType", operator = Operator.EQ) private String contentType; + @CriteriaField(field = "imMessageTaskId", operator = Operator.EQ) + private Long imMessageTaskId; + @CriteriaField(ignore = true) private List receivePlatforms;; diff --git a/op/op-server/src/main/java/cn/axzo/nanopart/server/xxljob/SendMessageJob.java b/op/op-server/src/main/java/cn/axzo/nanopart/server/xxljob/SendMessageJob.java index e8986964..162fceae 100644 --- a/op/op-server/src/main/java/cn/axzo/nanopart/server/xxljob/SendMessageJob.java +++ b/op/op-server/src/main/java/cn/axzo/nanopart/server/xxljob/SendMessageJob.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class SendMessageJob extends IJobHandler { + public static final String BIZ_ID_PREFIX = "op-message"; @Autowired private OpMessageConfigService opMessageConfigService; @Autowired @@ -96,14 +97,15 @@ public class SendMessageJob extends IJobHandler { List appTypes = BooleanUtils.isTrue(isAllPerson) ? receiveData.getPlatforms().stream().map(AppTypeEnum::valueOf).collect(Collectors.toList()) : null; MessageTaskResp messageTask = messageApi.sendMessageAsync(AsyncSendMessageParam.builder() - .sendImAccount(opMessageConfig.getSendImAccount()) - .receivePersons(resolveReceivePerson(receiveData)) - .allPerson(isAllPerson) - .appTypes(appTypes) - .msgHeader(opMessageConfig.getTitle()) - .msgContent(opMessageConfig.getContent()) - .jumpData(resolveJumpData(opMessageConfig)) - .cardBannerUrl(opMessageConfig.getCoverImg()) + .bizId(String.format("%s:%s", BIZ_ID_PREFIX, opMessageConfig.getId())) + .sendImAccount(opMessageConfig.getSendImAccount()) + .receivePersons(resolveReceivePerson(receiveData)) + .allPerson(isAllPerson) + .appTypes(appTypes) + .msgHeader(opMessageConfig.getTitle()) + .msgContent(opMessageConfig.getContent()) + .jumpData(resolveJumpData(opMessageConfig)) + .cardBannerUrl(opMessageConfig.getCoverImg()) .build()).getData(); opMessageConfigService.update(OpMessageConfigService.UpdateOpMessageConfigParam.builder()