Merge branch 'release-20241113' into 'master'

Release 20241113

See merge request universal/infrastructure/backend/workflow-engine!11
This commit is contained in:
王粒 2024-11-13 02:49:33 +00:00
commit 3a73421c9b
19 changed files with 447 additions and 58 deletions

View File

@ -0,0 +1,45 @@
package cn.axzo.workflow.common.enums;
import cn.axzo.framework.rocketmq.Event;
/**
* 流程实例相关的 MQ 事件枚举定义
*
* @author wangli
* @since 2023/9/25 11:47
*/
public enum ElasticSearchEventEnum {
ELASTIC_SEARCH_SYNC("elastic-search", "elastic-search-sync", "同步实例数据到ES"),
;
private final String module;
private final String tag;
private final String desc;
private Event.EventCode eventCode;
ElasticSearchEventEnum(String module, String tag, String desc) {
this.eventCode = Event.EventCode.builder()
.module(module)
.name(tag)
.build();
this.module = module;
this.tag = tag;
this.desc = desc;
}
public String getModule() {
return module;
}
public String getTag() {
return tag;
}
public String getDesc() {
return desc;
}
public Event.EventCode getEventCode() {
return eventCode;
}
}

View File

@ -5,9 +5,9 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory;
import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator;
import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivitySetAssigneeJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityTriggerJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
@ -40,7 +40,6 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.lang.Nullable;
import java.time.Duration;
@ -70,7 +69,8 @@ public class FlowableConfiguration {
List<JobProcessor> jobProcessors,
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties,
SupportRefreshProperties refreshProperties) {
SupportRefreshProperties refreshProperties,
List<ProcessExtConfigurer> configurers) {
return configuration -> {
configuration.setEnableHistoricTaskLogging(true);
configuration.setHistoryLevel(HistoryLevel.AUDIT);
@ -100,6 +100,7 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler());
configurers.forEach(i-> configuration.addCustomJobHandler(i.getJobHandler()));
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);

View File

@ -0,0 +1,13 @@
package cn.axzo.workflow.core.conf;
import org.flowable.job.service.JobHandler;
/**
* 扩展加入自定义的任务处理器
*
* @author wangli
* @since 2024-11-04 10:58
*/
public interface ProcessExtConfigurer {
JobHandler getJobHandler();
}

View File

@ -36,7 +36,7 @@ public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandl
executeInternal(job, configuration, variableScope, commandContext);
} else {
log.warn("get lock failed,processInstanceId:{},jobId:{}", processInstanceId, jobId);
throw new WorkflowEngineException(ASYNC_JOB_EXECUTION_ERROR);
throw new WorkflowEngineException(ASYNC_JOB_EXECUTION_ERROR, processInstanceId);
}
} finally {
releaseLock(processInstanceId, jobId);
@ -70,5 +70,5 @@ public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandl
extAxPropertyService.delete(processInstanceId, jobId);
}
abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);
public abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);
}

View File

@ -27,7 +27,7 @@ public class AsyncAbortProcessInstanceJobHandler extends AbstractExecuteWithLock
}
@Override
void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncAbortProcessInstanceHandler executing...,jobInfo:{}", JSONUtil.toJsonStr(job));
log(job);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);

View File

@ -0,0 +1,27 @@
package cn.axzo.workflow.es.conf;
import cn.axzo.workflow.core.conf.ProcessExtConfigurer;
import cn.axzo.workflow.es.flowable.ext.AsyncElasticSearchSyncJobHandler;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import org.flowable.job.service.JobHandler;
import org.springframework.stereotype.Component;
/**
* ES 相关对 Flowable 扩展的配置
*
* @author wangli
* @since 2024-11-04 10:39
*/
@Component
public class FlowableElasticSearchConfiguration implements ProcessExtConfigurer {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
public FlowableElasticSearchConfiguration(AggregateProcessInstanceService aggregateProcessInstanceService) {
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
}
@Override
public JobHandler getJobHandler() {
return new AsyncElasticSearchSyncJobHandler(aggregateProcessInstanceService);
}
}

View File

