feat(REQ-3004) - 移除一些无用的代码
This commit is contained in:
parent
6a99b1180c
commit
070b873adb
@ -1,27 +0,0 @@
|
||||
package cn.axzo.workflow.es.conf;
|
||||
|
||||
import cn.axzo.workflow.core.conf.ProcessExtConfigurer;
|
||||
import cn.axzo.workflow.es.flowable.ext.AsyncElasticSearchSyncJobHandler;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import org.flowable.job.service.JobHandler;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* ES 相关对 Flowable 扩展的配置
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-11-04 10:39
|
||||
*/
|
||||
@Component
|
||||
public class FlowableElasticSearchConfiguration implements ProcessExtConfigurer {
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
|
||||
public FlowableElasticSearchConfiguration(AggregateProcessInstanceService aggregateProcessInstanceService) {
|
||||
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobHandler getJobHandler() {
|
||||
return new AsyncElasticSearchSyncJobHandler(aggregateProcessInstanceService);
|
||||
}
|
||||
}
|
||||
@ -1,42 +0,0 @@
|
||||
package cn.axzo.workflow.es.flowable.ext;
|
||||
|
||||
import cn.axzo.workflow.core.engine.job.AbstractExecuteWithLockJobHandler;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandContext;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
import org.flowable.job.service.JobHandler;
|
||||
import org.flowable.job.service.impl.persistence.entity.JobEntity;
|
||||
import org.flowable.variable.api.delegate.VariableScope;
|
||||
|
||||
/**
|
||||
* 同步 ElasticSearch 数据
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-11-04 10:09
|
||||
*/
|
||||
@Slf4j
|
||||
public class AsyncElasticSearchSyncJobHandler extends AbstractExecuteWithLockJobHandler implements JobHandler {
|
||||
public static final String TYPE = "async-es-sync";
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
|
||||
public AsyncElasticSearchSyncJobHandler(AggregateProcessInstanceService aggregateProcessInstanceService) {
|
||||
super();
|
||||
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
|
||||
log.info("AsyncElasticSearchSyncJobHandler exec start...");
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration =
|
||||
CommandContextUtil.getProcessEngineConfiguration(commandContext);
|
||||
processEngineConfiguration.getCommandExecutor().execute(new CustomElasticSearchCmd(job.getProcessInstanceId(), aggregateProcessInstanceService));
|
||||
}
|
||||
}
|
||||
@ -1,80 +0,0 @@
|
||||
package cn.axzo.workflow.es.flowable.ext;
|
||||
|
||||
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandConfig;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandContext;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
import org.flowable.job.service.JobService;
|
||||
import org.flowable.job.service.JobServiceConfiguration;
|
||||
import org.flowable.job.service.impl.asyncexecutor.ResetExpiredJobsCmd;
|
||||
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
|
||||
import org.flowable.job.service.impl.persistence.entity.JobEntity;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 自定义同步处理器
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-11-04 10:00
|
||||
*/
|
||||
public class CustomElasticSearchAsyncCmd extends AbstractCommand<Void> implements Serializable {
|
||||
private final String processInstanceId;
|
||||
|
||||
public CustomElasticSearchAsyncCmd(String processInstanceId) {
|
||||
this.processInstanceId = processInstanceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String paramToJsonString() {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("processInstanceId", processInstanceId);
|
||||
return JSON.toJSONString(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void executeInternal(CommandContext commandContext) {
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
|
||||
|
||||
HistoryService historyService = processEngineConfiguration.getHistoryService();
|
||||
HistoricProcessInstance instance = historyService.createHistoricProcessInstanceQuery()
|
||||
.processInstanceId(processInstanceId)
|
||||
.singleResult();
|
||||
|
||||
String jobId = startAsync(processEngineConfiguration, instance);
|
||||
|
||||
// // 重置任务,因为上面的 cmd 和这个 cmd 的 lock 对象不一致
|
||||
JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
|
||||
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
|
||||
CommandConfig commandConfig = new CommandConfig().transactionRequired();
|
||||
commandExecutor.execute(commandConfig, new ResetExpiredJobsCmd(Collections.singletonList(jobId), jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
|
||||
return null;
|
||||
}
|
||||
|
||||
private String startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, HistoricProcessInstance instance) {
|
||||
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
|
||||
|
||||
JobEntity job = jobService.createJob();
|
||||
// 这里的 executionId 可为 null
|
||||
job.setExecutionId(instance.getId());
|
||||
job.setProcessInstanceId(instance.getId());
|
||||
job.setProcessDefinitionId(instance.getProcessDefinitionId());
|
||||
job.setElementId(AsyncElasticSearchSyncJobHandler.TYPE);
|
||||
job.setElementName(instance.getName());
|
||||
job.setJobHandlerType(AsyncElasticSearchSyncJobHandler.TYPE);
|
||||
job.setTenantId(instance.getTenantId());
|
||||
|
||||
// 创建异步任务并调度
|
||||
jobService.createAsyncJob(job, true);
|
||||
jobService.scheduleAsyncJob(job);
|
||||
return job.getId();
|
||||
}
|
||||
}
|
||||
@ -1,58 +0,0 @@
|
||||
package cn.axzo.workflow.es.flowable.ext;
|
||||
|
||||
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
|
||||
import cn.axzo.workflow.es.model.ProcessTaskDocument;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandContext;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 自定义同步处理器
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-11-04 10:00
|
||||
*/
|
||||
@Slf4j
|
||||
public class CustomElasticSearchCmd extends AbstractCommand<Void> implements Serializable {
|
||||
private final String processInstanceId;
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
|
||||
public CustomElasticSearchCmd(String processInstanceId,
|
||||
AggregateProcessInstanceService aggregateProcessInstanceService) {
|
||||
this.processInstanceId = processInstanceId;
|
||||
this.aggregateProcessInstanceService = aggregateProcessInstanceService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String paramToJsonString() {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("processInstanceId", processInstanceId);
|
||||
return JSON.toJSONString(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void executeInternal(CommandContext commandContext) {
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
|
||||
HistoryService historyService = processEngineConfiguration.getHistoryService();
|
||||
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
|
||||
.processInstanceId(processInstanceId)
|
||||
.singleResult();
|
||||
// 删除指定实例的父子文档
|
||||
log.info("delete document processInstanceId: {}", processInstanceId);
|
||||
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
|
||||
log.info("reInsert document processInstanceId: {}", processInstanceId);
|
||||
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user