Merge branch 'feature/REQ-2752-wl' into feature/REQ-2752

# Conflicts:
#	workflow-engine-axzo-ext/src/main/java/cn/axzo/workflow/admin/service/impl/ExtAxProcessAdminServiceImpl.java
This commit is contained in:
wangli 2024-10-09 09:46:45 +08:00
commit ba3e681776
40 changed files with 1386 additions and 298 deletions

View File

@ -140,6 +140,11 @@
<artifactId>easy-es-boot-starter</artifactId>
<version>${easy-es.version}</version>
</dependency>
<dependency>
<groupId>org.dromara.easy-es</groupId>
<artifactId>easy-es-annotation</artifactId>
<version>${easy-es.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -0,0 +1,16 @@
package cn.axzo.workflow.client.feign.es;
import cn.axzo.workflow.client.annotation.WorkflowEngineFeignClient;
import cn.axzo.workflow.common.annotation.Manageable;
/**
* 操作 ES 的流程实例 API
*
* @author wangli
* @since 2024-10-07 21:12
*/
@WorkflowEngineFeignClient
@Manageable
public interface EsProcessInstanceApi {
}

View File

@ -29,6 +29,10 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.dromara.easy-es</groupId>
<artifactId>easy-es-annotation</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -58,6 +58,8 @@ public interface BpmnConstants {
String FLOW_SERVER_VERSION = "serverVersion";
String FLOW_SERVER_VERSION_121 = "1.2.1";
String FLOW_SERVER_VERSION_130 = "1.3.0";
// 1.4.2 开始启用新版本日志
String FLOW_SERVER_VERSION_142 = "1.4.2";
String CONFIG_NOTICE = "noticeConfig";
String CONFIG_APPROVE = "approveConfig";
String TEMPLATE_NOTICE_MESSAGE_CONFIG = "noticeMessageConfig";
@ -187,4 +189,9 @@ public interface BpmnConstants {
* 回退操作次数上限
*/
Integer MAX_BACKED_OPERATE_COUNT = 20;
String LATEST_SYNC_TO_ELASTICSEARCH_TIME = "latest.sync.to.elasticsearch.time";
/**
* 固定父子文档在相同分片
*/
String ES_FIXED_ROUTING = "routing";
}

View File

@ -0,0 +1,16 @@
package cn.axzo.workflow.common.enums;
/**
* 时间查询方向
* <p>
* 注意: 该枚举用在查询 flowable 引擎数据时, 都是包含自身时间点的.
* 例如, 使用 Before ,也就是说在某个时间点之前,是包含"某个时间"自身的.
*
* @author wangli
* @since 2024-09-29 09:56
*/
public enum TimeQueryDirection {
BEFORE,
AFTER,
;
}

View File

@ -0,0 +1,25 @@
package cn.axzo.workflow.common.model.dto.es;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* 数据同步 ES 的结果统计
*
* @author wangli
* @since 2024-09-30 14:42
*/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class DataSyncSummaryDTO {
private Long processInstanceCount;
private Long processTaskCount;
}

View File

@ -1,11 +1,14 @@
package cn.axzo.workflow.common.model.dto.es;
import cn.axzo.workflow.common.enums.TimeQueryDirection;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.Date;
/**
* 历史流程实例的搜索对象
*
@ -49,5 +52,40 @@ public class HistoricProcessInstanceSearchForEsDTO {
*/
private String tenantId;
/**
* 实例是否已结束
*
*/
private Boolean finished;
/**
* 开始时间
*/
private Date startTime;
/**
* 控制查询开始时间的方向,包含自身的时间点
* <p>
* 默认是查询开始时间之后
*/
private TimeQueryDirection startTimeDirection = TimeQueryDirection.AFTER;
/**
* 结束时间
*/
private Date endTime;
/**
* 控制查询结束时间的方向,包含自身的时间点
* <p>
* 默认是查询结束时间点之前
*/
private TimeQueryDirection endTimeDirection = TimeQueryDirection.BEFORE;
/**
* 用于覆盖同步逻辑中的PageSize,一般不需要传
*/
private Integer overPageSize = 100;
}

View File

@ -7,6 +7,8 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.dromara.easyes.annotation.IndexField;
import org.dromara.easyes.annotation.rely.FieldType;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@ -29,6 +31,7 @@ public class AttachmentDTO implements Serializable {
/**
* 附件 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String id;
/**
@ -36,6 +39,7 @@ public class AttachmentDTO implements Serializable {
*/
@ApiModelProperty(value = "附件类型")
@NotNull(message = "附件类型不能为空")
@IndexField(fieldType = FieldType.KEYWORD)
private AttachmentTypeEnum type;
/**
@ -43,12 +47,14 @@ public class AttachmentDTO implements Serializable {
*/
@ApiModelProperty(value = "文件名称不能为空")
@NotBlank(message = "文件名称不能为空")
@IndexField(fieldType = FieldType.KEYWORD)
private String name;
/**
* 文件描述
*/
@ApiModelProperty(value = "文件描述")
@IndexField(exist = false)
private String description;
/**
@ -56,6 +62,7 @@ public class AttachmentDTO implements Serializable {
*/
@ApiModelProperty(value = "附件地址")
@NotBlank(message = "附件地址不能为空")
@IndexField(fieldType = FieldType.KEYWORD)
private String url;
}

View File

@ -8,6 +8,8 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.dromara.easyes.annotation.IndexField;
import org.dromara.easyes.annotation.rely.FieldType;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
@ -50,6 +52,7 @@ public class BpmnTaskDelegateAssigner implements Serializable {
* 安心筑:身份 ID
*/
@Deprecated
@IndexField(fieldType = FieldType.KEYWORD)
private String assignee;
/**
@ -59,6 +62,7 @@ public class BpmnTaskDelegateAssigner implements Serializable {
* 安心筑:身份 Type 应该必传
*/
@Deprecated
@IndexField(fieldType = FieldType.KEYWORD)
private String assigneeType;
/**
@ -67,6 +71,7 @@ public class BpmnTaskDelegateAssigner implements Serializable {
* 枢智:用户姓名
* 安心筑:用户姓名
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String assignerName;
/**
@ -74,6 +79,7 @@ public class BpmnTaskDelegateAssigner implements Serializable {
* <p>
* 仅安心筑使用, 应该必传
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String personId;
/**
@ -82,18 +88,21 @@ public class BpmnTaskDelegateAssigner implements Serializable {
* 枢智: 企业 ID
* 安心筑: 工作台 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String tenantId;
/**
* 人所在的单位 ID
* 仅安心筑使用, 工人可以没有, 其他身份一定需要
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String ouId;
/**
* 该属性业务接入时无需关心. 尽量保证其他属性都设值.
* 头像
*/
@IndexField(exist = false)
private String avatar;
public final String buildAssigneeId_1_2_1() {

View File

@ -0,0 +1,47 @@
package cn.axzo.workflow.common.model.request.es;
import cn.axzo.workflow.common.model.request.BpmPageParam;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import javax.validation.constraints.NotBlank;
import java.util.List;
/**
* 搜索 Es 中实例纬度的数据
*
* @author wangli
* @since 2024-10-08 14:03
*/
@EqualsAndHashCode(callSuper = true)
@ApiModel("搜索 Es 中实例纬度的数据")
@Data
public class InstanceSearchReqDTO extends BpmPageParam {
@ApiModelProperty(value = "审批人所属租户 ID")
@NotBlank(message = "租户 ID 不能为空")
private String tenantId;
@ApiModelProperty(value = "审批人所属单位 ID")
private String ouId;
@ApiModelProperty(value = "审批人自然人 ID")
private String personId;
@ApiModelProperty(value = "审批人姓名")
private String assigneeName;
@ApiModelProperty(value = "流程实例状态", notes = "参考 BpmnProcessInstanceResultEnum 枚举")
private String businessStatus;
@ApiModelProperty(value = "流程实例名称")
private String processInstanceName;
/**
* 流程实例 ID, 与其它所有的属性互斥
*/
@ApiModelProperty(value = "流程实例 ID 集合")
private List<String> processInstanceIds;
}

View File

@ -0,0 +1,55 @@
package cn.axzo.workflow.common.model.request.es;
import cn.axzo.workflow.common.model.request.BpmPageParam;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.List;
/**
* 搜索 Es 中任务纬度的数据
*
* @author wangli
* @since 2024-10-08 09:49
*/
@EqualsAndHashCode(callSuper = true)
@ApiModel("搜索 Es 中任务纬度的数据")
@Data
public class TaskSearchReqDTO extends BpmPageParam {
private static final long serialVersionUID = -1L;
/**
* 搜索含有指定该租户的任务
*/
@ApiModelProperty(value = "租户 ID 集合")
private List<String> tenantIds;
/**
* 搜索含有指定单位的任务
*/
@ApiModelProperty(value = "单位 ID 集合")
private List<String> ouIds;
/**
* 搜索含有指定自然人的任务
*/
@ApiModelProperty(value = "自然人 ID")
private String personId;
/**
* 搜索含有指定自然人名称的任务
*/
@ApiModelProperty(value = "自然人姓名集合")
private String name;
/**
* 与上面三个属性互斥
* <p>
* 成对是数据项
*/
@ApiModelProperty(value = "审批人信息集合")
private List<BpmnTaskDelegateAssigner> assigners;
}

View File

@ -0,0 +1,34 @@
package cn.axzo.workflow.core.engine.cmd;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.common.engine.impl.persistence.entity.PropertyEntity;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.util.Objects;
/**
* 操作 ACT_GE_PROPERTY 表的删除指定属性的命令
*
* @author wangli
* @since 2024-09-30 15:15
*/
public class CustomClearPropertyCmd implements Command<Void> {
private final String propertyName;
public CustomClearPropertyCmd(String propertyName) {
this.propertyName = propertyName;
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
PropertyEntity entity = processEngineConfiguration.getPropertyEntityManager().findById(propertyName);
if (Objects.nonNull(entity)) {
processEngineConfiguration.getPropertyEntityManager().delete(entity.getId());
}
return null;
}
}

View File

@ -0,0 +1,43 @@
package cn.axzo.workflow.core.engine.cmd;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.common.engine.impl.persistence.entity.PropertyEntity;
import org.flowable.common.engine.impl.persistence.entity.PropertyEntityImpl;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.util.Objects;
/**
* 操作 ACT_GE_PROPERTY 表的写命令
*
* @author wangli
* @since 2024-09-30 15:15
*/
public class CustomInsertPropertyCmd implements Command<Void> {
private final String propertyName;
private final String propertyValue;
public CustomInsertPropertyCmd(String propertyName, String propertyValue) {
this.propertyName = propertyName;
this.propertyValue = propertyValue;
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
PropertyEntity entity = processEngineConfiguration.getPropertyEntityManager().findById(propertyName);
if (Objects.nonNull(entity)) {
entity.setValue(propertyValue);
processEngineConfiguration.getPropertyEntityManager().update(entity);
} else {
entity = new PropertyEntityImpl();
entity.setName(propertyName);
entity.setValue(propertyValue);
processEngineConfiguration.getPropertyEntityManager().insert(entity);
}
return null;
}
}

View File

@ -21,7 +21,7 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventT
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ENTITY_UPDATED;
/**
* TODO
* 引擎内的 Entity 事件监听处理
*
* @author wangli
* @since 2024-09-02 15:34
@ -33,20 +33,19 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener {
private final List<EntityEventHandle> handles;
public static final Set<FlowableEngineEventType> SUPPORTED =
ImmutableSet.<FlowableEngineEventType>builder()
.add(ENTITY_CREATED)
.add(ENTITY_INITIALIZED)
.add(ENTITY_UPDATED)
.add(ENTITY_DELETED)
.add(ENTITY_SUSPENDED)
.add(ENTITY_ACTIVATED)
.build();
ImmutableSet.<FlowableEngineEventType>builder()
.add(ENTITY_CREATED)
.add(ENTITY_INITIALIZED)
.add(ENTITY_UPDATED)
.add(ENTITY_DELETED)
.add(ENTITY_SUSPENDED)
.add(ENTITY_ACTIVATED)
.build();
@Override
public void onEvent(FlowableEvent event) {
if (event instanceof FlowableEntityEvent && SUPPORTED.contains(event.getType())) {
FlowableEntityEvent entityEvent = (FlowableEntityEvent) event;
// log.warn("entity event type: {}, class: {}",entityEvent.getType(), entityEvent.getEntity().getClass());
handles.forEach(handle -> {
Object entity = entityEvent.getEntity();
if (handle.support(entity)) {
@ -66,48 +65,10 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener {
}
}
});
// if (entityEvent.getEntity() instanceof TaskEntity) {
// TaskEntity taskEntity = (TaskEntity) entityEvent.getEntity();
// log.error("event taskId :{}, taskDefKey: {}", taskEntity.getId(), taskEntity.getTaskDefinitionKey());
//
// if (Objects.equals(event.getType(), ENTITY_CREATED)) {
// onCreate(taskEntity);
// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) {
// onInitialized(taskEntity);
// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) {
// onUpdated(taskEntity);
// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) {
// onDeleted(taskEntity);
// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) {
// onSuspended(taskEntity);
// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) {
// onActivated(taskEntity);
// }
// } else if(entityEvent.getEntity() instanceof CommentEntity) {
// CommentEntity commentEntity = (CommentEntity) entityEvent.getEntity();
// log.error("event taskId :{}", commentEntity.getId());
//
// if (Objects.equals(event.getType(), ENTITY_CREATED)) {
// onCreate(commentEntity);
// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) {
// onInitialized(commentEntity);
// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) {
// onUpdated(commentEntity);
// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) {
// onDeleted(commentEntity);
// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) {
// onSuspended(commentEntity);
// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) {
// onActivated(commentEntity);
// }
// }
}
}
@Override
public boolean isFailOnException() {
return true;

View File

@ -1,7 +1,7 @@
package cn.axzo.workflow.core.engine.listener.entity;
/**
* TODO
* Entity life cycle
*
* @author wangli
* @since 2024-09-06 00:03
@ -12,16 +12,22 @@ public interface EntityEventHandle<T> {
T convert(Object entity);
void onCreate(T entity);
default void onCreate(T entity) {
}
void onInitialized(T entity);
default void onInitialized(T entity) {
}
void onUpdated(T entity);
default void onUpdated(T entity) {
}
void onDeleted(T entity);
default void onDeleted(T entity) {
}
void onSuspended(T entity);
default void onSuspended(T entity) {
}
void onActivated(T entity);
default void onActivated(T entity) {
}
}

View File

@ -22,7 +22,6 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERAT
//@Component
@AllArgsConstructor
public class CommentEntityEventHandle implements EntityEventHandle<CommentEntity> {
private final ExtAxProcessLogService processLogService;
@Override
public boolean support(Object entity) {
@ -37,45 +36,30 @@ public class CommentEntityEventHandle implements EntityEventHandle<CommentEntity
@Override
public void onCreate(CommentEntity entity) {
log.info("comment event onCreate: {}", entity.getId());
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(entity.getProcessInstanceId());
queryLog.setTaskId(entity.getId());
ExtAxProcessLog update = new ExtAxProcessLog();
if (Objects.equals(COMMENT_TYPE_ADVICE, entity.getType())) {
update.setAdvice(entity.getFullMessage());
} else if (Objects.equals(COMMENT_TYPE_OPERATION_DESC, entity.getType())) {
update.setOperationDesc(entity.getFullMessage());
}
processLogService.update(queryLog, update);
}
@Override
public void onInitialized(CommentEntity entity) {
log.info("comment event onInitialized: {}", entity.getId());
}
@Override
public void onUpdated(CommentEntity entity) {
log.info("comment event onUpdated: {}", entity.getId());
}
@Override
public void onDeleted(CommentEntity entity) {
log.info("comment event onDeleted: {}", entity.getId());
}
@Override
public void onSuspended(CommentEntity entity) {
log.info("comment event onSuspended: {}", entity.getId());
}
@Override
public void onActivated(CommentEntity entity) {
log.info("comment event onSuspended: {}", entity.getId());
log.info("comment event onActivated: {}", entity.getId());
}
}

View File

@ -1,6 +1,5 @@
package cn.axzo.workflow.core.engine.listener.entity.type;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.workflow.common.enums.BpmnFlowNodeMode;
import cn.axzo.workflow.common.enums.BpmnFlowNodeType;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
@ -15,7 +14,6 @@ import org.apache.commons.collections4.ListUtils;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.UserTask;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.impl.bpmn.behavior.MultiInstanceActivityBehavior;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
@ -75,16 +73,67 @@ public class TaskEntityEventHandle implements EntityEventHandle<TaskEntity> {
}
@Override
public void onActivated(TaskEntity taskEntity) {
log.debug("onActivated");
public void onCreate(TaskEntity taskEntity) {
// 记录发起人
boolean isNodeStarter = Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType());
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(taskEntity.getProcessDefinitionId());
FlowElement flowElement = bpmnModel.getFlowElement(taskEntity.getTaskDefinitionKey());
ExtAxProcessLog log = new ExtAxProcessLog();
log.setProcessInstanceId(taskEntity.getProcessInstanceId());
log.setTenantId(taskEntity.getTenantId());
log.setActivityId(taskEntity.getTaskDefinitionKey());
log.setActivityName(taskEntity.getName());
log.setApprovalMethod((isNodeStarter ? nobody : getApprovalMethod(flowElement).orElse(nobody)).getType());
log.setNodeType((getNodeType(flowElement).orElse(BpmnFlowNodeType.NODE_EMPTY)).getType());
log.setNodeMode((isNodeStarter ? BpmnFlowNodeMode.GENERAL : getNodeMode(flowElement)).getType());
log.setTaskId(taskEntity.getId());
log.setOperationDesc(PENDING.getDesc());
log.setStartTime(taskEntity.getCreateTime());
log.setStatus(PROCESSING.getStatus());
processLogService.insert(log);
}
public void onSuspended(TaskEntity taskEntity) {
log.debug("onSuspended");
@Override
public void onInitialized(TaskEntity taskEntity) {
BpmnMetaParserHelper.getButtonConfig(ProcessDefinitionUtil.getProcess(taskEntity.getProcessDefinitionId()), taskEntity.getTaskDefinitionKey())
.ifPresent(buttons -> {
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
ExtAxProcessLog updateLog = new ExtAxProcessLog();
updateLog.setButtonConf(buttons);
processLogService.update(queryLog, updateLog);
});
}
@Override
public void onUpdated(TaskEntity taskEntity) {
if (Objects.equals(HIDDEN_ASSIGNEE_ID, taskEntity.getAssignee())) {
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
processLogService.delete(queryLog);
} else {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
List<BpmnTaskDelegateAssigner> assigneeList = runtimeService.getVariable(taskEntity.getProcessInstanceId(),
INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class);
ListUtils.emptyIfNull(assigneeList).stream().filter(e -> Objects.equals(e.buildAssigneeId(), taskEntity.getAssignee())).findAny()
.ifPresent(assignee -> {
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
processLogService.updateAssignee(queryLog, assignee);
});
}
}
@Override
public void onDeleted(TaskEntity taskEntity) {
log.debug("onDeleted");
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
@ -109,23 +158,18 @@ public class TaskEntityEventHandle implements EntityEventHandle<TaskEntity> {
Object advice = taskEntity.getTransientVariableLocal(COMMENT_TYPE_ADVICE);
if (Objects.nonNull(advice) && StringUtils.hasText(advice.toString())) {
log.info("COMMENT_TYPE_ADVICE: {}", advice);
update.setAdvice(advice.toString());
}
Object operationDesc = taskEntity.getTransientVariableLocal(COMMENT_TYPE_OPERATION_DESC);
if (Objects.nonNull(operationDesc) && StringUtils.hasText(operationDesc.toString())) {
log.info("COMMENT_TYPE_OPERATION_DESC: {}", operationDesc);
update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() + operationDesc : operationDesc.toString());
} else {
update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() : "");
// 评论节点会给 operationDesc COMMENTED 的值所以注释
// update.setOperationDesc(BpmnProcessInstanceResultEnum.valueOfStatus(completionType).getDesc());
}
String completionType = taskEntity.getVariable(TASK_COMPLETE_OPERATION_TYPE + taskEntity.getId(), String.class);
if (StringUtils.hasText(completionType) && !Objects.equals(DELETED.getStatus(), completionType)) {
log.info("TASK_COMPLETE_OPERATION_TYPE: {}", completionType);
update.setStatus(completionType);
} else {
// 多实例除操作人以外的任务直接删除日志 例如一个节点有两个人或签A 人驳回了那么 B 人不再需要操作任务自动删除而会签也同理
@ -162,69 +206,6 @@ public class TaskEntityEventHandle implements EntityEventHandle<TaskEntity> {
return Objects.equals(logs.get(0).getNodeType(), BpmnFlowNodeType.NODE_CARBON_COPY.getType());
}
public void onUpdated(TaskEntity taskEntity) {
log.debug("onUpdated");
if (Objects.equals(HIDDEN_ASSIGNEE_ID, taskEntity.getAssignee())) {
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
processLogService.delete(queryLog);
} else {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
List<BpmnTaskDelegateAssigner> assigneeList = runtimeService.getVariable(taskEntity.getProcessInstanceId(),
INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class);
ListUtils.emptyIfNull(assigneeList).stream().filter(e -> Objects.equals(e.buildAssigneeId(), taskEntity.getAssignee())).findAny()
.ifPresent(assignee -> {
log.debug("审批人: {}", JSON.toJSONString(assignee));
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
processLogService.updateAssignee(queryLog, assignee);
});
}
}
public void onInitialized(TaskEntity taskEntity) {
log.debug("onInitialized");
BpmnMetaParserHelper.getButtonConfig(ProcessDefinitionUtil.getProcess(taskEntity.getProcessDefinitionId()), taskEntity.getTaskDefinitionKey())
.ifPresent(buttons -> {
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
queryLog.setTaskId(taskEntity.getId());
ExtAxProcessLog updateLog = new ExtAxProcessLog();
updateLog.setButtonConf(buttons);
processLogService.update(queryLog, updateLog);
});
}
public void onCreate(TaskEntity taskEntity) {
log.debug("onCreate");
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
// 记录发起人
boolean isNodeStarter = Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType());
RepositoryService repositoryService = processEngineConfiguration.getRepositoryService();
BpmnModel bpmnModel = repositoryService.getBpmnModel(taskEntity.getProcessDefinitionId());
FlowElement flowElement = bpmnModel.getFlowElement(taskEntity.getTaskDefinitionKey());
ExtAxProcessLog log = new ExtAxProcessLog();
log.setProcessInstanceId(taskEntity.getProcessInstanceId());
log.setTenantId(taskEntity.getTenantId());
log.setActivityId(taskEntity.getTaskDefinitionKey());
log.setActivityName(taskEntity.getName());
log.setApprovalMethod((isNodeStarter ? nobody : getApprovalMethod(flowElement).orElse(nobody)).getType());
log.setNodeType((getNodeType(flowElement).orElse(BpmnFlowNodeType.NODE_EMPTY)).getType());
log.setNodeMode((isNodeStarter ? BpmnFlowNodeMode.GENERAL : getNodeMode(flowElement)).getType());
log.setTaskId(taskEntity.getId());
log.setOperationDesc(PENDING.getDesc());
log.setStartTime(taskEntity.getCreateTime());
log.setStatus(PROCESSING.getStatus());
processLogService.insert(log);
}
private BpmnFlowNodeMode getNodeMode(FlowElement flowElement) {
BpmnFlowNodeMode node = GENERAL;
if (flowElement instanceof UserTask) {

View File

@ -2,6 +2,7 @@ package cn.axzo.workflow.core.service;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.engine.history.HistoricProcessInstance;
import java.util.List;
@ -14,5 +15,10 @@ import java.util.List;
*/
public interface BpmnProcessInstanceForEsService {
Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search);
List<HistoricProcessInstance> queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page);
BpmnModel queryBpmnModel(String processDefinitionId);
}

View File

@ -0,0 +1,28 @@
package cn.axzo.workflow.core.service;
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
import org.flowable.engine.task.Attachment;
import org.flowable.engine.task.Comment;
import org.flowable.task.api.history.HistoricTaskInstance;
import java.util.List;
/**
* 专用与对接 ES 的流程任务相关操作
*
* @author wangli
* @since 2024-09-29 10:55
*/
public interface BpmnProcessTaskForEsService {
List<HistoricTaskInstance> queryHistoricProcessTaskByProcessInstanceId(String processInstanceId);
List<ExtAxHiTaskInst> queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId);
List<Comment> queryCommentByProcessInstanceId(String processInstanceId);
List<Attachment> queryAttachmentByProcessInstanceId(String processInstanceId);
List<ExtAxProcessLog> queryProcessLogByProcessInstanceId(String processInstanceId);
}

