Merge branch 'feature/REQ-4624' into dev

This commit is contained in:
wangli 2025-08-04 10:13:06 +08:00
commit 8eb48df1e5
5 changed files with 56 additions and 88 deletions

View File

@ -22,5 +22,6 @@ import java.io.Serializable;
public class TermNodeAddTimerJobDTO implements Serializable {
private String processInstanceId;
private String activityId;
private String timeCycle;
private Integer delayTime;
private String timeUnit;
}

View File

@ -1,24 +1,19 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.dto.TermNodeAddTimerJobDTO;
import cn.axzo.workflow.core.converter.json.NotSupportConverter;
import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.ManagementService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.jobexecutor.TimerEventHandler;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.TimerUtil;
import org.flowable.engine.runtime.Execution;
import org.flowable.job.service.TimerJobService;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import org.springframework.util.CollectionUtils;
import java.io.Serializable;
import java.util.List;
import java.util.Date;
/**
* 自定义添加定时任务的逻辑
@ -41,37 +36,39 @@ public class CustomAddTimerJobCmd extends AbstractCommand<Void> implements Seria
@Override
public Void executeInternal(CommandContext commandContext) {
log.info("CustomAddTimerJobCmd start. instanceId: {}, activityId: {}, timeCycle: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getTimeCycle());
log.info("CustomAddTimerJobCmd start. instanceId: {}, activityId: {}, delayTime: {}, timeUnit: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getDelayTime(), dto.getTimeUnit());
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ManagementService managementService = processEngineConfiguration.getManagementService();
String tableName = managementService.getTableName(Execution.class);
List<Execution> list = processEngineConfiguration.getRuntimeService()
.createNativeExecutionQuery()
.sql("SELECT * FROM " + tableName + " WHERE PROC_INST_ID_ = #{instanceId} AND ACT_ID_ = #{activityId} AND IS_ACTIVE_ = 1 AND TASK_COUNT_ = 1")
.parameter("instanceId", dto.getProcessInstanceId())
.parameter("activityId", dto.getActivityId())
.list();
if (CollectionUtils.isEmpty(list)) {
return null;
Date alterTime;
switch (dto.getTimeUnit()) {
case "M":
alterTime = DateUtil.offsetDay(new Date(), dto.getDelayTime());
break;
case "H":
alterTime = DateUtil.offsetHour(new Date(), dto.getDelayTime());
break;
default:
alterTime = DateUtil.offsetSecond(new Date(), dto.getDelayTime());
break;
}
if (list.get(list.size() - 1) instanceof ExecutionEntity) {
ExecutionEntity executionEntity = (ExecutionEntity) list.get(list.size() - 1);
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setTimeCycle(dto.getTimeCycle());
TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition,
new NotSupportConverter.NotSupportFlowElement(),
false, executionEntity, AsyncTermNodeAlterJobHandler.TYPE,
TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(), null,
timerEventDefinition.getCalendarName()));
if (timerJob != null) {
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob);
}
} else {
log.warn("未找到 execution entity");
}
managementService.executeCommand(context -> {
TimerJobService timerJobService = CommandContextUtil.getTimerJobService();
TimerJobEntity timerJobEntity = timerJobService.createTimerJob();
timerJobEntity.setJobType("timer");
timerJobEntity.setJobHandlerType(AsyncTermNodeAlterJobHandler.TYPE); // 这里填写你自定义的 JobHandler 类型
timerJobEntity.setProcessInstanceId(dto.getProcessInstanceId());
timerJobEntity.setExecutionId(null);
timerJobEntity.setDuedate(alterTime); // 立即执行
timerJobEntity.setRepeat(null); // 不重复
timerJobEntity.setRetries(1);
timerJobEntity.setJobHandlerConfiguration(dto.getActivityId()); // 可选传递参数
timerJobService.scheduleTimerJob(timerJobEntity);
return null;
});
return null;
}
}

View File

@ -5,14 +5,11 @@ import cn.axzo.workflow.common.model.dto.AlterDTO;
import cn.axzo.workflow.common.model.dto.TermNodePausingDTO;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.engine.tx.listener.DeleteTimerJobTransactionListener;
import cn.axzo.workflow.core.listener.Alter;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
@ -71,9 +68,8 @@ public class AsyncTermNodeAlterJobHandler extends AbstractJobHandler implements
tasks.forEach(e -> {
sb.append("id").append(e.getId()).append(", assignee: ").append(e.getAssignee());
});
log.info("tasks size:{}", JSON.toJSONString(sb));
log.info("tasks size:{}, info: {}", tasks.size(), JSON.toJSONString(sb));
if (CollectionUtils.isEmpty(tasks) || tasks.size() > 1 || hasAssignee(tasks.get(0).getAssignee())) {
deleteTimerJob(dto);
return;
}
if (DateUtil.compare(DateUtil.date(), DateUtil.offsetMinute(tasks.get(0).getCreateTime(), refreshProperties.getPauseDelay())) <= 0) {
@ -82,30 +78,27 @@ public class AsyncTermNodeAlterJobHandler extends AbstractJobHandler implements
// 不允许重复告警
if (!refreshProperties.getRepeatAlter() && dto.getRetries() > 0) {
deleteTimerJob(dto);
return;
}
// 超过告警次数
if (refreshProperties.getAlterRetries() != 0 && dto.getRetries() >= refreshProperties.getAlterRetries()) {
return;
}
// 发送告警对象
Alter alter = SpringContextUtils.getBean(Alter.class);
AlterDTO alterDTO = new AlterDTO();
alterDTO.setProcessInstanceId(dto.getProcessInstanceId());
alterDTO.setActivityId(dto.getActivityId());
alterDTO.setTaskId(tasks.get(0).getId());
alterDTO.setStartTime(tasks.get(0).getCreateTime());
alterDTO.setPrettyStartTime(DateUtil.formatDateTime(tasks.get(0).getCreateTime()));
if (Boolean.TRUE.equals(refreshProperties.getAlterSendDingTalk())) {
alter.invoke(alterDTO);
if (refreshProperties.getAlterRetries() == 0 || dto.getRetries() < refreshProperties.getAlterRetries()) {
// 发送告警对象
Alter alter = SpringContextUtils.getBean(Alter.class);
AlterDTO alterDTO = new AlterDTO();
alterDTO.setProcessInstanceId(dto.getProcessInstanceId());
alterDTO.setActivityId(dto.getActivityId());
alterDTO.setTaskId(tasks.get(0).getId());
alterDTO.setStartTime(tasks.get(0).getCreateTime());
alterDTO.setPrettyStartTime(DateUtil.formatDateTime(tasks.get(0).getCreateTime()));
if (Boolean.TRUE.equals(refreshProperties.getAlterSendDingTalk())) {
alter.invoke(alterDTO);
// 记录告警次数
incRetries(job, dto, runtimeService, activityId);
// 记录告警次数
incRetries(job, dto, runtimeService, activityId);
if (refreshProperties.getAlterRetries() != 0 && dto.getRetries() >= refreshProperties.getAlterRetries()) {
deleteTimerJob(dto);
}
}
}
}
@ -114,10 +107,10 @@ public class AsyncTermNodeAlterJobHandler extends AbstractJobHandler implements
runtimeService.setVariable(job.getProcessInstanceId(), BIZ_NODE_ALTER + activityId, dto);
}
private void deleteTimerJob(TermNodePausingDTO dto) {
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new DeleteTimerJobTransactionListener(dto));
}
// private void deleteTimerJob(TermNodePausingDTO dto) {
// Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
// new DeleteTimerJobTransactionListener(dto));
// }
private Boolean hasAssignee(String assignee) {
if (!StringUtils.hasText(assignee)) {

View File

@ -22,7 +22,7 @@ public class AddTimerJobTransactionListener implements TransactionListener {
@Override
public void execute(CommandContext commandContext) {
log.info("add timer job listener. instanceId: {}, activityId: {}, timeCycle: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getTimeCycle());
log.info("add timer job listener. instanceId: {}, activityId: {}, delayTime: {}, timeUnit: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getDelayTime(), dto.getTimeUnit());
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
processEngineConfiguration.getCommandExecutor().execute(new CustomAddTimerJobCmd(dto));

View File

@ -7,7 +7,6 @@ import cn.axzo.workflow.core.common.context.ActivityOperationContext;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.engine.tx.listener.AddTimerJobTransactionListener;
import cn.axzo.workflow.core.engine.tx.listener.DeleteTimerJobTransactionListener;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import lombok.AllArgsConstructor;
@ -91,7 +90,8 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
TermNodeAddTimerJobDTO addTimerJobDTO = new TermNodeAddTimerJobDTO();
addTimerJobDTO.setProcessInstanceId(execution.getProcessInstanceId());
addTimerJobDTO.setActivityId(execution.getCurrentActivityId());
addTimerJobDTO.setTimeCycle("R100/PT" + refreshProperties.getAlterInterval() + timeUnit);
addTimerJobDTO.setDelayTime(refreshProperties.getAlterInterval());
addTimerJobDTO.setTimeUnit(timeUnit);
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new AddTimerJobTransactionListener(addTimerJobDTO));
@ -104,27 +104,4 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
});
}
/**
* 节点已取消
*
* @param execution
*/
@Override
public void onEnd(DelegateExecution execution) {
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
// ManagementService managementService = processEngineConfiguration.getManagementService();
// Job timerJob = managementService.createTimerJobQuery()
// .elementId(execution.getCurrentActivityId())
// .processInstanceId(execution.getProcessInstanceId())
// .singleResult();
// if (Objects.nonNull(timerJob)) {
// CommandContextUtil.getTimerJobService().deleteTimerJob((TimerJobEntity) timerJob);
// }
TermNodePausingDTO dto = new TermNodePausingDTO();
dto.setActivityId(execution.getCurrentActivityId());
dto.setProcessInstanceId(execution.getProcessInstanceId());
dto.setRetries(1);
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new DeleteTimerJobTransactionListener(dto));
}
}