feat: REQ-2129 消费消息发送mq进行push操作
This commit is contained in:
parent
6481b75d88
commit
2d68cb5171
@ -0,0 +1,31 @@
|
||||
package cn.axzo.msg.center.event.outer;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @Classname EventTypeEnum
|
||||
* @Date 2021/2/7 6:05 下午
|
||||
* @Created by lilong
|
||||
*/
|
||||
@Getter
|
||||
public enum EventTypeEnum {
|
||||
|
||||
MESSAGE_HISTORY_UPDATED("message-history", "message-history-updated", "发送记录修改")
|
||||
;
|
||||
|
||||
EventTypeEnum(String model, String name, String desc) {
|
||||
this.eventCode = Event.EventCode.builder()
|
||||
.module(model)
|
||||
.name(name)
|
||||
.build();
|
||||
this.model = model;
|
||||
this.name = name;
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
private String model;
|
||||
private String name;
|
||||
private String desc;
|
||||
private Event.EventCode eventCode;
|
||||
}
|
||||
@ -0,0 +1,191 @@
|
||||
package cn.axzo.msg.center.event.outer;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventHandler;
|
||||
import cn.axzo.msg.center.api.request.MsgBody4Guest;
|
||||
import cn.axzo.msg.center.dal.MessageRecordV3Dao;
|
||||
import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
|
||||
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
|
||||
import cn.axzo.msg.center.domain.enums.MsgRouteTypeEnum;
|
||||
import cn.axzo.msg.center.domain.enums.NativeTypeEnum;
|
||||
import cn.axzo.msg.center.event.payload.MessageHistoryUpdatedPayload;
|
||||
import cn.axzo.msg.center.inside.notices.service.IYouMengMessageService;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
|
||||
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.BooleanUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class PushYouMengMessageHandler implements EventHandler, InitializingBean {
|
||||
|
||||
@Autowired
|
||||
private EventConsumer eventConsumer;
|
||||
@Autowired
|
||||
private MessageRecordV3Dao messageRecordV3Dao;
|
||||
@Autowired
|
||||
private MessageTemplateNewService messageTemplateNewService;
|
||||
@Autowired
|
||||
private IYouMengMessageService youMengMessageService;
|
||||
|
||||
@Value("${spring.application.name:msg-center}")
|
||||
private String applicationName;
|
||||
|
||||
/**
|
||||
* 按照这个顺序解析routerType
|
||||
*/
|
||||
private static final List<MsgRouteTypeEnum> ROUTER_TYPE_SORTED = Lists.newArrayList(
|
||||
MsgRouteTypeEnum.IOS,
|
||||
MsgRouteTypeEnum.ANDROID,
|
||||
MsgRouteTypeEnum.WEBVIEW,
|
||||
MsgRouteTypeEnum.MINI_PROGRAM,
|
||||
MsgRouteTypeEnum.WECHAT_MINI_PROGRAM);
|
||||
|
||||
/**
|
||||
* 按照这个顺序解析IOS的url
|
||||
*/
|
||||
private static final List<MsgRouteTypeEnum> IOS_URL_SORTED = Lists.newArrayList(
|
||||
MsgRouteTypeEnum.IOS,
|
||||
MsgRouteTypeEnum.WEBVIEW,
|
||||
MsgRouteTypeEnum.MINI_PROGRAM,
|
||||
MsgRouteTypeEnum.WECHAT_MINI_PROGRAM);
|
||||
|
||||
/**
|
||||
* 按照这个顺序解析ANDROID的url
|
||||
*/
|
||||
private static final List<MsgRouteTypeEnum> ANDROID_URL_SORTED = Lists.newArrayList(
|
||||
MsgRouteTypeEnum.ANDROID,
|
||||
MsgRouteTypeEnum.WEBVIEW,
|
||||
MsgRouteTypeEnum.MINI_PROGRAM,
|
||||
MsgRouteTypeEnum.WECHAT_MINI_PROGRAM);
|
||||
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event, EventConsumer.Context context) {
|
||||
log.info("begin push-handler rocketmq event: {}", event);
|
||||
MessageHistoryUpdatedPayload payload = event.normalizedData(MessageHistoryUpdatedPayload.class);
|
||||
if (Objects.isNull(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!Objects.equals(payload.getNewMessageHistory().getBizId(), applicationName)) {
|
||||
log.info("push-handler 非msg-center的消息");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!payload.isSuccess()) {
|
||||
log.info("push-handler is not success");
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<MessageRecordV3> messageRecordV3s = messageRecordV3Dao.lambdaQuery()
|
||||
.eq(MessageRecordV3::getImMsgId, payload.getNewMessageHistory().getImMessageTaskId())
|
||||
.list()
|
||||
.stream()
|
||||
.findFirst();
|
||||
if (!messageRecordV3s.isPresent()) {
|
||||
log.info("push-handler, 非当前业务的im消息");
|
||||
return;
|
||||
}
|
||||
|
||||
MessageRecordV3 messageRecordV3 = messageRecordV3s.get();
|
||||
Optional<MessageTemplateDTO> messageTemplateDTO = messageTemplateNewService.queryEnableTemplateByCode(messageRecordV3.getTemplateCode());
|
||||
if (!messageTemplateDTO.isPresent()) {
|
||||
log.info("push-handler, 模板code:{},不存在", messageRecordV3.getTemplateCode());
|
||||
return;
|
||||
}
|
||||
|
||||
if (messageTemplateDTO.get().getPushData() == null) {
|
||||
log.info("push-handler, 模板code:{},未配置push信息", messageRecordV3.getTemplateCode());
|
||||
return;
|
||||
}
|
||||
|
||||
MessageBaseTemplate.PushData pushData = messageTemplateDTO.get().getPushData().toJavaObject(MessageBaseTemplate.PushData.class);
|
||||
|
||||
if (BooleanUtils.isNotTrue(pushData.isSwitchOn())) {
|
||||
log.info("push-handler, 模板code:{},push开关未打开", messageRecordV3.getTemplateCode());
|
||||
return;
|
||||
}
|
||||
MessageHistoryUpdatedPayload.MessageHistory newMessageHistory = payload.getNewMessageHistory();
|
||||
MessageHistoryUpdatedPayload.MessageBody messageBody = newMessageHistory.resolveMessageBody();
|
||||
youMengMessageService.sendPushMessage(MsgBody4Guest.builder()
|
||||
.ty(0)
|
||||
.f("0")
|
||||
.appClient(newMessageHistory.getAppType())
|
||||
.m(messageRecordV3.getContent())
|
||||
.m3(newMessageHistory.getReceivePersonId())
|
||||
.m2(resolveM2(messageBody, pushData))
|
||||
.t(newMessageHistory.getReceivePersonId())
|
||||
.build());
|
||||
|
||||
log.info("end push-handler rocketmq event: {}", event);
|
||||
}
|
||||
|
||||
private String resolveM2(MessageHistoryUpdatedPayload.MessageBody messageBody,
|
||||
MessageBaseTemplate.PushData pushData) {
|
||||
JSONObject jsonObject = new JSONObject()
|
||||
.fluentPut("t", messageBody.getMsgHeader())
|
||||
.fluentPut("type", 0);
|
||||
if (StringUtils.isNotBlank(pushData.getVoiceFile())) {
|
||||
jsonObject.fluentPut("audio", pushData.getVoiceFile());
|
||||
}
|
||||
|
||||
List<MessageHistoryUpdatedPayload.ActionPath> actionPaths = messageBody.resolveMsgBody().getCardDetailButton().getActionPaths();
|
||||
if (CollectionUtils.isNotEmpty(actionPaths)) {
|
||||
resolveRouter(jsonObject, actionPaths);
|
||||
}
|
||||
return jsonObject.toJSONString();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param jsonObject
|
||||
* @param actionPaths
|
||||
*/
|
||||
private void resolveRouter(JSONObject jsonObject, List<MessageHistoryUpdatedPayload.ActionPath> actionPaths) {
|
||||
|
||||
Map<MsgRouteTypeEnum, MessageHistoryUpdatedPayload.ActionPath> actionPathMap = actionPaths.stream()
|
||||
.collect(Collectors.toMap(MessageHistoryUpdatedPayload.ActionPath::getPlatform, Function.identity()));
|
||||
Optional<MessageHistoryUpdatedPayload.ActionPath> routerType = ROUTER_TYPE_SORTED.stream()
|
||||
.map(actionPathMap::get)
|
||||
.filter(Objects::nonNull)
|
||||
.findFirst();
|
||||
routerType.ifPresent(actionPath -> jsonObject.fluentPut("rt", NativeTypeEnum.getByCode(actionPath.getPlatform().getCode()).getMessage()));
|
||||
|
||||
Optional<MessageHistoryUpdatedPayload.ActionPath> ios = IOS_URL_SORTED.stream()
|
||||
.map(actionPathMap::get)
|
||||
.filter(Objects::nonNull)
|
||||
.findFirst();
|
||||
// IOS的跳转URL
|
||||
ios.ifPresent(actionPath -> jsonObject.fluentPut("ir", actionPath.getUrl()));
|
||||
|
||||
Optional<MessageHistoryUpdatedPayload.ActionPath> android = ANDROID_URL_SORTED.stream()
|
||||
.map(actionPathMap::get)
|
||||
.filter(Objects::nonNull)
|
||||
.findFirst();
|
||||
// ANDROID的跳转URL
|
||||
android.ifPresent(actionPath -> jsonObject.fluentPut("ar", actionPath.getUrl()));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
eventConsumer.registerHandler(EventTypeEnum.MESSAGE_HISTORY_UPDATED.getEventCode(), this);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,160 @@
|
||||
package cn.axzo.msg.center.event.payload;
|
||||
|
||||
import cn.axzo.im.center.common.enums.AppTypeEnum;
|
||||
import cn.axzo.msg.center.domain.enums.MsgRouteTypeEnum;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MessageHistoryUpdatedPayload implements Serializable {
|
||||
|
||||
private MessageHistory newMessageHistory;
|
||||
private MessageHistory oldMessageHistory;
|
||||
|
||||
public boolean isSuccess() {
|
||||
return !Objects.equals(newMessageHistory.getStatus(), oldMessageHistory.getStatus())
|
||||
&& Objects.equals(newMessageHistory.getStatus(), MessageHistory.Status.SUCCEED.name());
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class MessageHistory implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 上游业务请求ID
|
||||
*/
|
||||
private String bizId;
|
||||
|
||||
/**
|
||||
* 普通用户,通过appType包装
|
||||
* 包装以后进行账户注册
|
||||
*/
|
||||
private String messageId;
|
||||
|
||||
/**
|
||||
* 发送者IM账户
|
||||
*/
|
||||
private String fromAccount;
|
||||
|
||||
/**
|
||||
* 发送者IM账户
|
||||
*/
|
||||
private String toAccount;
|
||||
|
||||
/**
|
||||
* 终端类型
|
||||
*
|
||||
* @see AppTypeEnum
|
||||
*/
|
||||
private String appType;
|
||||
|
||||
/**
|
||||
* channel 网易云信
|
||||
*/
|
||||
private String channel;
|
||||
|
||||
private String messageBody;
|
||||
|
||||
private String result;
|
||||
|
||||
private Long imMessageTaskId;
|
||||
|
||||
private String receivePersonId;
|
||||
|
||||
private Long receiveOuId;
|
||||
|
||||
private String status;
|
||||
|
||||
private Integer isDelete;
|
||||
|
||||
private Date createAt;
|
||||
|
||||
private Date updateAt;
|
||||
|
||||
public enum Status {
|
||||
PENDING,
|
||||
SUCCEED,
|
||||
FAILED,
|
||||
;
|
||||
}
|
||||
|
||||
public MessageBody resolveMessageBody() {
|
||||
return JSONObject.parseObject(messageBody, MessageBody.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class MessageBody {
|
||||
|
||||
/**
|
||||
* 模板消息:Template、聊天消息:Chat、通知消息:Notify
|
||||
*/
|
||||
private String msgType;
|
||||
|
||||
/**
|
||||
* 消息标题
|
||||
*/
|
||||
private String msgHeader;
|
||||
|
||||
/**
|
||||
* 消息内容
|
||||
*/
|
||||
private String msgContent;
|
||||
|
||||
/**
|
||||
* 消息通知结构体
|
||||
*/
|
||||
private String msgBody;
|
||||
|
||||
public MsgBody resolveMsgBody() {
|
||||
return JSONObject.parseObject(msgBody, MsgBody.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class MsgBody {
|
||||
private CardDetailButton cardDetailButton;
|
||||
}
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class CardDetailButton {
|
||||
private List<ActionPath> actionPaths;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class ActionPath {
|
||||
private MsgRouteTypeEnum platform;
|
||||
|
||||
private String url;
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,7 @@
|
||||
package cn.axzo.msg.center.message.controller;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.msg.center.event.outer.PushYouMengMessageHandler;
|
||||
import cn.axzo.msg.center.message.xxl.UpdateOuIdJob;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -14,9 +16,18 @@ public class PrivateJobController {
|
||||
|
||||
@Autowired
|
||||
private UpdateOuIdJob updateOuIdJob;
|
||||
@Autowired
|
||||
private PushYouMengMessageHandler pushYouMengMessageHandler;
|
||||
|
||||
@PostMapping("private/ou/update")
|
||||
public Object updateOuIdJob(@RequestBody @Valid UpdateOuIdJob.UpdateOuIdParam req) throws Exception {
|
||||
return updateOuIdJob.execute(JSONObject.toJSONString(req));
|
||||
}
|
||||
|
||||
@PostMapping("private/you-meng/push")
|
||||
public Object pushYouMeng(@RequestBody @Valid Event event) throws Exception {
|
||||
|
||||
pushYouMengMessageHandler.onEvent(event,null);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
|
||||
import cn.axzo.msg.center.service.enums.PushTerminalEnum;
|
||||
import cn.axzo.msg.center.utils.JSONObjectUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
@ -72,6 +73,8 @@ public class MessageTemplateDTO implements Serializable {
|
||||
*/
|
||||
private String minAppVersion;
|
||||
|
||||
private JSONObject pushData;
|
||||
|
||||
public static MessageTemplateDTO from(MessageBaseTemplate baseTemplate, MessageTemplateRouterDTO msgTemplateRouter) {
|
||||
// 业务详情展示策略
|
||||
return MessageTemplateDTO.builder()
|
||||
@ -85,6 +88,7 @@ public class MessageTemplateDTO implements Serializable {
|
||||
.msgTemplateRouter(msgTemplateRouter)
|
||||
.pushTerminals(JSON.parseArray(baseTemplate.getPushTerminal(), PushTerminalEnum.class))
|
||||
.minAppVersion(baseTemplate.getMinAppVersion())
|
||||
.pushData(baseTemplate.getPushData())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,87 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Getter
|
||||
public class MqMessageRecord<Message extends MqMessage> {
|
||||
|
||||
private MqMessageType messageType;
|
||||
private Message message;
|
||||
private String targetId;
|
||||
private String operatorId;
|
||||
private String operatorType;
|
||||
private String shardingKey;
|
||||
|
||||
private MqMessageRecord() {}
|
||||
|
||||
public static <Message extends MqMessage> MqMessageRecordBuilder<Message>
|
||||
builder(MqMessageType messageType, Message message) {
|
||||
return new MqMessageRecordBuilder<>(messageType, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
|
||||
public static class MqMessageRecordBuilder<Message extends MqMessage> {
|
||||
private final MqMessageType messageType;
|
||||
private final Message message;
|
||||
private String targetId;
|
||||
private String operatorId;
|
||||
private String operatorType;
|
||||
private String shardingKey;
|
||||
|
||||
private MqMessageRecordBuilder(MqMessageType messageType, Message message) {
|
||||
this.message = message;
|
||||
this.messageType = messageType;
|
||||
}
|
||||
|
||||
public MqMessageRecordBuilder<Message> shardingKey(String shardingKey) {
|
||||
this.shardingKey = shardingKey;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MqMessageRecordBuilder<Message> targetId(String targetId) {
|
||||
this.targetId = targetId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MqMessageRecordBuilder<Message> targetId(Long targetId) {
|
||||
this.targetId = String.valueOf(targetId);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MqMessageRecordBuilder<Message> operatorId(Long operatorId) {
|
||||
this.operatorId = String.valueOf(operatorId);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MqMessageRecordBuilder<Message> operatorId(String operatorId) {
|
||||
this.operatorId = operatorId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MqMessageRecordBuilder<Message> operatorType(String operatorType) {
|
||||
this.operatorType = operatorType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MqMessageRecord<Message> build() {
|
||||
MqMessageRecord<Message> record = new MqMessageRecord<>();
|
||||
record.messageType = messageType;
|
||||
record.message = message;
|
||||
record.targetId = targetId;
|
||||
record.operatorId = operatorId;
|
||||
record.operatorType = operatorType;
|
||||
record.shardingKey = shardingKey;
|
||||
return record;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public enum MqMessageType {
|
||||
|
||||
TODO_PRESET_BUTTON_PRESSED("msg-center-todo", "msg-center-todo-preset-button-pressed", "预设按钮被点击"),
|
||||
|
||||
;
|
||||
private final String model;
|
||||
private final String tag;
|
||||
private final String desc;
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.Event.EventCode;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class MqProducer {
|
||||
|
||||
private final EventProducer<MqMessage> producer;
|
||||
|
||||
public <Message extends MqMessage> void send(MqMessageRecord<Message> record) {
|
||||
BizAssertions.assertNotNull(record.getMessageType(), "消息类型不能为空");
|
||||
BizAssertions.assertNotNull(record.getMessage(), "消息体不能为空");
|
||||
log.info("开始 - 发送消息. messageRecord={}", record);
|
||||
try {
|
||||
sendImpl(record);
|
||||
} catch (Exception e) {
|
||||
log.warn("异常 - 发送消息. messageRecord={}", record, e);
|
||||
// 由调用方决定怎么处理异常
|
||||
throw e;
|
||||
} finally {
|
||||
log.info("结束 - 发送消息. messageRecord={}", record);
|
||||
}
|
||||
}
|
||||
|
||||
private <Message extends MqMessage> void sendImpl(MqMessageRecord<Message> record) {
|
||||
MqMessageType messageType = record.getMessageType();
|
||||
producer.send(Event.builder()
|
||||
.shardingKey(record.getShardingKey())
|
||||
.targetId(record.getTargetId())
|
||||
.targetType(messageType.getModel())
|
||||
.eventCode(new EventCode(messageType.getModel(), messageType.getTag()))
|
||||
.eventModule(messageType.getModel())
|
||||
.eventName(messageType.getTag())
|
||||
.operatorId(record.getOperatorId())
|
||||
.operatorType(record.getOperatorType())
|
||||
.data(record.getMessage())
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,87 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.BaseListener;
|
||||
import cn.axzo.framework.rocketmq.DefaultEventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventHandlerRepository;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.framework.rocketmq.EventProducer.Context;
|
||||
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
|
||||
import cn.axzo.framework.rocketmq.RocketMQEventProducer.RocketMQMessageMeta;
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class RocketMQConfig {
|
||||
|
||||
public static final String APP_NAME = "MSG-CENTER";
|
||||
|
||||
@Value("topic_msg_center_${spring.profiles.active}")
|
||||
private String topic;
|
||||
|
||||
@Bean
|
||||
EventProducer<MqMessage> eventProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
RocketMQMessageMeta topicInfo = RocketMQMessageMeta.builder()
|
||||
.topic(topic)
|
||||
.build();
|
||||
Context<RocketMQMessageMeta> context = Context.<RocketMQMessageMeta>builder()
|
||||
.meta(topicInfo)
|
||||
.build();
|
||||
//noinspection unchecked
|
||||
return new RocketMQEventProducer(rocketMQTemplate, APP_NAME, APP_NAME, context, null);
|
||||
}
|
||||
|
||||
@Bean
|
||||
EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
|
||||
Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
|
||||
if (eventWrapper.isHandled()) {
|
||||
// 只收集被App真正消费的消息.
|
||||
//String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
|
||||
|
||||
}
|
||||
};
|
||||
return new DefaultEventConsumer(APP_NAME, eventHandlerRepository, callback);
|
||||
}
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = "topic_im_center_${spring.profiles.active}",
|
||||
consumerGroup = "GID_topic_im_center_${spring.application.name}_${spring.profiles.active}",
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
nameServer = "${rocketmq.name-server}"
|
||||
)
|
||||
public static class ImCenterListener extends BaseListener implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Autowired
|
||||
private EventConsumer eventConsumer;
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt message) {
|
||||
super.onEvent(message, eventConsumer);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
EventHandlerRepository eventHandlerRepository() {
|
||||
return new EventHandlerRepository((ex, logText) -> {
|
||||
log.warn("MQ, handle warning {}", logText, ex);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
package cn.axzo.msg.center.api.mq;
|
||||
|
||||
import cn.hutool.core.lang.UUID;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
public abstract class MqMessage implements Serializable {
|
||||
|
||||
/**
|
||||
* 消息唯一id
|
||||
*/
|
||||
private String messageId = UUID.randomUUID().toString();
|
||||
|
||||
/**
|
||||
* 消息发送时间
|
||||
*/
|
||||
private Date messageSendTime = new Date();
|
||||
|
||||
/**
|
||||
* 消息发送时间, 字符串格式
|
||||
*/
|
||||
private String messageSendTimeStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(messageSendTime);
|
||||
|
||||
}
|
||||
@ -0,0 +1,185 @@
|
||||
package cn.axzo.msg.center.api.mq;
|
||||
|
||||
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
|
||||
import cn.axzo.msg.center.service.enums.BizFinalStateEnum;
|
||||
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
|
||||
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
|
||||
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class PresetButtonPressedMessage extends MqMessage implements Serializable {
|
||||
/**
|
||||
* 请求批次号
|
||||
*/
|
||||
private String btnPressedRequestNo;
|
||||
|
||||
/**
|
||||
* 消息的唯一标识
|
||||
*/
|
||||
private String identityCode;
|
||||
|
||||
/**
|
||||
* 源模板编码. 比如抄送, 保存审批流的待办模版code
|
||||
*/
|
||||
private String srcTemplateCode;
|
||||
|
||||
/**
|
||||
* 待办内容模板编码. 比如抄送, 保存审批流的待办模版code或抄送的待办模版code
|
||||
*/
|
||||
private String templateCode;
|
||||
|
||||
/**
|
||||
* 关联业务编码
|
||||
*/
|
||||
private String bizCode;
|
||||
|
||||
/**
|
||||
* 流程类待办的流程结点编码
|
||||
*/
|
||||
private String subBizCode;
|
||||
|
||||
/**
|
||||
* 待办状态
|
||||
*/
|
||||
private PendingMessageStateEnum state;
|
||||
|
||||
/**
|
||||
* 执行者的自然人ID
|
||||
*/
|
||||
private Long executorPersonId;
|
||||
|
||||
/**
|
||||
* 执行者ID
|
||||
*/
|
||||
private Long executorId;
|
||||
|
||||
/**
|
||||
* 执行者姓名
|
||||
*/
|
||||
private String executorName;
|
||||
|
||||
/**
|
||||
* 请求批次号
|
||||
*/
|
||||
private String requestNo;
|
||||
|
||||
/**
|
||||
* 执行者身份
|
||||
*/
|
||||
private IdentityTypeEnum executorType;
|
||||
|
||||
/**
|
||||
* 消息所属组织类型
|
||||
*/
|
||||
private OrganizationTypeEnum orgType;
|
||||
|
||||
/**
|
||||
* 业务描述eg:流程结点描述
|
||||
*/
|
||||
private String bizDesc;
|
||||
|
||||
/**
|
||||
* 业务标签
|
||||
*/
|
||||
private String bizFlag;
|
||||
|
||||
/**
|
||||
* 消息标题
|
||||
*/
|
||||
private String title;
|
||||
|
||||
/**
|
||||
* 消息内容
|
||||
*/
|
||||
private String content;
|
||||
|
||||
/**
|
||||
* 发起者ID
|
||||
*/
|
||||
private Long promoterId;
|
||||
|
||||
/**
|
||||
* 发起者的自然人ID
|
||||
*/
|
||||
private Long promoterPersonId;
|
||||
|
||||
/**
|
||||
* 发起者姓名
|
||||
*/
|
||||
private String promoterName;
|
||||
|
||||
/**
|
||||
* 发起者身份 WORKER:工人,WORKER_LEADER:班组长,PRACTITIONER:从业人员,REGULATOR:监管人员,OPERATOR:运营人员
|
||||
*/
|
||||
private IdentityTypeEnum promoterType;
|
||||
|
||||
/**
|
||||
* 业务状态
|
||||
*/
|
||||
private BizFinalStateEnum bizFinalState;
|
||||
|
||||
/**
|
||||
* 业务类型
|
||||
*/
|
||||
private BizCategoryEnum bizCategory;
|
||||
|
||||
/**
|
||||
* 业务扩展参数
|
||||
*/
|
||||
@TableField(typeHandler = FastjsonTypeHandler.class)
|
||||
private JSONObject bizExtParam;
|
||||
|
||||
/**
|
||||
* 路由参数
|
||||
*/
|
||||
@TableField(typeHandler = FastjsonTypeHandler.class)
|
||||
private JSONObject routerParams;
|
||||
|
||||
/**
|
||||
* 待办的截止时间
|
||||
*/
|
||||
private Date deadline;
|
||||
|
||||
/**
|
||||
* 执行人单位id
|
||||
*/
|
||||
private Long executorOuId;
|
||||
|
||||
/**
|
||||
* 执行人工作台id
|
||||
*/
|
||||
private Long executorWorkspaceId;
|
||||
|
||||
/**
|
||||
* 执行人工作台名称
|
||||
*/
|
||||
private String executorWorkspaceName;
|
||||
|
||||
/**
|
||||
* 发起人单位id
|
||||
*/
|
||||
private Long promoterOuId;
|
||||
|
||||
/**
|
||||
* 发起人工作台id
|
||||
*/
|
||||
private Long promoterWorkspaceId;
|
||||
|
||||
/**
|
||||
* 发起人工作台名称
|
||||
*/
|
||||
private String promoterWorkspaceName;
|
||||
|
||||
}
|
||||
@ -2,7 +2,10 @@
|
||||
package cn.axzo.msg.center.api.request;
|
||||
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
@ -12,6 +15,9 @@ import lombok.experimental.Accessors;
|
||||
*/
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Accessors(chain = true)
|
||||
public class MsgBody4Guest {
|
||||
/**
|
||||
|
||||
@ -8,7 +8,11 @@ import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
@ -22,7 +26,7 @@ import java.io.Serializable;
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
@TableName("message_base_template")
|
||||
@TableName(value = "message_base_template", autoResultMap = true)
|
||||
public class MessageBaseTemplate extends BaseEntityExt<MessageBaseTemplate> implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -880409106378455813L;
|
||||
@ -85,4 +89,33 @@ public class MessageBaseTemplate extends BaseEntityExt<MessageBaseTemplate> impl
|
||||
public String toString() {
|
||||
return JSON.toJSONString(this);
|
||||
}
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class PushData {
|
||||
|
||||
private boolean switchOn;
|
||||
|
||||
/**
|
||||
* 声音文件
|
||||
*/
|
||||
private String voiceFile;
|
||||
|
||||
/**
|
||||
* 提醒方式:voice(声音)、vibrate(震动)
|
||||
*/
|
||||
private String ability;
|
||||
|
||||
/**
|
||||
* push类型:system(系统消息)、op(运营消息)
|
||||
*/
|
||||
private String type;
|
||||
|
||||
/**
|
||||
* 声音类型:custom(自定义)、system(系统
|
||||
*/
|
||||
private String voiceType;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user