REQ-2318: 待办状态变更时发送MQ消息通知其它业务方

This commit is contained in:
yanglin 2024-06-04 10:52:37 +08:00
parent 69b797add6
commit 7706b15206
12 changed files with 370 additions and 215 deletions

View File

@ -17,6 +17,8 @@ import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaster;
import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoPullBroadcaster;
import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent;
import cn.axzo.msg.center.message.service.todo.manage.event.NewTodoEvent;
import cn.axzo.msg.center.mq.MqMessageRecord;
@ -79,7 +81,8 @@ public class TodoManager {
private final TodoDao todoDao;
private final TodoRecordBuilder todoRecordBuilder;
private final TodoLogger todoLogger;
private final PullTodoBroadcaster pullTodoBroadcaster;
private final TodoPullBroadcaster todoPullBroadcaster;
private final TodoMqBroadcaster todoMqBroadcaster;
private final MessageTemplateNewService messageTemplateNewService;
private final MqProducer mqProducer;
private final ErrorAssembler errorAssembler;
@ -147,7 +150,8 @@ public class TodoManager {
Set<Long> executorPersonIds = todos.stream()
.map(Todo::getExecutorPersonId)
.collect(Collectors.<Long>toSet());
pullTodoBroadcaster.fireTodoChanged(executorPersonIds);
todoPullBroadcaster.fireTodoChanged(executorPersonIds);
todoMqBroadcaster.fireTodoCreated("send", todos);
// 记录日志
// @formatter:off
ctx.addLogContent("templateTitle", template.getTitle())
@ -215,6 +219,8 @@ public class TodoManager {
destTodo.setExecutorName(request.determineToPersonName());
}
todoDao.saveBatch(destTodos);
todoMqBroadcaster.fireTodoUpdated("handover", srcTodos);
todoMqBroadcaster.fireTodoCreated("handover", destTodos);
// build handover mappings
List<TodoHandoverMapping> mappings = new ArrayList<>();
for (int i = 0; i < srcTodos.size(); i++) {
@ -481,6 +487,7 @@ public class TodoManager {
// 支持重复发mq消息
if (isAdvancedOrCompleted) {
sendMqMessageOnPresetButtonPressed(ctx, request, todo);
todoMqBroadcaster.fireTodoUpdated("presetButtonPressed", todo);
// 如果不是重复发送, 就只记一条日志. 如果是重复发送, 就单独记录一条日志
if (!advanceResult.isAdvanced())
todoLogger.logTodoUpdated(ctx, todo);
@ -496,26 +503,19 @@ public class TodoManager {
*/
private void sendMqMessageOnPresetButtonPressed(
TodoRequestContext ctx, PresetButtonPressedRequest request, Todo todo) {
TodoBusiness business = todoBusinessDao.getById(todo.getTodoBusinessId());
PresetButtonPressedMessage message = new PresetButtonPressedMessage();
BeanUtils.copyProperties(business, message);
BeanUtils.copyProperties(todo, message);
message.setBtnPressedRequestNo(ctx.getRequestNo());
message.setPresetButtonType(request.getPresetButtonType());
message.setPromoterOuId(business.getOuId());
message.setPromoterWorkspaceId(business.getOrgId());
message.setPromoterWorkspaceName(business.getOrgName());
message.setExecutorOuId(todo.getOuId());
message.setExecutorWorkspaceId(todo.getOrgId());
message.setExecutorWorkspaceName(todo.getOrgName());
String shardingKey = String.format("%s:%s", todo.getTemplateCode(), todo.getBizCode());
ctx.addLogContent("shardingKey", shardingKey);
TodoBusinesses businesses = todoBusinessDao.getBusinesses(todo);
message.setTodoInfo(todoMqBroadcaster.createTodoInfo(businesses, todo));
ctx.addLogContent("shardingKey", todo.getTemplateCode());
try {
mqProducer.send(MqMessageRecord
.builder(MqMessageType.TODO_PRESET_BUTTON_PRESSED, message)
.targetId(todo.getId())
.messageKey(todo.getId())
.operatorId(request.getOperatorId())
.shardingKey(shardingKey)
.shardingKey(todo.getTemplateCode())
.build());
ctx.addLogContent("sendMqMessage", ImmutableMap.of("isSuccess", "true"));
} catch (Exception e) {
@ -589,12 +589,20 @@ public class TodoManager {
Set<Long> executorPersonIds = advanceTodos.todos.stream()
.map(Todo::getExecutorPersonId)
.collect(toSet());
pullTodoBroadcaster.fireTodoChanged(executorPersonIds);
todoPullBroadcaster.fireTodoChanged(executorPersonIds);
}
if (updated)
if (updated) {
ctx.addLogContent("stateAdvanced", true);
else
Set<Long> updatedTodoIds = advanceTodos.todos.stream()
.map(BaseEntityExt::getId)
.collect(toSet());
if (!updatedTodoIds.isEmpty()) {
List<Todo> updatedTodos = todoDao.listByIds(updatedTodoIds);
todoMqBroadcaster.fireTodoUpdated(ctx.getName(), updatedTodos);
}
} else {
advanceFailLogger.get();
}
return new StateAdvanceResult(updated, advanceTodos.business, advanceTodos.todos);
}

View File

@ -0,0 +1,79 @@
package cn.axzo.msg.center.message.service.todo.manage.broadcast;
import cn.axzo.basics.common.BeanMapper;
import cn.axzo.msg.center.api.mq.TodoInfo;
import cn.axzo.msg.center.api.mq.TodoUpdateMessage;
import cn.axzo.msg.center.dal.TodoBusinessDao;
import cn.axzo.msg.center.dal.TodoBusinesses;
import cn.axzo.msg.center.domain.entity.Todo;
import cn.axzo.msg.center.domain.entity.TodoBusiness;
import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class TodoMqBroadcaster {
private final MqProducer mqProducer;
private final TodoBusinessDao todoBusinessDao;
public void fireTodoCreated(String operation, List<Todo> todos) {
fireTodoUpdated(operation, todos);
}
public void fireTodoUpdated(String operation, List<Todo> todos) {
TodoBusinesses businesses = todoBusinessDao.getBusinesses(todos);
for (Todo todo : todos)
fireTodoUpdated(operation, businesses, todo);
}
public void fireTodoUpdated(String operation, Todo todo) {
TodoBusinesses businesses = todoBusinessDao.getBusinesses(todo);
fireTodoUpdated(operation, businesses, todo);
}
private void fireTodoUpdated(String operation, TodoBusinesses businesses, Todo todo) {
TodoUpdateMessage message = new TodoUpdateMessage();
message.setOperation(operation);
message.setUpdatedTodo(createTodoInfo(businesses, todo));
mqProducer.send(MqMessageRecord
.builder(MqMessageType.TODO_STATE_UPDATE, message)
.messageKey(todo.getId())
.shardingKey(todo.getTemplateCode())
.tagInfo(todo.getTemplateCode())
.build());
}
public TodoInfo createTodoInfo(TodoBusinesses businesses, Todo todo) {
TodoInfo todoInfo = BeanMapper.copyBean(todo, TodoInfo.class);
todoInfo.setTodoId(todo.getId());
todoInfo.setTodoBusinessId(todo.getTodoBusinessId());
todoInfo.setExecutorOuId(todo.getOuId());
todoInfo.setExecutorWorkspaceId(todo.getOrgId());
todoInfo.setExecutorWorkspaceName(todo.getOrgName());
TodoBusiness business = businesses.findBusiness(todo).orElse(null);
if (business != null) {
todoInfo.setBizFinalState(business.getBizFinalState());
todoInfo.setBizFlag(business.getBizFlag());
todoInfo.setPromoterType(business.getPromoterType());
todoInfo.setPromoterPersonId(business.getPromoterPersonId());
todoInfo.setPromoterName(business.getPromoterName());
todoInfo.setPromoterId(business.getPromoterId());
todoInfo.setPromoterOuId(business.getOuId());
todoInfo.setPromoterWorkspaceId(business.getOrgId());
todoInfo.setPromoterWorkspaceName(business.getOrgName());
todoInfo.setBizCategory(business.getBizCategory());
todoInfo.setDeadline(business.getDeadline());
}
return todoInfo;
}
}

View File

@ -1,4 +1,4 @@
package cn.axzo.msg.center.message.service.todo.manage;
package cn.axzo.msg.center.message.service.todo.manage.broadcast;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
class PullTodoBroadcaster implements DisposableBean {
public class TodoPullBroadcaster implements DisposableBean {
private final MessageApi messageApi;
private final ExecutorService executor = new ThreadPoolExecutor(
@ -37,7 +37,7 @@ class PullTodoBroadcaster implements DisposableBean {
/**
* 可以在事务中调用, 无论事务是否提交, 让终端拉多一次消息也没有影响
*/
void fireTodoChanged(Collection<Long> personIds) {
public void fireTodoChanged(Collection<Long> personIds) {
try {
executor.execute(() -> personIds.forEach(this::fireTodoChangedImpl));
} catch (Exception e) {

View File

@ -40,9 +40,9 @@ public class YoumengTemplateClient {
private final MessageBaseTemplateMapper messageBaseTemplateMapper;
private final ExecutorService executor = new ThreadPoolExecutor(
15, 30,
30, 50,
10, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(3000),
new ArrayBlockingQueue<>(10000),
new NamedThreadFactory(getClass().getSimpleName()));
public void asyncSend(Long templateId, List<YoumengPush> pushes) {

View File

@ -12,10 +12,11 @@ public class MqMessageRecord<Message extends MqMessage> {
private MqMessageType messageType;
private Message message;
private String targetId;
private String operatorId;
private String operatorType;
private String shardingKey;
private String tagInfo;
private String messageKey;
private MqMessageRecord() {}
@ -32,10 +33,11 @@ public class MqMessageRecord<Message extends MqMessage> {
public static class MqMessageRecordBuilder<Message extends MqMessage> {
private final MqMessageType messageType;
private final Message message;
private String targetId;
private String operatorId;
private String operatorType;
private String shardingKey;
private String tagInfo;
private String messageKey;
private MqMessageRecordBuilder(MqMessageType messageType, Message message) {
this.message = message;
@ -47,16 +49,6 @@ public class MqMessageRecord<Message extends MqMessage> {
return this;
}
public MqMessageRecordBuilder<Message> targetId(String targetId) {
this.targetId = targetId;
return this;
}
public MqMessageRecordBuilder<Message> targetId(Long targetId) {
this.targetId = String.valueOf(targetId);
return this;
}
public MqMessageRecordBuilder<Message> operatorId(Long operatorId) {
this.operatorId = String.valueOf(operatorId);
return this;
@ -72,14 +64,25 @@ public class MqMessageRecord<Message extends MqMessage> {
return this;
}
public MqMessageRecordBuilder<Message> tagInfo(String tagInfo) {
this.tagInfo = tagInfo;
return this;
}
public MqMessageRecordBuilder<Message> messageKey(Object key) {
this.messageKey = String.valueOf(key);
return this;
}
public MqMessageRecord<Message> build() {
MqMessageRecord<Message> record = new MqMessageRecord<>();
record.messageType = messageType;
record.message = message;
record.targetId = targetId;
record.operatorId = operatorId;
record.operatorType = operatorType;
record.shardingKey = shardingKey;
record.tagInfo = tagInfo;
record.messageKey = messageKey;
return record;
}
}

View File

@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
/**
* @author yanglin
@ -12,17 +13,25 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum MqMessageType {
TODO_PRESET_BUTTON_PRESSED("msg-center-todo", "msg-center-todo-preset-button-pressed", "预设按钮被点击"),
OLD_MSG_SEND("msg-center-old-msg", "old-msg-send", "发送旧消息");
OLD_MSG_SEND("old-msg", "old-msg-send", "发送旧消息"),
TODO_PRESET_BUTTON_PRESSED("todo", "todo-preset-button-pressed", "预设按钮被点击"),
TODO_STATE_UPDATE("todo", "todo-state-update", "待办状态变更(创建)");
private final String model;
private final String tag;
private final String eventModel;
private final String eventName;
private final String desc;
public Event.EventCode getEventCode() {
return Event.EventCode.builder()
.module(model)
.name(tag)
.module(eventModel)
.name(eventName)
.build();
}
public String getTag(String tagInfo) {
if (StringUtils.isBlank(tagInfo))
return eventName;
return eventName + "-" + tagInfo;
}
}

View File

@ -36,11 +36,11 @@ public class MqProducer {
MqMessageType messageType = record.getMessageType();
producer.send(Event.builder()
.shardingKey(record.getShardingKey())
.targetId(record.getTargetId())
.targetType(messageType.getModel())
.targetId(record.getMessageKey())
.targetType(messageType.getEventModel())
.eventCode(messageType.getEventCode())
.eventModule(messageType.getModel())
.eventName(messageType.getTag())
.eventModule(messageType.getEventModel())
.eventName(messageType.getTag(record.getTagInfo()))
.operatorId(record.getOperatorId())
.operatorType(record.getOperatorType())
.data(record.getMessage())

View File

@ -1,18 +1,10 @@
package cn.axzo.msg.center.api.mq;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.BizFinalStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
import cn.axzo.msg.center.service.enums.PresetButtonType;
import cn.axzo.msg.center.service.enums.TodoType;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.util.Date;
/**
* @author yanglin
@ -30,164 +22,9 @@ public class PresetButtonPressedMessage extends MqMessage implements Serializabl
* REJECT: 待办驳回
*/
private PresetButtonType presetButtonType;
/**
* 消息的唯一标识
*/
private String identityCode;
/**
* 源模板编码. 比如抄送, 保存审批流的待办模版code
* 待办信息
*/
private String srcTemplateCode;
/**
* 待办内容模板编码. 比如抄送, 保存审批流的待办模版code或抄送的待办模版code
*/
private String templateCode;
/**
* 关联业务编码
*/
private String bizCode;
/**
* 流程类待办的流程结点编码
*/
private String subBizCode;
/**
* 待办类别. EXECUTABLE: 可执行的; COPIED_TO_ME: 抄送给我的
*/
private TodoType type;
/**
* 待办状态
*/
private PendingMessageStateEnum state;
/**
* 执行者的自然人ID
*/
private Long executorPersonId;
/**
* 执行者ID
*/
private Long executorId;
/**
* 执行者姓名
*/
private String executorName;
/**
* 请求批次号
*/
private String requestNo;
/**
* 执行者身份
*/
private IdentityTypeEnum executorType;
/**
* 消息所属组织类型
*/
private OrganizationTypeEnum orgType;
/**
* 业务描述eg:流程结点描述
*/
private String bizDesc;
/**
* 业务标签
*/
private String bizFlag;
/**
* 消息标题
*/
private String title;
/**
* 消息内容
*/
private String content;
/**
* 发起者ID
*/
private Long promoterId;
/**
* 发起者的自然人ID
*/
private Long promoterPersonId;
/**
* 发起者姓名
*/
private String promoterName;
/**
* 发起者身份 WORKER:工人,WORKER_LEADER:班组长,PRACTITIONER:从业人员,REGULATOR:监管人员,OPERATOR:运营人员
*/
private IdentityTypeEnum promoterType;
/**
* 业务状态
*/
private BizFinalStateEnum bizFinalState;
/**
* 业务类型
*/
private BizCategoryEnum bizCategory;
/**
* 业务扩展参数
*/
private JSONObject bizExtParam;
/**
* 路由参数
*/
private JSONObject routerParams;
/**
* 待办的截止时间
*/
private Date deadline;
/**
* 执行人单位id
*/
private Long executorOuId;
/**
* 执行人工作台id
*/
private Long executorWorkspaceId;
/**
* 执行人工作台名称
*/
private String executorWorkspaceName;
/**
* 发起人单位id
*/
private Long promoterOuId;
/**
* 发起人工作台id
*/
private Long promoterWorkspaceId;
/**
* 发起人工作台名称
*/
private String promoterWorkspaceName;
private TodoInfo todoInfo;
}

View File

@ -0,0 +1,186 @@
package cn.axzo.msg.center.api.mq;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.BizFinalStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
import cn.axzo.msg.center.service.enums.TodoType;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.util.Date;
/**
* @author yanglin
*/
@Data
public class TodoInfo {
/**
* 待办id
*/
private Long todoId;
/**
* 待办发起角度信息id
*/
private Long todoBusinessId;
/**
* 消息的唯一标识
*/
private String identityCode;
/**
* 源模板编码. 比如抄送, 保存审批流的待办模版code
*/
private String srcTemplateCode;
/**
* 待办内容模板编码. 比如抄送, 保存审批流的待办模版code或抄送的待办模版code
*/
private String templateCode;
/**
* 关联业务编码
*/
private String bizCode;
/**
* 流程类待办的流程结点编码
*/
private String subBizCode;
/**
* 待办类别. EXECUTABLE: 可执行的; COPIED_TO_ME: 抄送给我的
*/
private TodoType type;
/**
* 待办状态
*/
private PendingMessageStateEnum state;
/**
* 执行者的自然人ID
*/
private Long executorPersonId;
/**
* 执行者ID
*/
private Long executorId;
/**
* 执行者姓名
*/
private String executorName;
/**
* 请求批次号
*/
private String requestNo;
/**
* 执行者身份
*/
private IdentityTypeEnum executorType;
/**
* 消息所属组织类型
*/
private OrganizationTypeEnum orgType;
/**
* 业务描述eg:流程结点描述
*/
private String bizDesc;
/**
* 业务标签
*/
private String bizFlag;
/**
* 消息标题
*/
private String title;
/**
* 消息内容
*/
private String content;
/**
* 发起者ID
*/
private Long promoterId;
/**
* 发起者的自然人ID
*/
private Long promoterPersonId;
/**
* 发起者姓名
*/
private String promoterName;
/**
* 发起者身份 WORKER:工人,WORKER_LEADER:班组长,PRACTITIONER:从业人员,REGULATOR:监管人员,OPERATOR:运营人员
*/
private IdentityTypeEnum promoterType;
/**
* 业务状态
*/
private BizFinalStateEnum bizFinalState;
/**
* 业务类型
*/
private BizCategoryEnum bizCategory;
/**
* 业务扩展参数
*/
private JSONObject bizExtParam;
/**
* 路由参数
*/
private JSONObject routerParams;
/**
* 待办的截止时间
*/
private Date deadline;
/**
* 执行人单位id
*/
private Long executorOuId;
/**
* 执行人工作台id
*/
private Long executorWorkspaceId;
/**
* 执行人工作台名称
*/
private String executorWorkspaceName;
/**
* 发起人单位id
*/
private Long promoterOuId;
/**
* 发起人工作台id
*/
private Long promoterWorkspaceId;
/**
* 发起人工作台名称
*/
private String promoterWorkspaceName;
}

View File

@ -0,0 +1,20 @@
package cn.axzo.msg.center.api.mq;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author yanglin
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class TodoUpdateMessage extends MqMessage {
/**
* 什么操作导致这次变化
*/
private String operation;
/**
* 待办信息
*/
private TodoInfo updatedTodo;
}

View File

@ -18,13 +18,21 @@ import java.util.Objects;
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum PendingMessageStateEnum {
// !! 抄送相关状态
CREATED(0, "创建"),
UNSENT(1, "未发送"),
HAS_BEEN_SENT(2, "代办"),
COMPLETED(5, "已办"),
RETRACT(6, "已撤回"),
DELETED(7, "已删除"),
READ(8, "已读"),
// !! 正常待办相关状态
HAS_BEEN_SENT(2, "待处理"),
COMPLETED(5, "已处理"),
RETRACT(6, "已撤回"),
// !! 未使用, 不删除避免影响使用方
UNSENT(1, "未发送"),
DELETED(7, "已删除"),
;
private final Integer code;

View File

@ -11,6 +11,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -41,6 +42,10 @@ public class TodoBusinessDao extends ServiceImpl<TodoBusinessMapper, TodoBusines
.update();
}
public TodoBusinesses getBusinesses(Todo todo) {
return getBusinesses(Collections.singletonList(todo));
}
public TodoBusinesses getBusinesses(List<Todo> todos) {
if (CollectionUtils.isEmpty(todos))
return new TodoBusinesses(emptyList(), emptyList());