update(REQ-2516) - 调整 MQ 事件增加 processDefinitionKey 属性, 未测试

This commit is contained in:
wangli 2024-06-04 19:10:49 +08:00
parent 51435465f6
commit 38a099cb11
12 changed files with 71 additions and 26 deletions

View File

@ -31,6 +31,11 @@ public class MessagePushDTO implements Serializable {
*/
private String processInstanceId;
/**
* 业务 ID
*/
private String processDefinitionKey;
/**
* 流程任务 ID
*/

View File

@ -25,6 +25,8 @@ public interface MessagePushEvent extends FlowableEvent {
String getProcessDefinitionId();
String getProcessDefinitionKey();
String getCurrentTaskDefinitionKey();
String getTenantId();

View File

@ -26,21 +26,21 @@ import static cn.axzo.workflow.core.engine.event.MessagePushEventType.SMS;
public class MessagePushEventBuilder {
public static MessagePushEventImpl createEvent(MessagePushEventType type, List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConf, String processInstanceId,
BpmnNoticeConf noticeConf, String processInstanceId, String processDefinitionKey,
String tenantId, String taskId) {
switch (type) {
case NOTICE:
return createNoticeEvent(assigners, noticeConf, processInstanceId, tenantId, taskId);
return createNoticeEvent(assigners, noticeConf, processInstanceId, processDefinitionKey, tenantId, taskId);
case PENDING_PUSH:
throw new WorkflowEngineException(MESSAGE_PUSH_EVENT_BUILD_ERROR);
case PENDING_COMPLETE:
return createPendingCompleteEvent(assigners, noticeConf, processInstanceId, tenantId, taskId);
return createPendingCompleteEvent(assigners, noticeConf, processInstanceId, processDefinitionKey, tenantId, taskId);
case CARBON_COPY:
return createCarbonCopyEvent(assigners, noticeConf, processInstanceId, tenantId);
return createCarbonCopyEvent(assigners, noticeConf, processInstanceId, processDefinitionKey, tenantId);
case CARBON_COPY_COMPLETE:
return createCarbonCopyCompleteEvent(assigners, noticeConf, processInstanceId, tenantId);
return createCarbonCopyCompleteEvent(assigners, noticeConf, processInstanceId, processDefinitionKey, tenantId);
case SMS:
return createSmsEvent(assigners, noticeConf, processInstanceId, tenantId, taskId);
return createSmsEvent(assigners, noticeConf, processInstanceId, processDefinitionKey, tenantId, taskId);
default:
throw new WorkflowEngineException(MES_PUSH_OBJECT_BUILD_ERROR);
}
@ -48,9 +48,9 @@ public class MessagePushEventBuilder {
public static MessagePushEventImpl createNoticeEvent(List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConf
, String processInstanceId, String tenantId, String taskId) {
, String processInstanceId, String processDefinitionKey, String tenantId, String taskId) {
MessagePushEventImpl newEvent = new MessagePushEventImpl(NOTICE, assigners, noticeConf, processInstanceId,
tenantId, taskId);
processDefinitionKey, tenantId, taskId);
return newEvent;
}
@ -58,47 +58,55 @@ public class MessagePushEventBuilder {
BpmnNoticeConf noticeConf,
BpmnApproveConf processApproveConf,
String processInstanceId,
String processDefinitionId, String currentTaskDefinitionKey,
String processDefinitionId,
String processDefinitionKey,
String currentTaskDefinitionKey,
String tenantId, String taskId) {
MessagePushEventImpl newEvent = new MessagePushEventImpl(PENDING_PUSH, assigners, noticeConf, processApproveConf, processInstanceId,
processDefinitionId, currentTaskDefinitionKey, tenantId, taskId);
MessagePushEventImpl newEvent = new MessagePushEventImpl(PENDING_PUSH, assigners, noticeConf, processApproveConf,
processInstanceId, processDefinitionId, processDefinitionKey, currentTaskDefinitionKey, tenantId, taskId);
return newEvent;
}
public static MessagePushEventImpl createPendingCompleteEvent(List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConf, String processInstanceId,
String processDefinitionKey,
String tenantId, String taskId) {
MessagePushEventImpl newEvent = new MessagePushEventImpl(PENDING_COMPLETE, assigners, noticeConf,
processInstanceId,
processInstanceId, processDefinitionKey,
tenantId, taskId);
return newEvent;
}
public static MessagePushEvent createPendingRollbackEvent(String processInstanceId, String tenantId, String taskId, BpmnNoticeConf noticeConf) {
return new MessagePushEventImpl(PENDING_ROLLBACK, null, noticeConf, processInstanceId, tenantId, taskId);
public static MessagePushEvent createPendingRollbackEvent(String processInstanceId, String processDefinitionKey,
String tenantId, String taskId, BpmnNoticeConf noticeConf) {
return new MessagePushEventImpl(PENDING_ROLLBACK, null, noticeConf, processInstanceId, processDefinitionKey, tenantId, taskId);
}
public static MessagePushEventImpl createCarbonCopyEvent(List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConf, String processInstanceId,
String processDefinitionKey,
String tenantId) {
MessagePushEventImpl newEvent = new MessagePushEventImpl(CARBON_COPY, assigners, noticeConf,
processInstanceId, tenantId, null);
processInstanceId, processDefinitionKey, tenantId, null);
return newEvent;
}
public static MessagePushEventImpl createCarbonCopyCompleteEvent(List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConf, String processInstanceId,
String processDefinitionKey,
String tenantId) {
MessagePushEventImpl newEvent = new MessagePushEventImpl(CARBON_COPY_COMPLETE, assigners, noticeConf,
processInstanceId, tenantId, null);
processInstanceId, processDefinitionKey, tenantId, null);
return newEvent;
}
public static MessagePushEventImpl createSmsEvent(List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConf,
String processInstanceId, String tenantId, String taskId) {
String processInstanceId,
String processDefinitionKey,
String tenantId, String taskId) {
MessagePushEventImpl newEvent = new MessagePushEventImpl(SMS, assigners, noticeConf, processInstanceId,
tenantId, taskId);
processDefinitionKey, tenantId, taskId);
return newEvent;
}
}

View File

@ -25,6 +25,7 @@ public class MessagePushEventImpl implements MessagePushEvent {
private BpmnApproveConf processApproveConfig;
private String processInstanceId;
private String processDefinitionId;
private String processDefinitionKey;
private String currentTaskDefinitionKey;
private String tenantId;
private String taskId;
@ -48,11 +49,13 @@ public class MessagePushEventImpl implements MessagePushEvent {
* @param tenantId
* @param taskId
*/
public MessagePushEventImpl(FlowableEventType type, List<BpmnTaskDelegateAssigner> assigners, BpmnNoticeConf noticeConfig, String processInstanceId, String tenantId, String taskId) {
public MessagePushEventImpl(FlowableEventType type, List<BpmnTaskDelegateAssigner> assigners, BpmnNoticeConf noticeConfig,
String processInstanceId, String processDefinitionKey, String tenantId, String taskId) {
this.type = type;
this.assigners = assigners;
this.noticeConfig = noticeConfig;
this.processInstanceId = processInstanceId;
this.processDefinitionKey = processDefinitionKey;
this.tenantId = tenantId;
this.taskId = taskId;
}
@ -75,7 +78,7 @@ public class MessagePushEventImpl implements MessagePushEvent {
public MessagePushEventImpl(FlowableEventType type, List<BpmnTaskDelegateAssigner> assigners,
BpmnNoticeConf noticeConfig,
BpmnApproveConf processApproveConfig,
String processInstanceId, String processDefinitionId,
String processInstanceId, String processDefinitionId, String processDefinitionKey,
String currentTaskDefinitionKey, String tenantId, String taskId) {
this.type = type;
this.assigners = assigners;
@ -141,6 +144,15 @@ public class MessagePushEventImpl implements MessagePushEvent {
this.processDefinitionId = processDefinitionId;
}
@Override
public String getProcessDefinitionKey() {
return processDefinitionKey;
}
public void setProcessDefinitionKey(String processDefinitionKey) {
this.processDefinitionKey = processDefinitionKey;
}
@Override
public String getCurrentTaskDefinitionKey() {
return currentTaskDefinitionKey;

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT;
import static cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper.getCarbonCopyConfigs;
import static cn.axzo.workflow.core.listener.AbstractBpmnEventListener.parseProcessDefinitionKey;
/**
* 抄送功能的具体实现
@ -110,6 +111,7 @@ public class EngineCarbonCopyEventListener implements JavaDelegate {
MessagePushEventImpl event = MessagePushEventBuilder.createEvent(MessagePushEventType.CARBON_COPY,
carbonUsers, bpmnNoticeConf, execution.getProcessInstanceId(),
parseProcessDefinitionKey(execution.getProcessDefinitionId()),
execution.getTenantId(), getCarbonTaskId(execution));
eventDispatcher.dispatchEvent(event, processEngineConfiguration.getEngineCfgKey());
}

View File

@ -158,7 +158,8 @@ public class EngineProcessInstanceEventListener extends AbstractFlowableEngineEv
.execute(new CustomNoticeDestinationUserSelectorCmd(engineExecutionStartListener, historicTaskInstanceConverter,
serviceVersion, workspaceType, config.getNotice(), event.getProcessInstanceId(), assigner));
MessagePushEventImpl messagePushEvent = MessagePushEventBuilder.createEvent(MessagePushEventType.NOTICE,
assigners, config, processInstance.getProcessInstanceId(), processInstance.getTenantId(), null);
assigners, config, processInstance.getProcessInstanceId(), processInstance.getProcessDefinitionKey(),
processInstance.getTenantId(), null);
log.info("发送通知消息: {}", JSONUtil.toJsonStr(messagePushEvent));
eventDispatcher.dispatchEvent(messagePushEvent, processEngineConfiguration.getEngineCfgKey());
}

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.listener;
import cn.axzo.workflow.core.common.context.OperationContext;
import cn.hutool.json.JSONUtil;
import org.slf4j.MDC;
import org.springframework.util.StringUtils;
import static cn.azxo.framework.common.constatns.Constants.CTX_LOG_ID_MDC;
@ -33,4 +34,11 @@ public abstract class AbstractBpmnEventListener<T extends OperationContext> impl
protected String buildCacheKey(String apiUrl, Object request) {
return apiUrl + JSONUtil.toJsonStr(request);
}
public static String parseProcessDefinitionKey(String processDefinitionId) {
if (StringUtils.hasText(processDefinitionId)) {
return "";
}
return processDefinitionId.split(":")[0];
}
}

View File

@ -628,6 +628,7 @@ public class BpmnProcessTaskServiceImpl implements BpmnProcessTaskService {
MessagePushEventImpl event = MessagePushEventBuilder.createEvent(MessagePushEventType.valueOf(type),
Lists.newArrayList(assigner), noticeConfig.orElse(null),
processInstance.getProcessInstanceId(),
processInstance.getProcessDefinitionKey(),
processInstance.getTenantId(), task.getId());
event.setProcessInstanceId(processInstance.getProcessInstanceId());
event.setTenantId(processInstance.getTenantId());

View File

@ -31,6 +31,7 @@ import java.util.Optional;
import static cn.axzo.workflow.core.common.code.BpmnTaskRespCode.TASK_COMPLETE_FAIL_ASSIGN_NOT_SELF;
import static cn.axzo.workflow.core.common.code.BpmnTaskRespCode.TASK_COMPLETE_FAIL_NOT_EXISTS;
import static cn.axzo.workflow.core.common.code.BpmnTaskRespCode.TASK_HAS_BEEN_COMPLETE;
import static cn.axzo.workflow.core.listener.AbstractBpmnEventListener.parseProcessDefinitionKey;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER;
/**
@ -80,7 +81,9 @@ public class ApproveErrorReporterEventListener implements BpmnAsyncJobEventListe
Optional<BpmnNoticeConf> noticeConfig = BpmnMetaParserHelper.getNoticeConfig(process);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
FlowableEventDispatcher eventDispatcher = processEngineConfiguration.getEventDispatcher();
MessagePushEvent event = MessagePushEventBuilder.createPendingRollbackEvent(jobEvent.getProcessInstanceId(), jobInfo.getTenantId(), dto.getTaskId(), noticeConfig.orElse(new BpmnNoticeConf()));
MessagePushEvent event = MessagePushEventBuilder.createPendingRollbackEvent(jobEvent.getProcessInstanceId(),
parseProcessDefinitionKey(jobEvent.getProcessDefinitionId()),
jobInfo.getTenantId(), dto.getTaskId(), noticeConfig.orElse(new BpmnNoticeConf()));
log.info("发送恢复待办的流程事件消息: taskDto:{},event:{}", JSONUtil.toJsonStr(dto), JSONUtil.toJsonStr(event));
eventDispatcher.dispatchEvent(event, processEngineConfiguration.getEngineCfgKey());
}

View File

@ -376,6 +376,7 @@ public class RocketMqMessagePushEventListener extends AbstractBpmnEventListener<
Object> variables) {
return new MessagePushDTO()
.setProcessInstanceId(event.getProcessInstanceId())
.setProcessDefinitionKey(event.getProcessDefinitionKey())
.setType(type)
.setTemplateId(templateId)
.setTaskId(event.getTaskId())

View File

@ -76,7 +76,7 @@ public class MessagePushProcessEventListener extends AbstractBpmnEventListener<P
optNoticeConfig.ifPresent(noticeConfig -> {
MessagePushEventImpl messagePushEvent =
MessagePushEventBuilder.createEvent(MessagePushEventType.PENDING_COMPLETE, null, noticeConfig,
event.getProcessInstanceId(), null, null);
event.getProcessInstanceId(), parseProcessDefinitionKey(event.getProcessDefinitionId()), null, null);
log.info("发送完成实例下所有待办的消息: {}", JSONUtil.toJsonStr(messagePushEvent));
@ -87,7 +87,7 @@ public class MessagePushProcessEventListener extends AbstractBpmnEventListener<P
}
MessagePushEventImpl carbonCopyCompleteEvent =
MessagePushEventBuilder.createEvent(MessagePushEventType.CARBON_COPY_COMPLETE, null, noticeConfig,
event.getProcessInstanceId(), null, null);
event.getProcessInstanceId(), parseProcessDefinitionKey(event.getProcessDefinitionId()), null, null);
eventDispatcher.dispatchEvent(carbonCopyCompleteEvent, processEngineConfiguration.getEngineCfgKey());
log.info("发送完成抄送的消息: {}", JSONUtil.toJsonStr(carbonCopyCompleteEvent));
}

View File

@ -114,7 +114,8 @@ public class MessagePushTaskEvent_103_Listener extends AbstractBpmnEventListener
BpmnMetaParserHelper.getNodePendingConfig(userTask).ifPresent(noticeConfig::setPending);
MessagePushEventImpl event = MessagePushEventBuilder.createEvent(MessagePushEventType.PENDING_COMPLETE,
null, noticeConfig, delegateTask.getProcessInstanceId(), null, delegateTask.getId());
null, noticeConfig, delegateTask.getProcessInstanceId(),
parseProcessDefinitionKey(delegateTask.getProcessDefinitionId()), null, delegateTask.getId());
log.info("发送完成待办的消息: {}, processInstanceId:{}", JSONUtil.toJsonStr(event), delegateTask.getProcessInstanceId());
eventDispatcher.dispatchEvent(event, processEngineConfiguration.getEngineCfgKey());
});
@ -150,7 +151,8 @@ public class MessagePushTaskEvent_103_Listener extends AbstractBpmnEventListener
noticeConf,
processApproveConf.orElse(new BpmnApproveConf()),
processInstance.getProcessInstanceId(),
processInstance.getProcessDefinitionId(), userTask.getId(),
processInstance.getProcessDefinitionId(),
processInstance.getProcessDefinitionKey(), userTask.getId(),
processInstance.getTenantId(), delegateTask.getId());
log.info("发送推送待办的消息: {}, processInstanceId:{}", JSONUtil.toJsonStr(event), event.getProcessInstanceId());
eventDispatcher.dispatchEvent(event, processEngineConfiguration.getEngineCfgKey());