fix - 增加一张新表来支撑业务节点告警问题

This commit is contained in:
wangli 2025-09-05 17:10:23 +08:00
parent de012b69fb
commit f4168959e7
10 changed files with 261 additions and 29 deletions

View File

@ -21,7 +21,9 @@ import java.io.Serializable;
@Builder
public class TermNodeAddTimerJobDTO implements Serializable {
private String processInstanceId;
private String processDefinitionId;
private String activityId;
private String activityName;
private Integer delayTime;
private String timeUnit;
}

View File

@ -20,7 +20,6 @@ import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRemindTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncResetApproversUserTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncTransferUserTaskJobHandler;
import cn.axzo.workflow.core.engine.job.NextActivityConfigCheckJobHandler;
import cn.axzo.workflow.core.engine.job.exception.handle.CustomAsyncJobLogClearTraceExceptionHandler;
@ -114,7 +113,7 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncExtTaskInstJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncTransferUserTaskJobHandler());
configuration.addCustomJobHandler(new AsyncTermNodeAlterJobHandler(refreshProperties));
// configuration.addCustomJobHandler(new AsyncTermNodeAlterJobHandler(refreshProperties));
configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler());

View File

@ -1,16 +1,13 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.dto.TermNodeAddTimerJobDTO;
import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import cn.axzo.workflow.core.service.ExtAxNodeAlterJobService;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.ManagementService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.TimerJobService;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import java.io.Serializable;
import java.util.Date;
@ -37,10 +34,7 @@ public class CustomAddTimerJobCmd extends AbstractCommand<Void> implements Seria
@Override
public Void executeInternal(CommandContext commandContext) {
log.info("CustomAddTimerJobCmd start. instanceId: {}, activityId: {}, delayTime: {}, timeUnit: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getDelayTime(), dto.getTimeUnit());
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ManagementService managementService = processEngineConfiguration.getManagementService();
Date alterTime;
switch (dto.getTimeUnit()) {
case "M":
@ -54,20 +48,14 @@ public class CustomAddTimerJobCmd extends AbstractCommand<Void> implements Seria
break;
}
managementService.executeCommand(context -> {
TimerJobService timerJobService = CommandContextUtil.getTimerJobService();
TimerJobEntity timerJobEntity = timerJobService.createTimerJob();
timerJobEntity.setJobType("timer");
timerJobEntity.setJobHandlerType(AsyncTermNodeAlterJobHandler.TYPE); // 这里填写你自定义的 JobHandler 类型
timerJobEntity.setProcessInstanceId(dto.getProcessInstanceId());
timerJobEntity.setExecutionId(null);
timerJobEntity.setDuedate(alterTime); // 立即执行
timerJobEntity.setRepeat(null); // 不重复
timerJobEntity.setRetries(1);
timerJobEntity.setJobHandlerConfiguration(dto.getActivityId()); // 可选传递参数
timerJobService.scheduleTimerJob(timerJobEntity);
return null;
});
ExtAxNodeAlterJobService service = SpringContextUtils.getBean(ExtAxNodeAlterJobService.class);
ExtAxNodeAlterJob job = new ExtAxNodeAlterJob();
job.setProcessInstanceId(dto.getProcessInstanceId());
job.setProcessDefinitionId(dto.getProcessDefinitionId());
job.setActivityId(dto.getActivityId());
job.setActivityName(dto.getActivityName());
job.setAlterTime(alterTime);
service.save(job);
return null;
}

View File

@ -89,7 +89,9 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
TermNodeAddTimerJobDTO addTimerJobDTO = new TermNodeAddTimerJobDTO();
addTimerJobDTO.setProcessInstanceId(execution.getProcessInstanceId());
addTimerJobDTO.setProcessDefinitionId(execution.getProcessDefinitionId());
addTimerJobDTO.setActivityId(execution.getCurrentActivityId());
addTimerJobDTO.setActivityName(flowElement.getName());
addTimerJobDTO.setDelayTime(refreshProperties.getAlterInterval());
addTimerJobDTO.setTimeUnit(timeUnit);

View File

@ -0,0 +1,48 @@
package cn.axzo.workflow.core.repository.entity;
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.util.Date;
/**
* 业务节点告警任务表
*
* @author wangli
* @since 2025-09-05 13:43
*/
@EqualsAndHashCode(callSuper = true)
@TableName(value = "ext_ax_node_alter_job", autoResultMap = true)
@Data
@ToString(callSuper = true)
public class ExtAxNodeAlterJob extends BaseEntity<ExtAxNodeAlterJob> {
private static final long serialVersionUID = 461756492937079852L;
/**
* 流程实例 ID
*/
private String processInstanceId;
/**
* 流程定义 ID
*/
private String processDefinitionId;
/**
* 活动节点 ID
*/
private String activityId;
/**
* 活动节点名称
*/
private String activityName;
/**
* 告警事件
*/
private Date alterTime;
/**
* 异常信息
*/
private String exception;
}

View File

@ -0,0 +1,22 @@
package cn.axzo.workflow.core.repository.mapper;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface ExtAxNodeAlterJobMapper extends BaseMapperX<ExtAxNodeAlterJob> {
@Delete({
"<script>",
"DELETE FROM ext_ax_node_alter_job WHERE id IN ",
"<foreach item='id' collection='ids' open='(' separator=',' close=')'>",
"#{id}",
"</foreach>",
"</script>"
})
int physicsDeleteBatchIds(@Param("ids") List<Long> ids);
}

View File

@ -0,0 +1,17 @@
package cn.axzo.workflow.core.service;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
/**
* 业务节点告警表操作服务
*
* @author wangli
* @since 2024/4/3 10:40
*/
public interface ExtAxNodeAlterJobService extends IService<ExtAxNodeAlterJob> {
boolean deleteByIds(List<Long> ids);
}

View File

@ -0,0 +1,31 @@
package cn.axzo.workflow.core.service.impl;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import cn.axzo.workflow.core.repository.mapper.ExtAxNodeAlterJobMapper;
import cn.axzo.workflow.core.service.ExtAxNodeAlterJobService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
/**
* 业务节点告警表操作服务实现类
*
* @author wangli
* @since 2025-09-05 13:50
*/
@Service
@Slf4j
public class ExtAxNodeAlterJobServiceImpl extends ServiceImpl<ExtAxNodeAlterJobMapper, ExtAxNodeAlterJob> implements ExtAxNodeAlterJobService {
@Resource
private ExtAxNodeAlterJobMapper extAxNodeAlterJobMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean deleteByIds(List<Long> ids) {
return extAxNodeAlterJobMapper.physicsDeleteBatchIds(ids) > 0;
}
}

View File

@ -0,0 +1,28 @@
package cn.axzo.workflow.server.controller.web;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yanglin
*/
@RestController
public class JobHttpHandler {
@PostMapping("/jobs/{jobName}")
ReturnT<String> exec(@PathVariable String jobName,
@RequestBody(required = false) JSONObject paramObj) throws Exception {
IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(jobName);
if (jobHandler == null)
return new ReturnT<>(ReturnT.FAIL_CODE, String.format("找不到job: %s", jobName));
jobHandler.execute();
return ReturnT.SUCCESS;
}
}

View File

@ -1,15 +1,43 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.basics.common.util.NumberUtil;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.infra.xxl220to250.IJobHandler;
import cn.axzo.workflow.common.model.dto.AlterDTO;
import cn.axzo.workflow.common.model.response.category.CategoryItemVO;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.listener.Alter;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import cn.axzo.workflow.core.service.CategoryService;
import cn.axzo.workflow.core.service.ExtAxNodeAlterJobService;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.task.api.Task;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.axzo.workflow.common.constant.BpmnConstants.BPM_MODEL_CATEGORY;
/**
* TODO
* 调度业务节点告警定时任务
*
* @author wangli
* @since 2025-09-05 11:19
@ -18,13 +46,80 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@Slf4j
public class NodeAlterJobHandler extends IJobHandler {
private final ExtAxNodeAlterJobService extAxNodeAlterJobService;
private final CategoryService categoryService;
private final SpringProcessEngineConfiguration processEngineConfiguration;
private final SupportRefreshProperties refreshProperties;
@XxlJob("nodeAlterJobHandler")
@Override
public ReturnT<String> execute(String param) throws Exception {
// log.info();
XxlJobHelper.log("request param: {}", param);
List<ExtAxNodeAlterJob> jobs = extAxNodeAlterJobService.list(new LambdaQueryWrapper<ExtAxNodeAlterJob>()
.le(ExtAxNodeAlterJob::getAlterTime, new Date()));
if (CollectionUtils.isEmpty(jobs)) {
XxlJobHelper.log("没有需要处理的业务节点告警任务");
return ReturnT.SUCCESS;
}
List<ExtAxNodeAlterJob> executedJobs = new ArrayList<>();
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
TaskService taskService = processEngineConfiguration.getTaskService();
jobs.forEach(e -> {
XxlJobHelper.log("start execution of job: {}", JSON.toJSONString(e));
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(e.getProcessInstanceId()).singleResult();
if (Objects.isNull(processInstance)) {
executedJobs.add(e);
return;
}
List<Task> tasks = taskService.createTaskQuery()
.processInstanceId(e.getProcessInstanceId())
.taskDefinitionKey(e.getActivityId())
.active().list();
return null;
StringBuilder sb = new StringBuilder();
tasks.forEach(t -> {
sb.append("id").append(e.getId()).append(", assignee: ").append(t.getAssignee());
});
XxlJobHelper.log("tasks size:{}, info: {}", tasks.size(), com.alibaba.fastjson.JSON.toJSONString(sb));
if (CollectionUtils.isEmpty(tasks) || tasks.size() > 1 || hasAssignee(tasks.get(0).getAssignee())) {
executedJobs.add(e);
return;
}
Optional<CategoryItemVO> category = categoryService.get(BPM_MODEL_CATEGORY, processInstance.getProcessDefinitionKey());
// sendAlter(processInstance, category, e, tasks.get(0));
executedJobs.add(e);
XxlJobHelper.log("end execution of job: {}", JSON.toJSONString(e));
});
extAxNodeAlterJobService.deleteByIds(executedJobs.stream().map(ExtAxNodeAlterJob::getId).collect(Collectors.toList()));
return ReturnT.SUCCESS;
}
private void sendAlter(ProcessInstance processInstance, Optional<CategoryItemVO> category, ExtAxNodeAlterJob job, Task task) {
// 发送告警对象
Alter alter = SpringContextUtils.getBean(Alter.class);
AlterDTO alterDTO = new AlterDTO();
alterDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
alterDTO.setProcessDefinitionName(category.orElse(new CategoryItemVO()).getLabel());
alterDTO.setProcessInstanceId(job.getProcessInstanceId());
alterDTO.setActivityId(job.getActivityId());
alterDTO.setTaskId(task.getId());
alterDTO.setStartTime(task.getCreateTime());
alterDTO.setPrettyStartTime(DateUtil.formatDateTime(task.getCreateTime()));
if (Boolean.TRUE.equals(refreshProperties.getAlterSendDingTalk())) {
alter.invoke(alterDTO);
}
}
private Boolean hasAssignee(String assignee) {
if (!StringUtils.hasText(assignee)) {
return false;
}
String[] split = assignee.split("\\|");
return split.length == 2 && NumberUtil.isPositiveNumber(Long.valueOf(split[1]));
}
}