diff --git a/pom.xml b/pom.xml index edd5ef0a2..5d8bc11b6 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ 3.26.0 7.10.2 2.0.0 + 2.5.0 @@ -48,6 +49,12 @@ ${axzo-dependencies.version} pom import + + + com.xuxueli + xxl-job-core + + io.github.openfeign @@ -161,6 +168,11 @@ org-api ${axzo-dependencies.org.version} + + com.xuxueli + xxl-job-core + ${xxl-job.version} + diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/CompletedProcessInstanceSyncEsJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/CompletedProcessInstanceSyncEsJobHandler.java index cf09d1f2d..7c12649c1 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/CompletedProcessInstanceSyncEsJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/CompletedProcessInstanceSyncEsJobHandler.java @@ -18,10 +18,8 @@ import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; -import com.xxl.job.core.util.ShardingUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.impl.interceptor.CommandExecutor; @@ -58,7 +56,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign @Component @RequiredArgsConstructor @Slf4j -public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler { +public class CompletedProcessInstanceSyncEsJobHandler { private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; private final AggregateProcessInstanceService aggregateProcessInstanceService; private final SpringProcessEngineConfiguration springProcessEngineConfiguration; @@ -69,14 +67,13 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler { @XxlJob("completedProcessInstanceSyncToEs") public ReturnT execute(String s) { // 分片参数 - ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); - if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) { - log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); - XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); - } + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); log.debug("start exec finished process instance data sync... "); - XxlJobLogger.log("start exec finished process instance data sync... "); + XxlJobHelper.log("start exec finished process instance data sync... "); HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO(); search.setFinished(true); @@ -87,25 +84,25 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler { if (StringUtils.hasText(s)) { search = JSON.parseObject(s, HistoricProcessInstanceSearchForEsDTO.class); log.info("根据入参转换后的查询入参:{}", JSON.toJSONString(search)); - XxlJobLogger.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search)); + XxlJobHelper.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search)); } else { log.info("入参为空, 将以默认条件执行"); - XxlJobLogger.log("入参为空, 将以默认条件执行"); + XxlJobHelper.log("入参为空, 将以默认条件执行"); } } catch (Exception e) { log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); - XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); + XxlJobHelper.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); return ReturnT.FAIL; } // 开始同步完成的实例到 ES - DataSyncSummaryDTO summary = doSync(search, shardingVO.getTotal() > 1 ? shardingVO : null); + DataSyncSummaryDTO summary = doSync(search, shardIndex, shardTotal); // 执行完同步后的一些额外操作 afterSync(endTime); log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); - XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); + XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); return ReturnT.SUCCESS; } @@ -124,30 +121,29 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler { * 内部自动分页循环同步 * * @param search - * @param shardingVO 分片信息, 可能为 null * @return 应同步至 ES 的统计数据 */ - private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, ShardingUtil.ShardingVO shardingVO) { + private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, int shardIndex, int shardTotal) { Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search); log.info("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount); - XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount); + XxlJobHelper.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount); AtomicLong totalProcessTaskCount = new AtomicLong(0); IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize()) .forEach(skipRows -> { log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100), skipRows, skipRows + search.getOverPageSize()); - XxlJobLogger.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100), + XxlJobHelper.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100), skipRows, skipRows + search.getOverPageSize()); int pageNo = skipRows / search.getOverPageSize() + 1; List instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page(pageNo, search.getOverPageSize())); instances.parallelStream().filter(hpi -> { - if (Objects.nonNull(shardingVO)) { + if (shardIndex >= 0 && shardTotal >= 0) { if (hpi.getId().contains("-")) { // 最开始的实例编号是 UUID,无法取模,所以统一交给第一个节点处理 - return shardingVO.getIndex() == 0; + return shardIndex == 0; } - return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex(); + return Long.parseLong(hpi.getId().substring(12)) % shardTotal == shardIndex; } // 如果配置的非分片广播的方式,那么 shardingVO 将是 null return true; diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/DeadLetterJobRetryHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/DeadLetterJobRetryHandler.java index ebae7878f..72ac7ae57 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/DeadLetterJobRetryHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/DeadLetterJobRetryHandler.java @@ -6,7 +6,6 @@ import cn.axzo.workflow.core.service.BpmnProcessVariableService; import cn.hutool.json.JSONUtil; import com.alibaba.nacos.common.utils.StringUtils; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; import lombok.Data; @@ -26,7 +25,7 @@ import static com.xxl.job.core.biz.model.ReturnT.FAIL_CODE; @Component @RequiredArgsConstructor @Slf4j -public class DeadLetterJobRetryHandler extends IJobHandler { +public class DeadLetterJobRetryHandler { @Resource private BpmnProcessVariableService variableService; @@ -34,7 +33,6 @@ public class DeadLetterJobRetryHandler extends IJobHandler { @Resource private BpmnProcessJobService jobService; - @Override @XxlJob("deadLetterJobRetry") public ReturnT execute(String param) { if (StringUtils.isBlank(param)) { diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/EsIndexOperationJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/EsIndexOperationJobHandler.java index f25ebd5f1..b07e23343 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/EsIndexOperationJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/EsIndexOperationJobHandler.java @@ -2,9 +2,8 @@ package cn.axzo.workflow.server.xxljob; import cn.axzo.workflow.es.service.EsProcessInstanceService; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -19,10 +18,9 @@ import org.springframework.util.StringUtils; @Component @RequiredArgsConstructor @Slf4j -public class EsIndexOperationJobHandler extends IJobHandler { +public class EsIndexOperationJobHandler { private final EsProcessInstanceService esProcessInstanceService; - @Override @XxlJob("esIndexOperation") public ReturnT execute(String param) { if (!StringUtils.hasText(param)) { @@ -38,10 +36,10 @@ public class EsIndexOperationJobHandler extends IJobHandler { */ private void deleteIndex() { log.info("开始删除父子文档索引..."); - XxlJobLogger.log("开始删除父子文档索引..."); + XxlJobHelper.log("开始删除父子文档索引..."); Boolean index = esProcessInstanceService.deleteIndex(); log.info("删除完成. 响应结果: {}", index); - XxlJobLogger.log("删除完成. 响应结果: {}", index); + XxlJobHelper.log("删除完成. 响应结果: {}", index); } /** @@ -49,10 +47,10 @@ public class EsIndexOperationJobHandler extends IJobHandler { */ private void createIndex() { log.info("开始创建父子文档索引..."); - XxlJobLogger.log("开始创建父子文档索引..."); + XxlJobHelper.log("开始创建父子文档索引..."); Boolean index = esProcessInstanceService.createIndex(); // 如果重复调用,避免报错,在创建的逻辑中,内置了判断是否存在指定索引,如果存在则不再创建,直接返回 false. log.info("创建完成. 响应结果: {}", index); - XxlJobLogger.log("创建完成. 响应结果: {}", index); + XxlJobHelper.log("创建完成. 响应结果: {}", index); } } diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OperationDataJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OperationDataJobHandler.java index 912c306fd..14a127b58 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OperationDataJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OperationDataJobHandler.java @@ -7,9 +7,8 @@ import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; import cn.axzo.workflow.core.repository.mapper.CommonMapper; import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -31,21 +30,20 @@ import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.APPROV @Component @RequiredArgsConstructor @Slf4j -public class OperationDataJobHandler extends IJobHandler { +public class OperationDataJobHandler { @Resource private CommonMapper commonMapper; @Resource private ExtAxHiTaskInstService extAxHiTaskInstService; - @Override @XxlJob("executeDynamicSql") public ReturnT execute(String s) throws Exception { if (StringUtils.hasText(s)) { - XxlJobLogger.log("执行动态 sql"); + XxlJobHelper.log("执行动态 sql"); List> maps = commonMapper.executeDynamicSQL(s); - XxlJobLogger.log("result: {}", JSON.toJSONString(maps)); + XxlJobHelper.log("result: {}", JSON.toJSONString(maps)); } else { - XxlJobLogger.log("仅修复 extAxTaskInst 表数据"); + XxlJobHelper.log("仅修复 extAxTaskInst 表数据"); repairData(); } return ReturnT.SUCCESS; diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OtherProcessInstanceSyncEsJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OtherProcessInstanceSyncEsJobHandler.java index 0f2633a45..f33683bdb 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OtherProcessInstanceSyncEsJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/OtherProcessInstanceSyncEsJobHandler.java @@ -14,10 +14,8 @@ import cn.hutool.core.util.NumberUtil; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; -import com.xxl.job.core.util.ShardingUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.impl.interceptor.CommandExecutor; @@ -52,7 +50,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign @Component @RequiredArgsConstructor @Slf4j -public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler { +public class OtherProcessInstanceSyncEsJobHandler { private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; private final AggregateProcessInstanceService aggregateProcessInstanceService; private final SpringProcessEngineConfiguration springProcessEngineConfiguration; @@ -63,28 +61,28 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler { @XxlJob("otherProcessInstanceSyncToEs") public ReturnT execute(String param) { // 分片参数 - ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); - if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) { - log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); - XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); - } + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + log.debug("start exec other process instance data sync... "); - XxlJobLogger.log("start exec other process instance data sync... "); + XxlJobHelper.log("start exec other process instance data sync... "); // 同步前的一些额外动作 Date endTime = beforeSync(); if (Objects.isNull(endTime)) { log.info("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!"); - XxlJobLogger.log("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!"); + XxlJobHelper.log("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!"); return ReturnT.FAIL; } // 开始同步流程数据到 ES - DataSyncSummaryDTO summary = doSync(endTime, param, shardingVO.getTotal() > 1 ? shardingVO : null); + DataSyncSummaryDTO summary = doSync(endTime, param, shardIndex, shardTotal); log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); - XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); + XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); return ReturnT.SUCCESS; } @@ -99,10 +97,10 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler { * @param endTime * @param pageSizeStr */ - private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, ShardingUtil.ShardingVO shardingVO) { + private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, int shardIndex, int shardTotal) { Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTimeTotalCount(endTime); log.info("查询到待同步实例维度数据量: {} 条", totalCount); - XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalCount); + XxlJobHelper.log("查询到待同步实例维度数据量: {} 条", totalCount); AtomicInteger pageSize = new AtomicInteger(50); if (StringUtils.hasText(pageSizeStr)) { try { @@ -116,18 +114,18 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler { .forEach(skipRows -> { log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100), skipRows, skipRows + pageSize.get()); - XxlJobLogger.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100), + XxlJobHelper.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100), skipRows, skipRows + pageSize.get()); int pageNo = skipRows / pageSize.get() + 1; List instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTime(endTime, new Page(pageNo, pageSize.get())); instances.parallelStream().filter(hpi -> { - if (Objects.nonNull(shardingVO)) { + if (shardIndex >= 0 && shardTotal >= 0) { if (hpi.getId().contains("-")) { // 最开始的实例编号是 UUID,无法取模,所以统一交给第一个节点处理 - return shardingVO.getIndex() == 0; + return shardIndex == 0; } - return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex(); + return Long.parseLong(hpi.getId().substring(12)) % shardTotal == shardIndex; } // 如果配置的非分片广播的方式,那么 shardingVO 将是 null return true; diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/SpecifyProcessInstanceSyncEsJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/SpecifyProcessInstanceSyncEsJobHandler.java index 44c5efd10..81bfdd5cf 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/SpecifyProcessInstanceSyncEsJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/SpecifyProcessInstanceSyncEsJobHandler.java @@ -13,9 +13,8 @@ import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; import cn.hutool.core.util.NumberUtil; import com.google.common.collect.Lists; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.flowable.engine.history.HistoricProcessInstance; @@ -45,7 +44,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign @Component @RequiredArgsConstructor @Slf4j -public class SpecifyProcessInstanceSyncEsJobHandler extends IJobHandler { +public class SpecifyProcessInstanceSyncEsJobHandler { private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; private final AggregateProcessInstanceService aggregateProcessInstanceService; private final SpringProcessEngineConfiguration springProcessEngineConfiguration; @@ -56,13 +55,13 @@ public class SpecifyProcessInstanceSyncEsJobHandler extends IJobHandler { @XxlJob("specifyProcessInstanceSyncToEs") public ReturnT execute(String processInstanceId) { log.info("Sync specify processInstance id: {}", processInstanceId); - XxlJobLogger.log("Sync specify processInstance id: {}", processInstanceId); + XxlJobHelper.log("Sync specify processInstance id: {}", processInstanceId); // 开始同步流程数据到 ES DataSyncSummaryDTO summary = doSync(processInstanceId); log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); - XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); + XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); return ReturnT.SUCCESS; }