View File

@ -4,14 +4,19 @@ import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDT
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.history.HistoricProcessInstanceQuery;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* 专用与对接 ES 的流程实例相关操作
@ -23,10 +28,44 @@ import java.util.List;
@Slf4j
public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceForEsService {
@Resource
@Lazy
private HistoryService historyService;
@Resource
@Lazy
private RepositoryService repositoryService;
@Override
public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) {
if (Objects.isNull(search)) {
return 0L;
}
return buildHistoricProcessInstanceQuery(search).count();
}
@Override
public List<HistoricProcessInstance> queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page) {
if (Objects.isNull(search)) {
return Collections.emptyList();
}
HistoricProcessInstanceQuery query = buildHistoricProcessInstanceQuery(search)
.includeProcessVariables()
.orderByProcessInstanceId()
.asc();
if (Objects.nonNull(page)) {
int firstResult = Math.toIntExact((page.getCurrent() - 1) * page.getSize());
int maxResult = Math.toIntExact(page.getSize());
log.debug("search pageable info, firstResult: {}, maxResult: {}", firstResult, maxResult);
return query.listPage(firstResult, maxResult);
}
return query.list();
}
@Override
public BpmnModel queryBpmnModel(String processDefinitionId) {
return repositoryService.getBpmnModel(processDefinitionId);
}
private HistoricProcessInstanceQuery buildHistoricProcessInstanceQuery(HistoricProcessInstanceSearchForEsDTO search) {
HistoricProcessInstanceQuery historicProcessInstanceQuery = historyService.createHistoricProcessInstanceQuery();
if (StringUtils.hasText(search.getProcessInstanceId())) {
historicProcessInstanceQuery.processInstanceId(search.getProcessInstanceId());
@ -46,10 +85,34 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
if (StringUtils.hasText(search.getTenantId())) {
historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId());
}
return historicProcessInstanceQuery
.orderByProcessInstanceId()
.asc()
.listPage(((Long) (page.getPages() * page.getSize())).intValue(),
((Long) ((page.getPages() + 1) * page.getSize())).intValue());
// 只有传值后, 才接入该数据, 否则不控制
if (Objects.equals(Boolean.TRUE, search.getFinished())) {
historicProcessInstanceQuery.finished();
} else if (Objects.equals(Boolean.FALSE, search.getFinished())) {
historicProcessInstanceQuery.unfinished();
}
if (Objects.nonNull(search.getStartTime())) {
// 引擎默认仅支撑两种
switch (search.getStartTimeDirection()) {
case BEFORE:
historicProcessInstanceQuery.startedBefore(search.getStartTime());
break;
default:
historicProcessInstanceQuery.startedAfter(search.getStartTime());
break;
}
}
if (Objects.nonNull(search.getEndTime())) {
// 引擎默认仅支撑两种
switch (search.getEndTimeDirection()) {
case AFTER:
historicProcessInstanceQuery.finishedAfter(search.getEndTime());
break;
default:
historicProcessInstanceQuery.finishedBefore(search.getEndTime());
break;
}
}
return historicProcessInstanceQuery;
}
}

