feat(REQ-2752) - ES 数据同步增加流程实例的发起人信息

This commit is contained in:
wangli 2024-10-14 18:17:07 +08:00
parent df4e13f305
commit ae83141701
5 changed files with 67 additions and 9 deletions

View File

@ -70,6 +70,11 @@ public class ProcessInstanceDocumentVO {
*/
private String businessStatus;
/**
* 发起人姓名
*/
private String initiatorName;
/**
* 实例对应的流程引擎服务端迭代版本
*/

View File

@ -91,6 +91,12 @@ public class ProcessInstanceDocument {
@IndexField(fieldType = FieldType.KEYWORD)
private String businessStatus;
/**
* 发起人姓名
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String initiatorName;
/**
* 实例对应的流程引擎服务端迭代版本
*/

View File

@ -2,6 +2,7 @@ package cn.axzo.workflow.es.service.aggregation;
import cn.axzo.basics.common.BeanMapper;
import cn.axzo.workflow.common.enums.WorkspaceType;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.common.model.response.es.ProcessInstanceDocumentVO;
@ -30,8 +31,10 @@ import java.util.Objects;
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_INITIATOR;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_INTERNAL_INITIATOR;
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.PROCESSING;
@ -70,9 +73,13 @@ public class AggregateProcessInstanceService {
Map<String, Object> variables = hpi.getProcessVariables();
if (CollectionUtils.isEmpty(variables)) {
variables.putAll(bpmnProcessInstanceForEsService.queryInstanceVariables(hpi.getId(),
Lists.newArrayList(WORKFLOW_ENGINE_VERSION, INTERNAL_PROCESS_WORKSPACE_TYPE, INTERNAL_PROCESS_AGENT)));
Lists.newArrayList(WORKFLOW_ENGINE_VERSION, INTERNAL_PROCESS_WORKSPACE_TYPE, INTERNAL_PROCESS_AGENT, INTERNAL_INITIATOR, OLD_INTERNAL_INITIATOR)));
}
BpmnTaskDelegateAssigner initiator = BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_INITIATOR, variables.getOrDefault(OLD_INTERNAL_INITIATOR, null)));
if (Objects.nonNull(initiator)) {
processInstanceDocument.setInitiatorName(initiator.getAssignerName());
}
processInstanceDocument.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
processInstanceDocument.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
processInstanceDocument.setAgent((Boolean) variables.getOrDefault(INTERNAL_PROCESS_AGENT, false));

View File

@ -14,6 +14,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
@ -24,6 +25,7 @@ import org.springframework.util.StringUtils;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
@ -48,6 +50,13 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
@XxlJob("completedProcessInstanceSyncToEs")
public ReturnT<String> execute(String s) {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
}
log.debug("start exec finished process instance data sync... ");
XxlJobLogger.log("start exec finished process instance data sync... ");
@ -72,7 +81,7 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
}
// 开始同步完成的实例到 ES
DataSyncSummaryDTO summary = doSync(search);
DataSyncSummaryDTO summary = doSync(search, shardingVO.getTotal() > 1 ? shardingVO : null);
// 执行完同步后的一些额外操作
afterSync(endTime);
@ -97,9 +106,10 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
* 内部自动分页循环同步
*
* @param search
* @param shardingVO 分片信息, 可能为 null
* @return 应同步至 ES 的统计数据
*/
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search) {
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, ShardingUtil.ShardingVO shardingVO) {
Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
log.info("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
@ -112,7 +122,18 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
skipRows, skipRows + search.getOverPageSize());
int pageNo = skipRows / search.getOverPageSize() + 1;
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page<HistoricProcessInstance>(pageNo, search.getOverPageSize()));
instances.forEach(hpi -> {
instances.parallelStream().filter(hpi -> {
if (Objects.nonNull(shardingVO)) {
if (hpi.getId().contains("-")) {
// 最开始的实例编号是 UUID无法取模所以统一交给第一个节点处理
return shardingVO.getIndex() == 0;
}
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
}
// 如果配置的非分片广播的方式那么 shardingVO 将是 null
return true;
}).forEach(hpi -> {
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi);
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
});

View File

@ -10,6 +10,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
@ -42,20 +43,27 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
@XxlJob("otherProcessInstanceSyncToEs")
public ReturnT<String> execute(String s) {
public ReturnT<String> execute(String param) {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
}
log.debug("start exec other process instance data sync... ");
XxlJobLogger.log("start exec other process instance data sync... ");
// 同步前的一些额外动作
Date endTime = beforeSync();
if(Objects.isNull(endTime)){
if (Objects.isNull(endTime)) {
log.info("请先执行“流程实例数据同步 ES 第一步”, Bean NamecompletedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
XxlJobLogger.log("请先执行“流程实例数据同步 ES 第一步”, Bean NamecompletedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
return ReturnT.FAIL;
}
// 开始同步流程数据到 ES
DataSyncSummaryDTO summary = doSync(endTime, s);
DataSyncSummaryDTO summary = doSync(endTime, param, shardingVO.getTotal() > 1 ? shardingVO : null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
@ -73,7 +81,7 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
* @param endTime
* @param pageSizeStr
*/
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr) {
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, ShardingUtil.ShardingVO shardingVO) {
Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTimeTotalCount(endTime);
log.info("查询到待同步实例维度数据量: {} 条", totalCount);
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalCount);
@ -94,7 +102,18 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
skipRows, skipRows + pageSize.get());
int pageNo = skipRows / pageSize.get() + 1;
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTime(endTime, new Page<HistoricProcessInstance>(pageNo, pageSize.get()));
instances.forEach(hpi -> {
instances.parallelStream().filter(hpi -> {
if (Objects.nonNull(shardingVO)) {
if (hpi.getId().contains("-")) {
// 最开始的实例编号是 UUID无法取模所以统一交给第一个节点处理
return shardingVO.getIndex() == 0;
}
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
}
// 如果配置的非分片广播的方式那么 shardingVO 将是 null
return true;
}).forEach(hpi -> {
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi);
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
});