Merge remote-tracking branch 'origin/master'

# Conflicts:
#	pom.xml
This commit is contained in:
wangli 2025-05-07 13:54:36 +08:00
commit 8c6cd3e75e
7 changed files with 63 additions and 64 deletions

12
pom.xml
View File

@ -30,6 +30,7 @@
<javaparse.version>3.26.0</javaparse.version>
<elasticsearch.version>7.10.2</elasticsearch.version>
<easy-es.version>2.0.0</easy-es.version>
<xxl-job.version>2.5.0</xxl-job.version>
</properties>
<dependencyManagement>
@ -48,6 +49,12 @@
<version>${axzo-dependencies.version}</version>
<type>pom</type>
<scope>import</scope>
<exclusions>
<exclusion>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
@ -161,6 +168,11 @@
<artifactId>org-api</artifactId>
<version>${axzo-dependencies.org.version}</version>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
<dependency>
<groupId>cn.axzo.nanopart</groupId>
<artifactId>doc-api</artifactId>

View File

@ -18,10 +18,8 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
@ -58,7 +56,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign
@Component
@RequiredArgsConstructor
@Slf4j
public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
public class CompletedProcessInstanceSyncEsJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
@ -69,14 +67,13 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
@XxlJob("completedProcessInstanceSyncToEs")
public ReturnT<String> execute(String s) {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
}
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.debug("start exec finished process instance data sync... ");
XxlJobLogger.log("start exec finished process instance data sync... ");
XxlJobHelper.log("start exec finished process instance data sync... ");
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
search.setFinished(true);
@ -87,25 +84,25 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
if (StringUtils.hasText(s)) {
search = JSON.parseObject(s, HistoricProcessInstanceSearchForEsDTO.class);
log.info("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
XxlJobLogger.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
XxlJobHelper.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
} else {
log.info("入参为空, 将以默认条件执行");
XxlJobLogger.log("入参为空, 将以默认条件执行");
XxlJobHelper.log("入参为空, 将以默认条件执行");
}
} catch (Exception e) {
log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
XxlJobHelper.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
return ReturnT.FAIL;
}
// 开始同步完成的实例到 ES
DataSyncSummaryDTO summary = doSync(search, shardingVO.getTotal() > 1 ? shardingVO : null);
DataSyncSummaryDTO summary = doSync(search, shardIndex, shardTotal);
// 执行完同步后的一些额外操作
afterSync(endTime);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
return ReturnT.SUCCESS;
}
@ -124,30 +121,29 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
* 内部自动分页循环同步
*
* @param search
* @param shardingVO 分片信息, 可能为 null
* @return 应同步至 ES 的统计数据
*/
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, ShardingUtil.ShardingVO shardingVO) {
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, int shardIndex, int shardTotal) {
Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
log.info("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
XxlJobHelper.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
AtomicLong totalProcessTaskCount = new AtomicLong(0);
IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize())
.forEach(skipRows -> {
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
skipRows, skipRows + search.getOverPageSize());
XxlJobLogger.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
XxlJobHelper.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
skipRows, skipRows + search.getOverPageSize());
int pageNo = skipRows / search.getOverPageSize() + 1;
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page<HistoricProcessInstance>(pageNo, search.getOverPageSize()));
instances.parallelStream().filter(hpi -> {
if (Objects.nonNull(shardingVO)) {
if (shardIndex >= 0 && shardTotal >= 0) {
if (hpi.getId().contains("-")) {
// 最开始的实例编号是 UUID无法取模所以统一交给第一个节点处理
return shardingVO.getIndex() == 0;
return shardIndex == 0;
}
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
return Long.parseLong(hpi.getId().substring(12)) % shardTotal == shardIndex;
}
// 如果配置的非分片广播的方式那么 shardingVO 将是 null
return true;

View File

@ -6,7 +6,6 @@ import cn.axzo.workflow.core.service.BpmnProcessVariableService;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.StringUtils;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -26,7 +25,7 @@ import static com.xxl.job.core.biz.model.ReturnT.FAIL_CODE;
@Component
@RequiredArgsConstructor
@Slf4j
public class DeadLetterJobRetryHandler extends IJobHandler {
public class DeadLetterJobRetryHandler {
@Resource
private BpmnProcessVariableService variableService;
@ -34,7 +33,6 @@ public class DeadLetterJobRetryHandler extends IJobHandler {
@Resource
private BpmnProcessJobService jobService;
@Override
@XxlJob("deadLetterJobRetry")
public ReturnT<String> execute(String param) {
if (StringUtils.isBlank(param)) {

View File

@ -2,9 +2,8 @@ package cn.axzo.workflow.server.xxljob;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -19,10 +18,9 @@ import org.springframework.util.StringUtils;
@Component
@RequiredArgsConstructor
@Slf4j
public class EsIndexOperationJobHandler extends IJobHandler {
public class EsIndexOperationJobHandler {
private final EsProcessInstanceService esProcessInstanceService;
@Override
@XxlJob("esIndexOperation")
public ReturnT<String> execute(String param) {
if (!StringUtils.hasText(param)) {
@ -38,10 +36,10 @@ public class EsIndexOperationJobHandler extends IJobHandler {
*/
private void deleteIndex() {
log.info("开始删除父子文档索引...");
XxlJobLogger.log("开始删除父子文档索引...");
XxlJobHelper.log("开始删除父子文档索引...");
Boolean index = esProcessInstanceService.deleteIndex();
log.info("删除完成. 响应结果: {}", index);
XxlJobLogger.log("删除完成. 响应结果: {}", index);
XxlJobHelper.log("删除完成. 响应结果: {}", index);
}
/**
@ -49,10 +47,10 @@ public class EsIndexOperationJobHandler extends IJobHandler {
*/
private void createIndex() {
log.info("开始创建父子文档索引...");
XxlJobLogger.log("开始创建父子文档索引...");
XxlJobHelper.log("开始创建父子文档索引...");
Boolean index = esProcessInstanceService.createIndex();
// 如果重复调用,避免报错,在创建的逻辑中,内置了判断是否存在指定索引,如果存在则不再创建,直接返回 false.
log.info("创建完成. 响应结果: {}", index);
XxlJobLogger.log("创建完成. 响应结果: {}", index);
XxlJobHelper.log("创建完成. 响应结果: {}", index);
}
}

View File

@ -7,9 +7,8 @@ import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.repository.mapper.CommonMapper;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -31,21 +30,20 @@ import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.APPROV
@Component
@RequiredArgsConstructor
@Slf4j
public class OperationDataJobHandler extends IJobHandler {
public class OperationDataJobHandler {
@Resource
private CommonMapper commonMapper;
@Resource
private ExtAxHiTaskInstService extAxHiTaskInstService;
@Override
@XxlJob("executeDynamicSql")
public ReturnT<String> execute(String s) throws Exception {
if (StringUtils.hasText(s)) {
XxlJobLogger.log("执行动态 sql");
XxlJobHelper.log("执行动态 sql");
List<Map<String, Object>> maps = commonMapper.executeDynamicSQL(s);
XxlJobLogger.log("result: {}", JSON.toJSONString(maps));
XxlJobHelper.log("result: {}", JSON.toJSONString(maps));
} else {
XxlJobLogger.log("仅修复 extAxTaskInst 表数据");
XxlJobHelper.log("仅修复 extAxTaskInst 表数据");
repairData();
}
return ReturnT.SUCCESS;

View File

@ -14,10 +14,8 @@ import cn.hutool.core.util.NumberUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
@ -52,7 +50,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign
@Component
@RequiredArgsConstructor
@Slf4j
public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
public class OtherProcessInstanceSyncEsJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
@ -63,28 +61,28 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
@XxlJob("otherProcessInstanceSyncToEs")
public ReturnT<String> execute(String param) {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
}
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.debug("start exec other process instance data sync... ");
XxlJobLogger.log("start exec other process instance data sync... ");
XxlJobHelper.log("start exec other process instance data sync... ");
// 同步前的一些额外动作
Date endTime = beforeSync();
if (Objects.isNull(endTime)) {
log.info("请先执行“流程实例数据同步 ES 第一步”, Bean NamecompletedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
XxlJobLogger.log("请先执行“流程实例数据同步 ES 第一步”, Bean NamecompletedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
XxlJobHelper.log("请先执行“流程实例数据同步 ES 第一步”, Bean NamecompletedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
return ReturnT.FAIL;
}
// 开始同步流程数据到 ES
DataSyncSummaryDTO summary = doSync(endTime, param, shardingVO.getTotal() > 1 ? shardingVO : null);
DataSyncSummaryDTO summary = doSync(endTime, param, shardIndex, shardTotal);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
return ReturnT.SUCCESS;
}
@ -99,10 +97,10 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
* @param endTime
* @param pageSizeStr
*/
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, ShardingUtil.ShardingVO shardingVO) {
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, int shardIndex, int shardTotal) {
Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTimeTotalCount(endTime);
log.info("查询到待同步实例维度数据量: {} 条", totalCount);
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalCount);
XxlJobHelper.log("查询到待同步实例维度数据量: {} 条", totalCount);
AtomicInteger pageSize = new AtomicInteger(50);
if (StringUtils.hasText(pageSizeStr)) {
try {
@ -116,18 +114,18 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
.forEach(skipRows -> {
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
skipRows, skipRows + pageSize.get());
XxlJobLogger.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
XxlJobHelper.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
skipRows, skipRows + pageSize.get());
int pageNo = skipRows / pageSize.get() + 1;
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTime(endTime, new Page<HistoricProcessInstance>(pageNo, pageSize.get()));
instances.parallelStream().filter(hpi -> {
if (Objects.nonNull(shardingVO)) {
if (shardIndex >= 0 && shardTotal >= 0) {
if (hpi.getId().contains("-")) {
// 最开始的实例编号是 UUID无法取模所以统一交给第一个节点处理
return shardingVO.getIndex() == 0;
return shardIndex == 0;
}
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
return Long.parseLong(hpi.getId().substring(12)) % shardTotal == shardIndex;
}
// 如果配置的非分片广播的方式那么 shardingVO 将是 null
return true;

View File

@ -13,9 +13,8 @@ import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.hutool.core.util.NumberUtil;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.history.HistoricProcessInstance;
@ -45,7 +44,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign
@Component
@RequiredArgsConstructor
@Slf4j
public class SpecifyProcessInstanceSyncEsJobHandler extends IJobHandler {
public class SpecifyProcessInstanceSyncEsJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
@ -56,13 +55,13 @@ public class SpecifyProcessInstanceSyncEsJobHandler extends IJobHandler {
@XxlJob("specifyProcessInstanceSyncToEs")
public ReturnT<String> execute(String processInstanceId) {
log.info("Sync specify processInstance id: {}", processInstanceId);
XxlJobLogger.log("Sync specify processInstance id: {}", processInstanceId);
XxlJobHelper.log("Sync specify processInstance id: {}", processInstanceId);
// 开始同步流程数据到 ES
DataSyncSummaryDTO summary = doSync(processInstanceId);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
return ReturnT.SUCCESS;
}