fix - 调整 es 同步

This commit is contained in:
wangli 2024-11-06 11:01:32 +08:00
parent 2542cb9f87
commit 0033427234
3 changed files with 7 additions and 11 deletions

View File

@ -36,7 +36,7 @@ public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandl
executeInternal(job, configuration, variableScope, commandContext);
} else {
log.warn("get lock failed,processInstanceId:{},jobId:{}", processInstanceId, jobId);
throw new WorkflowEngineException(ASYNC_JOB_EXECUTION_ERROR);
throw new WorkflowEngineException(ASYNC_JOB_EXECUTION_ERROR, processInstanceId);
}
} finally {
releaseLock(processInstanceId, jobId);

View File

@ -37,7 +37,6 @@ public class AsyncElasticSearchSyncJobHandler extends AbstractExecuteWithLockJob
log.info("AsyncElasticSearchSyncJobHandler exec start...");
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
String processInstanceId = job.getCustomValues();
processEngineConfiguration.getCommandExecutor().execute(new CustomElasticSearchCmd(processInstanceId, aggregateProcessInstanceService));
processEngineConfiguration.getCommandExecutor().execute(new CustomElasticSearchCmd(job.getProcessInstanceId(), aggregateProcessInstanceService));
}
}

View File

@ -51,10 +51,10 @@ public class CustomElasticSearchAsyncCmd extends AbstractCommand<Void> implement
String jobId = startAsync(processEngineConfiguration, instance);
// // 重置任务因为上面的 cmd 和这个 cmd lock 对象不一致
// JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
// CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
// CommandConfig commandConfig = new CommandConfig().transactionRequired();
// commandExecutor.execute(commandConfig, new ResetExpiredJobsCmd(Collections.singletonList(jobId), jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
CommandConfig commandConfig = new CommandConfig().transactionRequired();
commandExecutor.execute(commandConfig, new ResetExpiredJobsCmd(Collections.singletonList(jobId), jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
return null;
}
@ -71,11 +71,8 @@ public class CustomElasticSearchAsyncCmd extends AbstractCommand<Void> implement
job.setJobHandlerType(AsyncElasticSearchSyncJobHandler.TYPE);
job.setTenantId(instance.getTenantId());
// 携带自定义的数据
job.setCustomValues(processInstanceId);
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.createAsyncJob(job, true);
jobService.scheduleAsyncJob(job);
return job.getId();
}