methodCache = new HashMap<>();
- public WorkflowEngineStarterRetryEventListener(EventConsumer eventConsumer, Environment environment) {
+ public WorkflowEngineStarterRetryEventListener(EventConsumer eventConsumer, Environment environment, WorkflowCoreService workflowCoreService) {
this.eventConsumer = eventConsumer;
+ this.environment = environment;
this.currentApplicationName = environment.getProperty("spring.application.name");
+ this.workflowCoreService = workflowCoreService;
+ parseWorkflowCoreService();
+ }
+
+ private void parseWorkflowCoreService() {
+ Class coreService = (Class) workflowCoreService.getClass().getGenericInterfaces()[0];
+ FeignClient feignClient = AnnotationUtils.findAnnotation(coreService, FeignClient.class);
+ if (Objects.isNull(feignClient)) {
+ throw new IllegalStateException("WorkflowCoreService 配置错误,没有找到 FeignClient 注解");
+ }
+ String parsedFeignContextUrl = resolveExpression(feignClient.url());
+
+ Method[] methods = coreService.getDeclaredMethods();
+ for (Method method : methods) {
+ methodCache.put(method.getName(), method);
+ }
+ }
+
+ private String resolveExpression(String expression) {
+ // 假设表达式格式是 ${property:defaultValue}
+ if (expression != null && expression.startsWith("${") && expression.endsWith("}")) {
+ String content = expression.substring(2, expression.length() - 1); // 去除 ${ 和 }
+ String[] parts = content.split(":", 2); // 分割属性和默认值
+ String property = parts[0];
+ String defaultValue = parts.length > 1 ? parts[1] : null;
+ // 尝试从环境中获取属性值,如果未找到则使用默认值
+ return environment.getProperty(property, defaultValue != null ? defaultValue : "");
+ }
+ // 如果表达式格式不匹配,直接返回原表达式或处理错误情况
+ return expression;
}
public void init() {
eventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
}
+ @SneakyThrows
@Override
public void onEvent(Event event, EventConsumer.Context context) {
log.info("WorkflowEngineClientRetryEventListener onEvent: {}", event.toPrettyJsonString());
+ WorkflowEngineStarterRpcInvokeDTO dto = event.normalizedData(WorkflowEngineStarterRpcInvokeDTO.class);
+
+ Method method = methodCache.getOrDefault(dto.getMethodName(), null);
+ if (Objects.isNull(method)) {
+ throw new IllegalStateException("找不到方法:" + dto.getMethodName());
+ }
+ try {
+ Object invoke = method.invoke(workflowCoreService, JSON.parseObject((String) dto.getBody(), BpmnProcessInstanceCreateDTO.class));
+ log.info("Event Invoke Result: {}", JSON.toJSONString(invoke));
+ } catch (Exception e) {
+ log.error("Event Invoke Exception: {}", e.getMessage());
+ }
}
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/WorkflowEngineServiceFacade.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/WorkflowEngineServiceFacade.java
deleted file mode 100644
index f5a2528c6..000000000
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/WorkflowEngineServiceFacade.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package cn.axzo.workflow.starter.service;
-
-import cn.axzo.framework.rocketmq.EventProducer;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAbortDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCancelDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateDTO;
-import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceVO;
-import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
-import cn.axzo.workflow.starter.common.exception.CreateProcessInstanceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * 流程引擎服务的 Starter 对使用方暴露的核心入口
- *
- * @author wangli
- * @since 2024/5/21 16:53
- */
-public class WorkflowEngineServiceFacade {
- private final Logger log = LoggerFactory.getLogger(WorkflowEngineServiceFacade.class);
- private final EventProducer workflowEngineClientEventProducer;
- private final WorkflowEngineStarterProperties workflowEngineStarterProperties;
-
- public WorkflowEngineServiceFacade(EventProducer workflowEngineClientEventProducer,
- WorkflowEngineStarterProperties workflowEngineStarterProperties) {
- this.workflowEngineClientEventProducer = workflowEngineClientEventProducer;
- this.workflowEngineStarterProperties = workflowEngineStarterProperties;
- }
-
- /**
- * 同步创建流程实例
- *
- * 同步调用方式实现,使用方需要主动关注接口是否调用成功
- *
- * @param dto 创建流程实例的参数
- * @return 返回流程实例 ID
- * @throws {@link CreateProcessInstanceException} 创建流程实例失败时抛出异常
- */
- public String createProcessInstanceSync(BpmnProcessInstanceCreateDTO dto) {
- return null;
- }
-
- /**
- * 异步创建流程实例
- *
- * 异步调用方式实现,内部使用 MQ 解耦,使用方如需要获取流程实例编号信息,则通过监听 MQ 消息即可获取
- */
- public void createProcessInstance(BpmnProcessInstanceCreateDTO dto) {
-// asynchronousService.createProcessInstance(dto);
- }
-
- public void cancelProcessInstance(BpmnProcessInstanceCancelDTO dto) {
- }
-
- public void abortProcessInstance(BpmnProcessInstanceAbortDTO dto) {
- }
-
- public void batchAbortProcessInstances(List dtos) {
- }
-
- public BpmnProcessInstanceVO getProcessInstanceSync(String processInstanceId) {
- return null;
- }
-
- public Map getProcessVariablesSync() {
- return null;
- }
-}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/impl/AsynchronousService.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/impl/AsynchronousService.java
deleted file mode 100644
index 07fafdcf7..000000000
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/impl/AsynchronousService.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package cn.axzo.workflow.starter.service.impl;
-
-import cn.axzo.workflow.starter.listener.ProcessListener;
-import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
-
-/**
- * 异步调用服务
- *
- * 该类应该被动态代理实现,在调用方法发送 MQ 事件前,判断接口是否被标记 {@code @Management} 注解,
- * 如果被标记且没有配置 {@code workflow.engine.client.manageable} 属性,则不允许调用,否则通过
- * {@link RpcInvokeEventProducer} 发送 MQ 事件后,再调用同步服务进行真实的 RPC 调用
- *
- * @author wangli
- * @since 2024/5/22 10:55
- */
-public interface AsynchronousService extends ProcessListener {
-
-}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/impl/SynchronousService.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/impl/SynchronousService.java
deleted file mode 100644
index c975ec4c3..000000000
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/service/impl/SynchronousService.java
+++ /dev/null
@@ -1,526 +0,0 @@
-package cn.axzo.workflow.starter.service.impl;
-
-import cn.axzo.workflow.client.feign.bpmn.ProcessActivityApi;
-import cn.axzo.workflow.client.feign.bpmn.ProcessDefinitionApi;
-import cn.axzo.workflow.client.feign.bpmn.ProcessInstanceApi;
-import cn.axzo.workflow.client.feign.bpmn.ProcessJobApi;
-import cn.axzo.workflow.client.feign.bpmn.ProcessModelApi;
-import cn.axzo.workflow.client.feign.bpmn.ProcessTaskApi;
-import cn.axzo.workflow.client.feign.bpmn.ProcessVariableApi;
-import cn.axzo.workflow.client.feign.manage.ProcessCategoryApi;
-import cn.axzo.workflow.client.feign.manage.ProcessConfigApi;
-import cn.axzo.workflow.common.enums.BpmnFlowNodeType;
-import cn.axzo.workflow.common.model.request.bpmn.BpmnButtonMetaInfo;
-import cn.axzo.workflow.common.model.request.bpmn.RestBpmnProcessVariable;
-import cn.axzo.workflow.common.model.request.bpmn.definition.BpmnProcessDefinitionUpdateDTO;
-import cn.axzo.workflow.common.model.request.bpmn.model.BpmnModelCreateDTO;
-import cn.axzo.workflow.common.model.request.bpmn.model.BpmnModelSearchDTO;
-import cn.axzo.workflow.common.model.request.bpmn.model.BpmnModelUpdateDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessDefinitionPageDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAbortDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAdminPageReqVO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCancelDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCarbonCopyDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCheckApproverDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateWithFormDTO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceMyPageReqVO;
-import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceQueryDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnRobotTaskCompleteDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnRobotTaskCreateDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAttachmentDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskCommentDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskCountersignDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskPageSearchDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskRemindDTO;
-import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskTransferDTO;
-import cn.axzo.workflow.common.model.request.category.CategoryConfigCreateDTO;
-import cn.axzo.workflow.common.model.request.category.CategoryConfigSearchDTO;
-import cn.axzo.workflow.common.model.request.category.CategoryCreateDTO;
-import cn.axzo.workflow.common.model.request.category.CategorySearchDTO;
-import cn.axzo.workflow.common.model.request.category.CategoryUpdateDTO;
-import cn.axzo.workflow.common.model.response.BpmPageResult;
-import cn.axzo.workflow.common.model.response.bpmn.BatchOperationResultVO;
-import cn.axzo.workflow.common.model.response.bpmn.model.BpmnModelDetailVO;
-import cn.axzo.workflow.common.model.response.bpmn.model.BpmnModelExtVO;
-import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessDefinitionVO;
-import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceAdminPageItemVO;
-import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstancePageItemVO;
-import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceVO;
-import cn.axzo.workflow.common.model.response.bpmn.process.ProcessNodeDetailVO;
-import cn.axzo.workflow.common.model.response.bpmn.task.BpmnHistoricTaskInstanceGroupVO;
-import cn.axzo.workflow.common.model.response.bpmn.task.BpmnHistoricTaskInstanceVO;
-import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskDonePageItemVO;
-import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskInstanceVO;
-import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskTodoPageItemVO;
-import cn.axzo.workflow.common.model.response.category.CategoryConfigItemVO;
-import cn.axzo.workflow.common.model.response.category.CategoryItemVO;
-import cn.azxo.framework.common.model.CommonResponse;
-import com.alibaba.fastjson.JSON;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
-import org.springframework.util.StopWatch;
-
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/**
- * 同步调用服务
- *
processActivityApi.trigger(triggerId, async), "触发任务失败", triggerId, async);
- }
-
-
- public Boolean setAssignee(BpmnActivitySetAssigneeDTO dto) {
- return null;
- }
-
-
- public BpmPageResult getProcessDefinitionPage(BpmnProcessDefinitionPageDTO dto) {
- return null;
- }
-
-
- public Boolean updateProcessDefinition(BpmnProcessDefinitionUpdateDTO dto) {
- return null;
- }
-
-
- public BpmnProcessDefinitionVO getProcessDefinition(String processDefinitionId) {
- return null;
- }
-
-
- public BpmnProcessDefinitionVO getProcessDefinitionByDeploymentId(String deploymentId) {
- return null;
- }
-
-
- public BpmnProcessDefinitionVO getActiveProcessDefinitionByKey(String key) {
- return null;
- }
-
-
- public Boolean getActiveProcessDefinitionByKey(String processDefinitionId, Integer state) {
- return null;
- }
-
-
- public String getActiveProcessDefinitionId(String tenantId, String category) {
- return null;
- }
-
-
- public BpmnModelUpdateDTO getActiveProcessDefinitionJsonModel(String modelId, String category, String tenantId) {
- return null;
- }
-
-
- public Void delete(String deploymentId, Boolean cascade) {
- return null;
- }
-
-
- public BpmPageResult getAllProcessInstancePage(BpmnProcessInstanceAdminPageReqVO dto) {
- return null;
- }
-
-
- public BpmPageResult getMyProcessInstancePage(BpmnProcessInstanceMyPageReqVO dto) {
- return null;
- }
-
-
- public String createProcessInstance(BpmnProcessInstanceCreateDTO dto) {
- return null;
- }
-
-
- public String createProcessInstanceWith(BpmnProcessInstanceCreateWithFormDTO dto) {
- return null;
- }
-
-
- public Boolean cancelProcessInstance(BpmnProcessInstanceCancelDTO dto) {
- return null;
- }
-
-
- public Boolean abortProcessInstance(BpmnProcessInstanceAbortDTO dto) {
- return null;
- }
-
-
- public BatchOperationResultVO batchAbortProcessInstance(List dtos) {
- return null;
- }
-
-
- public Boolean carbonCopyProcessInstance(BpmnProcessInstanceCarbonCopyDTO dto) {
- return null;
- }
-
-
- public BpmnProcessInstanceVO getProcessInstanceVO(BpmnProcessInstanceQueryDTO dto) {
- return null;
- }
-
-
- public Boolean updateProcessStatus(String processDefinitionId, Integer status) {
- return null;
- }
-
-
- public ObjectNode processInstanceGraphical(String processInstanceId, @Nullable String tenantId) {
- return null;
- }
-
-
- public List processInstanceNodeForecast(String processInstanceId, @Nullable String tenantId) {
- return null;
- }
-
-
- public List processInstanceFilterNodeForecast(String processInstanceId, @Nullable String tenantId, Boolean allNode, @Nullable List nodeDefinitionKeys) {
- return null;
- }
-
-
- public Map getProcessVariables(String processInstanceId, @Nullable String tenantId) {
- return null;
- }
-
-
- public List getTenantIds() {
- return null;
- }
-
-
- public Boolean checkInstanceApprover(BpmnProcessInstanceCheckApproverDTO dto) {
- return null;
- }
-
-
- public Void executeDeadLetterJobAction(String jobId, String procInstId) {
- return null;
- }
-
-
- public BpmPageResult page(BpmnModelSearchDTO dto) {
- return null;
- }
-
-
- public String create(BpmnModelCreateDTO dto) {
- return null;
- }
-
-
- public BpmnModelDetailVO getById(String processModelId, String tenantId) {
- return null;
- }
-
-
- public BpmnModelDetailVO getByKey(String processModelKey, String tenantId) {
- return null;
- }
-
-
- public BpmnModelExtVO getModelExt(String modelId) {
- return null;
- }
-
-
- public String update(BpmnModelUpdateDTO dto) {
- return null;
- }
-
-
- public String deployById(String processModelId, String modelTenantId, String operator) {
- return null;
- }
-
-
- public String deployByKey(String processModelKey, String modelTenantId, String operator) {
- return null;
- }
-
-
- public Void unDeployById(String processModelId, String tenantId, String operator) {
- return null;
- }
-
-
- public Void deleteById(String processModelId, String tenantId) {
- return null;
- }
-
-
- public Void deleteByKey(String processModelKey, String tenantId) {
- return null;
- }
-
-
- public Void changeStatus(String modelId, Integer status, String operator) {
- return null;
- }
-
-
- public List getModelCategoryList() {
- return null;
- }
-
-
- public BpmPageResult getTodoTaskPage(BpmnTaskPageSearchDTO dto) {
- return null;
- }
-
-
- public BpmPageResult getDoneTaskPage(BpmnTaskPageSearchDTO dto) {
- return null;
- }
-
-
- public List getTaskListFlatByProcessInstanceId(String processInstanceId, @Nullable String tenantId) {
- return null;
- }
-
-
- public List getTaskListGroupByProcessInstanceId(String processInstanceId, @Nullable String tenantId) {
- return null;
- }
-
-
- public List getActiveTasksByProcessInstanceId(String processInstanceId, String tenantId) {
- return null;
- }
-
-
- public Boolean approveTask(BpmnTaskAuditDTO dto) {
- return null;
- }
-
-
- public BatchOperationResultVO batchApproveTask(List dtos) {
- return null;
- }
-
-
- public Boolean rejectTask(BpmnTaskAuditDTO dto) {
- return null;
- }
-
-
- public BatchOperationResultVO batchRejectTask(List dtos) {
- return null;
- }
-
-
- public Boolean transferTask(BpmnTaskTransferDTO dto) {
- return null;
- }
-
-
- public BatchOperationResultVO batchTransferTask(List dtos) {
- return null;
- }
-
-
- public Boolean commentTask(BpmnTaskCommentDTO dto) {
- return null;
- }
-
-
- public Void addAttachment(BpmnTaskAttachmentDTO dto) {
- return null;
- }
-
-
- public Boolean countersignTask(BpmnTaskCountersignDTO dto) {
- return null;
- }
-
-
- public Boolean remindTask(BpmnTaskRemindDTO dto) {
- return null;
- }
-
-
- public String createRobotTask(BpmnRobotTaskCreateDTO dto) {
- return null;
- }
-
-
- public Boolean completeRobotTask(BpmnRobotTaskCompleteDTO dto) {
- return null;
- }
-
-
- public String findTaskIdByInstanceIdAndPersonId(String processInstanceId, String personId) {
- return null;
- }
-
-
- public Map findTaskIdByInstanceIdsAndPersonId(List processInstanceIds, String personId, BpmnFlowNodeType... filterTypes) {
- return null;
- }
-
-
- public Void createVariable(String executionId, RestBpmnProcessVariable restVariable) {
- return null;
- }
-
-
- public Void updateVariable(String executionId, RestBpmnProcessVariable restVariable) {
- return null;
- }
-
-
- public Void deleteVariables(String executionId, String variableNames, String scope) {
- return null;
- }
-
-
- public CategoryItemVO get(Long id) {
- return null;
- }
-
-
- public List getByIds(List ids) {
- return null;
- }
-
-
- public List getByValues(List values) {
- return null;
- }
-
-
- public CategoryItemVO create(CategoryCreateDTO req) {
- return null;
- }
-
-
- public CategoryItemVO update(CategoryUpdateDTO dto) {
- return null;
- }
-
-
- public Boolean delete(Long id) {
- return null;
- }
-
-
- public Boolean updateState(Long id, Boolean state) {
- return null;
- }
-
-
- public List list(CategorySearchDTO dto) {
- return null;
- }
-
-
- public BpmPageResult search(CategorySearchDTO dto) {
- return null;
- }
-
-
- public Boolean createConfig(CategoryConfigCreateDTO dto) {
- return null;
- }
-
-
- public Boolean deleteConfig(Long id) {
- return null;
- }
-
-
- public BpmPageResult configSearch(CategoryConfigSearchDTO dto) {
- return null;
- }
-
-
- public Boolean updateCategoryConfigType(Long id, String configType) {
- return null;
- }
-
-
- public Boolean checkCategoryStatus(Long tenantId, String categoryCode) {
- return null;
- }
-
-
- public List getDefaultButtons() {
- return null;
- }
-
- private T parseResult(Supplier> supplier, String operationType, Object... args) {
- return parseResult(supplier, operationType, null, args);
- }
-
- private T parseResult(Supplier> supplier, String operationType, Consumer> consumer, Object... args) {
- log.info("{}-Param: {}", operationType, JSON.toJSONString(args));
- CommonResponse response = printLatency(supplier, operationType);
- log.info("{}-Result: {}", operationType, JSON.toJSONString(response));
- if (!Objects.equals(HttpStatus.OK.value(), response.getCode())) {
- if (consumer != null) {
- consumer.accept(response);
- } else {
- throw new RuntimeException(response.getMsg());
- }
- }
- return response.getData();
- }
-
- public R printLatency(Supplier function, String optType) {
- StopWatch stopWatch = new StopWatch(optType);
- stopWatch.start(optType);
- R r = function.get();
- stopWatch.stop();
- log.info(stopWatch.shortSummary());
- return r;
- }
-}
From d07e580e7b0cacd2c9272240922d44fc0024f156 Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Thu, 30 May 2024 18:39:37 +0800
Subject: [PATCH 034/210] =?UTF-8?q?update(REQ-2516)=20-=20review=20?=
=?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=8A=9F=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../starter/WorkflowEngineStarterAutoConfiguration.java | 7 +++----
.../starter/mq/execute/interceptor/ExecutorInvoker.java | 2 +-
.../starter/mq/execute/interceptor/LogInterceptor.java | 2 +-
.../starter/mq/execute/interceptor/RetryInterceptor.java | 4 ++--
4 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
index 37165542d..9cbc0ccbc 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
@@ -8,7 +8,6 @@ import cn.axzo.workflow.starter.mq.execute.interceptor.ExecuteInterceptor;
import cn.axzo.workflow.starter.mq.execute.interceptor.ExecutorInvoker;
import cn.axzo.workflow.starter.mq.execute.interceptor.LogInterceptor;
import cn.axzo.workflow.starter.mq.execute.interceptor.RetryInterceptor;
-import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -39,9 +38,9 @@ public class WorkflowEngineStarterAutoConfiguration {
List interceptors = new ArrayList<>();
interceptors.add(new LogInterceptor());
interceptors.add(new RetryInterceptor());
- if (!CollectionUtils.isEmpty(additionalInterceptors)) {
- additionalInterceptors.forEach(interceptor -> interceptor.setNext(interceptor));
- }
+// if (!CollectionUtils.isEmpty(additionalInterceptors)) {
+// additionalInterceptors.forEach(interceptor -> interceptor.setNext(interceptor));
+// }
interceptors.add(new ExecutorInvoker());
return new ListenerExecutorImpl(initInterceptorChain(interceptors));
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/ExecutorInvoker.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/ExecutorInvoker.java
index fadcb9df5..5cd92acee 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/ExecutorInvoker.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/ExecutorInvoker.java
@@ -4,7 +4,7 @@ import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import java.util.function.Consumer;
-public class ExecutorInvoker extends AbstractListenerInterceptor {
+public final class ExecutorInvoker extends AbstractListenerInterceptor {
@Override
public void execute(ListenerExecutor executor, Consumer consumer, T t) {
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/LogInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/LogInterceptor.java
index 7961f0f73..4e4649870 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/LogInterceptor.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/LogInterceptor.java
@@ -7,7 +7,7 @@ import org.springframework.util.StopWatch;
import java.util.function.Consumer;
-public class LogInterceptor extends AbstractListenerInterceptor {
+public final class LogInterceptor extends AbstractListenerInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(LogInterceptor.class);
@Override
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java
index 9758b5aaf..65e4d1d54 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java
@@ -11,7 +11,7 @@ import java.util.function.Consumer;
@Getter
@Setter
-public class RetryInterceptor extends AbstractListenerInterceptor {
+public final class RetryInterceptor extends AbstractListenerInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(RetryInterceptor.class);
@@ -39,7 +39,7 @@ public class RetryInterceptor extends AbstractListenerInterceptor {
throw new ServiceException(numOfRetries + " retries failed with Exception. Giving up.");
}
- protected void waitBeforeRetry(long waitTime) {
+ private void waitBeforeRetry(long waitTime) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
From 953296194b0fb5e4c4f533ff995f44ab4f2a8c51 Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Thu, 30 May 2024 19:04:04 +0800
Subject: [PATCH 035/210] =?UTF-8?q?update(REQ-2516)=20-=20=E8=B0=83?=
=?UTF-8?q?=E6=95=B4=20RPC=20=E5=8A=A8=E4=BD=9C=20MQ=20=E4=BA=8B=E4=BB=B6?=
=?UTF-8?q?=E7=9B=91=E5=90=AC=E5=99=A8=E7=9A=84=20Bean=20=E6=B3=A8?=
=?UTF-8?q?=E5=86=8C=E6=96=B9=E5=BC=8F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../workflow/starter/RocketMQConfiguration.java | 2 +-
.../WorkflowEngineStarterRetryEventListener.java | 13 +++++++------
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
index 2ad4b1d6a..197b42e5d 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
@@ -137,7 +137,7 @@ public class RocketMQConfiguration {
}
}
- @Bean(initMethod = "init")
+ @Bean
public WorkflowEngineStarterRetryEventListener workflowEngineClientRetryEventListener(EventConsumer eventConsumer,
Environment environment,
WorkflowCoreService workflowCoreService) {
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
index 4b09f8a6c..695cf6564 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
@@ -11,6 +11,7 @@ import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
@@ -26,7 +27,7 @@ import java.util.Objects;
* @author wangli
* @since 2024/5/21 16:28
*/
-public class WorkflowEngineStarterRetryEventListener implements EventHandler {
+public class WorkflowEngineStarterRetryEventListener implements EventHandler, InitializingBean {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterRetryEventListener.class);
private final EventConsumer eventConsumer;
private final Environment environment;
@@ -70,10 +71,6 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler {
return expression;
}
- public void init() {
- eventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
- }
-
@SneakyThrows
@Override
public void onEvent(Event event, EventConsumer.Context context) {
@@ -82,7 +79,7 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler {
Method method = methodCache.getOrDefault(dto.getMethodName(), null);
if (Objects.isNull(method)) {
- throw new IllegalStateException("找不到方法:" + dto.getMethodName());
+ throw new IllegalStateException("Not found method:" + dto.getMethodName());
}
try {
Object invoke = method.invoke(workflowCoreService, JSON.parseObject((String) dto.getBody(), BpmnProcessInstanceCreateDTO.class));
@@ -92,4 +89,8 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler {
}
}
+ @Override
+ public void afterPropertiesSet() {
+ eventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
+ }
}
From c78897392c5c0df62383e652c62c0ee1976f2771 Mon Sep 17 00:00:00 2001
From: wangli
Date: Thu, 30 May 2024 23:42:27 +0800
Subject: [PATCH 036/210] =?UTF-8?q?update(REQ-2516)=20-=20=E5=AE=8C?=
=?UTF-8?q?=E5=96=84=20MQ=20=E6=B6=88=E8=B4=B9=E8=80=85=E5=9C=A8=E6=9C=AC?=
=?UTF-8?q?=E5=9C=B0=E4=B8=8E=E6=9C=8D=E5=8A=A1=E5=99=A8=E4=B9=8B=E9=97=B4?=
=?UTF-8?q?=E5=90=AF=E5=8A=A8=E8=83=BD=E4=BA=92=E4=B8=8D=E5=BD=B1=E5=93=8D?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../starter/RocketMQConfiguration.java | 4 ++
.../NonContainerEnvironmentCondition.java | 67 +++++++++++++++++++
2 files changed, 71 insertions(+)
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
index 197b42e5d..ca0e6c612 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
@@ -7,6 +7,7 @@ import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
+import cn.axzo.workflow.starter.common.condition.NonContainerEnvironmentCondition;
import cn.axzo.workflow.starter.mq.retry.consumer.WorkflowEngineStarterRetryEventListener;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import org.apache.rocketmq.common.message.MessageExt;
@@ -19,6 +20,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@@ -106,6 +108,7 @@ public class RocketMQConfiguration {
}
@Component
+ @Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${MY_POD_NAMESPACE:debugging}_consumer",
consumeMode = ConsumeMode.ORDERLY,
@@ -122,6 +125,7 @@ public class RocketMQConfiguration {
}
@Component
+ @Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${MY_POD_NAMESPACE:debugging}_consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java
new file mode 100644
index 000000000..622fb40ae
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java
@@ -0,0 +1,67 @@
+package cn.axzo.workflow.starter.common.condition;
+
+import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+import org.springframework.util.StringUtils;
+
+import java.util.Objects;
+
+/**
+ * 用于处理 MQ 的消费者, 在本地启动或在容器中启动时, 能自主控制是否并入统一的消费组
+ *
+ * 可查看 {@link WorkflowEngineStarterProperties#localConsumer} 属性, 来了解本来的用途,
+ * 特别需要注意的是: Starter 是结合 K8S 的 命名空间(namespace) 来处理的. 如果公司在删减/变更
+ * 环境后,可能会导致 {@link NonContainerEnvironmentCondition#mappingNamespace(String)}
+ * 方法异常.
+ *
+ * @author wangli
+ * @since 2024/5/30 22:19
+ */
+public class NonContainerEnvironmentCondition implements Condition {
+
+ private static final Logger log = LoggerFactory.getLogger(NonContainerEnvironmentCondition.class);
+
+ private static final String MY_POD_NAMESPACE = "MY_POD_NAMESPACE";
+
+ @Override
+ public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+ ConfigurableEnvironment environment = (ConfigurableEnvironment) context.getEnvironment();
+ // 优先外部化配置
+ Boolean localCustomer = environment.getProperty("workflow.engine.starter.local-consumer", Boolean.class);
+ if (Objects.isNull(localCustomer)) {
+ // 获取默认值
+ localCustomer = new WorkflowEngineStarterProperties().getLocalConsumer();
+ }
+ log.info("workflow engine starter local consumer status: {}", localCustomer);
+
+ String myPodNamespace = environment.getProperty(MY_POD_NAMESPACE);
+ if (localCustomer && !StringUtils.hasText(myPodNamespace)) {
+ environment.getSystemProperties().put(MY_POD_NAMESPACE, mappingNamespace(environment.getProperty("spring.profiles.active")));
+ }
+
+ return true;
+ }
+
+ private String mappingNamespace(String namespace) {
+ switch (namespace) {
+ case "dev":
+ case "local":
+ return "java-dev";
+ case "test":
+ return "test";
+ case "pre":
+ return "pre";
+ case "live":
+ return "live";
+ case "master":
+ return "pro";
+ default:
+ throw new IllegalArgumentException(String.format("namespace %s is not supported", namespace));
+ }
+ }
+}
From 44d31536c36d5618eac0d72030a4adfdc5645899 Mon Sep 17 00:00:00 2001
From: wangli
Date: Thu, 30 May 2024 23:42:45 +0800
Subject: [PATCH 037/210] =?UTF-8?q?update(REQ-2516)=20-=20=E5=AE=8C?=
=?UTF-8?q?=E5=96=84=E4=BB=A3=E7=A0=81=E6=B3=A8=E9=87=8A?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../workflow/starter/WorkflowEngineStarterProperties.java | 8 +++++++-
.../feign/ext/WorkflowEngineStarterInvocationHandler.java | 4 ++--
.../WorkflowEngineStarterInvocationHandlerFactory.java | 2 +-
3 files changed, 10 insertions(+), 4 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
index 099bd18b8..f6ed6d5d5 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
@@ -20,11 +20,17 @@ public class WorkflowEngineStarterProperties {
/**
* 本地启动是否消费容器的广播事件
+ *
+ * 默认 false, 本地启动应用时, 将创建消息组名称中含有"debugging"的消费组.
+ * 否则, 本地启动应用时, 将与容器环境中所有实例共同消费广播事件.
*/
private Boolean localConsumer = false;
/**
- * 方法调用默认采用的模式
+ * WorkflowCoreService 类中所有方法调用时默认采用的模式
+ *
+ * 如果是同步调用,则直接通过普通 FeignClient 进行调用,
+ * 否则将通过 MQ 将 RPC 调用进行解耦
*/
private RpcInvokeModeEnum invokeMode = SYNC;
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandler.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandler.java
index 583b5b96c..b8bc5a5c5 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandler.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandler.java
@@ -9,12 +9,12 @@ import java.lang.reflect.Proxy;
import java.util.Map;
/**
- * TODO
+ * Workflow Engine Starter Core Service Invocation Handler
*
* @author wangli
* @since 2024/5/29 22:58
*/
-public class WorkflowEngineStarterInvocationHandler implements InvocationHandler {
+class WorkflowEngineStarterInvocationHandler implements InvocationHandler {
private final Target target;
private final Map dispatch;
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandlerFactory.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandlerFactory.java
index 028e6d66f..26a133e30 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandlerFactory.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterInvocationHandlerFactory.java
@@ -8,7 +8,7 @@ import java.lang.reflect.Method;
import java.util.Map;
/**
- * TODO
+ * 用于生成 InvocationHandler
*
* @author wangli
* @since 2024/5/29 22:56
From b333344ab73ac4f9ac74530d0bc424624b713e0e Mon Sep 17 00:00:00 2001
From: wangli
Date: Thu, 30 May 2024 23:43:11 +0800
Subject: [PATCH 038/210] =?UTF-8?q?update(REQ-2516)=20-=20=E4=B8=B4?=
=?UTF-8?q?=E6=97=B6=E6=B3=A8=E9=87=8A,=20=E9=81=BF=E5=85=8D=E5=90=AF?=
=?UTF-8?q?=E5=8A=A8=E5=BC=82=E5=B8=B8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../mq/broadcast/consumer/InnerActivityEventListener.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
index fc3be696d..2983c2859 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
@@ -11,7 +11,6 @@ import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@@ -21,7 +20,7 @@ import java.util.function.Consumer;
* @author wangli
* @since 2024/5/21 15:51
*/
-@Component
+//@Component
public class InnerActivityEventListener implements WorkflowListener {
private final Logger log = LoggerFactory.getLogger(InnerActivityEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
From bfe7819565bebb3a812bd8416dc5058b5458ca42 Mon Sep 17 00:00:00 2001
From: wangli
Date: Thu, 30 May 2024 23:45:43 +0800
Subject: [PATCH 039/210] =?UTF-8?q?update(REQ-2516)=20-=20=E5=AE=8C?=
=?UTF-8?q?=E5=96=84=E6=B3=A8=E9=87=8A?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../workflow/starter/WorkflowEngineStarterProperties.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
index f6ed6d5d5..b14bf5989 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
@@ -34,6 +34,10 @@ public class WorkflowEngineStarterProperties {
*/
private RpcInvokeModeEnum invokeMode = SYNC;
+ /**
+ * 该属性还不确定能否实现
+ */
+ @Deprecated
private Boolean autoAck = false;
public Boolean getManageable() {
From 96e277c85e6341e042724d9ea41beace81c0fa0a Mon Sep 17 00:00:00 2001
From: yangqicheng
Date: Fri, 31 May 2024 10:23:29 +0800
Subject: [PATCH 040/210] =?UTF-8?q?update=20-=20REQ-2516-=E6=B5=81?=
=?UTF-8?q?=E7=A8=8B=E6=8E=A5=E5=8F=A3=E7=BB=A7=E6=89=BForder=E6=8E=A5?=
=?UTF-8?q?=E5=8F=A3=E5=A2=9E=E5=8A=A0=E6=8E=92=E5=BA=8F=EF=BC=8C=E8=B0=83?=
=?UTF-8?q?=E6=95=B4Listener=E7=94=9F=E6=88=90bean=E6=96=B9=E5=BC=8F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../starter/RocketMQConfiguration.java | 13 +++++++++--
.../listener/MessageNotificationListener.java | 3 ++-
.../listener/ProcessActivityListener.java | 15 +++++--------
.../listener/ProcessInstanceListener.java | 4 +++-
.../starter/listener/ProcessListener.java | 4 +++-
.../starter/listener/ProcessTaskListener.java | 4 +++-
.../consumer/InnerActivityEventListener.java | 4 +++-
.../consumer/InnerNoticeEventListener.java | 5 +++++
.../WorkflowEngineBroadcastEventListener.java | 22 +++++++------------
9 files changed, 44 insertions(+), 30 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
index 2ad4b1d6a..1d6430fbd 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
@@ -7,6 +7,8 @@ import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
+import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowEngineBroadcastEventListener;
+import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowListener;
import cn.axzo.workflow.starter.mq.retry.consumer.WorkflowEngineStarterRetryEventListener;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import org.apache.rocketmq.common.message.MessageExt;
@@ -16,6 +18,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
@@ -25,6 +28,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
+import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
@@ -111,7 +115,7 @@ public class RocketMQConfiguration {
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}"
)
- public class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener {
+ public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener {
@Resource
private EventConsumer eventConsumer;
@@ -127,7 +131,7 @@ public class RocketMQConfiguration {
consumeMode = ConsumeMode.CONCURRENTLY,
nameServer = "${rocketmq.name-server}"
)
- public class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener {
+ public static class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener {
@Resource
private EventConsumer eventConsumer;
@@ -144,4 +148,9 @@ public class RocketMQConfiguration {
return new WorkflowEngineStarterRetryEventListener(eventConsumer, environment, workflowCoreService);
}
+ @Bean(initMethod = "init")
+ public WorkflowEngineBroadcastEventListener WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, ObjectProvider> listeners) {
+ return new WorkflowEngineBroadcastEventListener(eventConsumer, listeners.getIfAvailable());
+ }
+
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java
index d3ea96dfc..23a92d09d 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java
@@ -1,6 +1,7 @@
package cn.axzo.workflow.starter.listener;
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
+import org.springframework.core.Ordered;
/**
* TODO
@@ -8,7 +9,7 @@ import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
* @author wangli
* @since 2024/5/27 16:25
*/
-public interface MessageNotificationListener {
+public interface MessageNotificationListener extends Ordered {
void pushNotice(MessagePushDTO messagePushDTO);
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessActivityListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessActivityListener.java
index 78d55f7d9..f4c9f77a3 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessActivityListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessActivityListener.java
@@ -1,42 +1,39 @@
package cn.axzo.workflow.starter.listener;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
+import org.springframework.core.Ordered;
/**
* @author wangli
* @since 2024/5/27 16:25
*/
-public interface ProcessActivityListener {
+public interface ProcessActivityListener extends Ordered {
/**
* 节点已启动
*
* @param activityDTO 入参
*/
- default void onStart(ProcessActivityDTO activityDTO) {
- }
+ void onStart(ProcessActivityDTO activityDTO);
/**
* 节点等待业务指定审批人
*
* @param activityDTO 入参
*/
- default void onWaitAssignee(ProcessActivityDTO activityDTO) {
- }
+ void onWaitAssignee(ProcessActivityDTO activityDTO);
/**
* 节点已完成
*
* @param activityDTO 入参
*/
- default void onTake(ProcessActivityDTO activityDTO) {
- }
+ void onTake(ProcessActivityDTO activityDTO);
/**
* 节点已取消
*
* @param activityDTO 入参
*/
- default void onEnd(ProcessActivityDTO activityDTO) {
- }
+ void onEnd(ProcessActivityDTO activityDTO);
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java
index 4643d4e0a..c4f5112bd 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java
@@ -1,12 +1,14 @@
package cn.axzo.workflow.starter.listener;
+import org.springframework.core.Ordered;
+
/**
* TODO
*
* @author wangli
* @since 2024/5/27 16:20
*/
-public interface ProcessInstanceListener extends ProcessListener {
+public interface ProcessInstanceListener extends Ordered {
void created();
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessListener.java
index 71ddc7706..92aaa9c83 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessListener.java
@@ -1,12 +1,14 @@
package cn.axzo.workflow.starter.listener;
+import org.springframework.core.Ordered;
+
/**
* TODO
*
* @author wangli
* @since 2024/5/27 16:26
*/
-public interface ProcessListener {
+public interface ProcessListener extends Ordered {
/**
* 入参来源于配置
*
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java
index 54754c930..f3d956326 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java
@@ -1,10 +1,12 @@
package cn.axzo.workflow.starter.listener;
+import org.springframework.core.Ordered;
+
/**
* TODO
*
* @author wangli
* @since 2024/5/27 16:21
*/
-public interface ProcessTaskListener {
+public interface ProcessTaskListener extends Ordered {
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
index fc3be696d..7973fd8e7 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
@@ -10,6 +10,7 @@ import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -32,13 +33,14 @@ public class InnerActivityEventListener implements WorkflowListener {
);
@Autowired
- private List activityListeners;
+ private ObjectProvider> activityListenersProvider;
@Autowired
private ListenerExecutor listenerExecutor;
@Override
public void handEvent(Event event, EventConsumer.Context context) {
+ List activityListeners = activityListenersProvider.getIfAvailable();
if (!CollectionUtils.isEmpty(activityListeners)) {
ProcessActivityDTO activityDTO = JSON.parseObject(event.getData().toString(), ProcessActivityDTO.class);
ProcessActivityEventEnum type = activityDTO.getType();
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java
index 1508fed01..76233d96a 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java
@@ -30,6 +30,11 @@ public class InnerNoticeEventListener {
public static void main(String[] args) {
MessageNotificationListener listener = new MessageNotificationListener() {
+ @Override
+ public int getOrder() {
+ return 0;
+ }
+
@Override
public void pushNotice(MessagePushDTO messagePushDTO) {
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
index 8c5741f5e..feecf0ba1 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
@@ -7,24 +7,26 @@ import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
/**
- * TODO
- *
* @author wangli
* @since 2024/5/21 15:50
*/
-@Component
-public class WorkflowEngineBroadcastEventListener implements EventHandler, InitializingBean {
+public class WorkflowEngineBroadcastEventListener implements EventHandler {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineBroadcastEventListener.class);
private final EventConsumer eventConsumer;
private final List workflowListeners;
+ public void init() {
+ eventConsumer.registerHandlers(InnerActivityEventListener.SUPPORTED_EVENT_CODES, this);
+ eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES, this);
+ eventConsumer.registerHandlers(InnerNoticeEventListener.SUPPORTED_EVENT_CODES, this);
+ eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES, this);
+ }
+
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, List workflowListeners) {
this.eventConsumer = eventConsumer;
this.workflowListeners = workflowListeners;
@@ -43,12 +45,4 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
}
}
}
-
- @Override
- public void afterPropertiesSet() throws Exception {
- eventConsumer.registerHandlers(InnerActivityEventListener.SUPPORTED_EVENT_CODES, this);
- eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES, this);
- eventConsumer.registerHandlers(InnerNoticeEventListener.SUPPORTED_EVENT_CODES, this);
- eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES, this);
- }
}
From a347cc45fe15c23a1cb1a0014c031ad9f10fab4b Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Fri, 31 May 2024 14:25:36 +0800
Subject: [PATCH 041/210] =?UTF-8?q?update(REQ-2516)=20-=20=E5=AE=8C?=
=?UTF-8?q?=E6=95=B4=E6=B5=8B=E8=AF=95=E6=95=B4=E4=B8=AA=20RPC=20=E5=90=8C?=
=?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=A8=A1=E5=BC=8F=EF=BC=8C=E4=B8=94=E6=94=AF?=
=?UTF-8?q?=E6=8C=81=E9=80=9A=E8=BF=87=20Properties=20=E9=85=8D=E7=BD=AE?=
=?UTF-8?q?=E8=B0=83=E6=95=B4=E9=BB=98=E8=AE=A4=E5=90=8C=E5=BC=82=E6=AD=A5?=
=?UTF-8?q?=E6=A8=A1=E5=BC=8F=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../common/enums/WorkflowEngineEventEnum.java | 2 +-
.../server/controller/web/TestController.java | 13 ++-
...orkflowEngineStarterAutoConfiguration.java | 2 +-
.../WorkflowEngineStarterProperties.java | 4 +-
...owEngineStarterRocketMQConfiguration.java} | 97 ++++++++++++-------
.../starter/common/util/ThreadUtil.java | 4 +
.../feign/ext/ComplexInvokeClient.java | 69 +++++++++++--
...rkflowEngineStarterRetryEventListener.java | 13 ++-
8 files changed, 151 insertions(+), 53 deletions(-)
rename workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/{RocketMQConfiguration.java => WorkflowEngineStarterRocketMQConfiguration.java} (69%)
diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/WorkflowEngineEventEnum.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/WorkflowEngineEventEnum.java
index a74dcd261..b03fb9847 100644
--- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/WorkflowEngineEventEnum.java
+++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/WorkflowEngineEventEnum.java
@@ -39,7 +39,7 @@ public enum WorkflowEngineEventEnum {
}
public String getTag() {
- return tag;
+ return eventCode.getName();
}
public String getDesc() {
diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java
index a6cc9eafc..96dc07b76 100644
--- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java
+++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java
@@ -131,13 +131,20 @@ public class TestController {
}
@GetMapping("/test5")
- public CommonResponse test5() {
+ public CommonResponse test5(@RequestParam(required = false) Boolean sync) {
BpmnProcessInstanceCreateDTO dto = new BpmnProcessInstanceCreateDTO();
dto.setProcessDefinitionKey("1");
dto.setCooperationOrg(new CooperationOrgDTO());
dto.setBusinessKey("businessKey");
dto.setInitiator(new BpmnTaskDelegateAssigner());
- workflowCoreService.async().createProcessInstance(dto);
- return CommonResponse.success(true);
+ if (Objects.nonNull(sync)) {
+ if (sync) {
+ workflowCoreService.sync();
+ } else {
+ workflowCoreService.async();
+ }
+ }
+ CommonResponse processInstance = workflowCoreService.createProcessInstance(dto);
+ return processInstance;
}
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
index 9cbc0ccbc..dc966c518 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
@@ -28,7 +28,7 @@ import java.util.List;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WorkflowEngineStarterProperties.class)
@EnableFeignClients(clients = {WorkflowCoreService.class})
-@Import(RocketMQConfiguration.class)
+@Import(WorkflowEngineStarterRocketMQConfiguration.class)
public class WorkflowEngineStarterAutoConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class);
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
index b14bf5989..e0bb3db3c 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
@@ -3,7 +3,7 @@ package cn.axzo.workflow.starter;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.SYNC;
+import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.ASYNC;
/**
* Workflow Engine Starter Properties
@@ -32,7 +32,7 @@ public class WorkflowEngineStarterProperties {
* 如果是同步调用,则直接通过普通 FeignClient 进行调用,
* 否则将通过 MQ 将 RPC 调用进行解耦
*/
- private RpcInvokeModeEnum invokeMode = SYNC;
+ private RpcInvokeModeEnum invokeMode = ASYNC;
/**
* 该属性还不确定能否实现
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java
similarity index 69%
rename from workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
rename to workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java
index ca0e6c612..5eda00fbf 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/RocketMQConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java
@@ -2,6 +2,7 @@ package cn.axzo.workflow.starter;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.DefaultEventConsumer;
+import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.framework.rocketmq.EventProducer;
@@ -17,8 +18,9 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@@ -37,8 +39,8 @@ import java.util.function.Consumer;
* @since 2024/5/30 14:05
*/
@Configuration(proxyBeanMethods = false)
-public class RocketMQConfiguration {
- private final Logger log = LoggerFactory.getLogger(RocketMQConfiguration.class);
+public class WorkflowEngineStarterRocketMQConfiguration {
+ private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterRocketMQConfiguration.class);
private static final String DEFAULT_MODULE = "workflowEngine";
private static final String DEFAULT_EVENT = "topic_workflow_engine_";
@@ -48,6 +50,50 @@ public class RocketMQConfiguration {
@Value("${spring.profiles.active:dev}")
private String activeProfile;
+ //================================= Workflow Engine Broadcast MQ =================================//
+ @Bean
+ @ConditionalOnMissingBean(EventHandlerRepository.class)
+ public EventHandlerRepository eventHandlerRepository() {
+ return new EventHandlerRepository((ex, logText) -> {
+ log.warn("MQ, handle warning {}", logText, ex);
+ if (Objects.nonNull(ex)) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(EventProducer.class)
+ public EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
+ Consumer callback = eventWrapper -> {
+ if (eventWrapper.isHandled()) {
+ // 只收集被App真正消费的消息.
+ //String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
+ }
+ };
+ return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback);
+ }
+
+ @Component
+ @Conditional(NonContainerEnvironmentCondition.class)
+ @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
+ consumerGroup = "GID_${spring.application.name}_workflow_engine_${MY_POD_NAMESPACE:debugging}_consumer",
+ consumeMode = ConsumeMode.ORDERLY,
+ nameServer = "${rocketmq.name-server}"
+ )
+ public class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener {
+ @Resource(name = "eventConsumer")
+ private EventConsumer eventConsumer;
+
+ @Override
+ public void onMessage(MessageExt message) {
+ super.onEvent(message, eventConsumer);
+ }
+ }
+
+
+ //======================================= RPC Invoke MQ ========================================//
+
/**
* 客户端 RPC Retry 事件生产者
*
@@ -58,7 +104,7 @@ public class RocketMQConfiguration {
public EventProducer workflowEngineClientEventProducer(RocketMQTemplate rocketMQTemplate) {
return new RocketMQEventProducer(rocketMQTemplate,
DEFAULT_MODULE,
- applicationName,
+ applicationName + "Starter",
EventProducer.Context.builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(DEFAULT_EVENT + activeProfile)
@@ -79,14 +125,14 @@ public class RocketMQConfiguration {
}
@Bean
- public RpcInvokeEventProducer rpcInvokeEventProducer(EventProducer workflowEngineClientEventProducer,
+ public RpcInvokeEventProducer rpcInvokeEventProducer(@Qualifier("workflowEngineClientEventProducer") EventProducer workflowEngineClientEventProducer,
Environment environment) {
return new RpcInvokeEventProducer(workflowEngineClientEventProducer, environment);
}
@Bean
- @ConditionalOnClass(EventHandlerRepository.class)
- public EventHandlerRepository eventHandlerRepository() {
+ @ConditionalOnMissingBean(name = "workflowEngineStarterEventHandlerRepository")
+ public EventHandlerRepository workflowEngineStarterEventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {
log.warn("MQ, handle warning {}", logText, ex);
if (Objects.nonNull(ex)) {
@@ -96,32 +142,16 @@ public class RocketMQConfiguration {
}
@Bean
- @ConditionalOnClass(EventConsumer.class)
- public EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
+ @ConditionalOnMissingBean(name = "workflowEngineStarterEventConsumer")
+ public EventConsumer workflowEngineStarterEventConsumer(@Qualifier("workflowEngineStarterEventHandlerRepository") EventHandlerRepository workflowEngineStarterEventHandlerRepository) {
Consumer callback = eventWrapper -> {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
- //String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
+ Event event = eventWrapper.getEvent();
+ log.info("MQ, handled event: {}", event.toPrettyJsonString());
}
};
- return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback);
- }
-
- @Component
- @Conditional(NonContainerEnvironmentCondition.class)
- @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
- consumerGroup = "GID_${spring.application.name}_workflow_engine_${MY_POD_NAMESPACE:debugging}_consumer",
- consumeMode = ConsumeMode.ORDERLY,
- nameServer = "${rocketmq.name-server}"
- )
- public class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener {
- @Resource
- private EventConsumer eventConsumer;
-
- @Override
- public void onMessage(MessageExt message) {
- super.onEvent(message, eventConsumer);
- }
+ return new DefaultEventConsumer(applicationName + "Starter", workflowEngineStarterEventHandlerRepository, callback);
}
@Component
@@ -129,23 +159,24 @@ public class RocketMQConfiguration {
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${MY_POD_NAMESPACE:debugging}_consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
+ replyTimeout = 10000,
nameServer = "${rocketmq.name-server}"
)
public class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener {
- @Resource
- private EventConsumer eventConsumer;
+ @Resource(name = "workflowEngineStarterEventConsumer")
+ private EventConsumer workflowEngineStarterEventConsumer;
@Override
public void onMessage(MessageExt message) {
- super.onEvent(message, eventConsumer);
+ super.onEvent(message, workflowEngineStarterEventConsumer);
}
}
@Bean
- public WorkflowEngineStarterRetryEventListener workflowEngineClientRetryEventListener(EventConsumer eventConsumer,
+ public WorkflowEngineStarterRetryEventListener workflowEngineClientRetryEventListener(@Qualifier("workflowEngineStarterEventConsumer") EventConsumer workflowEngineStarterEventConsumer,
Environment environment,
WorkflowCoreService workflowCoreService) {
- return new WorkflowEngineStarterRetryEventListener(eventConsumer, environment, workflowCoreService);
+ return new WorkflowEngineStarterRetryEventListener(workflowEngineStarterEventConsumer, environment, workflowCoreService);
}
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/util/ThreadUtil.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/util/ThreadUtil.java
index bccb06b78..59acedc08 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/util/ThreadUtil.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/util/ThreadUtil.java
@@ -19,4 +19,8 @@ public class ThreadUtil {
public static RpcInvokeModeEnum get() {
return threadLocal.get();
}
+
+ public static void clear() {
+ threadLocal.remove();
+ }
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java
index a8d375314..d5b72a718 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java
@@ -6,6 +6,8 @@ import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.common.util.ThreadUtil;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
+import cn.azxo.framework.common.model.CommonResponse;
+import com.alibaba.fastjson.JSON;
import feign.Client;
import feign.Request;
import feign.Response;
@@ -13,7 +15,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,21 +59,43 @@ public class ComplexInvokeClient implements Client {
@Override
public Response execute(Request request, Request.Options options) throws IOException {
- log.info("ComplexInvokeClient execute");
+ log.info("ComplexInvokeClient execute...");
RpcInvokeModeEnum currentInvokeModeEnum = ThreadUtil.get();
if (Objects.isNull(currentInvokeModeEnum)) {
+ log.info("Without calling the async or sync methods of the WorkflowCoreService, the default configuration in StarterProperties will be loaded.");
currentInvokeModeEnum = getDefaultInvokeMode(request);
}
+ log.info("final InvokeMode : {}", currentInvokeModeEnum);
if (Objects.equals(SYNC, currentInvokeModeEnum)) {
+ log.info("[sync] invoke...");
+ ThreadUtil.clear();
return feignClient.execute(request, options);
}
Map> headers = request.headers();
headers.forEach((k, v) -> log.info("ComplexInvokeClient header: {} = {}", k, v));
- // TODO 发送 RPC 调用动作的 MQ 事件
+ asyncInvoke(request);
+
+
+ return Response.builder()
+ .status(HttpStatus.OK.value())
+ .reason(HttpStatus.OK.getReasonPhrase())
+ .headers(headers)
+ .request(request)
+ .body(body)
+ .build();
+ }
+
+ /**
+ * 发送 RPC 调用动作的 MQ 事件
+ *
+ * @param request
+ */
+ private void asyncInvoke(Request request) {
+ log.info("[async] invoke...");
WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO();
event.setClassName(request.requestTemplate().feignTarget().type().getName());
event.setMethodName(request.requestTemplate().methodMetadata().method().getName());
@@ -78,12 +107,6 @@ public class ComplexInvokeClient implements Client {
event.setOriginUrl(request.url());
event.setBody(new String(request.body(), StandardCharsets.UTF_8));
rpcInvokeEventProducer.send(WORKFLOW_ENGINE_STARTER, event);
-
- return Response.builder()
- .status(HttpStatus.OK.value())
- .body(new byte[0])
- .request(request)
- .build();
}
private RpcInvokeModeEnum getDefaultInvokeMode(Request request) {
@@ -96,4 +119,34 @@ public class ComplexInvokeClient implements Client {
}
return RpcInvokeModeEnum.valueOf(invokeModel.iterator().next());
}
+
+ static Response.Body body = new Response.Body() {
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success("Send MQ Success"))
+ .getBytes(StandardCharsets.UTF_8));
+
+ @Override
+ public Integer length() {
+ return null;
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ @Override
+ public InputStream asInputStream() throws IOException {
+ return inputStream;
+ }
+
+ @Override
+ public Reader asReader(Charset charset) throws IOException {
+ return new InputStreamReader(asInputStream(), charset);
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+ };
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
index 695cf6564..a3643747d 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
@@ -29,14 +29,16 @@ import java.util.Objects;
*/
public class WorkflowEngineStarterRetryEventListener implements EventHandler, InitializingBean {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterRetryEventListener.class);
- private final EventConsumer eventConsumer;
+ private final EventConsumer workflowEngineStarterEventConsumer;
private final Environment environment;
private final String currentApplicationName;
private final WorkflowCoreService workflowCoreService;
private final Map methodCache = new HashMap<>();
- public WorkflowEngineStarterRetryEventListener(EventConsumer eventConsumer, Environment environment, WorkflowCoreService workflowCoreService) {
- this.eventConsumer = eventConsumer;
+ public WorkflowEngineStarterRetryEventListener(EventConsumer workflowEngineStarterEventConsumer,
+ Environment environment,
+ WorkflowCoreService workflowCoreService) {
+ this.workflowEngineStarterEventConsumer = workflowEngineStarterEventConsumer;
this.environment = environment;
this.currentApplicationName = environment.getProperty("spring.application.name");
this.workflowCoreService = workflowCoreService;
@@ -82,15 +84,16 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
throw new IllegalStateException("Not found method:" + dto.getMethodName());
}
try {
+ workflowCoreService.sync();
Object invoke = method.invoke(workflowCoreService, JSON.parseObject((String) dto.getBody(), BpmnProcessInstanceCreateDTO.class));
log.info("Event Invoke Result: {}", JSON.toJSONString(invoke));
} catch (Exception e) {
- log.error("Event Invoke Exception: {}", e.getMessage());
+ log.error("Event Invoke Exception: {}", e.getMessage(), e);
}
}
@Override
public void afterPropertiesSet() {
- eventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
+ workflowEngineStarterEventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
}
}
From 504f26239fe34494841ba3b06bbc11868cb269fd Mon Sep 17 00:00:00 2001
From: yangqicheng
Date: Fri, 31 May 2024 14:27:22 +0800
Subject: [PATCH 042/210] =?UTF-8?q?update=20-=20REQ-2516-=E6=89=A7?=
=?UTF-8?q?=E8=A1=8CListener=E5=A2=9E=E5=8A=A0=E5=BC=82=E5=B8=B8=E5=A4=84?=
=?UTF-8?q?=E7=90=86=E7=AD=96=E7=95=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...orkflowEngineStarterAutoConfiguration.java | 26 +++++-
.../WorkflowEngineStarterProperties.java | 56 +++++++++++++
.../common/enums/FailHandleTypeEnum.java | 23 ++++++
.../WorkflowListenerExeException.java | 41 ++++++++++
.../listener/ProcessInstanceListener.java | 46 +++++++++--
.../consumer/InnerActivityEventListener.java | 3 +-
.../consumer/InnerInstanceEventListener.java | 80 ++++++++++++++++---
.../WorkflowEngineBroadcastEventListener.java | 5 ++
.../interceptor/FailBackInterceptor.java | 20 +++++
.../interceptor/FailFastInterceptor.java | 25 ++++++
...erceptor.java => FailOverInterceptor.java} | 30 ++++---
11 files changed, 326 insertions(+), 29 deletions(-)
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/enums/FailHandleTypeEnum.java
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowListenerExeException.java
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailBackInterceptor.java
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailFastInterceptor.java
rename workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/{RetryInterceptor.java => FailOverInterceptor.java} (62%)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
index 9cbc0ccbc..97976a8de 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
@@ -4,10 +4,13 @@ import cn.axzo.framework.domain.ServiceException;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutorImpl;
+import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import cn.axzo.workflow.starter.mq.execute.interceptor.ExecuteInterceptor;
import cn.axzo.workflow.starter.mq.execute.interceptor.ExecutorInvoker;
+import cn.axzo.workflow.starter.mq.execute.interceptor.FailBackInterceptor;
+import cn.axzo.workflow.starter.mq.execute.interceptor.FailFastInterceptor;
+import cn.axzo.workflow.starter.mq.execute.interceptor.FailOverInterceptor;
import cn.axzo.workflow.starter.mq.execute.interceptor.LogInterceptor;
-import cn.axzo.workflow.starter.mq.execute.interceptor.RetryInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -34,10 +37,12 @@ public class WorkflowEngineStarterAutoConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class);
@Bean
- public ListenerExecutor initListenerExecutor(List additionalInterceptors) {
+ public ListenerExecutor initListenerExecutor(WorkflowEngineStarterProperties starterProperties,
+ List additionalInterceptors) {
+ FailHandleTypeEnum failHandleType = starterProperties.getFailHandleType();
List interceptors = new ArrayList<>();
interceptors.add(new LogInterceptor());
- interceptors.add(new RetryInterceptor());
+ interceptors.add(getFailInterceptor(starterProperties, failHandleType));
// if (!CollectionUtils.isEmpty(additionalInterceptors)) {
// additionalInterceptors.forEach(interceptor -> interceptor.setNext(interceptor));
// }
@@ -45,6 +50,21 @@ public class WorkflowEngineStarterAutoConfiguration {
return new ListenerExecutorImpl(initInterceptorChain(interceptors));
}
+ private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties,
+ FailHandleTypeEnum failHandleType) {
+ if (failHandleType != null) {
+ switch (failHandleType) {
+ case FAIL_BACK:
+ return new FailBackInterceptor();
+ case FAIL_FAST:
+ return new FailFastInterceptor();
+ case FAIL_OVER:
+ return new FailOverInterceptor(starterProperties.getNumOfRetries(), starterProperties.getWaitTimeInMs(), starterProperties.getWaitIncreaseFactor());
+ }
+ }
+ return new FailOverInterceptor(starterProperties.getNumOfRetries(), starterProperties.getWaitTimeInMs(), starterProperties.getWaitIncreaseFactor());
+ }
+
public ExecuteInterceptor initInterceptorChain(List chain) {
if (chain == null || chain.isEmpty()) {
throw new ServiceException("invalid command interceptor chain configuration: " + chain);
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
index b14bf5989..617028cc7 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
@@ -1,5 +1,6 @@
package cn.axzo.workflow.starter;
+import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -34,6 +35,29 @@ public class WorkflowEngineStarterProperties {
*/
private RpcInvokeModeEnum invokeMode = SYNC;
+ /**
+ * 失败处理策略:
+ * 1、FAIL_OVER, 当前listener执行出错,忽略继续往下执行,可配置重试相关参数,不抛出异常(默认策略)
+ * 2、FAIL_FAST, 快速失败,出错直接抛出异常,listener不再往下执行
+ * 3、FAIL_BACK, 失败自动恢复,在后台记录失败的消息,并按照一定的策略后期再进行重试,目前暂不支持
+ */
+ private FailHandleTypeEnum failHandleType = FailHandleTypeEnum.FAIL_OVER;
+
+ /**
+ * 自动重试次数
+ */
+ private int numOfRetries = 3;
+
+ /**
+ * 初始等待时间,单位:毫秒
+ */
+ private int waitTimeInMs = 50;
+
+ /**
+ * 重试累乘因子
+ */
+ private int waitIncreaseFactor = 3;
+
/**
* 该属性还不确定能否实现
*/
@@ -71,4 +95,36 @@ public class WorkflowEngineStarterProperties {
public void setAutoAck(Boolean autoAck) {
this.autoAck = autoAck;
}
+
+ public FailHandleTypeEnum getFailHandleType() {
+ return failHandleType;
+ }
+
+ public void setFailHandleType(FailHandleTypeEnum failHandleType) {
+ this.failHandleType = failHandleType;
+ }
+
+ public int getNumOfRetries() {
+ return numOfRetries;
+ }
+
+ public void setNumOfRetries(int numOfRetries) {
+ this.numOfRetries = numOfRetries;
+ }
+
+ public int getWaitTimeInMs() {
+ return waitTimeInMs;
+ }
+
+ public void setWaitTimeInMs(int waitTimeInMs) {
+ this.waitTimeInMs = waitTimeInMs;
+ }
+
+ public int getWaitIncreaseFactor() {
+ return waitIncreaseFactor;
+ }
+
+ public void setWaitIncreaseFactor(int waitIncreaseFactor) {
+ this.waitIncreaseFactor = waitIncreaseFactor;
+ }
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/enums/FailHandleTypeEnum.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/enums/FailHandleTypeEnum.java
new file mode 100644
index 000000000..0f6d64e32
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/enums/FailHandleTypeEnum.java
@@ -0,0 +1,23 @@
+package cn.axzo.workflow.starter.common.enums;
+
+
+import lombok.Getter;
+
+/**
+ * 异常处理策略,执行客户端自定义Listener异常处理策略,默认未FAIL_OVER
+ */
+@Getter
+public enum FailHandleTypeEnum {
+ FAIL_OVER("fail_over", "当前listener执行出错,忽略继续往下执行,可配置重试相关参数,不抛出异常"),
+ FAIL_FAST("fail_fast", "快速失败,出错直接抛出异常,listener不再往下执行"),
+ FAIL_BACK("fail_back", "失败自动恢复,在后台记录失败的消息,并按照一定的策略后期再进行重试,目前暂不支持"),
+ ;
+ private final String code;
+ private final String description;
+
+ FailHandleTypeEnum(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowListenerExeException.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowListenerExeException.java
new file mode 100644
index 000000000..5c6ec1fcc
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowListenerExeException.java
@@ -0,0 +1,41 @@
+package cn.axzo.workflow.starter.common.exception;
+
+public class WorkflowListenerExeException extends RuntimeException {
+ private int code;
+
+ public WorkflowListenerExeException(String message) {
+ super(message);
+ }
+
+ public WorkflowListenerExeException() {
+ super();
+ }
+
+ public WorkflowListenerExeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public WorkflowListenerExeException(Throwable cause) {
+ super(cause);
+ }
+
+ public WorkflowListenerExeException(int code) {
+ super();
+ this.code = code;
+ }
+
+ public WorkflowListenerExeException(int code, String message, Throwable cause) {
+ super(message, cause);
+ this.code = code;
+ }
+
+ public WorkflowListenerExeException(int code, String message) {
+ super(message);
+ this.code = code;
+ }
+
+ public WorkflowListenerExeException(int code, Throwable cause) {
+ super(cause);
+ this.code = code;
+ }
+}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java
index c4f5112bd..145f422d9 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessInstanceListener.java
@@ -1,19 +1,55 @@
package cn.axzo.workflow.starter.listener;
+import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO;
import org.springframework.core.Ordered;
/**
- * TODO
- *
* @author wangli
* @since 2024/5/27 16:20
*/
public interface ProcessInstanceListener extends Ordered {
- void created();
+ /**
+ * 流程实例创建成功后回调
+ *
+ * @param processInstanceDTO
+ */
+ void onCreated(ProcessInstanceDTO processInstanceDTO);
- void completed();
+ /**
+ * 流程实例开始运行后回调
+ *
+ * @param processInstanceDTO
+ */
+ void onStarted(ProcessInstanceDTO processInstanceDTO);
- void deleted();
+ /**
+ * 流程实例被撤回后回调
+ *
+ * @param processInstanceDTO
+ */
+ void onCancelled(ProcessInstanceDTO processInstanceDTO);
+ /**
+ * 流程实例被驳回后回调
+ *
+ * @param processInstanceDTO
+ */
+ void onRejected(ProcessInstanceDTO processInstanceDTO);
+
+ /**
+ * 流程实例被中止后回调
+ *
+ * @param event
+ */
+ void onAborted(ProcessInstanceDTO event);
+
+ /**
+ * 流程实例运行完成后回调
+ *
+ * 注意: 完成只是说明流程实例已停止运行
+ *
+ * @param event
+ */
+ void onCompleted(ProcessInstanceDTO event);
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
index d0a6ec2ea..7973fd8e7 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
@@ -12,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@@ -21,7 +22,7 @@ import java.util.function.Consumer;
* @author wangli
* @since 2024/5/21 15:51
*/
-//@Component
+@Component
public class InnerActivityEventListener implements WorkflowListener {
private final Logger log = LoggerFactory.getLogger(InnerActivityEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java
index 4caf3b922..8cfd748eb 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java
@@ -1,27 +1,89 @@
package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
+import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.ProcessInstanceEventEnum;
+import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO;
+import cn.axzo.workflow.starter.listener.ProcessInstanceListener;
+import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
+import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
import java.util.List;
+import java.util.function.Consumer;
+
+import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INSTANCE_ABORTED;
+import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INSTANCE_CANCELLED;
+import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INSTANCE_COMPLETED;
+import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INSTANCE_CREATED;
+import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INSTANCE_REJECTED;
+import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INSTANCE_STARTED;
/**
- * TODO
- *
* @author wangli
* @since 2024/5/21 15:51
*/
-public class InnerInstanceEventListener {
+@Component
+public class InnerInstanceEventListener implements WorkflowListener {
private final Logger log = LoggerFactory.getLogger(InnerInstanceEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
- ProcessInstanceEventEnum.PROCESS_INSTANCE_CREATED.getEventCode(),
- ProcessInstanceEventEnum.PROCESS_INSTANCE_STARTED.getEventCode(),
- ProcessInstanceEventEnum.PROCESS_INSTANCE_CANCELLED.getEventCode(),
- ProcessInstanceEventEnum.PROCESS_INSTANCE_REJECTED.getEventCode(),
- ProcessInstanceEventEnum.PROCESS_INSTANCE_ABORTED.getEventCode(),
- ProcessInstanceEventEnum.PROCESS_INSTANCE_COMPLETED.getEventCode()
+ PROCESS_INSTANCE_CREATED.getEventCode(),
+ PROCESS_INSTANCE_STARTED.getEventCode(),
+ PROCESS_INSTANCE_CANCELLED.getEventCode(),
+ PROCESS_INSTANCE_REJECTED.getEventCode(),
+ PROCESS_INSTANCE_ABORTED.getEventCode(),
+ PROCESS_INSTANCE_COMPLETED.getEventCode()
);
+
+ @Autowired
+ private ObjectProvider> activityListenersProvider;
+
+ @Autowired
+ private ListenerExecutor listenerExecutor;
+
+ @Override
+ public void handEvent(Event event, EventConsumer.Context context) {
+ List instanceListeners = activityListenersProvider.getIfAvailable();
+ if (!CollectionUtils.isEmpty(instanceListeners)) {
+ ProcessInstanceDTO instanceDTO = JSON.parseObject(event.getData().toString(), ProcessInstanceDTO.class);
+ ProcessInstanceEventEnum type = instanceDTO.getType();
+ for (ProcessInstanceListener instanceListener : instanceListeners) {
+ Consumer consumer = null;
+ switch (type) {
+ case PROCESS_INSTANCE_CREATED:
+ consumer = instanceListener::onCreated;
+ break;
+ case PROCESS_INSTANCE_STARTED:
+ consumer = instanceListener::onStarted;
+ break;
+ case PROCESS_INSTANCE_CANCELLED:
+ consumer = instanceListener::onCancelled;
+ break;
+ case PROCESS_INSTANCE_ABORTED:
+ consumer = instanceListener::onAborted;
+ break;
+ case PROCESS_INSTANCE_COMPLETED:
+ consumer = instanceListener::onCompleted;
+ break;
+ case PROCESS_INSTANCE_REJECTED:
+ consumer = instanceListener::onRejected;
+ break;
+ default:
+ log.warn("unknown process activity event type: {}", type);
+ }
+ listenerExecutor.execute(consumer, instanceDTO);
+ }
+ }
+ }
+
+ @Override
+ public boolean support(Event event) {
+ return event != null && SUPPORTED_EVENT_CODES.contains(event.getEventCode());
+ }
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
index feecf0ba1..f1c4956c9 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
@@ -7,6 +7,7 @@ import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
@@ -39,6 +40,10 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler {
log.warn("AllMessagePushEventHandler MessagePushDTO is null");
return;
}
+ if (CollectionUtils.isEmpty(workflowListeners)) {
+ log.warn("AllWorkflowListener is null,msg:{}", JSON.toJSONString(event));
+ return;
+ }
for (WorkflowListener workflowListener : workflowListeners) {
if (workflowListener.support(event)) {
workflowListener.handEvent(event, context);
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailBackInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailBackInterceptor.java
new file mode 100644
index 000000000..fca7356d0
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailBackInterceptor.java
@@ -0,0 +1,20 @@
+package cn.axzo.workflow.starter.mq.execute.interceptor;
+
+import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
+
+import java.util.function.Consumer;
+
+/**
+ * 失败自动恢复,在后台记录失败的消息,并按照一定的策略后期再进行重试,目前暂不支持
+ */
+public final class FailBackInterceptor extends AbstractListenerInterceptor {
+ @Override
+ public void execute(ListenerExecutor executor, Consumer consumer, T t) {
+ throw new UnsupportedOperationException("FailBackInterceptor");
+ }
+
+ @Override
+ public void setNext(ExecuteInterceptor next) {
+ throw new UnsupportedOperationException("FailBackInterceptor");
+ }
+}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailFastInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailFastInterceptor.java
new file mode 100644
index 000000000..657898731
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailFastInterceptor.java
@@ -0,0 +1,25 @@
+package cn.axzo.workflow.starter.mq.execute.interceptor;
+
+import cn.axzo.workflow.starter.common.exception.WorkflowListenerExeException;
+import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
+
+import java.util.function.Consumer;
+
+/**
+ * 快速失败,出错直接抛出异常,listener不再往下执行
+ */
+public final class FailFastInterceptor extends AbstractListenerInterceptor {
+
+ @Override
+ public void execute(ListenerExecutor executor, Consumer consumer, T t) {
+ try {
+ getNext().execute(executor, consumer, t);
+ } catch (Exception e) {
+ throw new WorkflowListenerExeException(
+ "Failed to invoke the method "
+// + methodName
+ + ". Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
+ }
+ }
+
+}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailOverInterceptor.java
similarity index 62%
rename from workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java
rename to workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailOverInterceptor.java
index 65e4d1d54..bd130890b 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/RetryInterceptor.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/interceptor/FailOverInterceptor.java
@@ -1,23 +1,30 @@
package cn.axzo.workflow.starter.mq.execute.interceptor;
-import cn.axzo.framework.domain.ServiceException;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
-import lombok.Getter;
-import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Consumer;
-@Getter
-@Setter
-public final class RetryInterceptor extends AbstractListenerInterceptor {
+/**
+ * 当前listener执行出错,忽略继续往下执行,可配置重试相关参数,不抛出异常
+ */
+public final class FailOverInterceptor extends AbstractListenerInterceptor {
- private static final Logger LOGGER = LoggerFactory.getLogger(RetryInterceptor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FailOverInterceptor.class);
- protected int numOfRetries = 3;
- protected int waitTimeInMs = 50;
- protected int waitIncreaseFactor = 5;
+ private int numOfRetries = 3;
+ private int waitTimeInMs = 50;
+ private int waitIncreaseFactor = 5;
+
+ public FailOverInterceptor() {
+ }
+
+ public FailOverInterceptor(int numOfRetries, int waitTimeInMs, int waitIncreaseFactor) {
+ this.numOfRetries = numOfRetries;
+ this.waitTimeInMs = waitTimeInMs;
+ this.waitIncreaseFactor = waitIncreaseFactor;
+ }
@Override
public void execute(ListenerExecutor executor, Consumer consumer, T t) {
@@ -31,12 +38,13 @@ public final class RetryInterceptor extends AbstractListenerInterceptor {
}
try {
getNext().execute(executor, consumer, t);
+ return;
} catch (Exception e) {
LOGGER.info("Caught exception: {}", e.getMessage(), e);
}
failedAttempts++;
} while (failedAttempts <= numOfRetries);
- throw new ServiceException(numOfRetries + " retries failed with Exception. Giving up.");
+// throw new ServiceException(numOfRetries + " retries failed with Exception. Giving up.");
}
private void waitBeforeRetry(long waitTime) {
From d934bd42dfff63e53d1343fb02235c8808d570b4 Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Fri, 31 May 2024 14:40:22 +0800
Subject: [PATCH 043/210] =?UTF-8?q?update(REQ-2516)=20-=20=E8=A7=A3?=
=?UTF-8?q?=E5=86=B3=20MQ=20=E7=9A=84=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?=
=?UTF-8?q?=E5=86=B2=E7=AA=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...flowEngineStarterRocketMQConfiguration.java | 17 ++++++++++++-----
.../WorkflowEngineBroadcastEventListener.java | 18 ++++++++++--------
2 files changed, 22 insertions(+), 13 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java
index aae8fff50..949794a62 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterRocketMQConfiguration.java
@@ -8,9 +8,9 @@ import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
+import cn.axzo.workflow.starter.common.condition.NonContainerEnvironmentCondition;
import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowEngineBroadcastEventListener;
import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowListener;
-import cn.axzo.workflow.starter.common.condition.NonContainerEnvironmentCondition;
import cn.axzo.workflow.starter.mq.retry.consumer.WorkflowEngineStarterRetryEventListener;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import org.apache.rocketmq.common.message.MessageExt;
@@ -72,7 +72,8 @@ public class WorkflowEngineStarterRocketMQConfiguration {
Consumer callback = eventWrapper -> {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
- //String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
+ Event event = eventWrapper.getEvent();
+ log.info("WorkflowEngine Broadcast MQ, handled event: {}", event.toPrettyJsonString());
}
};
return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback);
@@ -85,7 +86,7 @@ public class WorkflowEngineStarterRocketMQConfiguration {
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}"
)
- public class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener {
+ public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener {
@Resource(name = "eventConsumer")
private EventConsumer eventConsumer;
@@ -95,6 +96,12 @@ public class WorkflowEngineStarterRocketMQConfiguration {
}
}
+ @Bean
+ public WorkflowEngineBroadcastEventListener workflowEngineBroadcastEventListener(@Qualifier("eventConsumer") EventConsumer eventConsumer,
+ ObjectProvider> listeners) {
+ return new WorkflowEngineBroadcastEventListener(eventConsumer, listeners.getIfAvailable());
+ }
+
//======================================= RPC Invoke MQ ========================================//
@@ -152,7 +159,7 @@ public class WorkflowEngineStarterRocketMQConfiguration {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
Event event = eventWrapper.getEvent();
- log.info("MQ, handled event: {}", event.toPrettyJsonString());
+ log.info("WorkflowEngineStarter RPC MQ, handled event: {}", event.toPrettyJsonString());
}
};
return new DefaultEventConsumer(applicationName + "Starter", workflowEngineStarterEventHandlerRepository, callback);
@@ -166,7 +173,7 @@ public class WorkflowEngineStarterRocketMQConfiguration {
replyTimeout = 10000,
nameServer = "${rocketmq.name-server}"
)
- public class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener {
+ public static class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener {
@Resource(name = "workflowEngineStarterEventConsumer")
private EventConsumer workflowEngineStarterEventConsumer;
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
index f1c4956c9..59fa2012d 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
@@ -7,6 +7,7 @@ import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.CollectionUtils;
import java.util.List;
@@ -16,18 +17,11 @@ import java.util.Objects;
* @author wangli
* @since 2024/5/21 15:50
*/
-public class WorkflowEngineBroadcastEventListener implements EventHandler {
+public class WorkflowEngineBroadcastEventListener implements EventHandler, InitializingBean {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineBroadcastEventListener.class);
private final EventConsumer eventConsumer;
private final List workflowListeners;
- public void init() {
- eventConsumer.registerHandlers(InnerActivityEventListener.SUPPORTED_EVENT_CODES, this);
- eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES, this);
- eventConsumer.registerHandlers(InnerNoticeEventListener.SUPPORTED_EVENT_CODES, this);
- eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES, this);
- }
-
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, List workflowListeners) {
this.eventConsumer = eventConsumer;
this.workflowListeners = workflowListeners;
@@ -50,4 +44,12 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler {
}
}
}
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ eventConsumer.registerHandlers(InnerActivityEventListener.SUPPORTED_EVENT_CODES, this);
+ eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES, this);
+ eventConsumer.registerHandlers(InnerNoticeEventListener.SUPPORTED_EVENT_CODES, this);
+ eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES, this);
+ }
}
From 3cfb68470ff102fba4667bc05c35920c01280b39 Mon Sep 17 00:00:00 2001
From: yangqicheng
Date: Fri, 31 May 2024 16:48:57 +0800
Subject: [PATCH 044/210] =?UTF-8?q?update=20-=20REQ-2516-=E6=8C=89?=
=?UTF-8?q?=E7=85=A7=E9=A1=BA=E5=BA=8F=E6=89=A7=E8=A1=8C=E5=AE=A2=E6=88=B7?=
=?UTF-8?q?=E7=AB=AF=E8=87=AA=E5=AE=9A=E4=B9=89listener?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../listener/MessageNotificationListener.java | 39 +++++++++++-
.../starter/listener/ProcessTaskListener.java | 27 ++++++++-
.../consumer/AbstractWorkflowListener.java | 33 +++++++++++
.../consumer/InnerActivityEventListener.java | 15 +----
.../consumer/InnerInstanceEventListener.java | 15 +----
.../consumer/InnerNoticeEventListener.java | 58 +++++++++++++++++-
.../consumer/InnerTaskEventListener.java | 59 ++++++++++++++++---
7 files changed, 209 insertions(+), 37 deletions(-)
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractWorkflowListener.java
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java
index 23a92d09d..b0bcfaa81 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/MessageNotificationListener.java
@@ -4,22 +4,59 @@ import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import org.springframework.core.Ordered;
/**
- * TODO
+ * 消息事件对应mq消息,按照ordered由小到大顺序执行
*
* @author wangli
* @since 2024/5/27 16:25
*/
public interface MessageNotificationListener extends Ordered {
+ /**
+ * 站内信推送
+ *
+ * @param messagePushDTO
+ */
void pushNotice(MessagePushDTO messagePushDTO);
+ /**
+ * 待办推送
+ *
+ * @param messagePushDTO
+ */
void pushPending(MessagePushDTO messagePushDTO);
+ /**
+ * 完成待办
+ *
+ * @param messagePushDTO
+ */
void completePending(MessagePushDTO messagePushDTO);
+ /**
+ * 审批失败,恢复待办
+ *
+ * @param messagePushDTO
+ */
void rollbackPending(MessagePushDTO messagePushDTO);
+ /**
+ * 抄送流程
+ *
+ * @param messagePushDTO
+ */
void carbonCopy(MessagePushDTO messagePushDTO);
+ /**
+ * 完成抄送
+ *
+ * @param messagePushDTO
+ */
void carbonCopyComplete(MessagePushDTO messagePushDTO);
+
+ /**
+ * 短信推送
+ *
+ * @param messagePushDTO
+ */
+ void pushSms(MessagePushDTO messagePushDTO);
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java
index f3d956326..9e7e00dc8 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/listener/ProcessTaskListener.java
@@ -1,12 +1,35 @@
package cn.axzo.workflow.starter.listener;
+import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO;
import org.springframework.core.Ordered;
/**
- * TODO
- *
* @author wangli
* @since 2024/5/27 16:21
*/
public interface ProcessTaskListener extends Ordered {
+
+ /**
+ * 用户任务已指派审核人
+ */
+ void onAssigned(ProcessTaskDTO taskDTO);
+
+ /**
+ * 用户任务已创建,未指派审核人
+ */
+ void onCreated(ProcessTaskDTO taskDTO);
+
+ /**
+ * 用户任务已通过
+ *
+ * 仅审核通过一个用户任务时触发, 如果任务是驳回了, 则直接走实例撤回事件
+ */
+ void onCompleted(ProcessTaskDTO taskDTO);
+
+ /**
+ * 用户任务已删除
+ *
+ * 删除不代表驳回或拒绝,因为通过也会走该事件
+ */
+ void onDeleted(ProcessTaskDTO taskDTO);
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractWorkflowListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractWorkflowListener.java
new file mode 100644
index 000000000..37ca7510b
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractWorkflowListener.java
@@ -0,0 +1,33 @@
+package cn.axzo.workflow.starter.mq.broadcast.consumer;
+
+import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.Ordered;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractWorkflowListener implements WorkflowListener {
+
+ @Autowired
+ private ObjectProvider> instanceListenersProvider;
+
+ @Autowired
+ private ListenerExecutor listenerExecutor;
+
+
+ protected List getCustomListeners() {
+ List list = instanceListenersProvider.getIfAvailable();
+ if (list == null) {
+ list = new ArrayList();
+ }
+ return list.stream().sorted(Comparator.comparingInt(Ordered::getOrder)).collect(Collectors.toList());
+ }
+
+ protected ListenerExecutor getListenerExecutor() {
+ return listenerExecutor;
+ }
+}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
index 7973fd8e7..62a539ca4 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerActivityEventListener.java
@@ -5,13 +5,10 @@ import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.ProcessActivityEventEnum;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.starter.listener.ProcessActivityListener;
-import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -23,7 +20,7 @@ import java.util.function.Consumer;
* @since 2024/5/21 15:51
*/
@Component
-public class InnerActivityEventListener implements WorkflowListener {
+public class InnerActivityEventListener extends AbstractWorkflowListener {
private final Logger log = LoggerFactory.getLogger(InnerActivityEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
ProcessActivityEventEnum.PROCESS_ACTIVITY_START.getEventCode(),
@@ -32,15 +29,9 @@ public class InnerActivityEventListener implements WorkflowListener {
ProcessActivityEventEnum.PROCESS_ACTIVITY_END.getEventCode()
);
- @Autowired
- private ObjectProvider> activityListenersProvider;
-
- @Autowired
- private ListenerExecutor listenerExecutor;
-
@Override
public void handEvent(Event event, EventConsumer.Context context) {
- List activityListeners = activityListenersProvider.getIfAvailable();
+ List activityListeners = getCustomListeners();
if (!CollectionUtils.isEmpty(activityListeners)) {
ProcessActivityDTO activityDTO = JSON.parseObject(event.getData().toString(), ProcessActivityDTO.class);
ProcessActivityEventEnum type = activityDTO.getType();
@@ -62,7 +53,7 @@ public class InnerActivityEventListener implements WorkflowListener {
default:
log.warn("unknown process activity event type: {}", type);
}
- listenerExecutor.execute(consumer, activityDTO);
+ getListenerExecutor().execute(consumer, activityDTO);
}
}
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java
index 8cfd748eb..8150efead 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerInstanceEventListener.java
@@ -5,13 +5,10 @@ import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.ProcessInstanceEventEnum;
import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO;
import cn.axzo.workflow.starter.listener.ProcessInstanceListener;
-import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -30,7 +27,7 @@ import static cn.axzo.workflow.common.enums.ProcessInstanceEventEnum.PROCESS_INS
* @since 2024/5/21 15:51
*/
@Component
-public class InnerInstanceEventListener implements WorkflowListener {
+public class InnerInstanceEventListener extends AbstractWorkflowListener {
private final Logger log = LoggerFactory.getLogger(InnerInstanceEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
PROCESS_INSTANCE_CREATED.getEventCode(),
@@ -41,15 +38,9 @@ public class InnerInstanceEventListener implements WorkflowListener {
PROCESS_INSTANCE_COMPLETED.getEventCode()
);
- @Autowired
- private ObjectProvider> activityListenersProvider;
-
- @Autowired
- private ListenerExecutor listenerExecutor;
-
@Override
public void handEvent(Event event, EventConsumer.Context context) {
- List instanceListeners = activityListenersProvider.getIfAvailable();
+ List instanceListeners = getCustomListeners();
if (!CollectionUtils.isEmpty(instanceListeners)) {
ProcessInstanceDTO instanceDTO = JSON.parseObject(event.getData().toString(), ProcessInstanceDTO.class);
ProcessInstanceEventEnum type = instanceDTO.getType();
@@ -77,7 +68,7 @@ public class InnerInstanceEventListener implements WorkflowListener {
default:
log.warn("unknown process activity event type: {}", type);
}
- listenerExecutor.execute(consumer, instanceDTO);
+ getListenerExecutor().execute(consumer, instanceDTO);
}
}
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java
index 76233d96a..634a81777 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerNoticeEventListener.java
@@ -1,22 +1,26 @@
package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
+import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.ProcessMessagePushEventEnum;
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import cn.axzo.workflow.starter.listener.MessageNotificationListener;
+import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
import java.util.List;
+import java.util.function.Consumer;
/**
- * TODO
- *
* @author wangli
* @since 2024/5/21 15:53
*/
-public class InnerNoticeEventListener {
+@Component
+public class InnerNoticeEventListener extends AbstractWorkflowListener {
private final static Logger log = LoggerFactory.getLogger(InnerNoticeEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
ProcessMessagePushEventEnum.PROCESS_PUSH_NOTICE.getEventCode(),
@@ -28,6 +32,49 @@ public class InnerNoticeEventListener {
ProcessMessagePushEventEnum.PROCESS_PUSH_SMS.getEventCode()
);
+ @Override
+ public void handEvent(Event event, EventConsumer.Context context) {
+ List instanceListeners = getCustomListeners();
+ if (!CollectionUtils.isEmpty(instanceListeners)) {
+ MessagePushDTO instanceDTO = JSON.parseObject(event.getData().toString(), MessagePushDTO.class);
+ ProcessMessagePushEventEnum type = instanceDTO.getType();
+ for (MessageNotificationListener noticeListener : instanceListeners) {
+ Consumer consumer = null;
+ switch (type) {
+ case PROCESS_PUSH_NOTICE:
+ consumer = noticeListener::pushNotice;
+ break;
+ case PROCESS_PUSH_PENDING:
+ consumer = noticeListener::pushPending;
+ break;
+ case PROCESS_PUSH_PENDING_COMPLETE:
+ consumer = noticeListener::completePending;
+ break;
+ case PROCESS_PUSH_PENDING_ROLLBACK:
+ consumer = noticeListener::rollbackPending;
+ break;
+ case PROCESS_CARBON_COPY:
+ consumer = noticeListener::carbonCopy;
+ break;
+ case PROCESS_CARBON_COPY_COMPLETE:
+ consumer = noticeListener::carbonCopyComplete;
+ break;
+ case PROCESS_PUSH_SMS:
+ consumer = noticeListener::pushSms;
+ break;
+ default:
+ log.warn("unknown message event type: {}", type);
+ }
+ getListenerExecutor().execute(consumer, instanceDTO);
+ }
+ }
+ }
+
+ @Override
+ public boolean support(Event event) {
+ return event != null && SUPPORTED_EVENT_CODES.contains(event.getEventCode());
+ }
+
public static void main(String[] args) {
MessageNotificationListener listener = new MessageNotificationListener() {
@Override
@@ -64,6 +111,11 @@ public class InnerNoticeEventListener {
}
+ @Override
+ public void pushSms(MessagePushDTO messagePushDTO) {
+
+ }
+
public int hashCode() {
return super.hashCode();
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerTaskEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerTaskEventListener.java
index c9d7aa6de..7b8d37665 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerTaskEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerTaskEventListener.java
@@ -1,25 +1,70 @@
package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
+import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.ProcessTaskEventEnum;
+import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO;
+import cn.axzo.workflow.starter.listener.ProcessTaskListener;
+import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
import java.util.List;
+import java.util.function.Consumer;
+
+import static cn.axzo.workflow.common.enums.ProcessTaskEventEnum.PROCESS_TASK_ASSIGNED;
+import static cn.axzo.workflow.common.enums.ProcessTaskEventEnum.PROCESS_TASK_COMPLETED;
+import static cn.axzo.workflow.common.enums.ProcessTaskEventEnum.PROCESS_TASK_CREATED;
+import static cn.axzo.workflow.common.enums.ProcessTaskEventEnum.PROCESS_TASK_DELETED;
/**
- * TODO
- *
* @author wangli
* @since 2024/5/21 15:51
*/
-public class InnerTaskEventListener {
+@Component
+public class InnerTaskEventListener extends AbstractWorkflowListener {
private final Logger log = LoggerFactory.getLogger(InnerTaskEventListener.class);
public final static List SUPPORTED_EVENT_CODES = ImmutableList.of(
- ProcessTaskEventEnum.PROCESS_TASK_CREATED.getEventCode(),
- ProcessTaskEventEnum.PROCESS_TASK_ASSIGNED.getEventCode(),
- ProcessTaskEventEnum.PROCESS_TASK_COMPLETED.getEventCode(),
- ProcessTaskEventEnum.PROCESS_TASK_DELETED.getEventCode()
+ PROCESS_TASK_CREATED.getEventCode(),
+ PROCESS_TASK_ASSIGNED.getEventCode(),
+ PROCESS_TASK_COMPLETED.getEventCode(),
+ PROCESS_TASK_DELETED.getEventCode()
);
+
+ @Override
+ public void handEvent(Event event, EventConsumer.Context context) {
+ List taskListeners = getCustomListeners();
+ if (!CollectionUtils.isEmpty(taskListeners)) {
+ ProcessTaskDTO taskDTO = JSON.parseObject(event.getData().toString(), ProcessTaskDTO.class);
+ ProcessTaskEventEnum type = taskDTO.getType();
+ for (ProcessTaskListener taskListener : taskListeners) {
+ Consumer consumer = null;
+ switch (type) {
+ case PROCESS_TASK_CREATED:
+ consumer = taskListener::onCreated;
+ break;
+ case PROCESS_TASK_COMPLETED:
+ consumer = taskListener::onCompleted;
+ break;
+ case PROCESS_TASK_ASSIGNED:
+ consumer = taskListener::onAssigned;
+ break;
+ case PROCESS_TASK_DELETED:
+ consumer = taskListener::onDeleted;
+ break;
+ default:
+ log.warn("unknown task event type: {}", type);
+ }
+ getListenerExecutor().execute(consumer, taskDTO);
+ }
+ }
+ }
+
+ @Override
+ public boolean support(Event event) {
+ return event != null && SUPPORTED_EVENT_CODES.contains(event.getEventCode());
+ }
}
From db30025235dacec9008bacaed706c751cd7991dd Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Fri, 31 May 2024 17:18:11 +0800
Subject: [PATCH 045/210] =?UTF-8?q?update(REQ-2516)=20-=20=E8=B0=83?=
=?UTF-8?q?=E6=95=B4=E4=B8=80=E4=BA=9B=E4=BB=A3=E7=A0=81=E9=80=BB=E8=BE=91?=
=?UTF-8?q?=EF=BC=8C=E5=B9=B6=E5=AE=8C=E5=96=84=E6=B3=A8=E9=87=8A?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...orkflowEngineStarterAutoConfiguration.java | 39 +++++++------------
.../WorkflowEngineStarterProperties.java | 3 +-
.../WorkflowEngineBroadcastEventListener.java | 5 ++-
.../mq/execute/ListenerExecutorImpl.java | 18 +++++++--
...rkflowEngineStarterRetryEventListener.java | 2 +-
5 files changed, 35 insertions(+), 32 deletions(-)
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
index 9a1ce11da..7c0b1f5b7 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java
@@ -1,10 +1,9 @@
package cn.axzo.workflow.starter;
-import cn.axzo.framework.domain.ServiceException;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
+import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutorImpl;
-import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import cn.axzo.workflow.starter.mq.execute.interceptor.ExecuteInterceptor;
import cn.axzo.workflow.starter.mq.execute.interceptor.ExecutorInvoker;
import cn.axzo.workflow.starter.mq.execute.interceptor.FailBackInterceptor;
@@ -39,40 +38,28 @@ public class WorkflowEngineStarterAutoConfiguration {
@Bean
public ListenerExecutor initListenerExecutor(WorkflowEngineStarterProperties starterProperties,
List additionalInterceptors) {
- FailHandleTypeEnum failHandleType = starterProperties.getFailHandleType();
List interceptors = new ArrayList<>();
interceptors.add(new LogInterceptor());
- interceptors.add(getFailInterceptor(starterProperties, failHandleType));
+ interceptors.add(getFailInterceptor(starterProperties));
// if (!CollectionUtils.isEmpty(additionalInterceptors)) {
// additionalInterceptors.forEach(interceptor -> interceptor.setNext(interceptor));
// }
interceptors.add(new ExecutorInvoker());
- return new ListenerExecutorImpl(initInterceptorChain(interceptors));
+ return new ListenerExecutorImpl(interceptors);
}
- private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties,
- FailHandleTypeEnum failHandleType) {
- if (failHandleType != null) {
- switch (failHandleType) {
- case FAIL_BACK:
- return new FailBackInterceptor();
- case FAIL_FAST:
- return new FailFastInterceptor();
- case FAIL_OVER:
- return new FailOverInterceptor(starterProperties.getNumOfRetries(), starterProperties.getWaitTimeInMs(), starterProperties.getWaitIncreaseFactor());
- }
+ private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties) {
+ FailHandleTypeEnum failHandleType = starterProperties.getFailHandleType();
+ switch (failHandleType) {
+ case FAIL_BACK:
+ return new FailBackInterceptor();
+ case FAIL_FAST:
+ return new FailFastInterceptor();
+ case FAIL_OVER:
+ default:
+ return new FailOverInterceptor(starterProperties.getNumOfRetries(), starterProperties.getWaitTimeInMs(), starterProperties.getWaitIncreaseFactor());
}
- return new FailOverInterceptor(starterProperties.getNumOfRetries(), starterProperties.getWaitTimeInMs(), starterProperties.getWaitIncreaseFactor());
}
- public ExecuteInterceptor initInterceptorChain(List chain) {
- if (chain == null || chain.isEmpty()) {
- throw new ServiceException("invalid command interceptor chain configuration: " + chain);
- }
- for (int i = 0; i < chain.size() - 1; i++) {
- chain.get(i).setNext(chain.get(i + 1));
- }
- return chain.get(0);
- }
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
index 7c58c2228..bce4016b5 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java
@@ -4,6 +4,7 @@ import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import static cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum.FAIL_OVER;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.ASYNC;
/**
@@ -41,7 +42,7 @@ public class WorkflowEngineStarterProperties {
* 2、FAIL_FAST, 快速失败,出错直接抛出异常,listener不再往下执行
* 3、FAIL_BACK, 失败自动恢复,在后台记录失败的消息,并按照一定的策略后期再进行重试,目前暂不支持
*/
- private FailHandleTypeEnum failHandleType = FailHandleTypeEnum.FAIL_OVER;
+ private FailHandleTypeEnum failHandleType = FAIL_OVER;
/**
* 自动重试次数
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
index 59fa2012d..3ecfdf008 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java
@@ -14,6 +14,8 @@ import java.util.List;
import java.util.Objects;
/**
+ * 流程引擎服务广播的事件监听器
+ *
* @author wangli
* @since 2024/5/21 15:50
*/
@@ -29,6 +31,7 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
@Override
public void onEvent(Event event, EventConsumer.Context context) {
+ // TODO 针对事件,区别不同的模型对象处理
MessagePushDTO messagePushDTO = JSON.parseObject(event.getData().toString(), MessagePushDTO.class);
if (Objects.isNull(messagePushDTO)) {
log.warn("AllMessagePushEventHandler MessagePushDTO is null");
@@ -46,7 +49,7 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
}
@Override
- public void afterPropertiesSet() throws Exception {
+ public void afterPropertiesSet() {
eventConsumer.registerHandlers(InnerActivityEventListener.SUPPORTED_EVENT_CODES, this);
eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES, this);
eventConsumer.registerHandlers(InnerNoticeEventListener.SUPPORTED_EVENT_CODES, this);
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/ListenerExecutorImpl.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/ListenerExecutorImpl.java
index ef39ac4eb..ec177f726 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/ListenerExecutorImpl.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/execute/ListenerExecutorImpl.java
@@ -1,19 +1,31 @@
package cn.axzo.workflow.starter.mq.execute;
+import cn.axzo.framework.domain.ServiceException;
import cn.axzo.workflow.starter.mq.execute.interceptor.ExecuteInterceptor;
+import java.util.List;
import java.util.function.Consumer;
-public class ListenerExecutorImpl implements ListenerExecutor {
+public final class ListenerExecutorImpl implements ListenerExecutor {
private final ExecuteInterceptor firstExecuteInterceptor;
- public ListenerExecutorImpl(ExecuteInterceptor firstExecuteInterceptor) {
- this.firstExecuteInterceptor = firstExecuteInterceptor;
+ public ListenerExecutorImpl(List chain) {
+ this.firstExecuteInterceptor = initInterceptorChain(chain);
}
@Override
public void execute(Consumer command, T t) {
firstExecuteInterceptor.execute(this, command, t);
}
+
+ private ExecuteInterceptor initInterceptorChain(List chain) {
+ if (chain == null || chain.isEmpty()) {
+ throw new ServiceException("invalid command interceptor chain configuration: " + chain);
+ }
+ for (int i = 0; i < chain.size() - 1; i++) {
+ chain.get(i).setNext(chain.get(i + 1));
+ }
+ return chain.get(0);
+ }
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
index a3643747d..1160006c4 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java
@@ -22,7 +22,7 @@ import java.util.Map;
import java.util.Objects;
/**
- * RPC 动作事件的 MQ 消费者
+ * RPC 动作事件的 MQ 消费者(集成业务系统中的自产自销)
*
* @author wangli
* @since 2024/5/21 16:28
From 45094aef326e11b4f334ea2e874e2ebd36ad29ea Mon Sep 17 00:00:00 2001
From: wangli <274027703@qq.com>
Date: Fri, 31 May 2024 17:19:25 +0800
Subject: [PATCH 046/210] =?UTF-8?q?update(REQ-2516)=20-=20=E6=96=B0?=
=?UTF-8?q?=E5=A2=9E=E8=87=AA=E5=AE=9A=E4=B9=89=E7=9A=84=20Decoder?=
=?UTF-8?q?=EF=BC=8C=20=E7=94=A8=E6=9D=A5=E5=A4=84=E7=90=86=E6=89=80?=
=?UTF-8?q?=E6=9C=89=20API=20=E5=86=85=E9=83=A8=E6=8A=9B=E5=87=BA=E7=9A=84?=
=?UTF-8?q?=E5=90=84=E7=B1=BB=E5=BC=82=E5=B8=B8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../server/controller/web/TestController.java | 9 ++-
.../starter/api/WorkflowCoreService.java | 3 +-
.../ext/WorkflowEngineStarterDecoder.java | 63 +++++++++++++++++++
...rkflowEngineStarterFeignConfiguration.java | 10 +++
4 files changed, 81 insertions(+), 4 deletions(-)
create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java
diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java
index 96dc07b76..82e0d6a7b 100644
--- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java
+++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/TestController.java
@@ -144,7 +144,12 @@ public class TestController {
workflowCoreService.async();
}
}
- CommonResponse processInstance = workflowCoreService.createProcessInstance(dto);
- return processInstance;
+ String processInstance = workflowCoreService.createProcessInstance(dto);
+ return CommonResponse.success(processInstance);
+ }
+
+ @GetMapping("/test6")
+ public CommonResponse test6() {
+ return CommonResponse.success("1");
}
}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/api/WorkflowCoreService.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/api/WorkflowCoreService.java
index d3a2420b4..626aa989b 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/api/WorkflowCoreService.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/api/WorkflowCoreService.java
@@ -3,7 +3,6 @@ package cn.axzo.workflow.starter.api;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateDTO;
import cn.axzo.workflow.starter.common.util.ThreadUtil;
import cn.axzo.workflow.starter.feign.ext.WorkflowEngineStarterFeignConfiguration;
-import cn.azxo.framework.common.model.CommonResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
@@ -24,7 +23,7 @@ import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.SYNC;
public interface WorkflowCoreService {
@PostMapping("/api/process/instance/create")
- CommonResponse createProcessInstance(@Validated @RequestBody BpmnProcessInstanceCreateDTO dto);
+ String createProcessInstance(@Validated @RequestBody BpmnProcessInstanceCreateDTO dto);
default WorkflowCoreService sync() {
ThreadUtil.set(SYNC);
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java
new file mode 100644
index 000000000..9ba061501
--- /dev/null
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java
@@ -0,0 +1,63 @@
+package cn.axzo.workflow.starter.feign.ext;
+
+import cn.azxo.framework.common.model.CommonResponse;
+import feign.Response;
+import feign.Util;
+import feign.codec.Decoder;
+
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Workflow Engine Starter Complex Invoke Client Decoder
+ *
+ * @author wangli
+ * @since 2024/5/31 14:55
+ */
+public final class WorkflowEngineStarterDecoder implements Decoder {
+ final Decoder delegate;
+
+ public WorkflowEngineStarterDecoder(Decoder delegate) {
+ Objects.requireNonNull(delegate, "Decoder must not be null. ");
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Object decode(Response response, Type type) throws IOException {
+ if (!isOptional(type)) {
+ return convert(response, type);
+ }
+ if (response.status() == 404 || response.status() == 204) {
+ return Optional.empty();
+ }
+ Type enclosedType = Util.resolveLastTypeParameter(type, Optional.class);
+ return Optional.ofNullable(convert(response, enclosedType));
+ }
+
+ static boolean isOptional(Type type) {
+ if (!(type instanceof ParameterizedType)) {
+ return false;
+ }
+ ParameterizedType parameterizedType = (ParameterizedType) type;
+ return parameterizedType.getRawType().equals(Optional.class);
+ }
+
+ /**
+ * 这里做返回数据的解析,并处理引擎返回的一些正常的业务异常
+ *
+ * @param response
+ * @param type
+ * @param
+ * @return
+ */
+ T convert(Response response, Type type) throws IOException {
+ Object decode = delegate.decode(response, type);
+ if (decode instanceof CommonResponse) {
+
+ }
+ return null;
+ }
+}
diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java
index 2935f4e34..63792779b 100644
--- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java
+++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java
@@ -5,7 +5,12 @@ import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import feign.Client;
import feign.RequestInterceptor;
import feign.Target;
+import feign.codec.Decoder;
+import org.springframework.beans.factory.ObjectFactory;
+import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
import org.springframework.cloud.openfeign.FeignBuilderCustomizer;
+import org.springframework.cloud.openfeign.support.ResponseEntityDecoder;
+import org.springframework.cloud.openfeign.support.SpringDecoder;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
@@ -30,6 +35,11 @@ public class WorkflowEngineStarterFeignConfiguration {
return new ComplexInvokeClient(starterProperties, rpcInvokeEventProducer, feignClient);
}
+ @Bean
+ public Decoder workflowEngineStarterDecoder(ObjectFactory