feat(xxljob) - 升级 xxl-job
This commit is contained in:
parent
2470fbc167
commit
ddbdb0d509
12
pom.xml
12
pom.xml
@ -30,6 +30,7 @@
|
||||
<javaparse.version>3.26.0</javaparse.version>
|
||||
<elasticsearch.version>7.10.2</elasticsearch.version>
|
||||
<easy-es.version>2.0.0</easy-es.version>
|
||||
<xxl-job.version>2.5.0</xxl-job.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
@ -48,6 +49,12 @@
|
||||
<version>${axzo-dependencies.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.xuxueli</groupId>
|
||||
<artifactId>xxl-job-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
@ -161,6 +168,11 @@
|
||||
<artifactId>org-api</artifactId>
|
||||
<version>${axzo-dependencies.org.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.xuxueli</groupId>
|
||||
<artifactId>xxl-job-core</artifactId>
|
||||
<version>${xxl-job.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
@ -18,10 +18,8 @@ import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import com.xxl.job.core.util.ShardingUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
@ -58,7 +56,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
public class CompletedProcessInstanceSyncEsJobHandler {
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
|
||||
@ -69,14 +67,13 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
@XxlJob("completedProcessInstanceSyncToEs")
|
||||
public ReturnT<String> execute(String s) {
|
||||
// 分片参数
|
||||
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
|
||||
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
|
||||
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
}
|
||||
int shardIndex = XxlJobHelper.getShardIndex();
|
||||
int shardTotal = XxlJobHelper.getShardTotal();
|
||||
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
|
||||
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
|
||||
|
||||
log.debug("start exec finished process instance data sync... ");
|
||||
XxlJobLogger.log("start exec finished process instance data sync... ");
|
||||
XxlJobHelper.log("start exec finished process instance data sync... ");
|
||||
|
||||
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
|
||||
search.setFinished(true);
|
||||
@ -87,25 +84,25 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
if (StringUtils.hasText(s)) {
|
||||
search = JSON.parseObject(s, HistoricProcessInstanceSearchForEsDTO.class);
|
||||
log.info("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
|
||||
XxlJobLogger.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
|
||||
XxlJobHelper.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
|
||||
} else {
|
||||
log.info("入参为空, 将以默认条件执行");
|
||||
XxlJobLogger.log("入参为空, 将以默认条件执行");
|
||||
XxlJobHelper.log("入参为空, 将以默认条件执行");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
|
||||
XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
|
||||
XxlJobHelper.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
|
||||
// 开始同步完成的实例到 ES
|
||||
DataSyncSummaryDTO summary = doSync(search, shardingVO.getTotal() > 1 ? shardingVO : null);
|
||||
DataSyncSummaryDTO summary = doSync(search, shardIndex, shardTotal);
|
||||
|
||||
// 执行完同步后的一些额外操作
|
||||
afterSync(endTime);
|
||||
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
@ -124,30 +121,29 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
* 内部自动分页循环同步
|
||||
*
|
||||
* @param search
|
||||
* @param shardingVO 分片信息, 可能为 null
|
||||
* @return 应同步至 ES 的统计数据
|
||||
*/
|
||||
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, ShardingUtil.ShardingVO shardingVO) {
|
||||
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, int shardIndex, int shardTotal) {
|
||||
Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
|
||||
log.info("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
|
||||
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
|
||||
XxlJobHelper.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
|
||||
AtomicLong totalProcessTaskCount = new AtomicLong(0);
|
||||
IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize())
|
||||
.forEach(skipRows -> {
|
||||
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
|
||||
skipRows, skipRows + search.getOverPageSize());
|
||||
XxlJobLogger.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
|
||||
XxlJobHelper.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
|
||||
skipRows, skipRows + search.getOverPageSize());
|
||||
int pageNo = skipRows / search.getOverPageSize() + 1;
|
||||
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page<HistoricProcessInstance>(pageNo, search.getOverPageSize()));
|
||||
|
||||
instances.parallelStream().filter(hpi -> {
|
||||
if (Objects.nonNull(shardingVO)) {
|
||||
if (shardIndex >= 0 && shardTotal >= 0) {
|
||||
if (hpi.getId().contains("-")) {
|
||||
// 最开始的实例编号是 UUID,无法取模,所以统一交给第一个节点处理
|
||||
return shardingVO.getIndex() == 0;
|
||||
return shardIndex == 0;
|
||||
}
|
||||
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
|
||||
return Long.parseLong(hpi.getId().substring(12)) % shardTotal == shardIndex;
|
||||
}
|
||||
// 如果配置的非分片广播的方式,那么 shardingVO 将是 null
|
||||
return true;
|
||||
|
||||
@ -6,7 +6,6 @@ import cn.axzo.workflow.core.service.BpmnProcessVariableService;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
@ -26,7 +25,7 @@ import static com.xxl.job.core.biz.model.ReturnT.FAIL_CODE;
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class DeadLetterJobRetryHandler extends IJobHandler {
|
||||
public class DeadLetterJobRetryHandler {
|
||||
|
||||
@Resource
|
||||
private BpmnProcessVariableService variableService;
|
||||
@ -34,7 +33,6 @@ public class DeadLetterJobRetryHandler extends IJobHandler {
|
||||
@Resource
|
||||
private BpmnProcessJobService jobService;
|
||||
|
||||
@Override
|
||||
@XxlJob("deadLetterJobRetry")
|
||||
public ReturnT<String> execute(String param) {
|
||||
if (StringUtils.isBlank(param)) {
|
||||
|
||||
@ -2,9 +2,8 @@ package cn.axzo.workflow.server.xxljob;
|
||||
|
||||
import cn.axzo.workflow.es.service.EsProcessInstanceService;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -19,10 +18,9 @@ import org.springframework.util.StringUtils;
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class EsIndexOperationJobHandler extends IJobHandler {
|
||||
public class EsIndexOperationJobHandler {
|
||||
private final EsProcessInstanceService esProcessInstanceService;
|
||||
|
||||
@Override
|
||||
@XxlJob("esIndexOperation")
|
||||
public ReturnT<String> execute(String param) {
|
||||
if (!StringUtils.hasText(param)) {
|
||||
@ -38,10 +36,10 @@ public class EsIndexOperationJobHandler extends IJobHandler {
|
||||
*/
|
||||
private void deleteIndex() {
|
||||
log.info("开始删除父子文档索引...");
|
||||
XxlJobLogger.log("开始删除父子文档索引...");
|
||||
XxlJobHelper.log("开始删除父子文档索引...");
|
||||
Boolean index = esProcessInstanceService.deleteIndex();
|
||||
log.info("删除完成. 响应结果: {}", index);
|
||||
XxlJobLogger.log("删除完成. 响应结果: {}", index);
|
||||
XxlJobHelper.log("删除完成. 响应结果: {}", index);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -49,10 +47,10 @@ public class EsIndexOperationJobHandler extends IJobHandler {
|
||||
*/
|
||||
private void createIndex() {
|
||||
log.info("开始创建父子文档索引...");
|
||||
XxlJobLogger.log("开始创建父子文档索引...");
|
||||
XxlJobHelper.log("开始创建父子文档索引...");
|
||||
Boolean index = esProcessInstanceService.createIndex();
|
||||
// 如果重复调用,避免报错,在创建的逻辑中,内置了判断是否存在指定索引,如果存在则不再创建,直接返回 false.
|
||||
log.info("创建完成. 响应结果: {}", index);
|
||||
XxlJobLogger.log("创建完成. 响应结果: {}", index);
|
||||
XxlJobHelper.log("创建完成. 响应结果: {}", index);
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,9 +7,8 @@ import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||
import cn.axzo.workflow.core.repository.mapper.CommonMapper;
|
||||
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -31,21 +30,20 @@ import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.APPROV
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class OperationDataJobHandler extends IJobHandler {
|
||||
public class OperationDataJobHandler {
|
||||
@Resource
|
||||
private CommonMapper commonMapper;
|
||||
@Resource
|
||||
private ExtAxHiTaskInstService extAxHiTaskInstService;
|
||||
|
||||
@Override
|
||||
@XxlJob("executeDynamicSql")
|
||||
public ReturnT<String> execute(String s) throws Exception {
|
||||
if (StringUtils.hasText(s)) {
|
||||
XxlJobLogger.log("执行动态 sql");
|
||||
XxlJobHelper.log("执行动态 sql");
|
||||
List<Map<String, Object>> maps = commonMapper.executeDynamicSQL(s);
|
||||
XxlJobLogger.log("result: {}", JSON.toJSONString(maps));
|
||||
XxlJobHelper.log("result: {}", JSON.toJSONString(maps));
|
||||
} else {
|
||||
XxlJobLogger.log("仅修复 extAxTaskInst 表数据");
|
||||
XxlJobHelper.log("仅修复 extAxTaskInst 表数据");
|
||||
repairData();
|
||||
}
|
||||
return ReturnT.SUCCESS;
|
||||
|
||||
@ -14,10 +14,8 @@ import cn.hutool.core.util.NumberUtil;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import com.xxl.job.core.util.ShardingUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
@ -52,7 +50,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
public class OtherProcessInstanceSyncEsJobHandler {
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
|
||||
@ -63,28 +61,28 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
@XxlJob("otherProcessInstanceSyncToEs")
|
||||
public ReturnT<String> execute(String param) {
|
||||
// 分片参数
|
||||
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
|
||||
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
|
||||
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
}
|
||||
int shardIndex = XxlJobHelper.getShardIndex();
|
||||
int shardTotal = XxlJobHelper.getShardTotal();
|
||||
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
|
||||
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
|
||||
|
||||
|
||||
log.debug("start exec other process instance data sync... ");
|
||||
XxlJobLogger.log("start exec other process instance data sync... ");
|
||||
XxlJobHelper.log("start exec other process instance data sync... ");
|
||||
|
||||
// 同步前的一些额外动作
|
||||
Date endTime = beforeSync();
|
||||
if (Objects.isNull(endTime)) {
|
||||
log.info("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
|
||||
XxlJobLogger.log("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
|
||||
XxlJobHelper.log("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
|
||||
// 开始同步流程数据到 ES
|
||||
DataSyncSummaryDTO summary = doSync(endTime, param, shardingVO.getTotal() > 1 ? shardingVO : null);
|
||||
DataSyncSummaryDTO summary = doSync(endTime, param, shardIndex, shardTotal);
|
||||
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
@ -99,10 +97,10 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
* @param endTime
|
||||
* @param pageSizeStr
|
||||
*/
|
||||
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, ShardingUtil.ShardingVO shardingVO) {
|
||||
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, int shardIndex, int shardTotal) {
|
||||
Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTimeTotalCount(endTime);
|
||||
log.info("查询到待同步实例维度数据量: {} 条", totalCount);
|
||||
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalCount);
|
||||
XxlJobHelper.log("查询到待同步实例维度数据量: {} 条", totalCount);
|
||||
AtomicInteger pageSize = new AtomicInteger(50);
|
||||
if (StringUtils.hasText(pageSizeStr)) {
|
||||
try {
|
||||
@ -116,18 +114,18 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
.forEach(skipRows -> {
|
||||
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
|
||||
skipRows, skipRows + pageSize.get());
|
||||
XxlJobLogger.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
|
||||
XxlJobHelper.log("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
|
||||
skipRows, skipRows + pageSize.get());
|
||||
int pageNo = skipRows / pageSize.get() + 1;
|
||||
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTime(endTime, new Page<HistoricProcessInstance>(pageNo, pageSize.get()));
|
||||
|
||||
instances.parallelStream().filter(hpi -> {
|
||||
if (Objects.nonNull(shardingVO)) {
|
||||
if (shardIndex >= 0 && shardTotal >= 0) {
|
||||
if (hpi.getId().contains("-")) {
|
||||
// 最开始的实例编号是 UUID,无法取模,所以统一交给第一个节点处理
|
||||
return shardingVO.getIndex() == 0;
|
||||
return shardIndex == 0;
|
||||
}
|
||||
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
|
||||
return Long.parseLong(hpi.getId().substring(12)) % shardTotal == shardIndex;
|
||||
}
|
||||
// 如果配置的非分片广播的方式,那么 shardingVO 将是 null
|
||||
return true;
|
||||
|
||||
@ -13,9 +13,8 @@ import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import cn.hutool.core.util.NumberUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
@ -45,7 +44,7 @@ import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssign
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class SpecifyProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
public class SpecifyProcessInstanceSyncEsJobHandler {
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
|
||||
@ -56,13 +55,13 @@ public class SpecifyProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
@XxlJob("specifyProcessInstanceSyncToEs")
|
||||
public ReturnT<String> execute(String processInstanceId) {
|
||||
log.info("Sync specify processInstance id: {}", processInstanceId);
|
||||
XxlJobLogger.log("Sync specify processInstance id: {}", processInstanceId);
|
||||
XxlJobHelper.log("Sync specify processInstance id: {}", processInstanceId);
|
||||
|
||||
// 开始同步流程数据到 ES
|
||||
DataSyncSummaryDTO summary = doSync(processInstanceId);
|
||||
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobHelper.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user