View File

@ -1061,6 +1061,7 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic
.orElse(variables.getOrDefault(OLD_INTERNAL_INITIATOR, null))))
.tenantId(historicProcessInstance.getTenantId())
.agented((Boolean) Optional.ofNullable(variables.get(INTERNAL_PROCESS_AGENT)).orElse(false))
// 任务
.taskDetails(genericTaskLogVos(historicProcessInstance.getId(), logs, forecasting, dto))
.defaultButtonConf(getButtonConfig(bpmnModel.getMainProcess()).orElse(new BpmnButtonConf()))
.supportBatchOperation(getProcessApproveConf(bpmnModel.getMainProcess()).orElse(new BpmnApproveConf()).getSupportBatchOperation())

View File

@ -0,0 +1,88 @@
package cn.axzo.workflow.core.service.impl;
import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO;
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import cn.axzo.workflow.core.service.ExtAxProcessLogService;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.TaskService;
import org.flowable.engine.task.Attachment;
import org.flowable.engine.task.Comment;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
/**
* 专用与对接 ES 的流程任务相关操作
*
* @author wangli
* @since 2024-09-29 10:55
*/
@Service
@Slf4j
public class BpmnProcessTaskForEsServiceImpl implements BpmnProcessTaskForEsService {
@Resource
@Lazy
private HistoryService historyService;
@Resource
@Lazy
private TaskService taskService;
@Resource
private ExtAxHiTaskInstService extAxHiTaskInstService;
@Resource
private ExtAxProcessLogService extAxProcessLogService;
@Override
public List<HistoricTaskInstance> queryHistoricProcessTaskByProcessInstanceId(String processInstanceId) {
// List<BpmnHistoricTaskInstanceVO> historicTaskListByProcessInstanceId = bpmnProcessTaskService.getHistoricTaskListByProcessInstanceId(processInstanceId, null);
if (!StringUtils.hasText(processInstanceId)) {
return Collections.emptyList();
}
return historyService.createHistoricTaskInstanceQuery()
.processInstanceId(processInstanceId)
.list();
}
public List<ExtAxHiTaskInst> queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId) {
if (!StringUtils.hasText(processInstanceId)) {
return Collections.emptyList();
}
ExtHiTaskSearchDTO searchDTO = new ExtHiTaskSearchDTO();
searchDTO.setProcessInstanceId(processInstanceId);
return extAxHiTaskInstService.queryList(searchDTO);
}
@Override
public List<Comment> queryCommentByProcessInstanceId(String processInstanceId) {
if (!StringUtils.hasText(processInstanceId)) {
return Collections.emptyList();
}
return taskService.getProcessInstanceComments(processInstanceId);
}
@Override
public List<Attachment> queryAttachmentByProcessInstanceId(String processInstanceId) {
if (!StringUtils.hasText(processInstanceId)) {
return Collections.emptyList();
}
return taskService.getProcessInstanceAttachments(processInstanceId);
}
@Override
public List<ExtAxProcessLog> queryProcessLogByProcessInstanceId(String processInstanceId) {
if(!StringUtils.hasText(processInstanceId)){
return Collections.emptyList();
}
ExtAxProcessLog queryLog = new ExtAxProcessLog();
queryLog.setProcessInstanceId(processInstanceId);
return extAxProcessLogService.genericQuery(queryLog);
}
}

