Merge remote-tracking branch 'origin/hotfix/node-alter' into release-20251022

This commit is contained in:
wangli 2025-10-22 11:09:26 +08:00
commit f1eb549ade
14 changed files with 325 additions and 28 deletions

View File

@ -32,6 +32,7 @@
<easy-es.version>2.0.0</easy-es.version>
<xxl-job.version>2.5.0</xxl-job.version>
<workflow-engine-common.version>0.0.1-SNAPSHOT</workflow-engine-common.version>
<adapter.version>2.0.1-SNAPSHOT</adapter.version>
</properties>
<dependencyManagement>
@ -179,6 +180,11 @@
<artifactId>doc-api</artifactId>
<version>${axzo-dependencies.version}</version>
</dependency>
<dependency>
<groupId>cn.axzo.infra</groupId>
<artifactId>adapter</artifactId>
<version>${adapter.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -21,7 +21,9 @@ import java.io.Serializable;
@Builder
public class TermNodeAddTimerJobDTO implements Serializable {
private String processInstanceId;
private String processDefinitionId;
private String activityId;
private String activityName;
private Integer delayTime;
private String timeUnit;
}

View File

@ -20,7 +20,6 @@ import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRemindTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncResetApproversUserTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncTransferUserTaskJobHandler;
import cn.axzo.workflow.core.engine.job.NextActivityConfigCheckJobHandler;
import cn.axzo.workflow.core.engine.job.exception.handle.CustomAsyncJobLogClearTraceExceptionHandler;
@ -114,7 +113,7 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncExtTaskInstJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncTransferUserTaskJobHandler());
configuration.addCustomJobHandler(new AsyncTermNodeAlterJobHandler(refreshProperties));
// configuration.addCustomJobHandler(new AsyncTermNodeAlterJobHandler(refreshProperties));
configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler());

View File