@ -0,0 +1,42 @@
package cn.axzo.workflow.es.flowable.ext;
import cn.axzo.workflow.core.engine.job.AbstractExecuteWithLockJobHandler;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
/**
* 同步 ElasticSearch 数据
*
* @author wangli
* @since 2024-11-04 10:09
*/
@Slf4j
public class AsyncElasticSearchSyncJobHandler extends AbstractExecuteWithLockJobHandler implements JobHandler {
public static final String TYPE = "async-es-sync";
private final AggregateProcessInstanceService aggregateProcessInstanceService;
public AsyncElasticSearchSyncJobHandler(AggregateProcessInstanceService aggregateProcessInstanceService) {
super();
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncElasticSearchSyncJobHandler exec start...");
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
processEngineConfiguration.getCommandExecutor().execute(new CustomElasticSearchCmd(job.getProcessInstanceId(), aggregateProcessInstanceService));
}
}

View File

@ -0,0 +1,80 @@
package cn.axzo.workflow.es.flowable.ext;
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
import com.alibaba.fastjson.JSON;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.ResetExpiredJobsCmd;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* 自定义同步处理器
*
* @author wangli
* @since 2024-11-04 10:00
*/
public class CustomElasticSearchAsyncCmd extends AbstractCommand<Void> implements Serializable {
private final String processInstanceId;
public CustomElasticSearchAsyncCmd(String processInstanceId) {
this.processInstanceId = processInstanceId;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("processInstanceId", processInstanceId);
return JSON.toJSONString(params);
}
@Override
public Void executeInternal(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance instance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
String jobId = startAsync(processEngineConfiguration, instance);
// // 重置任务因为上面的 cmd 和这个 cmd lock 对象不一致
JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
CommandConfig commandConfig = new CommandConfig().transactionRequired();
commandExecutor.execute(commandConfig, new ResetExpiredJobsCmd(Collections.singletonList(jobId), jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
return null;
}
private String startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, HistoricProcessInstance instance) {
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity job = jobService.createJob();
// 这里的 executionId 可为 null
job.setExecutionId(instance.getId());
job.setProcessInstanceId(instance.getId());
job.setProcessDefinitionId(instance.getProcessDefinitionId());
job.setElementId(AsyncElasticSearchSyncJobHandler.TYPE);
job.setElementName(instance.getName());
job.setJobHandlerType(AsyncElasticSearchSyncJobHandler.TYPE);
job.setTenantId(instance.getTenantId());
// 创建异步任务并调度
jobService.createAsyncJob(job, true);
jobService.scheduleAsyncJob(job);
return job.getId();
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.workflow.es.flowable.ext;
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 自定义同步处理器
*
* @author wangli
* @since 2024-11-04 10:00
*/
@Slf4j
public class CustomElasticSearchCmd extends AbstractCommand<Void> implements Serializable {
private final String processInstanceId;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
public CustomElasticSearchCmd(String processInstanceId,
AggregateProcessInstanceService aggregateProcessInstanceService) {
this.processInstanceId = processInstanceId;
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("processInstanceId", processInstanceId);
return JSON.toJSONString(params);
}
@Override
public Void executeInternal(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
return null;
}
}

View File

@ -91,7 +91,7 @@ public class RequestHeaderContextInterceptor implements HandlerInterceptor {
log.info("repeatApi key: {}", cacheRepeatKey);
//success为true表示key不存在,执行成功,false表示key存在,执行失败
Boolean success = RedisUtils.trySetObject(cacheRepeatKey, "", Duration.ofSeconds(5));
Boolean success = RedisUtils.trySetObject(cacheRepeatKey, "", Duration.ofSeconds(60));
if (success) {
KEY_CACHE.set(cacheRepeatKey);
insert(extAxProperty, applicationName, clientVersion, manageableStatus);

View File

@ -1,21 +1,23 @@
package cn.axzo.workflow.server.controller.listener.process;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.common.enums.ElasticSearchEventEnum;
import cn.axzo.workflow.core.common.context.ProcessOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnProcessEventListener;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
import cn.hutool.core.lang.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.engine.delegate.event.FlowableCancelledEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 流程实例结束后同步最新数据至 ES
*
@ -25,9 +27,11 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Scope("prototype")
@AllArgsConstructor
public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Resource
private EventProducer<?> eventProducer;
@Value("${sendMq:true}")
private Boolean sendMQ;
@Override
public int getOrder() {
@ -76,9 +80,21 @@ public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<Proc
syncToEs(event.getProcessInstanceId());
}
private void syncToEs(String processInstanceId) {
String uuid = UUID.fastUUID().toString();
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, processInstanceId, null, uuid));
public void syncToEs(String processInstanceId) {
sendMessageQueue(processInstanceId, ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC);
}
private void sendMessageQueue(String processInstanceId, ElasticSearchEventEnum eventEnum) {
if (!sendMQ) {
return;
}
eventProducer.send(Event.builder()
.shardingKey(processInstanceId)
.eventCode(eventEnum.getEventCode())
.targetId(processInstanceId)
.targetType(processInstanceId)
.data(processInstanceId)
.build());
}
}

View File

@ -1,20 +1,21 @@
package cn.axzo.workflow.server.controller.listener.task;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.common.enums.ElasticSearchEventEnum;
import cn.axzo.workflow.core.common.context.TaskOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnTaskEventListener;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
import cn.hutool.core.lang.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 同步任务相关数据至 ES
*
@ -24,9 +25,11 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Scope("prototype")
@AllArgsConstructor
public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Resource
private EventProducer<?> eventProducer;
@Value("${sendMq:true}")
private Boolean sendMQ;
@Override
public int getOrder() {
@ -40,8 +43,20 @@ public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<Ta
*/
@Override
public void onAssigned(DelegateTask delegateTask) {
String uuid = UUID.fastUUID().toString();
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, delegateTask.getProcessInstanceId(), delegateTask.getId(), uuid));
sendMessageQueue(delegateTask.getProcessInstanceId(), ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC);
}
private void sendMessageQueue(String processInstanceId, ElasticSearchEventEnum eventEnum) {
if (!sendMQ) {
return;
}
eventProducer.send(Event.builder()
.shardingKey(processInstanceId)
.eventCode(eventEnum.getEventCode())
.targetId(processInstanceId)
.targetType(processInstanceId)
.data(processInstanceId)
.build());
}
}

View File

@ -1,17 +1,12 @@
package cn.axzo.workflow.server.controller.listener.tx;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.es.flowable.ext.CustomElasticSearchAsyncCmd;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionListener;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.util.List;
import org.springframework.scheduling.annotation.Async;
/**
* 引擎内的动作事务提交后执行同步 ES
@ -22,7 +17,6 @@ import java.util.List;
@Slf4j
@AllArgsConstructor
public class OnTxCommittedSyncToEsListener implements TransactionListener {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final String processInstanceId;
private final String taskId;
private final String uuid;
@ -31,17 +25,8 @@ public class OnTxCommittedSyncToEsListener implements TransactionListener {
public void execute(CommandContext commandContext) {
log.info("SyncEsTaskEntityEventHandle onInitialized uuid:{}, processInstanceId:{}, taskId: {}", uuid, processInstanceId, taskId);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
CommandContextUtil.getProcessEngineConfiguration(commandContext).getCommandExecutor()
.execute(new CustomElasticSearchAsyncCmd(processInstanceId));
}
}

View File

@ -16,6 +16,7 @@ import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceLog
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceMyPageReqVO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceQueryDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.HistoricProcessInstanceSearchDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.common.model.response.bpmn.BatchOperationResultVO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceAdminPageItemVO;
@ -31,11 +32,13 @@ import cn.axzo.workflow.server.common.annotation.RepeatSubmit;
import cn.axzo.workflow.server.common.util.RpcExternalUtil;
import cn.axzo.workflow.server.controller.web.BasicPopulateAvatarController;
import cn.azxo.framework.common.model.CommonResponse;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.v3.oas.annotations.Operation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.flowable.engine.history.HistoricProcessInstance;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@ -53,9 +56,11 @@ import javax.annotation.Nullable;
import javax.annotation.Resource;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import static cn.azxo.framework.common.model.CommonResponse.success;
@ -346,9 +351,19 @@ public class BpmnProcessInstanceController extends BasicPopulateAvatarController
log.info("获取指定流程实例的日志 getProcessInstanceLog===>>>参数:{}", JSONUtil.toJsonStr(dto));
BpmnProcessInstanceLogVO log = bpmnProcessInstanceService.getProcessInstanceLog(dto);
parseSignatureUrl(log);
resetPersonInfo(log);
return success(log);
}
private void resetPersonInfo(BpmnProcessInstanceLogVO log) {
List<BpmnTaskDelegateAssigner> users = log.getTaskDetails().stream().map(BpmnTaskInstanceLogVO::getAssigneeSnapshot).filter(Objects::nonNull).filter(e -> NumberUtils.isNumber(e.getPersonId())).collect(Collectors.toList());
users.add(log.getInitiator());
populateUsersAvatar(users);
// users.stream().collect(Collectors.toMap(BpmnTaskDelegateAssigner::getPersonId, Function.identity(), (s,t)->s))
// .forEach();
}
private void parseSignatureUrl(BpmnProcessInstanceLogVO log) {
List<String> signUrls = log.getTaskDetails().stream()
.filter(i -> StringUtils.hasText(i.getTaskId()))

View File

@ -0,0 +1,43 @@
package cn.axzo.workflow.server.mq.inside;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.EventConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 监听引擎自己的广播事件
*
* @author wangli
* @since 2024-11-06 15:05
*/
@Configuration(proxyBeanMethods = false)
public class SelfBoradcastRocketConfiguration {
public static final String DEFAULT_EVENT = "topic_workflow_engine_";
@Component
@ConditionalOnProperty(name = "rocketmq.name-server")
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer",
consumeMode = ConsumeMode.ORDERLY,
maxReconsumeTimes = 3,
nameServer = "${rocketmq.name-server}"
)
public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
}