View File

@ -27,7 +27,7 @@ import static cn.axzo.workflow.core.common.code.BpmnModelRespCode.MODEL_ID_NOT_E
@Service
@RequiredArgsConstructor
@Slf4j
public class ExtAxAxReModelServiceImpl implements ExtAxReModelService {
public class ExtAxReModelServiceImpl implements ExtAxReModelService {
@Resource
private ExtAxReModelMapper extAxReModelMapper;

View File

@ -16,15 +16,14 @@
</properties>
<dependencies>
<dependency>
<groupId>cn.axzo.workflow</groupId>
<artifactId>workflow-engine-axzo-ext</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.workflow</groupId>
<artifactId>workflow-engine-core</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.framework.data</groupId>
<artifactId>axzo-data-mybatis-plus</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
@ -40,6 +39,5 @@
<groupId>org.dromara.easy-es</groupId>
<artifactId>easy-es-boot-starter</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,6 +1,6 @@
package cn.axzo.workflow.es.mapper;
import cn.axzo.workflow.es.model.EsProcessInstance;
import cn.axzo.workflow.es.model.ProcessInstanceDocument;
import org.dromara.easyes.core.kernel.BaseEsMapper;
/**
@ -9,5 +9,5 @@ import org.dromara.easyes.core.kernel.BaseEsMapper;
* @author wangli
* @since 2024-09-23 10:52
*/
public interface EsProcessInstanceMapper extends BaseEsMapper<EsProcessInstance> {
public interface EsProcessInstanceMapper extends BaseEsMapper<ProcessInstanceDocument> {
}

View File

@ -1,6 +1,6 @@
package cn.axzo.workflow.es.mapper;
import cn.axzo.workflow.es.model.EsProcessTask;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import org.dromara.easyes.core.kernel.BaseEsMapper;
/**
@ -9,5 +9,5 @@ import org.dromara.easyes.core.kernel.BaseEsMapper;
* @author wangli
* @since 2024-09-23 10:52
*/
public interface EsProcessTaskMapper extends BaseEsMapper<EsProcessTask> {
public interface EsProcessTaskMapper extends BaseEsMapper<ProcessTaskDocument> {
}

View File

@ -4,8 +4,11 @@ import lombok.Data;
import org.dromara.easyes.annotation.IndexField;
import org.dromara.easyes.annotation.IndexId;
import org.dromara.easyes.annotation.IndexName;
import org.dromara.easyes.annotation.Join;
import org.dromara.easyes.annotation.Node;
import org.dromara.easyes.annotation.rely.FieldType;
import org.dromara.easyes.annotation.rely.IdType;
import org.dromara.easyes.annotation.rely.RefreshPolicy;
import java.util.Date;
@ -16,8 +19,9 @@ import java.util.Date;
* @since 2024-09-25 20:32
*/
@Data
@IndexName("process_instance")
public class EsProcessInstance {
@IndexName(value = "process_instance_document", keepGlobalPrefix = true, refreshPolicy = RefreshPolicy.IMMEDIATE)
@Join(nodes = {@Node(parentClass = ProcessInstanceDocument.class ,parentAlias = "process_instance_document", childClasses = {ProcessTaskDocument.class}, childAliases = {"process_task_document"})})
public class ProcessInstanceDocument {
/**
* 流程实例 ID
@ -28,7 +32,7 @@ public class EsProcessInstance {
/**
* 流程实例名称
*/
@IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true)
@IndexField(fieldType = FieldType.KEYWORD)
private String name;
/**
@ -44,6 +48,7 @@ public class EsProcessInstance {
/**
* 流程对应业务分类的类型,项目/单位/监管/OMS
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String processCategoryType;
/**
@ -59,32 +64,33 @@ public class EsProcessInstance {
/**
* 流程实例的持续时间 ms
*/
private Long duration;
private Long durationInMillis;
/**
* 该流程中上次操作完成的时间
* 该流程中上次操作完成的时间, 如果是已完成的实例, 就清空该值
*/
private Date lastOperationTime;
/**
* 租户 ID
*/
@IndexField(fieldData = true)
@IndexField(fieldType = FieldType.KEYWORD)
private String tenantId;
/**
* 流程实例业务状态
*/
@IndexField(fieldType = FieldType.KEYWORD_TEXT)
@IndexField(fieldType = FieldType.KEYWORD)
private String businessStatus;
/**
* 实例对应的流程引擎服务端迭代版本
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String workflowEngineVersion;
/**
* 是否使用代运营
* 是否代运营
*/
private Boolean agent;

View File

@ -1,17 +1,12 @@
package cn.axzo.workflow.es.model;
import cn.axzo.workflow.common.model.request.bpmn.task.AttachmentDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import lombok.Data;
import org.dromara.easyes.annotation.IndexField;
import org.dromara.easyes.annotation.IndexId;
import org.dromara.easyes.annotation.IndexName;
import org.dromara.easyes.annotation.Settings;
import org.dromara.easyes.annotation.rely.FieldType;
import org.dromara.easyes.annotation.rely.IdType;
import java.util.Date;
import java.util.List;
/**
* 流程任务文档模型
@ -20,9 +15,8 @@ import java.util.List;
* @since 2024-09-25 20:32
*/
@Data
@IndexName("process_task")
@Settings
public class EsProcessTask {
//@IndexName("process_task_document")
public class ProcessTaskDocument {
/**
* 任务 ID
@ -33,29 +27,31 @@ public class EsProcessTask {
/**
* 归属的流程实例 ID
*/
@IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true)
@IndexField(fieldType = FieldType.KEYWORD)
private String processInstanceId;
/**
* 归属的流程实例定义 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String processDefinitionId;
/**
* 任务定义 KEY,对应节点 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String taskDefinitionKey;
/**
* 任务名称
*/
@IndexField(fieldType = FieldType.KEYWORD_TEXT)
@IndexField(fieldType = FieldType.KEYWORD)
private String name;
/**
* 任务状态:审批中/通过/驳回/转交/加签...
*/
@IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true)
@IndexField(fieldType = FieldType.KEYWORD)
private String status;
/**
@ -83,40 +79,58 @@ public class EsProcessTask {
*/
private Long duration;
/**
* 上次操作时间
*/
private Date lastUpdateTime;
/**
* 归属租户, processInstance#tenantId 一致
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String tenantId;
/**
* 审批人相关信息
* 审批人姓名
*/
@IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true)
private BpmnTaskDelegateAssigner assigner;
@IndexField(fieldType = FieldType.KEYWORD)
private String assigneeName;
/**
* 审批人自然人 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String assigneePersonId;
/**
* 审批人所属单位 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String assigneeOuId;
/**
* 审批人所属租户 ID
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String assigneeTenantId;
/**
* 任务关联的附件(含文件,图片,签名)
*/
private List<AttachmentDTO> attachments;
// @IndexField(fieldType = FieldType.NESTED, nestedClass = AttachmentDTO.class)
// private List<AttachmentDTO> attachments;
/**
* 审批方式:配置审批人/业务指定/业务触发(不含人)
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String approvalMethod;
/**
* 节点类型:审批节点/业务节点/评论节点/抄送节点
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String nodeType;
/**
* 节点模式:会签/或签
*/
@IndexField(fieldType = FieldType.KEYWORD)
private String nodeMode;
}

View File

@ -1,6 +1,8 @@
package cn.axzo.workflow.es.service;
import cn.axzo.workflow.es.model.EsProcessInstance;
import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.es.model.ProcessInstanceDocument;
import java.util.Collection;
@ -12,19 +14,44 @@ import java.util.Collection;
*/
public interface EsProcessInstanceService {
/**
* 删除流程实例索引
*
* @return
*/
Boolean deleteIndex();
/**
* 新增流程实例文档
*
* @param esProcessInstance
* @param processInstanceDocument
* @return 成功的条数
*/
Integer insert(EsProcessInstance esProcessInstance);
Integer insert(String routing, ProcessInstanceDocument processInstanceDocument);
/**
* 批量新增流程实例文档
*
* @param esProcessInstances
* @param processInstanceDocuments
* @return
*/
Integer insertBatch(Collection<EsProcessInstance> esProcessInstances);
Integer insertBatch(String routing, Collection<ProcessInstanceDocument> processInstanceDocuments);
/**
* 更新流程实例文档
*
* @param routing
* @param processInstanceDocument
*/
Integer update(String routing, ProcessInstanceDocument processInstanceDocument);
/**
* 删除指定文档
*
* @param processInstanceId
* @return
*/
Integer delete(String processInstanceId);
BpmPageResult<ProcessInstanceDocument> search(InstanceSearchReqDTO dto);
}

View File

@ -1,5 +1,11 @@
package cn.axzo.workflow.es.service;
import cn.axzo.workflow.common.model.request.es.TaskSearchReqDTO;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import org.dromara.easyes.core.biz.EsPageInfo;
import java.util.Collection;
/**
* 操作流程任务的 ES Service
*
@ -7,4 +13,17 @@ package cn.axzo.workflow.es.service;
* @since 2024-09-27 11:07
*/
public interface EsProcessTaskService {
/**
* 删除流程实例索引
*
* @return
*/
Boolean deleteIndex();
Integer insert(String routing, String parentId, ProcessTaskDocument processTaskDocument);
Integer insertBatch(String routing, String parentId, Collection<ProcessTaskDocument> processTaskDocuments);
EsPageInfo<ProcessTaskDocument> search(TaskSearchReqDTO dto);
}

View File

@ -0,0 +1,149 @@
package cn.axzo.workflow.es.service.aggregation;
import cn.axzo.workflow.common.enums.WorkspaceType;
import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import cn.axzo.workflow.es.model.ProcessInstanceDocument;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.engine.history.HistoricProcessInstance;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.PROCESSING;
/**
* 实例纬度的聚合操作
*
* @author wangli
* @since 2024-09-30 11:46
*/
@Component
@AllArgsConstructor
@Slf4j
public class AggregateProcessInstanceService {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final EsProcessInstanceService esProcessInstanceService;
private final AggregateProcessTaskService aggregateProcessTaskService;
/**
* 同步指定查询结果的数据至 ES
* <p>
* 内部自动分页循环同步
*
* @param search
* @return 应同步至 ES 的统计数据
*/
public DataSyncSummaryDTO syncProcessInstanceForSearch(HistoricProcessInstanceSearchForEsDTO search) {
Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
log.info("查询到待同步数据量: {} 条", totalProcessInstanceCount);
AtomicLong totalProcessTaskCount = new AtomicLong(0);
IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize())
.forEach(skipRows -> {
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
skipRows, skipRows + search.getOverPageSize());
int pageNo = skipRows / search.getOverPageSize() + 1;
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page<HistoricProcessInstance>(pageNo, search.getOverPageSize()));
instances.forEach(hpi -> {
if (Objects.nonNull(hpi.getEndTime())) {
// 已结束的实例
List<ProcessTaskDocument> processTaskDocuments = syncProcessInstance(hpi);
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
} else {
// 未结束的实例
}
});
});
return new DataSyncSummaryDTO(totalProcessInstanceCount, totalProcessTaskCount.get());
}
/**
* 同步结束审批的数据至 ES
*/
public List<ProcessTaskDocument> syncProcessInstance(HistoricProcessInstance hpi) {
if (Objects.isNull(hpi)) {
return Collections.emptyList();
}
ProcessInstanceDocument processInstanceDocument = new ProcessInstanceDocument();
processInstanceDocument.setId(hpi.getId());
processInstanceDocument.setName(hpi.getName());
processInstanceDocument.setBusinessKey(hpi.getBusinessKey());
processInstanceDocument.setProcessDefinitionId(hpi.getProcessDefinitionId());
processInstanceDocument.setStartTime(hpi.getStartTime());
processInstanceDocument.setEndTime(hpi.getEndTime());
processInstanceDocument.setDurationInMillis(hpi.getDurationInMillis());
processInstanceDocument.setTenantId(hpi.getTenantId());
processInstanceDocument.setBusinessStatus(hpi.getBusinessStatus());
Map<String, Object> variables = hpi.getProcessVariables();
processInstanceDocument.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
processInstanceDocument.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
processInstanceDocument.setAgent((Boolean) variables.getOrDefault(INTERNAL_PROCESS_AGENT, false));
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId());
BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> {
processInstanceDocument.setSupportBatch(cfg.getSupportBatchOperation());
processInstanceDocument.setUserAgreeSignature(cfg.getUserAgreeSignature());
});
// 实例纬度数据同步 ES
esProcessInstanceService.insert(ES_FIXED_ROUTING, processInstanceDocument);
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121));
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>();
if (version.compareTo(supportVersion) >= 0) {
log.info("通过新日志表进行任务纬度的数据同步");
// 用新日志表数据任务同步
toEsProcessTaskDocuments.addAll(aggregateProcessTaskService.syncProcessTaskForNew(hpi, instanceVersion));
} else {
log.info("通过引擎的任务表进行任务纬度的数据同步");
// 用引擎的任务表处理任务同步
toEsProcessTaskDocuments.addAll(aggregateProcessTaskService.syncProcessTaskForOld(hpi, instanceVersion, bpmnModel));
}
//更新实例上的最后操作时间
if (Objects.equals(PROCESSING.getStatus(), hpi.getBusinessStatus())) {
toEsProcessTaskDocuments.stream()
.filter(i -> Objects.equals(i.getStatus(), PROCESSING.getStatus()))
.max(Comparator.comparing(ProcessTaskDocument::getEndTime))
.ifPresent(processTaskDocument -> {
processInstanceDocument.setLastOperationTime(processTaskDocument.getEndTime());
esProcessInstanceService.update(ES_FIXED_ROUTING, processInstanceDocument);
});
} else {
processInstanceDocument.setLastOperationTime(null);
}
return toEsProcessTaskDocuments;
}
public BpmPageResult<ProcessInstanceDocument> search(InstanceSearchReqDTO dto) {
return esProcessInstanceService.search(dto);
}
}