@ -41,6 +41,16 @@ public class SupportRefreshProperties {
@Value(value = "${workflow.alter.enable:false}")
private Boolean alterEnable;
/**
* 是否全量业务 ID 都开启告警功能
*/
@Value(value = "${workflow.alter.allDefinitionKey:false}")
private Boolean alterAllDefinitionKey;
/**
* 需要开启告警功能的业务 ID 列表
*/
@Value(value = "${workflow.alter.definitionKeys:}")
private List<String> alterDefinitionKeys;
/**
* 节点卡住多久才告警单位分钟
*/

View File

@ -1,16 +1,13 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.dto.TermNodeAddTimerJobDTO;
import cn.axzo.workflow.core.engine.job.AsyncTermNodeAlterJobHandler;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import cn.axzo.workflow.core.service.ExtAxNodeAlterJobService;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.ManagementService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.TimerJobService;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import java.io.Serializable;
import java.util.Date;
@ -37,14 +34,11 @@ public class CustomAddTimerJobCmd extends AbstractCommand<Void> implements Seria
@Override
public Void executeInternal(CommandContext commandContext) {
log.info("CustomAddTimerJobCmd start. instanceId: {}, activityId: {}, delayTime: {}, timeUnit: {}", dto.getProcessInstanceId(), dto.getActivityId(), dto.getDelayTime(), dto.getTimeUnit());
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ManagementService managementService = processEngineConfiguration.getManagementService();
Date alterTime;
switch (dto.getTimeUnit()) {
case "M":
alterTime = DateUtil.offsetDay(new Date(), dto.getDelayTime());
alterTime = DateUtil.offsetMinute(new Date(), dto.getDelayTime());
break;
case "H":
alterTime = DateUtil.offsetHour(new Date(), dto.getDelayTime());
@ -54,20 +48,15 @@ public class CustomAddTimerJobCmd extends AbstractCommand<Void> implements Seria
break;
}
managementService.executeCommand(context -> {
TimerJobService timerJobService = CommandContextUtil.getTimerJobService();
TimerJobEntity timerJobEntity = timerJobService.createTimerJob();
timerJobEntity.setJobType("timer");
timerJobEntity.setJobHandlerType(AsyncTermNodeAlterJobHandler.TYPE); // 这里填写你自定义的 JobHandler 类型
timerJobEntity.setProcessInstanceId(dto.getProcessInstanceId());
timerJobEntity.setExecutionId(null);
timerJobEntity.setDuedate(alterTime); // 立即执行
timerJobEntity.setRepeat(null); // 不重复
timerJobEntity.setRetries(1);
timerJobEntity.setJobHandlerConfiguration(dto.getActivityId()); // 可选传递参数
timerJobService.scheduleTimerJob(timerJobEntity);
return null;
});
ExtAxNodeAlterJobService service = SpringContextUtils.getBean(ExtAxNodeAlterJobService.class);
ExtAxNodeAlterJob job = new ExtAxNodeAlterJob();
job.setProcessInstanceId(dto.getProcessInstanceId());
job.setProcessDefinitionId(dto.getProcessDefinitionId());
job.setActivityId(dto.getActivityId());
job.setActivityName(dto.getActivityName());
job.setAlterTime(alterTime);
job.setException("");
service.save(job);
return null;
}

View File

@ -89,12 +89,13 @@ public class CustomBizSpecifyAssigneeToTaskCmd extends AbstractCommand<Boolean>
* @param assigners
*/
public static void validate(RuntimeService runtimeService, String executionId, Task task, List<BpmnTaskDelegateAssigner> assigners) {
validTask(task, executionId);
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(task.getProcessDefinitionId());
boolean present = assigners.stream().anyMatch(assigner -> !StringUtils.hasText(assigner.getNodeId()));
if (present && getCategoryVersion(bpmnModel.getMainProcess()).orElse(0) > 0) {
throw new WorkflowEngineException(ASSIGNEE_NODE_ID_NOT_EXISTS, "审批人");
}
validTask(task, executionId);
//校验审批人数量是否超过限制
validTaskAssignerCount(runtimeService, (TaskEntity) task, assigners);
}

View File

@ -11,6 +11,7 @@ import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.common.engine.impl.cfg.TransactionState;
@ -60,11 +61,16 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
if (!Boolean.TRUE.equals(refreshProperties.getAlterEnable())) {
return;
}
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(execution.getProcessDefinitionId());
if (!refreshProperties.getAlterAllDefinitionKey()) {
if (!ListUtils.emptyIfNull(refreshProperties.getAlterDefinitionKeys()).contains(bpmnModel.getMainProcess().getId())) {
return;
}
}
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
TermNodePausingDTO dto = new TermNodePausingDTO(execution.getProcessInstanceId(), execution.getCurrentActivityId(), 0);
runtimeService.setVariable(execution.getProcessInstanceId(), BIZ_NODE_ALTER + execution.getCurrentActivityId(), dto);
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(execution.getProcessDefinitionId());
FlowElement flowElement = bpmnModel.getFlowElement(execution.getCurrentActivityId());
BpmnMetaParserHelper.getNodeType(flowElement).ifPresent(e -> {
if (Objects.equals(BpmnFlowNodeType.NODE_BUSINESS, e)) {
@ -89,7 +95,9 @@ public class InternalBpmnActivityEventListener_lo_Listener extends AbstractBpmnE
TermNodeAddTimerJobDTO addTimerJobDTO = new TermNodeAddTimerJobDTO();
addTimerJobDTO.setProcessInstanceId(execution.getProcessInstanceId());
addTimerJobDTO.setProcessDefinitionId(execution.getProcessDefinitionId());
addTimerJobDTO.setActivityId(execution.getCurrentActivityId());
addTimerJobDTO.setActivityName(flowElement.getName());
addTimerJobDTO.setDelayTime(refreshProperties.getAlterInterval());
addTimerJobDTO.setTimeUnit(timeUnit);

View File

@ -0,0 +1,48 @@
package cn.axzo.workflow.core.repository.entity;
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.util.Date;
/**
* 业务节点告警任务表
*
* @author wangli
* @since 2025-09-05 13:43
*/
@EqualsAndHashCode(callSuper = true)
@TableName(value = "ext_ax_node_alter_job", autoResultMap = true)
@Data
@ToString(callSuper = true)
public class ExtAxNodeAlterJob extends BaseEntity<ExtAxNodeAlterJob> {
private static final long serialVersionUID = 461756492937079852L;
/**
* 流程实例 ID
*/
private String processInstanceId;
/**
* 流程定义 ID
*/
private String processDefinitionId;
/**
* 活动节点 ID
*/
private String activityId;
/**
* 活动节点名称
*/
private String activityName;
/**
* 告警事件
*/
private Date alterTime;
/**
* 异常信息
*/
private String exception;
}

View File

@ -0,0 +1,22 @@
package cn.axzo.workflow.core.repository.mapper;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface ExtAxNodeAlterJobMapper extends BaseMapperX<ExtAxNodeAlterJob> {
@Delete({
"<script>",
"DELETE FROM ext_ax_node_alter_job WHERE id IN ",
"<foreach item='id' collection='ids' open='(' separator=',' close=')'>",
"#{id}",
"</foreach>",
"</script>"
})
int physicsDeleteBatchIds(@Param("ids") List<Long> ids);
}

View File

@ -0,0 +1,17 @@
package cn.axzo.workflow.core.service;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
/**
* 业务节点告警表操作服务
*
* @author wangli
* @since 2024/4/3 10:40
*/
public interface ExtAxNodeAlterJobService extends IService<ExtAxNodeAlterJob> {
boolean deleteByIds(List<Long> ids);
}

View File

@ -0,0 +1,31 @@
package cn.axzo.workflow.core.service.impl;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import cn.axzo.workflow.core.repository.mapper.ExtAxNodeAlterJobMapper;
import cn.axzo.workflow.core.service.ExtAxNodeAlterJobService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
/**
* 业务节点告警表操作服务实现类
*
* @author wangli
* @since 2025-09-05 13:50
*/
@Service
@Slf4j
public class ExtAxNodeAlterJobServiceImpl extends ServiceImpl<ExtAxNodeAlterJobMapper, ExtAxNodeAlterJob> implements ExtAxNodeAlterJobService {
@Resource
private ExtAxNodeAlterJobMapper extAxNodeAlterJobMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean deleteByIds(List<Long> ids) {
return extAxNodeAlterJobMapper.physicsDeleteBatchIds(ids) > 0;
}
}

View File

@ -154,6 +154,10 @@
<groupId>cn.axzo.nanopart</groupId>
<artifactId>doc-api</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.infra</groupId>
<artifactId>adapter</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,28 @@
package cn.axzo.workflow.server.controller.web;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yanglin
*/
@RestController
public class JobHttpHandler {
@PostMapping("/jobs/{jobName}")
ReturnT<String> exec(@PathVariable String jobName,
@RequestBody(required = false) JSONObject paramObj) throws Exception {
IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(jobName);
if (jobHandler == null)
return new ReturnT<>(ReturnT.FAIL_CODE, String.format("找不到job: %s", jobName));
jobHandler.execute();
return ReturnT.SUCCESS;
}
}

View File

@ -0,0 +1,132 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.basics.common.util.NumberUtil;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.infra.xxl220to250.IJobHandler;
import cn.axzo.workflow.common.model.dto.AlterDTO;
import cn.axzo.workflow.common.model.response.category.CategoryItemVO;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.listener.Alter;
import cn.axzo.workflow.core.repository.entity.ExtAxNodeAlterJob;
import cn.axzo.workflow.core.service.CategoryService;
import cn.axzo.workflow.core.service.ExtAxNodeAlterJobService;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.task.api.Task;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.axzo.workflow.common.constant.BpmnConstants.BPM_MODEL_CATEGORY;
/**
* 调度业务节点告警定时任务
*
* @author wangli
* @since 2025-09-05 11:19
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class NodeAlterJobHandler extends IJobHandler {
private final ExtAxNodeAlterJobService extAxNodeAlterJobService;
private final CategoryService categoryService;
private final SpringProcessEngineConfiguration processEngineConfiguration;
private final SupportRefreshProperties refreshProperties;
@XxlJob("nodeAlterJobHandler")
@Override
public ReturnT<String> execute(String param) throws Exception {
List<ExtAxNodeAlterJob> jobs = extAxNodeAlterJobService.list(new LambdaQueryWrapper<ExtAxNodeAlterJob>()
.le(ExtAxNodeAlterJob::getAlterTime, new Date()));
if (CollectionUtils.isEmpty(jobs)) {
XxlJobHelper.log("没有需要处理的业务节点告警任务");
return ReturnT.SUCCESS;
}
List<ExtAxNodeAlterJob> executedJobs = new ArrayList<>();
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
TaskService taskService = processEngineConfiguration.getTaskService();
jobs.forEach(e -> {
XxlJobHelper.log("start execution of job: {}", JSON.toJSONString(e));
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(e.getProcessInstanceId()).singleResult();
if (Objects.isNull(processInstance)) {
executedJobs.add(e);
return;
}
if (!refreshProperties.getAlterAllDefinitionKey()) {
if (!ListUtils.emptyIfNull(refreshProperties.getAlterDefinitionKeys()).contains(processInstance.getProcessDefinitionKey())) {
XxlJobHelper.log("skip alter by config");
return;
}
}
List<Task> tasks = taskService.createTaskQuery()
.processInstanceId(e.getProcessInstanceId())
.taskDefinitionKey(e.getActivityId())
.active().list();
StringBuilder sb = new StringBuilder();
tasks.forEach(t -> {
sb.append("id").append(e.getId()).append(", assignee: ").append(t.getAssignee());
});
XxlJobHelper.log("tasks size:{}, info: {}", tasks.size(), com.alibaba.fastjson.JSON.toJSONString(sb));
if (CollectionUtils.isEmpty(tasks) || tasks.size() > 1 || hasAssignee(tasks.get(0).getAssignee())) {
executedJobs.add(e);
return;
}
Optional<CategoryItemVO> category = categoryService.get(BPM_MODEL_CATEGORY, processInstance.getProcessDefinitionKey());
sendAlter(processInstance, category, e, tasks.get(0));
executedJobs.add(e);
XxlJobHelper.log("end execution of job: {}", JSON.toJSONString(e));
});
extAxNodeAlterJobService.deleteByIds(executedJobs.stream().map(ExtAxNodeAlterJob::getId).collect(Collectors.toList()));
return ReturnT.SUCCESS;
}
private void sendAlter(ProcessInstance processInstance, Optional<CategoryItemVO> category, ExtAxNodeAlterJob job, Task task) {
// 发送告警对象
Alter alter = SpringContextUtils.getBean(Alter.class);
AlterDTO alterDTO = new AlterDTO();
alterDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
alterDTO.setProcessDefinitionName(category.orElse(new CategoryItemVO()).getLabel());
alterDTO.setProcessInstanceId(job.getProcessInstanceId());
alterDTO.setActivityId(job.getActivityId());
alterDTO.setTaskId(task.getId());
alterDTO.setStartTime(task.getCreateTime());
alterDTO.setPrettyStartTime(DateUtil.formatDateTime(task.getCreateTime()));
if (Boolean.TRUE.equals(refreshProperties.getAlterSendDingTalk())) {
alter.invoke(alterDTO);
}
}
private Boolean hasAssignee(String assignee) {
if (!StringUtils.hasText(assignee)) {
return false;
}
String[] split = assignee.split("\\|");
return split.length == 2 && NumberUtil.isPositiveNumber(Long.valueOf(split[1]));
}
}