View File

@ -0,0 +1,52 @@
package cn.axzo.workflow.server.mq.inside.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.workflow.common.enums.ElasticSearchEventEnum;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* ES 数据同步处理
*
* @author wangli
* @since 2024-11-06 15:11
*/
@Slf4j
@Component
@AllArgsConstructor
public class ElasticSearchSyncListener implements EventHandler, InitializingBean {
private final EventConsumer eventConsumer;
private final HistoryService historyService;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Override
public void onEvent(Event event, EventConsumer.Context context) {
String processInstanceId = event.normalizedData(String.class);
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
}
@Override
public void afterPropertiesSet() {
eventConsumer.registerHandler(ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC.getEventCode(), this);
}
}

View File

@ -1,10 +1,10 @@
package cn.axzo.workflow.server.outside.mq;
package cn.axzo.workflow.server.mq.outside;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
import cn.axzo.workflow.server.mq.outside.producer.DingtalkSendProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
@ -61,11 +61,11 @@ public class DingtalkRocketConfiguration {
@Component
@ConditionalOnProperty(name = "rocketmq.name-server")
@RocketMQMessageListener(topic = "topic_third_party_sync_event_${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_riven_consumer",
consumerGroup = "GID_${spring.application.name}_riven_${spring.profiles.active}_consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorType = SelectorType.TAG,
selectorExpression = "riven-dingtalk-receive",
maxReconsumeTimes = 0,
maxReconsumeTimes = 3,
nameServer = "${rocketmq.name-server}"
)
public static class ReplyMessageRocketConsumer extends BaseListener implements RocketMQListener<MessageExt> {

View File

@ -1,4 +1,4 @@
package cn.axzo.workflow.server.outside.mq.consumer;
package cn.axzo.workflow.server.mq.outside.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
@ -7,15 +7,12 @@ import cn.axzo.framework.rocketmq.EventHandler;
//import cn.axzo.riven.client.model.DingtalkReceiveMqModel;
//import cn.axzo.riven.client.model.DingtalkSendMqModel;
//import cn.axzo.riven.client.model.SampleText;
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
import com.alibaba.fastjson.JSON;
import cn.axzo.workflow.server.mq.outside.producer.DingtalkSendProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 监听钉钉群消息的事件

View File

@ -1,4 +1,4 @@
package cn.axzo.workflow.server.outside.mq.producer;
package cn.axzo.workflow.server.mq.outside.producer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;