View File

@ -0,0 +1,231 @@
package cn.axzo.workflow.es.service.aggregation;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import cn.hutool.core.date.DateUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.persistence.entity.CommentEntityImpl;
import org.flowable.engine.task.Comment;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE;
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC;
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO;
import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT;
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END;
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED;
/**
* 任务纬度的聚合操作
*
* @author wangli
* @since 2024-09-30 11:46
*/
@Component
@AllArgsConstructor
@Slf4j
public class AggregateProcessTaskService {
private final static DefaultArtifactVersion SUPPORT_VERSION = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
private final EsProcessTaskService esProcessTaskService;
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService;
private final BpmnHistoricAttachmentConverter attachmentConverter;
/**
* 通过新的日志表进行任务纬度的数据同步至 ES
*/
public List<ProcessTaskDocument> syncProcessTaskForNew(HistoricProcessInstance hpi, String instanceVersion) {
DefaultArtifactVersion currentVersion = new DefaultArtifactVersion(instanceVersion);
if (currentVersion.compareTo(SUPPORT_VERSION) < 0) {
return Collections.emptyList();
}
List<ExtAxProcessLog> logs = bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(hpi.getId());
// Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId())
// .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>();
logs.forEach(log -> {
ProcessTaskDocument processTaskDocument = new ProcessTaskDocument();
processTaskDocument.setId(log.getTaskId());
processTaskDocument.setProcessInstanceId(log.getProcessInstanceId());
processTaskDocument.setProcessDefinitionId(hpi.getProcessDefinitionId());
processTaskDocument.setTaskDefinitionKey(log.getActivityId());
processTaskDocument.setName(log.getActivityName());
processTaskDocument.setStatus(log.getStatus());
processTaskDocument.setAdvice(log.getAdvice());
processTaskDocument.setOperationDesc(log.getOperationDesc());
processTaskDocument.setStartTime(log.getStartTime());
processTaskDocument.setEndTime(log.getEndTime());
if (Objects.nonNull(log.getEndTime())) {
processTaskDocument.setDuration(DateUtil.betweenMs(log.getEndTime(), log.getStartTime()));
}
processTaskDocument.setTenantId(log.getTenantId());
BpmnTaskDelegateAssigner assigner = CollectionUtils.isEmpty(log.getAssigneeFull()) ? null : log.getAssigneeFull().get(0);
if(Objects.nonNull(assigner)) {
processTaskDocument.setAssigneeName(assigner.getAssignerName());
processTaskDocument.setAssigneeOuId(assigner.getOuId());
processTaskDocument.setAssigneeTenantId(assigner.getTenantId());
}
// esProcessTask.setAttachments(attachmentConverter.toVos(attachmentMap.getOrDefault(log.getTaskId(), Collections.emptyList())));
processTaskDocument.setApprovalMethod(log.getApprovalMethod());
processTaskDocument.setNodeType(log.getNodeType());
processTaskDocument.setNodeMode(log.getNodeMode());
toEsProcessTaskDocuments.add(processTaskDocument);
});
esProcessTaskService.insertBatch(ES_FIXED_ROUTING, hpi.getId(), toEsProcessTaskDocuments);
return toEsProcessTaskDocuments;
}
/**
* 优先使用三个参数的同步方法.
* <p>
* 通过原本引擎表进行任务纬度的数据同步至 ES
*/
public List<ProcessTaskDocument> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion) {
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId());
return syncProcessTaskForOld(hpi, instanceVersion, bpmnModel);
}
/**
* 通过原本引擎表进行任务纬度的数据同步至 ES
*
* @param hpi
* @param instanceVersion
* @param bpmnModel
*/
public List<ProcessTaskDocument> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion, BpmnModel bpmnModel) {
List<HistoricTaskInstance> tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(hpi.getId());
Map<String, ExtAxHiTaskInst> extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(hpi.getId())
.stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s));
Map<String, List<Comment>> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(hpi.getId())
.stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
// 父子文档中子文档不支持 Nested 类型字段
// Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId())
// .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
Map<String, Object> variables = hpi.getProcessVariables();
// 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理
List<HistoricTaskInstance> filteredEffectiveTasks = filterEffectiveTasks(tasks, instanceVersion);
List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>();
for (HistoricTaskInstance task : filteredEffectiveTasks) {
ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst());
List<Comment> comments = commentMap.getOrDefault(task.getId(), Collections.emptyList());
// List<Attachment> attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList());
ProcessTaskDocument processTaskDocument = new ProcessTaskDocument();
processTaskDocument.setId(task.getId());
processTaskDocument.setProcessInstanceId(task.getProcessInstanceId());
processTaskDocument.setProcessDefinitionId(task.getProcessDefinitionId());
processTaskDocument.setTaskDefinitionKey(task.getTaskDefinitionKey());
processTaskDocument.setName(task.getName());
processTaskDocument.setStatus(extTask.getStatus());
// 处理 advice
processTaskDocument.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE))
.findFirst().orElse(new CommentEntityImpl()).getFullMessage());
// 处理 operationDesc
processTaskDocument.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType()))
.max(Comparator.comparing(Comment::getTime))
.orElse(new CommentEntityImpl()).getFullMessage());
processTaskDocument.setStartTime(task.getCreateTime());
processTaskDocument.setEndTime(task.getEndTime());
processTaskDocument.setDuration(task.getDurationInMillis());
// esProcessTask.setLastUpdateTime(task);
processTaskDocument.setTenantId(task.getTenantId());
BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null));
if(Objects.nonNull(assigner)) {
processTaskDocument.setAssigneeName(assigner.getAssignerName());
processTaskDocument.setAssigneeOuId(assigner.getOuId());
processTaskDocument.setAssigneeTenantId(assigner.getTenantId());
}
// esProcessTask.setAttachments(attachmentConverter.toVos(attachments));
if (Objects.nonNull(bpmnModel)) {
FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey());
BpmnMetaParserHelper.getApprovalMethod(flowElement)
.ifPresent(e -> processTaskDocument.setApprovalMethod(e.getType()));
BpmnMetaParserHelper.getNodeType(flowElement)
.ifPresent(e -> processTaskDocument.setNodeType(e.getType()));
// esProcessTask.setNodeMode();
}
toEsProcessTaskDocuments.add(processTaskDocument);
}
esProcessTaskService.insertBatch(ES_FIXED_ROUTING, hpi.getId(), toEsProcessTaskDocuments);
return toEsProcessTaskDocuments;
}
private List<HistoricTaskInstance> filterEffectiveTasks(List<HistoricTaskInstance> tasks, String instanceVersion) {
List<HistoricTaskInstance> effectiveTasks = new ArrayList<>();
if (CollectionUtils.isEmpty(tasks)) {
return effectiveTasks;
}
Stream<HistoricTaskInstance> taskInstanceStream = tasks.stream()
.filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID))
.filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID))
.filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus()));
if (Objects.isNull(instanceVersion)) {
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
} else {
if (StringUtils.hasText(instanceVersion)) {
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130);
if (version.compareTo(supportVersion) < 0) {
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
} else {
taskInstanceStream.forEach(effectiveTasks::add);
}
} else {
taskInstanceStream.forEach(effectiveTasks::add);
}
}
return effectiveTasks;
}
/**
* 兼容 1.2.1 版本的审批日志
*
* @param stream
* @return
*/
private Stream<HistoricTaskInstance> compatibleVersion(Stream<HistoricTaskInstance> stream) {
return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|| (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason())));
}
}

