Merge branch 'hotfix/timer_alter' into feature/REQ-4624

This commit is contained in:
wangli 2025-08-01 11:42:38 +08:00
commit 99b7609d14
5 changed files with 56 additions and 88 deletions

View File

@ -22,5 +22,6 @@ import java.io.Serializable;
public class TermNodeAddTimerJobDTO implements Serializable { public class TermNodeAddTimerJobDTO implements Serializable {
private String processInstanceId; private String processInstanceId;
private String activityId; private String activityId;
private String timeCycle; private Integer delayTime;
private String timeUnit;
} }

View File

@ -1,24 +1,19 @@
package cn.axzo.workflow.core.engine.cmd; package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.dto.TermNodeAddTimerJobDTO; import cn.axzo.workflow.common.model.dto.TermNodeAddTimerJobDTO;
import cn.axzo.workflow.core.converter.json.NotSupportConverter;
import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler; import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.common.engine.impl.interceptor.CommandContext; import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.ManagementService; import org.flowable.engine.ManagementService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.jobexecutor.TimerEventHandler;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil; import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.TimerUtil; import org.flowable.job.service.TimerJobService;
import org.flowable.engine.runtime.Execution;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity; import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import org.springframework.util.CollectionUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.Date;
/** /**
* 自定义添加定时任务的逻辑 * 自定义添加定时任务的逻辑
@ -41,37 +36,39 @@ public class CustomAddTimerJobCmd extends AbstractCommand<Void> implements Seria
@Override @Override
public Void executeInternal(CommandContext commandContext) { public Void executeInternal(CommandContext commandContext) {
log.info("CustomAddTimerJobCmd start. instanceId: {}, activityId: {}, timeCycle: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getTimeCycle()); log.info("CustomAddTimerJobCmd start. instanceId: {}, activityId: {}, delayTime: {}, timeUnit: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getDelayTime(), dto.getTimeUnit());
ProcessEngineConfigurationImpl processEngineConfiguration = ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext); CommandContextUtil.getProcessEngineConfiguration(commandContext);
ManagementService managementService = processEngineConfiguration.getManagementService(); ManagementService managementService = processEngineConfiguration.getManagementService();
String tableName = managementService.getTableName(Execution.class); Date alterTime;
List<Execution> list = processEngineConfiguration.getRuntimeService() switch (dto.getTimeUnit()) {
.createNativeExecutionQuery() case "M":
.sql("SELECT * FROM " + tableName + " WHERE PROC_INST_ID_ = #{instanceId} AND ACT_ID_ = #{activityId} AND IS_ACTIVE_ = 1 AND TASK_COUNT_ = 1") alterTime = DateUtil.offsetDay(new Date(), dto.getDelayTime());
.parameter("instanceId", dto.getProcessInstanceId()) break;
.parameter("activityId", dto.getActivityId()) case "H":
.list(); alterTime = DateUtil.offsetHour(new Date(), dto.getDelayTime());
if (CollectionUtils.isEmpty(list)) { break;
return null; default:
alterTime = DateUtil.offsetSecond(new Date(), dto.getDelayTime());
break;
} }
if (list.get(list.size() - 1) instanceof ExecutionEntity) { managementService.executeCommand(context -> {
ExecutionEntity executionEntity = (ExecutionEntity) list.get(list.size() - 1); TimerJobService timerJobService = CommandContextUtil.getTimerJobService();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition(); TimerJobEntity timerJobEntity = timerJobService.createTimerJob();
timerEventDefinition.setTimeCycle(dto.getTimeCycle()); timerJobEntity.setJobType("timer");
TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, timerJobEntity.setJobHandlerType(AsyncTermNodeAlterJobHandler.TYPE); // 这里填写你自定义的 JobHandler 类型
new NotSupportConverter.NotSupportFlowElement(), timerJobEntity.setProcessInstanceId(dto.getProcessInstanceId());
false, executionEntity, AsyncTermNodeAlterJobHandler.TYPE, timerJobEntity.setExecutionId(null);
TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(), null, timerJobEntity.setDuedate(alterTime); // 立即执行
timerEventDefinition.getCalendarName())); timerJobEntity.setRepeat(null); // 不重复
if (timerJob != null) { timerJobEntity.setRetries(1);
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob); timerJobEntity.setJobHandlerConfiguration(dto.getActivityId()); // 可选传递参数
} timerJobService.scheduleTimerJob(timerJobEntity);
} else { return null;
log.warn("未找到 execution entity"); });
}
return null; return null;
} }
} }

View File

@ -5,14 +5,11 @@ import cn.axzo.workflow.common.model.dto.AlterDTO;
import cn.axzo.workflow.common.model.dto.TermNodePausingDTO; import cn.axzo.workflow.common.model.dto.TermNodePausingDTO;
import cn.axzo.workflow.core.common.utils.SpringContextUtils; import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.conf.SupportRefreshProperties; import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.engine.tx.listener.DeleteTimerJobTransactionListener;
import cn.axzo.workflow.core.listener.Alter; import cn.axzo.workflow.core.listener.Alter;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.common.engine.impl.interceptor.CommandContext; import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.RuntimeService; import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService; import org.flowable.engine.TaskService;
@ -71,9 +68,8 @@ public class AsyncTermNodeAlterJobHandler extends AbstractJobHandler implements
tasks.forEach(e -> { tasks.forEach(e -> {
sb.append("id").append(e.getId()).append(", assignee: ").append(e.getAssignee()); sb.append("id").append(e.getId()).append(", assignee: ").append(e.getAssignee());
}); });
log.info("tasks size:{}", JSON.toJSONString(sb)); log.info("tasks size:{}, info: {}", tasks.size(), JSON.toJSONString(sb));
if (CollectionUtils.isEmpty(tasks) || tasks.size() > 1 || hasAssignee(tasks.get(0).getAssignee())) { if (CollectionUtils.isEmpty(tasks) || tasks.size() > 1 || hasAssignee(tasks.get(0).getAssignee())) {
deleteTimerJob(dto);
return; return;
} }
if (DateUtil.compare(DateUtil.date(), DateUtil.offsetMinute(tasks.get(0).getCreateTime(), refreshProperties.getPauseDelay())) <= 0) { if (DateUtil.compare(DateUtil.date(), DateUtil.offsetMinute(tasks.get(0).getCreateTime(), refreshProperties.getPauseDelay())) <= 0) {
@ -82,30 +78,27 @@ public class AsyncTermNodeAlterJobHandler extends AbstractJobHandler implements
// 不允许重复告警 // 不允许重复告警
if (!refreshProperties.getRepeatAlter() && dto.getRetries() > 0) { if (!refreshProperties.getRepeatAlter() && dto.getRetries() > 0) {
deleteTimerJob(dto); return;
}
// 超过告警次数
if (refreshProperties.getAlterRetries() != 0 && dto.getRetries() >= refreshProperties.getAlterRetries()) {
return; return;
} }
// 发送告警对象
Alter alter = SpringContextUtils.getBean(Alter.class);
AlterDTO alterDTO = new AlterDTO();
alterDTO.setProcessInstanceId(dto.getProcessInstanceId());
alterDTO.setActivityId(dto.getActivityId());
alterDTO.setTaskId(tasks.get(0).getId());
alterDTO.setStartTime(tasks.get(0).getCreateTime());
alterDTO.setPrettyStartTime(DateUtil.formatDateTime(tasks.get(0).getCreateTime()));
if (Boolean.TRUE.equals(refreshProperties.getAlterSendDingTalk())) {
alter.invoke(alterDTO);
if (refreshProperties.getAlterRetries() == 0 || dto.getRetries() < refreshProperties.getAlterRetries()) { // 记录告警次数
// 发送告警对象 incRetries(job, dto, runtimeService, activityId);
Alter alter = SpringContextUtils.getBean(Alter.class);
AlterDTO alterDTO = new AlterDTO();
alterDTO.setProcessInstanceId(dto.getProcessInstanceId());
alterDTO.setActivityId(dto.getActivityId());
alterDTO.setTaskId(tasks.get(0).getId());
alterDTO.setStartTime(tasks.get(0).getCreateTime());
alterDTO.setPrettyStartTime(DateUtil.formatDateTime(tasks.get(0).getCreateTime()));
if (Boolean.TRUE.equals(refreshProperties.getAlterSendDingTalk())) {
alter.invoke(alterDTO);
// 记录告警次数
incRetries(job, dto, runtimeService, activityId);
if (refreshProperties.getAlterRetries() != 0 && dto.getRetries() >= refreshProperties.getAlterRetries()) {
deleteTimerJob(dto);
}
}
} }
} }
@ -114,10 +107,10 @@ public class AsyncTermNodeAlterJobHandler extends AbstractJobHandler implements
runtimeService.setVariable(job.getProcessInstanceId(), BIZ_NODE_ALTER + activityId, dto); runtimeService.setVariable(job.getProcessInstanceId(), BIZ_NODE_ALTER + activityId, dto);
} }
private void deleteTimerJob(TermNodePausingDTO dto) { // private void deleteTimerJob(TermNodePausingDTO dto) {
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED, // Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new DeleteTimerJobTransactionListener(dto)); // new DeleteTimerJobTransactionListener(dto));
} // }
private Boolean hasAssignee(String assignee) { private Boolean hasAssignee(String assignee) {
if (!StringUtils.hasText(assignee)) { if (!StringUtils.hasText(assignee)) {

View File

@ -22,7 +22,7 @@ public class AddTimerJobTransactionListener implements TransactionListener {
@Override @Override
public void execute(CommandContext commandContext) { public void execute(CommandContext commandContext) {
log.info("add timer job listener. instanceId: {}, activityId: {}, timeCycle: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getTimeCycle()); log.info("add timer job listener. instanceId: {}, activityId: {}, delayTime: {}, timeUnit: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getDelayTime(), dto.getTimeUnit());
ProcessEngineConfigurationImpl processEngineConfiguration = ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext); CommandContextUtil.getProcessEngineConfiguration(commandContext);
processEngineConfiguration.getCommandExecutor().execute(new CustomAddTimerJobCmd(dto)); processEngineConfiguration.getCommandExecutor().execute(new CustomAddTimerJobCmd(dto));

View File

@ -7,7 +7,6 @@ import cn.axzo.workflow.core.common.context.ActivityOperationContext;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.conf.SupportRefreshProperties; import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.engine.tx.listener.AddTimerJobTransactionListener; import cn.axzo.workflow.core.engine.tx.listener.AddTimerJobTransactionListener;
import cn.axzo.workflow.core.engine.tx.listener.DeleteTimerJobTransactionListener;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener; import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener; import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
@ -91,7 +90,8 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
TermNodeAddTimerJobDTO addTimerJobDTO = new TermNodeAddTimerJobDTO(); TermNodeAddTimerJobDTO addTimerJobDTO = new TermNodeAddTimerJobDTO();
addTimerJobDTO.setProcessInstanceId(execution.getProcessInstanceId()); addTimerJobDTO.setProcessInstanceId(execution.getProcessInstanceId());
addTimerJobDTO.setActivityId(execution.getCurrentActivityId()); addTimerJobDTO.setActivityId(execution.getCurrentActivityId());
addTimerJobDTO.setTimeCycle("R100/PT" + refreshProperties.getAlterInterval() + timeUnit); addTimerJobDTO.setDelayTime(refreshProperties.getAlterInterval());
addTimerJobDTO.setTimeUnit(timeUnit);
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED, Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new AddTimerJobTransactionListener(addTimerJobDTO)); new AddTimerJobTransactionListener(addTimerJobDTO));
@ -104,27 +104,4 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
}); });
} }
/**
* 节点已取消
*
* @param execution
*/
@Override
public void onEnd(DelegateExecution execution) {
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
// ManagementService managementService = processEngineConfiguration.getManagementService();
// Job timerJob = managementService.createTimerJobQuery()
// .elementId(execution.getCurrentActivityId())
// .processInstanceId(execution.getProcessInstanceId())
// .singleResult();
// if (Objects.nonNull(timerJob)) {
// CommandContextUtil.getTimerJobService().deleteTimerJob((TimerJobEntity) timerJob);
// }
TermNodePausingDTO dto = new TermNodePausingDTO();
dto.setActivityId(execution.getCurrentActivityId());
dto.setProcessInstanceId(execution.getProcessInstanceId());
dto.setRetries(1);
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new DeleteTimerJobTransactionListener(dto));
}
} }