View File

@ -1,13 +1,26 @@
package cn.axzo.workflow.es.service.impl;
import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.es.mapper.EsProcessInstanceMapper;
import cn.axzo.workflow.es.model.EsProcessInstance;
import cn.axzo.workflow.es.model.ProcessInstanceDocument;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.dromara.easyes.annotation.IndexName;
import org.dromara.easyes.core.biz.EsPageInfo;
import org.dromara.easyes.core.conditions.select.LambdaEsQueryWrapper;
import org.dromara.easyes.core.toolkit.FieldUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.Collection;
import java.util.List;
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
/**
* 操作流程实例的 ES Service 实现
@ -15,26 +28,77 @@ import java.util.Collection;
* @author wangli
* @since 2024-09-27 11:09
*/
@Component
@Service
@AllArgsConstructor
@Slf4j
public class EsProcessInstanceServiceImpl implements EsProcessInstanceService {
private final EsProcessInstanceMapper esProcessInstanceMapper;
/**
* 删除流程实例索引
*
* @return
*/
@Override
public Boolean deleteIndex() {
IndexName annotation = AnnotationUtils.findAnnotation(ProcessInstanceDocument.class, IndexName.class);
return esProcessInstanceMapper.deleteIndex(annotation.value());
}
/**
* 新增流程实例文档文档
*
* @param esProcessInstance
* @param processInstanceDocument
* @return 成功的条数
*/
@Override
public Integer insert(EsProcessInstance esProcessInstance) {
return esProcessInstanceMapper.insert(esProcessInstance);
public Integer insert(String routing, ProcessInstanceDocument processInstanceDocument) {
return esProcessInstanceMapper.insert(routing, processInstanceDocument);
}
@Override
public Integer insertBatch(Collection<EsProcessInstance> esProcessInstances) {
return esProcessInstanceMapper.insertBatch(esProcessInstances);
public Integer insertBatch(String routing, Collection<ProcessInstanceDocument> processInstanceDocuments) {
return esProcessInstanceMapper.insertBatch(routing, processInstanceDocuments);
}
/**
* 更新流程实例文档
*
* @param routing
* @param processInstanceDocument
*/
@Override
public Integer update(String routing, ProcessInstanceDocument processInstanceDocument) {
return esProcessInstanceMapper.updateById(ES_FIXED_ROUTING, processInstanceDocument);
}
@Override
public Integer delete(String processInstanceId) {
return esProcessInstanceMapper.deleteById(ES_FIXED_ROUTING, processInstanceId);
}
@Override
public BpmPageResult<ProcessInstanceDocument> search(InstanceSearchReqDTO dto) {
if (!CollectionUtils.isEmpty(dto.getProcessInstanceIds())) {
List<ProcessInstanceDocument> processInstanceDocuments = esProcessInstanceMapper
.selectList(new LambdaEsQueryWrapper<ProcessInstanceDocument>()
.in(FieldUtils.val(ProcessInstanceDocument::getId), dto.getProcessInstanceIds()));
return new BpmPageResult<>(processInstanceDocuments, (long) processInstanceDocuments.size());
}
LambdaEsQueryWrapper<ProcessInstanceDocument> wrapper = new LambdaEsQueryWrapper<>();
wrapper.hasChild("process_task_document",
w -> w.like(StringUtils.hasText(dto.getAssigneeName()), FieldUtils.val(ProcessTaskDocument::getAssigneeName), dto.getAssigneeName())
.eq(StringUtils.hasText(dto.getTenantId()), FieldUtils.val(ProcessTaskDocument::getAssigneeTenantId), dto.getTenantId())
.eq(StringUtils.hasText(dto.getOuId()), FieldUtils.val(ProcessTaskDocument::getAssigneeOuId), dto.getOuId())
.eq(StringUtils.hasText(dto.getPersonId()), FieldUtils.val(ProcessTaskDocument::getAssigneePersonId), dto.getPersonId())
)
.eq(StringUtils.hasText(dto.getBusinessStatus()), FieldUtils.val(ProcessInstanceDocument::getBusinessStatus), dto.getBusinessStatus())
.like(StringUtils.hasText(dto.getProcessInstanceName()), FieldUtils.val(ProcessInstanceDocument::getName), dto.getProcessInstanceName())
.orderByDesc(FieldUtils.val(ProcessInstanceDocument::getStartTime))
;
EsPageInfo<ProcessInstanceDocument> pageInfo = esProcessInstanceMapper.pageQuery(wrapper, dto.getPageNo(), dto.getPageSize());
return new BpmPageResult<>(pageInfo.getList(), pageInfo.getTotal());
}
}

View File

@ -1,14 +1,62 @@
package cn.axzo.workflow.es.service.impl;
import cn.axzo.workflow.common.model.request.es.TaskSearchReqDTO;
import cn.axzo.workflow.es.mapper.EsProcessTaskMapper;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import org.springframework.stereotype.Component;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.easyes.annotation.IndexName;
import org.dromara.easyes.core.biz.EsPageInfo;
import org.dromara.easyes.core.conditions.select.LambdaEsQueryWrapper;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Service;
import java.util.Collection;
/**
* 操作流程任务的 ES Service
* 操作流程任务的 ES Service 实现
*
* @author wangli
* @since 2024-09-27 11:09
*/
@Component
@Service
@AllArgsConstructor
@Slf4j
public class EsProcessTaskServiceImpl implements EsProcessTaskService {
private final EsProcessTaskMapper esProcessTaskMapper;
/**
* 删除流程实例索引
*
* @return
*/
@Override
public Boolean deleteIndex() {
IndexName annotation = AnnotationUtils.findAnnotation(ProcessTaskDocument.class, IndexName.class);
return esProcessTaskMapper.deleteIndex(annotation.value());
}
/**
* 新增流程实例文档文档
*
* @param processTaskDocument
* @return 成功的条数
*/
public Integer insert(String routing, String parentId, ProcessTaskDocument processTaskDocument) {
return esProcessTaskMapper.insert(routing, parentId, processTaskDocument);
}
@Override
public Integer insertBatch(String routing, String parentId, Collection<ProcessTaskDocument> processTaskDocuments) {
return esProcessTaskMapper.insertBatch(routing, parentId, processTaskDocuments);
}
@Override
public EsPageInfo<ProcessTaskDocument> search(TaskSearchReqDTO dto) {
LambdaEsQueryWrapper<ProcessTaskDocument> wrapper = new LambdaEsQueryWrapper<>();
return esProcessTaskMapper.pageQuery(wrapper, dto.getPageNo(), dto.getPageSize());
}
}

View File

@ -0,0 +1,49 @@
package cn.axzo.workflow.listener;
import cn.axzo.workflow.core.engine.listener.entity.EntityEventHandle;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.service.impl.persistence.entity.TaskEntity;
import org.springframework.stereotype.Component;
/**
* 用于处理非结束的实例的数据同步至 ES
*
* @author wangli
* @since 2024-09-06 00:02
*/
@Slf4j
@Component
@AllArgsConstructor
public class SyncEsTaskEntityEventHandle implements EntityEventHandle<TaskEntity> {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Override
public boolean support(Object entity) {
return entity instanceof TaskEntity;
}
@Override
public TaskEntity convert(Object entity) {
return (TaskEntity) entity;
}
@Override
public void onDeleted(TaskEntity taskEntity) {
log.info("SyncEsTaskEntityEventHandle onDeleted processInstanceId:{}, taskId: {}", taskEntity.getProcessInstanceId(), taskEntity.getId());
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(taskEntity.getProcessInstanceId())
.includeProcessVariables()
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance);
}
}

View File

@ -34,7 +34,7 @@ public class XxlJobConfiguration {
@Value("")
private String accessToken;
@Value("")
@Value("${xxl.job.executor.logpath:}")
private String logPath;
@Value("-1")

View File

@ -1,36 +0,0 @@
package cn.axzo.workflow.server.controller.es;
import cn.axzo.workflow.es.mapper.EsProcessInstanceMapper;
import cn.axzo.workflow.es.mapper.EsProcessTaskMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* ES 相关操作
*
* @author wangli
* @since 2024-09-26 14:47
*/
@Slf4j
@RequestMapping("/web/v1/api/es")
@RestController
@Validated
public class EsIndexController {
@Resource
public EsProcessInstanceMapper esProcessInstanceMapper;
@Resource
private EsProcessTaskMapper esProcessTaskMapper;
@GetMapping("/index/create")
public void init() {
esProcessInstanceMapper.createIndex();
esProcessTaskMapper.createIndex();
}
}

View File

@ -0,0 +1,55 @@
package cn.axzo.workflow.server.controller.web.es;
import cn.axzo.workflow.client.feign.es.EsProcessInstanceApi;
import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import cn.axzo.workflow.es.model.ProcessInstanceDocument;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.azxo.framework.common.model.CommonResponse;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* ES 相关操作
*
* @author wangli
* @since 2024-09-26 14:47
*/
@Slf4j
@RequestMapping("/web/v1/api/es")
@RestController
@Validated
public class ElasticSearchController implements EsProcessInstanceApi {
@Resource
private AggregateProcessInstanceService aggregateProcessInstanceService;
@Resource
private BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
@Resource
private EsProcessInstanceService esProcessInstanceService;
@Resource
private EsProcessTaskService esProcessTaskService;
/**
* 审批数据搜索
*
* @param dto
* @return
*/
@PostMapping("/instance/search")
public CommonResponse<BpmPageResult<ProcessInstanceDocument>> processInstanceSearch(@Validated @RequestBody InstanceSearchReqDTO dto) {
log.info("审批数据搜索 processInstanceSearch===>>>参数:{}", JSONUtil.toJsonStr(dto));
return CommonResponse.success(aggregateProcessInstanceService.search(dto));
}
}

View File

@ -0,0 +1,94 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
import cn.axzo.workflow.core.engine.cmd.CustomInsertPropertyCmd;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.listener.SyncEsTaskEntityEventHandle;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Date;
import static cn.axzo.workflow.common.constant.BpmnConstants.LATEST_SYNC_TO_ELASTICSEARCH_TIME;
/**
* 流程实例数据同步至 ES
* <p>
* 在途的或新发起的流程实例均通过 {@link SyncEsTaskEntityEventHandle} 进行实时同步
*
* @author wangli
* @since 2024-09-27 11:03
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class ProcessInstanceSyncEsJobHandler extends IJobHandler {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
@XxlJob("processInstanceSyncToEs")
public ReturnT<String> execute(String s) {
log.debug("start exec finished process instance data sync... ");
XxlJobLogger.log("start exec finished process instance data sync... ");
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
search.setFinished(true);
Date endTime = new Date();
search.setEndTime(endTime);
try {
if (StringUtils.hasText(s)) {
search = JSON.parseObject(s, HistoricProcessInstanceSearchForEsDTO.class);
log.info("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
XxlJobLogger.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search));
} else {
log.info("入参为空, 将以默认条件执行");
XxlJobLogger.log("入参为空, 将以默认条件执行");
}
} catch (Exception e) {
log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
return ReturnT.FAIL;
}
// 同步前的一些额外操作
beforeSync(endTime);
// 开始同步完成的实例到 ES
DataSyncSummaryDTO summary = doSync(search);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
return ReturnT.SUCCESS;
}
/**
* 1. 记录本次操作的时间点
*/
private void beforeSync(Date endTime) {
//1. 记录本次操作的时间点
CommandExecutor commandExecutor = springProcessEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomInsertPropertyCmd(LATEST_SYNC_TO_ELASTICSEARCH_TIME, DateUtil.formatDateTime(endTime)));
}
/**
* 开始处理的入口
*
* @param search
*/
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search) {
return aggregateProcessInstanceService.syncProcessInstanceForSearch(search);
}
}

View File

@ -1,54 +0,0 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import cn.axzo.workflow.es.model.EsProcessInstance;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.history.HistoricProcessInstance;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 流程实例数据同步
*
* @author wangli
* @since 2024-09-27 11:03
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class ProcessInstanceSyncJobHandler extends IJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final EsProcessInstanceService esProcessInstanceService;
private final EsProcessTaskService esProcessTaskService;
@XxlJob("processInstanceToEs")
public ReturnT<String> execute(String s) {
XxlJobLogger.log("start exec process instance data sync... ");
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
IPage page = new Page(0, 10);
List<HistoricProcessInstance> historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page);
historicProcessInstances.forEach(processInstance -> {
EsProcessInstance esProcessInstance = new EsProcessInstance();
Integer inserted = esProcessInstanceService.insert(esProcessInstance);
if (inserted > 0) {
XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId());
} else {
XxlJobLogger.log("write to es caught exception! id: {}", processInstance.getId());
}
});
return ReturnT.SUCCESS;
}
}