feat(REQ-2752) - 1.调整已结束实例同步 ES 的代码设计. 2.增加记录初始同步时的操作时间. 3.增加新表用于记录操作时除开已完成的实例以外的数据(该功能可能会废弃).
This commit is contained in:
parent
6a38f65b3c
commit
ff6e991c34
@ -189,4 +189,5 @@ public interface BpmnConstants {
|
||||
* 回退操作次数上限
|
||||
*/
|
||||
Integer MAX_BACKED_OPERATE_COUNT = 20;
|
||||
String LATEST_SYNC_TO_ELASTICSEARCH_TIME = "latest.sync.to.elasticsearch.time";
|
||||
}
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
package cn.axzo.workflow.common.model.dto.es;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 数据同步 ES 的结果统计
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-30 14:42
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class DataSyncSummaryDTO {
|
||||
|
||||
private Long processInstanceCount;
|
||||
|
||||
private Long processTaskCount;
|
||||
}
|
||||
@ -54,6 +54,7 @@ public class HistoricProcessInstanceSearchForEsDTO {
|
||||
|
||||
/**
|
||||
* 实例是否已结束
|
||||
*
|
||||
*/
|
||||
private Boolean finished;
|
||||
|
||||
@ -84,7 +85,7 @@ public class HistoricProcessInstanceSearchForEsDTO {
|
||||
/**
|
||||
* 用于覆盖同步逻辑中的PageSize,一般不需要传
|
||||
*/
|
||||
private Integer overPageSize = 10;
|
||||
private Integer overPageSize = 100;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
package cn.axzo.workflow.core.engine.cmd;
|
||||
|
||||
import org.flowable.common.engine.impl.interceptor.Command;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandContext;
|
||||
import org.flowable.common.engine.impl.persistence.entity.PropertyEntity;
|
||||
import org.flowable.common.engine.impl.persistence.entity.PropertyEntityImpl;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 操作 ACT_GE_PROPERTY 表的写命令
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-30 15:15
|
||||
*/
|
||||
public class CustomInsertPropertyCmd implements Command<Void> {
|
||||
private final String propertyName;
|
||||
private final String propertyValue;
|
||||
|
||||
public CustomInsertPropertyCmd(String propertyName, String propertyValue) {
|
||||
this.propertyName = propertyName;
|
||||
this.propertyValue = propertyValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void execute(CommandContext commandContext) {
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration =
|
||||
CommandContextUtil.getProcessEngineConfiguration(commandContext);
|
||||
PropertyEntity entity = processEngineConfiguration.getPropertyEntityManager().findById(propertyName);
|
||||
if (Objects.nonNull(entity)) {
|
||||
entity.setValue(propertyValue);
|
||||
processEngineConfiguration.getPropertyEntityManager().update(entity);
|
||||
} else {
|
||||
entity = new PropertyEntityImpl();
|
||||
entity.setName(propertyName);
|
||||
entity.setValue(propertyValue);
|
||||
processEngineConfiguration.getPropertyEntityManager().insert(entity);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -21,7 +21,7 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventT
|
||||
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ENTITY_UPDATED;
|
||||
|
||||
/**
|
||||
* TODO
|
||||
* 引擎内的 Entity 事件监听处理
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-02 15:34
|
||||
@ -46,7 +46,6 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener {
|
||||
public void onEvent(FlowableEvent event) {
|
||||
if (event instanceof FlowableEntityEvent && SUPPORTED.contains(event.getType())) {
|
||||
FlowableEntityEvent entityEvent = (FlowableEntityEvent) event;
|
||||
// log.warn("entity event type: {}, class: {}",entityEvent.getType(), entityEvent.getEntity().getClass());
|
||||
handles.forEach(handle -> {
|
||||
Object entity = entityEvent.getEntity();
|
||||
if (handle.support(entity)) {
|
||||
@ -66,48 +65,10 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener {
|
||||
}
|
||||
}
|
||||
});
|
||||
// if (entityEvent.getEntity() instanceof TaskEntity) {
|
||||
// TaskEntity taskEntity = (TaskEntity) entityEvent.getEntity();
|
||||
// log.error("event taskId :{}, taskDefKey: {}", taskEntity.getId(), taskEntity.getTaskDefinitionKey());
|
||||
//
|
||||
// if (Objects.equals(event.getType(), ENTITY_CREATED)) {
|
||||
// onCreate(taskEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) {
|
||||
// onInitialized(taskEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) {
|
||||
// onUpdated(taskEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) {
|
||||
// onDeleted(taskEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) {
|
||||
// onSuspended(taskEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) {
|
||||
// onActivated(taskEntity);
|
||||
// }
|
||||
// } else if(entityEvent.getEntity() instanceof CommentEntity) {
|
||||
// CommentEntity commentEntity = (CommentEntity) entityEvent.getEntity();
|
||||
// log.error("event taskId :{}", commentEntity.getId());
|
||||
//
|
||||
// if (Objects.equals(event.getType(), ENTITY_CREATED)) {
|
||||
// onCreate(commentEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) {
|
||||
// onInitialized(commentEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) {
|
||||
// onUpdated(commentEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) {
|
||||
// onDeleted(commentEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) {
|
||||
// onSuspended(commentEntity);
|
||||
// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) {
|
||||
// onActivated(commentEntity);
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
return true;
|
||||
|
||||
@ -22,7 +22,6 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERAT
|
||||
//@Component
|
||||
@AllArgsConstructor
|
||||
public class CommentEntityEventHandle implements EntityEventHandle<CommentEntity> {
|
||||
private final ExtAxProcessLogService processLogService;
|
||||
|
||||
@Override
|
||||
public boolean support(Object entity) {
|
||||
@ -37,45 +36,30 @@ public class CommentEntityEventHandle implements EntityEventHandle<CommentEntity
|
||||
@Override
|
||||
public void onCreate(CommentEntity entity) {
|
||||
log.info("comment event onCreate: {}", entity.getId());
|
||||
ExtAxProcessLog queryLog = new ExtAxProcessLog();
|
||||
queryLog.setProcessInstanceId(entity.getProcessInstanceId());
|
||||
queryLog.setTaskId(entity.getId());
|
||||
ExtAxProcessLog update = new ExtAxProcessLog();
|
||||
if (Objects.equals(COMMENT_TYPE_ADVICE, entity.getType())) {
|
||||
update.setAdvice(entity.getFullMessage());
|
||||
} else if (Objects.equals(COMMENT_TYPE_OPERATION_DESC, entity.getType())) {
|
||||
update.setOperationDesc(entity.getFullMessage());
|
||||
}
|
||||
processLogService.update(queryLog, update);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInitialized(CommentEntity entity) {
|
||||
log.info("comment event onInitialized: {}", entity.getId());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdated(CommentEntity entity) {
|
||||
log.info("comment event onUpdated: {}", entity.getId());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeleted(CommentEntity entity) {
|
||||
log.info("comment event onDeleted: {}", entity.getId());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuspended(CommentEntity entity) {
|
||||
log.info("comment event onSuspended: {}", entity.getId());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onActivated(CommentEntity entity) {
|
||||
log.info("comment event onSuspended: {}", entity.getId());
|
||||
|
||||
log.info("comment event onActivated: {}", entity.getId());
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,4 +20,5 @@ public interface BpmnProcessInstanceForEsService {
|
||||
List<HistoricProcessInstance> queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page);
|
||||
|
||||
BpmnModel queryBpmnModel(String processDefinitionId);
|
||||
|
||||
}
|
||||
|
||||
@ -9,8 +9,6 @@ import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.RepositoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.history.HistoricProcessInstanceQuery;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@ -46,12 +44,17 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
if (Objects.isNull(search)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return buildHistoricProcessInstanceQuery(search)
|
||||
HistoricProcessInstanceQuery query = buildHistoricProcessInstanceQuery(search)
|
||||
.includeProcessVariables()
|
||||
.orderByProcessInstanceId()
|
||||
.asc()
|
||||
.listPage(((Long) (page.getPages() * page.getSize())).intValue(),
|
||||
((Long) ((page.getPages() + 1) * page.getSize())).intValue());
|
||||
.asc();
|
||||
if (Objects.nonNull(page)) {
|
||||
int firstResult = Math.toIntExact((page.getCurrent() - 1) * page.getSize());
|
||||
int maxResult = Math.toIntExact(page.getSize());
|
||||
log.debug("search pageable info, firstResult: {}, maxResult: {}", firstResult, maxResult);
|
||||
return query.listPage(firstResult, maxResult);
|
||||
}
|
||||
return query.list();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -79,8 +82,11 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
if (StringUtils.hasText(search.getTenantId())) {
|
||||
historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId());
|
||||
}
|
||||
// 只有传值后, 才接入该数据, 否则不控制
|
||||
if (Objects.equals(Boolean.TRUE, search.getFinished())) {
|
||||
historicProcessInstanceQuery.finished();
|
||||
} else if (Objects.equals(Boolean.FALSE, search.getFinished())) {
|
||||
historicProcessInstanceQuery.unfinished();
|
||||
}
|
||||
if (Objects.nonNull(search.getStartTime())) {
|
||||
// 引擎默认仅支撑两种
|
||||
|
||||
@ -27,7 +27,7 @@ import static cn.axzo.workflow.core.common.code.BpmnModelRespCode.MODEL_ID_NOT_E
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class ExtAxAxReModelServiceImpl implements ExtAxReModelService {
|
||||
public class ExtAxReModelServiceImpl implements ExtAxReModelService {
|
||||
@Resource
|
||||
private ExtAxReModelMapper extAxReModelMapper;
|
||||
|
||||
@ -0,0 +1,9 @@
|
||||
create table ext_ax_unfinished_process
|
||||
(
|
||||
id bigint auto_increment comment '主键'
|
||||
primary key,
|
||||
process_instance_id varchar(60) default '' not null comment '实例编号',
|
||||
create_at datetime default CURRENT_TIMESTAMP not null comment '创建时间',
|
||||
update_at datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
|
||||
is_delete int default 0 not null comment '是否删除'
|
||||
) comment '未完成实例的记录';
|
||||
@ -16,15 +16,14 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>cn.axzo.workflow</groupId>
|
||||
<artifactId>workflow-engine-axzo-ext</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.axzo.workflow</groupId>
|
||||
<artifactId>workflow-engine-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cn.axzo.framework.data</groupId>
|
||||
<artifactId>axzo-data-mybatis-plus</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch.client</groupId>
|
||||
@ -40,6 +39,5 @@
|
||||
<groupId>org.dromara.easy-es</groupId>
|
||||
<artifactId>easy-es-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
package cn.axzo.workflow.core.repository.entity;
|
||||
|
||||
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* 未完成的实例记录
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-30 15:48
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@TableName(value = "ext_ax_unfinished_process", autoResultMap = true)
|
||||
@Data
|
||||
@ToString(callSuper = true)
|
||||
public class ExtAxUnfinishedProcess extends BaseEntity<ExtAxUnfinishedProcess> {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 流程实例 ID
|
||||
*/
|
||||
private String processInstanceId;
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
package cn.axzo.workflow.core.repository.mapper;
|
||||
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxUnfinishedProcess;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
@Mapper
|
||||
public interface ExtAxUnfinishedProcessMapper extends BaseMapperX<ExtAxUnfinishedProcess> {
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package cn.axzo.workflow.core.service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Api Log 表操作服务
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024/4/3 10:40
|
||||
*/
|
||||
public interface ExtAxUnfinishedProcessService {
|
||||
void bulkInsertUnfinishedProcess(List<String> unfinishedIds);
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
package cn.axzo.workflow.core.service.impl;
|
||||
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxUnfinishedProcess;
|
||||
import cn.axzo.workflow.core.repository.mapper.ExtAxUnfinishedProcessMapper;
|
||||
import cn.axzo.workflow.core.service.ExtAxUnfinishedProcessService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 用于记录 ES 数据同步的时间点的未处理实例 ID
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-30 16:19
|
||||
*/
|
||||
@Service
|
||||
public class ExtAxUnfinishedProcessServiceImpl implements ExtAxUnfinishedProcessService {
|
||||
@Resource
|
||||
private ExtAxUnfinishedProcessMapper unfinishedProcessMapper;
|
||||
|
||||
@Override
|
||||
public void bulkInsertUnfinishedProcess(List<String> unfinishedIds) {
|
||||
if(CollectionUtils.isEmpty(unfinishedIds)){
|
||||
return;
|
||||
}
|
||||
List<ExtAxUnfinishedProcess> entities = unfinishedIds.stream().map(i -> {
|
||||
ExtAxUnfinishedProcess unfinishedProcess = new ExtAxUnfinishedProcess();
|
||||
unfinishedProcess.setProcessInstanceId(i);
|
||||
return unfinishedProcess;
|
||||
}).collect(Collectors.toList());
|
||||
unfinishedProcessMapper.insertBatch(entities);
|
||||
}
|
||||
}
|
||||
@ -16,7 +16,8 @@ import java.util.Date;
|
||||
* @since 2024-09-25 20:32
|
||||
*/
|
||||
@Data
|
||||
@IndexName("process_instance")
|
||||
@IndexName("process_instance_test")
|
||||
//@Join(nodes = {@Node(parentClass = EsProcessInstance.class, childClasses = EsProcessTask.class)})
|
||||
public class EsProcessInstance {
|
||||
|
||||
/**
|
||||
@ -62,7 +63,7 @@ public class EsProcessInstance {
|
||||
private Long durationInMillis;
|
||||
|
||||
/**
|
||||
* 该流程中上次操作完成的时间
|
||||
* 该流程中上次操作完成的时间, 如果是已完成的实例, 就清空该值
|
||||
*/
|
||||
private Date lastOperationTime;
|
||||
|
||||
@ -84,9 +85,9 @@ public class EsProcessInstance {
|
||||
private String workflowEngineVersion;
|
||||
|
||||
/**
|
||||
* 代运营的 WorkspaceId
|
||||
* 是否代运营
|
||||
*/
|
||||
private String agentTenantId;
|
||||
private Boolean agent;
|
||||
|
||||
/**
|
||||
* 是否支持批量操作任务
|
||||
|
||||
@ -20,7 +20,7 @@ import java.util.List;
|
||||
* @since 2024-09-25 20:32
|
||||
*/
|
||||
@Data
|
||||
@IndexName("process_task")
|
||||
@IndexName("process_task_test")
|
||||
public class EsProcessTask {
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,145 @@
|
||||
package cn.axzo.workflow.es.service.aggregation;
|
||||
|
||||
import cn.axzo.workflow.common.enums.WorkspaceType;
|
||||
import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO;
|
||||
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
|
||||
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import cn.axzo.workflow.core.service.ExtAxUnfinishedProcessService;
|
||||
import cn.axzo.workflow.es.model.EsProcessInstance;
|
||||
import cn.axzo.workflow.es.model.EsProcessTask;
|
||||
import cn.axzo.workflow.es.service.EsProcessInstanceService;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
|
||||
|
||||
/**
|
||||
* 实例纬度的聚合操作
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-30 11:46
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class AggregateProcessInstanceService {
|
||||
private HistoryService historyService;
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final EsProcessInstanceService esProcessInstanceService;
|
||||
private final AggregateProcessTaskService aggregateProcessTaskService;
|
||||
private final ExtAxUnfinishedProcessService unfinishedProcessService;
|
||||
|
||||
/**
|
||||
* 同步指定查询结果的数据至 ES
|
||||
* <p>
|
||||
* 内部自动分页循环同步
|
||||
*
|
||||
* @param search
|
||||
* @return 应同步至 ES 的统计数据
|
||||
*/
|
||||
public DataSyncSummaryDTO syncProcessInstanceForSearch(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
|
||||
log.info("查询到待同步数据量: {} 条", totalProcessInstanceCount);
|
||||
AtomicLong totalProcessTaskCount = new AtomicLong(0);
|
||||
IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize())
|
||||
.forEach(skipRows -> {
|
||||
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100),
|
||||
skipRows, skipRows + search.getOverPageSize());
|
||||
int pageNo = skipRows / search.getOverPageSize() + 1;
|
||||
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page<HistoricProcessInstance>(pageNo, search.getOverPageSize()));
|
||||
instances.forEach(hpi -> {
|
||||
if (Objects.nonNull(hpi.getEndTime())) {
|
||||
// 已结束的实例
|
||||
List<EsProcessTask> tasks = syncFinishProcessInstance(hpi);
|
||||
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + tasks.size());
|
||||
} else {
|
||||
// 未结束的实例
|
||||
}
|
||||
});
|
||||
});
|
||||
return new DataSyncSummaryDTO(totalProcessInstanceCount, totalProcessTaskCount.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步结束审批的数据至 ES
|
||||
*/
|
||||
public List<EsProcessTask> syncFinishProcessInstance(HistoricProcessInstance hpi) {
|
||||
if (Objects.isNull(hpi)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
EsProcessInstance esProcessInstance = new EsProcessInstance();
|
||||
esProcessInstance.setId(hpi.getId());
|
||||
esProcessInstance.setName(hpi.getName());
|
||||
esProcessInstance.setBusinessKey(hpi.getBusinessKey());
|
||||
esProcessInstance.setProcessDefinitionId(hpi.getProcessDefinitionId());
|
||||
esProcessInstance.setStartTime(hpi.getStartTime());
|
||||
esProcessInstance.setEndTime(hpi.getEndTime());
|
||||
esProcessInstance.setDurationInMillis(hpi.getDurationInMillis());
|
||||
esProcessInstance.setTenantId(hpi.getTenantId());
|
||||
esProcessInstance.setBusinessStatus(hpi.getBusinessStatus());
|
||||
|
||||
Map<String, Object> variables = hpi.getProcessVariables();
|
||||
|
||||
esProcessInstance.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
|
||||
esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
|
||||
esProcessInstance.setAgent((Boolean) variables.getOrDefault(INTERNAL_PROCESS_AGENT, false));
|
||||
|
||||
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId());
|
||||
BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> {
|
||||
esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation());
|
||||
esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature());
|
||||
});
|
||||
|
||||
// 实例纬度数据同步 ES
|
||||
esProcessInstanceService.insert(esProcessInstance);
|
||||
|
||||
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121));
|
||||
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
|
||||
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
|
||||
List<EsProcessTask> toEsTasks = new ArrayList<>();
|
||||
if (version.compareTo(supportVersion) >= 0) {
|
||||
log.info("通过新日志表进行任务纬度的数据同步");
|
||||
// 用新日志表数据任务同步
|
||||
toEsTasks.addAll(aggregateProcessTaskService.syncProcessTaskForNew(hpi, instanceVersion));
|
||||
} else {
|
||||
log.info("通过引擎的任务表进行任务纬度的数据同步");
|
||||
// 用引擎的任务表处理任务同步
|
||||
toEsTasks.addAll(aggregateProcessTaskService.syncProcessTaskForOld(hpi, instanceVersion, bpmnModel));
|
||||
}
|
||||
return toEsTasks;
|
||||
}
|
||||
|
||||
// public List<HistoricProcessInstance> recordUnfinishedProcessInstance(HistoricProcessInstanceSearchForEsDTO unfinishedSearch) {
|
||||
// List<HistoricProcessInstance> unfinishedInstances = historyService.createHistoricProcessInstanceQuery()
|
||||
// .or()
|
||||
// .unfinished()
|
||||
// .finishedAfter(unfinishedSearch.getEndTime())
|
||||
// .endOr()
|
||||
// .list();
|
||||
// List<String> unfinishedIds = unfinishedInstances.stream().map(HistoricProcessInstance::getId).collect(Collectors.toList());
|
||||
// log.info("未完成的实例数: {}", unfinishedIds.size());
|
||||
//
|
||||
// unfinishedProcessService.bulkInsertUnfinishedProcess(unfinishedIds);
|
||||
// return null;
|
||||
// }
|
||||
}
|
||||
@ -0,0 +1,221 @@
|
||||
package cn.axzo.workflow.es.service.aggregation;
|
||||
|
||||
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
|
||||
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
|
||||
import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter;
|
||||
import cn.axzo.workflow.es.model.EsProcessTask;
|
||||
import cn.axzo.workflow.es.service.EsProcessTaskService;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.FlowElement;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.persistence.entity.CommentEntityImpl;
|
||||
import org.flowable.engine.task.Attachment;
|
||||
import org.flowable.engine.task.Comment;
|
||||
import org.flowable.task.api.history.HistoricTaskInstance;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT;
|
||||
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END;
|
||||
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED;
|
||||
|
||||
/**
|
||||
* 任务纬度的聚合操作
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-30 11:46
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class AggregateProcessTaskService {
|
||||
private final static DefaultArtifactVersion SUPPORT_VERSION = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
|
||||
private final EsProcessTaskService esProcessTaskService;
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService;
|
||||
private final BpmnHistoricAttachmentConverter attachmentConverter;
|
||||
|
||||
/**
|
||||
* 通过新的日志表进行任务纬度的数据同步至 ES
|
||||
*/
|
||||
public List<EsProcessTask> syncProcessTaskForNew(HistoricProcessInstance hpi, String instanceVersion) {
|
||||
DefaultArtifactVersion currentVersion = new DefaultArtifactVersion(instanceVersion);
|
||||
if (currentVersion.compareTo(SUPPORT_VERSION) < 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<ExtAxProcessLog> logs = bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(hpi.getId());
|
||||
Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId())
|
||||
.stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
List<EsProcessTask> toEsTasks = new ArrayList<>();
|
||||
logs.forEach(log -> {
|
||||
EsProcessTask esProcessTask = new EsProcessTask();
|
||||
esProcessTask.setId(log.getTaskId());
|
||||
esProcessTask.setProcessInstanceId(log.getProcessInstanceId());
|
||||
esProcessTask.setProcessDefinitionId(hpi.getProcessDefinitionId());
|
||||
esProcessTask.setTaskDefinitionKey(log.getActivityId());
|
||||
esProcessTask.setName(log.getActivityName());
|
||||
esProcessTask.setStatus(log.getStatus());
|
||||
esProcessTask.setAdvice(log.getAdvice());
|
||||
esProcessTask.setOperationDesc(log.getOperationDesc());
|
||||
esProcessTask.setStartTime(log.getStartTime());
|
||||
esProcessTask.setEndTime(log.getEndTime());
|
||||
if (Objects.nonNull(log.getEndTime())) {
|
||||
esProcessTask.setDuration(DateUtil.betweenMs(log.getEndTime(), log.getStartTime()));
|
||||
}
|
||||
esProcessTask.setLastUpdateTime(log.getEndTime());
|
||||
esProcessTask.setTenantId(log.getTenantId());
|
||||
esProcessTask.setAssigner(CollectionUtils.isEmpty(log.getAssigneeFull()) ? null : log.getAssigneeFull().get(0));
|
||||
esProcessTask.setAttachments(attachmentConverter.toVos(attachmentMap.getOrDefault(log.getTaskId(), Collections.emptyList())));
|
||||
esProcessTask.setApprovalMethod(log.getApprovalMethod());
|
||||
esProcessTask.setNodeType(log.getNodeType());
|
||||
esProcessTask.setNodeMode(log.getNodeMode());
|
||||
toEsTasks.add(esProcessTask);
|
||||
});
|
||||
|
||||
esProcessTaskService.insertBatch(toEsTasks);
|
||||
return toEsTasks;
|
||||
}
|
||||
|
||||
/**
|
||||
* 优先使用三个参数的同步方法.
|
||||
* <p>
|
||||
* 通过原本引擎表进行任务纬度的数据同步至 ES
|
||||
*/
|
||||
public List<EsProcessTask> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion) {
|
||||
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId());
|
||||
return syncProcessTaskForOld(hpi, instanceVersion, bpmnModel);
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过原本引擎表进行任务纬度的数据同步至 ES
|
||||
*
|
||||
* @param hpi
|
||||
* @param instanceVersion
|
||||
* @param bpmnModel
|
||||
*/
|
||||
public List<EsProcessTask> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion, BpmnModel bpmnModel) {
|
||||
|
||||
List<HistoricTaskInstance> tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(hpi.getId());
|
||||
|
||||
Map<String, ExtAxHiTaskInst> extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(hpi.getId())
|
||||
.stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s));
|
||||
|
||||
Map<String, List<Comment>> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(hpi.getId())
|
||||
.stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId())
|
||||
.stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
Map<String, Object> variables = hpi.getProcessVariables();
|
||||
|
||||
// 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理
|
||||
List<HistoricTaskInstance> filteredEffectiveTasks = filterEffectiveTasks(tasks, instanceVersion);
|
||||
|
||||
List<EsProcessTask> toEsTasks = new ArrayList<>();
|
||||
for (HistoricTaskInstance task : filteredEffectiveTasks) {
|
||||
ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst());
|
||||
List<Comment> comments = commentMap.getOrDefault(task.getId(), Collections.emptyList());
|
||||
List<Attachment> attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList());
|
||||
|
||||
EsProcessTask esProcessTask = new EsProcessTask();
|
||||
esProcessTask.setId(task.getId());
|
||||
esProcessTask.setProcessInstanceId(task.getProcessInstanceId());
|
||||
esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId());
|
||||
esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey());
|
||||
esProcessTask.setName(task.getName());
|
||||
esProcessTask.setStatus(extTask.getStatus());
|
||||
// 处理 advice
|
||||
esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE))
|
||||
.findFirst().orElse(new CommentEntityImpl()).getFullMessage());
|
||||
// 处理 operationDesc
|
||||
esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType()))
|
||||
.max(Comparator.comparing(Comment::getTime))
|
||||
.orElse(new CommentEntityImpl()).getFullMessage());
|
||||
esProcessTask.setStartTime(task.getCreateTime());
|
||||
esProcessTask.setEndTime(task.getEndTime());
|
||||
esProcessTask.setDuration(task.getDurationInMillis());
|
||||
// esProcessTask.setLastUpdateTime(task);
|
||||
esProcessTask.setTenantId(task.getTenantId());
|
||||
esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null)));
|
||||
esProcessTask.setAttachments(attachmentConverter.toVos(attachments));
|
||||
if (Objects.nonNull(bpmnModel)) {
|
||||
FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey());
|
||||
BpmnMetaParserHelper.getApprovalMethod(flowElement)
|
||||
.ifPresent(e -> esProcessTask.setApprovalMethod(e.getType()));
|
||||
BpmnMetaParserHelper.getNodeType(flowElement)
|
||||
.ifPresent(e -> esProcessTask.setNodeType(e.getType()));
|
||||
// esProcessTask.setNodeMode();
|
||||
}
|
||||
toEsTasks.add(esProcessTask);
|
||||
}
|
||||
esProcessTaskService.insertBatch(toEsTasks);
|
||||
return toEsTasks;
|
||||
}
|
||||
|
||||
private List<HistoricTaskInstance> filterEffectiveTasks(List<HistoricTaskInstance> tasks, String instanceVersion) {
|
||||
List<HistoricTaskInstance> effectiveTasks = new ArrayList<>();
|
||||
if (CollectionUtils.isEmpty(tasks)) {
|
||||
return effectiveTasks;
|
||||
}
|
||||
Stream<HistoricTaskInstance> taskInstanceStream = tasks.stream()
|
||||
.filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID))
|
||||
.filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID))
|
||||
.filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus()));
|
||||
if (Objects.isNull(instanceVersion)) {
|
||||
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
|
||||
} else {
|
||||
if (StringUtils.hasText(instanceVersion)) {
|
||||
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
|
||||
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130);
|
||||
if (version.compareTo(supportVersion) < 0) {
|
||||
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
|
||||
} else {
|
||||
taskInstanceStream.forEach(effectiveTasks::add);
|
||||
}
|
||||
} else {
|
||||
taskInstanceStream.forEach(effectiveTasks::add);
|
||||
}
|
||||
}
|
||||
return effectiveTasks;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 兼容 1.2.1 版本的审批日志
|
||||
*
|
||||
* @param stream
|
||||
* @return
|
||||
*/
|
||||
private Stream<HistoricTaskInstance> compatibleVersion(Stream<HistoricTaskInstance> stream) {
|
||||
return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|
||||
|| (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|
||||
).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason())));
|
||||
}
|
||||
}
|
||||
@ -6,6 +6,7 @@ import cn.axzo.workflow.es.service.EsProcessInstanceService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@ -15,7 +16,7 @@ import java.util.Collection;
|
||||
* @author wangli
|
||||
* @since 2024-09-27 11:09
|
||||
*/
|
||||
@Component
|
||||
@Service
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class EsProcessInstanceServiceImpl implements EsProcessInstanceService {
|
||||
|
||||
@ -6,16 +6,17 @@ import cn.axzo.workflow.es.service.EsProcessTaskService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* 操作流程任务的 ES Service
|
||||
* 操作流程任务的 ES Service 实现
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-27 11:09
|
||||
*/
|
||||
@Component
|
||||
@Service
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class EsProcessTaskServiceImpl implements EsProcessTaskService {
|
||||
|
||||
@ -0,0 +1,207 @@
|
||||
package cn.axzo.workflow.listener;
|
||||
|
||||
import cn.axzo.workflow.common.enums.BpmnFlowNodeMode;
|
||||
import cn.axzo.workflow.core.engine.listener.entity.EntityEventHandle;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.bpmn.model.FlowElement;
|
||||
import org.flowable.bpmn.model.UserTask;
|
||||
import org.flowable.engine.impl.bpmn.behavior.MultiInstanceActivityBehavior;
|
||||
import org.flowable.task.service.impl.persistence.entity.TaskEntity;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.AND_SIGN_EXPRESSION;
|
||||
import static cn.axzo.workflow.common.enums.BpmnFlowNodeMode.AND;
|
||||
import static cn.axzo.workflow.common.enums.BpmnFlowNodeMode.GENERAL;
|
||||
import static cn.axzo.workflow.common.enums.BpmnFlowNodeMode.OR;
|
||||
|
||||
/**
|
||||
* 用于处理非结束的实例的数据同步至 ES
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-06 00:02
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
public class SyncEsTaskEntityEventHandle implements EntityEventHandle<TaskEntity> {
|
||||
|
||||
@Override
|
||||
public boolean support(Object entity) {
|
||||
return entity instanceof TaskEntity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskEntity convert(Object entity) {
|
||||
return (TaskEntity) entity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onActivated(TaskEntity taskEntity) {
|
||||
log.debug("onActivated");
|
||||
}
|
||||
|
||||
public void onSuspended(TaskEntity taskEntity) {
|
||||
log.debug("onSuspended");
|
||||
}
|
||||
|
||||
public void onDeleted(TaskEntity taskEntity) {
|
||||
log.debug("onDeleted");
|
||||
// ExtAxProcessLog queryLog = new ExtAxProcessLog();
|
||||
// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
|
||||
// queryLog.setTaskId(taskEntity.getId());
|
||||
// queryLog.setOperationDesc(PENDING.getDesc());
|
||||
// ExtAxProcessLog update = new ExtAxProcessLog();
|
||||
//
|
||||
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
||||
// RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
|
||||
// BpmnTaskDelegateAssigner assignee = BpmnTaskDelegateAssigner.toObjectCompatible(taskEntity.getVariable(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + taskEntity.getId()));
|
||||
// if (Objects.nonNull(assignee) && !Objects.equals(NO_ASSIGNEE, assignee.buildAssigneeId())) {
|
||||
// update.setAssigneeFull(Lists.newArrayList(assignee));
|
||||
// update.setAssigneeId(Long.valueOf(assignee.getPersonId()));
|
||||
// update.setAssigneeTenantId(assignee.getTenantId());
|
||||
// update.setAssigneeName(assignee.getAssignerName());
|
||||
// update.setAssigneeOuId(assignee.getOuId());
|
||||
// }
|
||||
//
|
||||
// boolean needDelete = false;
|
||||
// if (Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType())) {
|
||||
// update.setStatus(APPROVED.getStatus());
|
||||
// } else {
|
||||
//
|
||||
// Object advice = taskEntity.getTransientVariableLocal(COMMENT_TYPE_ADVICE);
|
||||
// if (Objects.nonNull(advice) && StringUtils.hasText(advice.toString())) {
|
||||
// log.info("COMMENT_TYPE_ADVICE: {}", advice);
|
||||
// update.setAdvice(advice.toString());
|
||||
// }
|
||||
// Object operationDesc = taskEntity.getTransientVariableLocal(COMMENT_TYPE_OPERATION_DESC);
|
||||
// if (Objects.nonNull(operationDesc) && StringUtils.hasText(operationDesc.toString())) {
|
||||
// log.info("COMMENT_TYPE_OPERATION_DESC: {}", operationDesc);
|
||||
// update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() + operationDesc : operationDesc.toString());
|
||||
// } else {
|
||||
// update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() : "");
|
||||
// // 评论节点会给 operationDesc 赋 COMMENTED 的值,所以注释
|
||||
//// update.setOperationDesc(BpmnProcessInstanceResultEnum.valueOfStatus(completionType).getDesc());
|
||||
// }
|
||||
//
|
||||
//
|
||||
// String completionType = taskEntity.getVariable(TASK_COMPLETE_OPERATION_TYPE + taskEntity.getId(), String.class);
|
||||
// if (StringUtils.hasText(completionType) && !Objects.equals(DELETED.getStatus(), completionType)) {
|
||||
// log.info("TASK_COMPLETE_OPERATION_TYPE: {}", completionType);
|
||||
// update.setStatus(completionType);
|
||||
// } else {
|
||||
// // 多实例除操作人以外的任务,直接删除日志, 例如一个节点有两个人或签,A 人驳回了,那么 B 人不再需要操作,任务自动删除。而会签也同理
|
||||
// update.setStatus(DELETED.getStatus());// delete标志着是多实例删除
|
||||
// needDelete = true;
|
||||
// }
|
||||
// }
|
||||
// update.setEndTime(new Date());
|
||||
//
|
||||
// // 判断是否抄送节点,如果是的话,需要将抄送人集合放入对应字段
|
||||
// if (isCarbonCopyNode(queryLog)) {
|
||||
// // 抄送人集合
|
||||
// List<BpmnTaskDelegateAssigner> carbonCopies = runtimeService.getVariable(taskEntity.getProcessInstanceId(), INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class);
|
||||
// update.setAssigneeFull(carbonCopies);
|
||||
// update.setOperationDesc("抄送" + carbonCopies.size() + "人");
|
||||
// }
|
||||
//
|
||||
// processLogService.update(queryLog, update);
|
||||
//
|
||||
// if (needDelete) {
|
||||
// // 再逻辑删除该记录
|
||||
// ExtAxProcessLog deleteLog = new ExtAxProcessLog();
|
||||
// deleteLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
|
||||
// deleteLog.setTaskId(taskEntity.getId());
|
||||
// processLogService.delete(deleteLog);
|
||||
// }
|
||||
}
|
||||
|
||||
private boolean isCarbonCopyNode(ExtAxProcessLog queryLog) {
|
||||
// List<ExtAxProcessLog> logs = processLogService.genericQuery(queryLog);
|
||||
// if (CollectionUtils.isEmpty(logs) || logs.size() != 1) {
|
||||
// return false;
|
||||
// }
|
||||
// return Objects.equals(logs.get(0).getNodeType(), BpmnFlowNodeType.NODE_CARBON_COPY.getType());
|
||||
return false;
|
||||
}
|
||||
|
||||
public void onUpdated(TaskEntity taskEntity) {
|
||||
log.debug("onUpdated");
|
||||
// if (Objects.equals(HIDDEN_ASSIGNEE_ID, taskEntity.getAssignee())) {
|
||||
// ExtAxProcessLog queryLog = new ExtAxProcessLog();
|
||||
// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
|
||||
// queryLog.setTaskId(taskEntity.getId());
|
||||
// processLogService.delete(queryLog);
|
||||
// } else {
|
||||
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
||||
// RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
|
||||
// List<BpmnTaskDelegateAssigner> assigneeList = runtimeService.getVariable(taskEntity.getProcessInstanceId(),
|
||||
// INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class);
|
||||
// ListUtils.emptyIfNull(assigneeList).stream().filter(e -> Objects.equals(e.buildAssigneeId(), taskEntity.getAssignee())).findAny()
|
||||
// .ifPresent(assignee -> {
|
||||
// log.debug("审批人: {}", JSON.toJSONString(assignee));
|
||||
// ExtAxProcessLog queryLog = new ExtAxProcessLog();
|
||||
// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
|
||||
// queryLog.setTaskId(taskEntity.getId());
|
||||
// processLogService.updateAssignee(queryLog, assignee);
|
||||
// });
|
||||
// }
|
||||
}
|
||||
|
||||
public void onInitialized(TaskEntity taskEntity) {
|
||||
log.debug("onInitialized");
|
||||
// BpmnMetaParserHelper.getButtonConfig(ProcessDefinitionUtil.getProcess(taskEntity.getProcessDefinitionId()), taskEntity.getTaskDefinitionKey())
|
||||
// .ifPresent(buttons -> {
|
||||
// ExtAxProcessLog queryLog = new ExtAxProcessLog();
|
||||
// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId());
|
||||
// queryLog.setTaskId(taskEntity.getId());
|
||||
//
|
||||
// ExtAxProcessLog updateLog = new ExtAxProcessLog();
|
||||
// updateLog.setButtonConf(buttons);
|
||||
// processLogService.update(queryLog, updateLog);
|
||||
// });
|
||||
}
|
||||
|
||||
public void onCreate(TaskEntity taskEntity) {
|
||||
log.debug("onCreate");
|
||||
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
||||
// // 记录发起人
|
||||
// boolean isNodeStarter = Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType());
|
||||
//
|
||||
// RepositoryService repositoryService = processEngineConfiguration.getRepositoryService();
|
||||
// BpmnModel bpmnModel = repositoryService.getBpmnModel(taskEntity.getProcessDefinitionId());
|
||||
// FlowElement flowElement = bpmnModel.getFlowElement(taskEntity.getTaskDefinitionKey());
|
||||
//
|
||||
// ExtAxProcessLog log = new ExtAxProcessLog();
|
||||
// log.setProcessInstanceId(taskEntity.getProcessInstanceId());
|
||||
// log.setTenantId(taskEntity.getTenantId());
|
||||
// log.setActivityId(taskEntity.getTaskDefinitionKey());
|
||||
// log.setActivityName(taskEntity.getName());
|
||||
// log.setApprovalMethod((isNodeStarter ? nobody : getApprovalMethod(flowElement).orElse(nobody)).getType());
|
||||
// log.setNodeType((getNodeType(flowElement).orElse(BpmnFlowNodeType.NODE_EMPTY)).getType());
|
||||
// log.setNodeMode((isNodeStarter ? BpmnFlowNodeMode.GENERAL : getNodeMode(flowElement)).getType());
|
||||
// log.setTaskId(taskEntity.getId());
|
||||
// log.setOperationDesc(PENDING.getDesc());
|
||||
// log.setStartTime(taskEntity.getCreateTime());
|
||||
// log.setStatus(PROCESSING.getStatus());
|
||||
//
|
||||
// processLogService.insert(log);
|
||||
}
|
||||
|
||||
private BpmnFlowNodeMode getNodeMode(FlowElement flowElement) {
|
||||
BpmnFlowNodeMode node = GENERAL;
|
||||
if (flowElement instanceof UserTask) {
|
||||
UserTask userTask = (UserTask) flowElement;
|
||||
if (userTask.getBehavior() instanceof MultiInstanceActivityBehavior) {
|
||||
MultiInstanceActivityBehavior behavior =
|
||||
(MultiInstanceActivityBehavior) userTask.getBehavior();
|
||||
node = Objects.equals(AND_SIGN_EXPRESSION, behavior.getCompletionCondition()) ? AND : OR;
|
||||
}
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,65 +1,25 @@
|
||||
package cn.axzo.workflow.server.xxljob;
|
||||
|
||||
import cn.axzo.workflow.common.enums.WorkspaceType;
|
||||
import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO;
|
||||
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
|
||||
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
|
||||
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
|
||||
import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter;
|
||||
import cn.axzo.workflow.es.model.EsProcessInstance;
|
||||
import cn.axzo.workflow.es.model.EsProcessTask;
|
||||
import cn.axzo.workflow.es.service.EsProcessInstanceService;
|
||||
import cn.axzo.workflow.es.service.EsProcessTaskService;
|
||||
import cn.axzo.workflow.core.engine.cmd.CustomInsertPropertyCmd;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.FlowElement;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.persistence.entity.CommentEntityImpl;
|
||||
import org.flowable.engine.task.Attachment;
|
||||
import org.flowable.engine.task.Comment;
|
||||
import org.flowable.task.api.history.HistoricTaskInstance;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
import org.flowable.spring.SpringProcessEngineConfiguration;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
|
||||
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END;
|
||||
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.LATEST_SYNC_TO_ELASTICSEARCH_TIME;
|
||||
|
||||
/**
|
||||
* 流程实例数据同步
|
||||
@ -71,19 +31,19 @@ import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJEC
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class ProcessInstanceSyncJobHandler extends IJobHandler {
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService;
|
||||
private final BpmnHistoricAttachmentConverter attachmentConverter;
|
||||
private final EsProcessInstanceService esProcessInstanceService;
|
||||
private final EsProcessTaskService esProcessTaskService;
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
|
||||
|
||||
@XxlJob("processInstanceToEs")
|
||||
public ReturnT<String> execute(String s) {
|
||||
log.debug("start exec process instance data sync... ");
|
||||
XxlJobLogger.log("start exec process instance data sync... ");
|
||||
log.debug("start exec finished process instance data sync... ");
|
||||
XxlJobLogger.log("start exec finished process instance data sync... ");
|
||||
|
||||
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
|
||||
search.setFinished(true);
|
||||
search.setEndTime(new Date());
|
||||
Date endTime = new Date();
|
||||
search.setEndTime(endTime);
|
||||
|
||||
try {
|
||||
if (StringUtils.hasText(s)) {
|
||||
search = JSON.parseObject(s, search.getClass());
|
||||
@ -96,211 +56,39 @@ public class ProcessInstanceSyncJobHandler extends IJobHandler {
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
|
||||
Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
|
||||
log.info("查询到待同步数据量: {} 条", totalCount);
|
||||
XxlJobLogger.log("查询到待同步数据量: {} 条", totalCount);
|
||||
Integer overPageSize = search.getOverPageSize();
|
||||
HistoricProcessInstanceSearchForEsDTO finalSearch = search;
|
||||
IntStream.iterate(0, i -> i + overPageSize).limit((totalCount + overPageSize - 1) / overPageSize)
|
||||
.forEach(skipRows -> {
|
||||
log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100),
|
||||
skipRows, skipRows + overPageSize);
|
||||
int pageNo = skipRows / overPageSize;
|
||||
log.info("pageNo: {}", pageNo);
|
||||
executeSync(finalSearch, new Page<>(pageNo, overPageSize));
|
||||
});
|
||||
// 同步前的一些额外操作
|
||||
beforeSync(endTime);
|
||||
|
||||
// 开始同步完成的实例到 ES
|
||||
DataSyncSummaryDTO summary = doSync(search);
|
||||
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
private void executeSync(HistoricProcessInstanceSearchForEsDTO search, IPage page) {
|
||||
List<HistoricProcessInstance> historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page);
|
||||
historicProcessInstances.forEach(processInstance -> {
|
||||
EsProcessInstance esProcessInstance = new EsProcessInstance();
|
||||
esProcessInstance.setId(processInstance.getId());
|
||||
esProcessInstance.setName(processInstance.getName());
|
||||
esProcessInstance.setBusinessKey(processInstance.getBusinessKey());
|
||||
esProcessInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
|
||||
esProcessInstance.setStartTime(processInstance.getStartTime());
|
||||
esProcessInstance.setEndTime(processInstance.getEndTime());
|
||||
esProcessInstance.setDurationInMillis(processInstance.getDurationInMillis());
|
||||
esProcessInstance.setTenantId(processInstance.getTenantId());
|
||||
esProcessInstance.setBusinessStatus(processInstance.getBusinessStatus());
|
||||
/**
|
||||
* 1. 记录本次操作的时间点
|
||||
*/
|
||||
private void beforeSync(Date endTime) {
|
||||
//1. 记录本次操作的时间点
|
||||
CommandExecutor commandExecutor = springProcessEngineConfiguration.getCommandExecutor();
|
||||
commandExecutor.execute(new CustomInsertPropertyCmd(LATEST_SYNC_TO_ELASTICSEARCH_TIME, DateUtil.formatDateTime(endTime)));
|
||||
|
||||
// esProcessInstance.setLastOperationTime();
|
||||
Map<String, Object> variables = processInstance.getProcessVariables();
|
||||
|
||||
esProcessInstance.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
|
||||
esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
|
||||
esProcessInstance.setAgentTenantId(String.valueOf(variables.getOrDefault(INTERNAL_PROCESS_AGENT, "")));
|
||||
|
||||
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(processInstance.getProcessDefinitionId());
|
||||
BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> {
|
||||
esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation());
|
||||
esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature());
|
||||
});
|
||||
|
||||
Integer inserted = esProcessInstanceService.insert(esProcessInstance);
|
||||
if (inserted > 0) {
|
||||
log.debug("write processInstance to es success! id: {}", processInstance.getId());
|
||||
XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId());
|
||||
} else {
|
||||
log.warn("write processInstance to es caught exception! id: {}", processInstance.getId());
|
||||
XxlJobLogger.log("write processInstance to es caught exception! id: {}", processInstance.getId());
|
||||
// // 2. 记录次时间点未完成的实例 ID 数据
|
||||
// HistoricProcessInstanceSearchForEsDTO unfinishedSearch = new HistoricProcessInstanceSearchForEsDTO();
|
||||
// // 特殊的查询逻辑,底层实现只依赖该 endTime,其他参数设置无效
|
||||
// unfinishedSearch.setEndTime(endTime);
|
||||
// aggregateProcessInstanceService.recordUnfinishedProcessInstance(unfinishedSearch);
|
||||
}
|
||||
|
||||
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121));
|
||||
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
|
||||
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
|
||||
List<EsProcessTask> toEsTasks = new ArrayList<>();
|
||||
if (version.compareTo(supportVersion) >= 0) {
|
||||
log.info("通过新日志表进行任务纬度的数据同步");
|
||||
// 用新日志表数据任务同步
|
||||
toEsTasks.addAll(executeTaskSyncForNew(processInstance));
|
||||
} else {
|
||||
log.info("通过引擎的任务表进行任务纬度的数据同步");
|
||||
// 用引擎的任务表处理任务同步
|
||||
toEsTasks.addAll(executeTaskSyncForOld(processInstance, instanceVersion, bpmnModel));
|
||||
}
|
||||
Integer insertedTask = esProcessTaskService.insertBatch(toEsTasks);
|
||||
if (insertedTask > 0) {
|
||||
log.debug("write processTask to es success! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask);
|
||||
XxlJobLogger.log("write processTask to es success! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask);
|
||||
} else {
|
||||
log.warn("write processTask to es caught exception! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask);
|
||||
XxlJobLogger.log("write processTask to es caught exception! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private List<EsProcessTask> executeTaskSyncForNew(HistoricProcessInstance processInstance) {
|
||||
List<ExtAxProcessLog> logs = bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(processInstance.getId());
|
||||
Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
List<EsProcessTask> toEsTasks = new ArrayList<>();
|
||||
logs.forEach(log -> {
|
||||
EsProcessTask esProcessTask = new EsProcessTask();
|
||||
esProcessTask.setId(log.getTaskId());
|
||||
esProcessTask.setProcessInstanceId(log.getProcessInstanceId());
|
||||
esProcessTask.setProcessDefinitionId(processInstance.getProcessDefinitionId());
|
||||
esProcessTask.setTaskDefinitionKey(log.getActivityId());
|
||||
esProcessTask.setName(log.getActivityName());
|
||||
esProcessTask.setStatus(log.getStatus());
|
||||
esProcessTask.setAdvice(log.getAdvice());
|
||||
esProcessTask.setOperationDesc(log.getOperationDesc());
|
||||
esProcessTask.setStartTime(log.getStartTime());
|
||||
esProcessTask.setEndTime(log.getEndTime());
|
||||
if (Objects.nonNull(log.getEndTime())) {
|
||||
esProcessTask.setDuration(DateUtil.betweenMs(log.getEndTime(), log.getStartTime()));
|
||||
}
|
||||
esProcessTask.setLastUpdateTime(log.getEndTime());
|
||||
esProcessTask.setTenantId(log.getTenantId());
|
||||
esProcessTask.setAssigner(log.getAssigneeFull().get(0));
|
||||
esProcessTask.setAttachments(attachmentConverter.toVos(attachmentMap.getOrDefault(log.getTaskId(), Collections.emptyList())));
|
||||
esProcessTask.setApprovalMethod(log.getApprovalMethod());
|
||||
esProcessTask.setNodeType(log.getNodeType());
|
||||
esProcessTask.setNodeMode(log.getNodeMode());
|
||||
toEsTasks.add(esProcessTask);
|
||||
});
|
||||
return toEsTasks;
|
||||
}
|
||||
|
||||
|
||||
private List<EsProcessTask> executeTaskSyncForOld(HistoricProcessInstance processInstance, String instanceVersion, BpmnModel bpmnModel) {
|
||||
List<HistoricTaskInstance> tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(processInstance.getId());
|
||||
|
||||
Map<String, ExtAxHiTaskInst> extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s));
|
||||
|
||||
Map<String, List<Comment>> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
Map<String, Object> variables = processInstance.getProcessVariables();
|
||||
|
||||
// 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理
|
||||
filterEffectiveTasks(tasks, instanceVersion);
|
||||
|
||||
List<EsProcessTask> toEsTasks = new ArrayList<>();
|
||||
tasks.forEach(task -> {
|
||||
log.debug("task: {}", task.getId());
|
||||
ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst());
|
||||
List<Comment> comments = commentMap.getOrDefault(task.getId(), Collections.emptyList());
|
||||
List<Attachment> attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList());
|
||||
|
||||
EsProcessTask esProcessTask = new EsProcessTask();
|
||||
esProcessTask.setId(task.getId());
|
||||
esProcessTask.setProcessInstanceId(task.getProcessInstanceId());
|
||||
esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId());
|
||||
esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey());
|
||||
esProcessTask.setName(task.getName());
|
||||
esProcessTask.setStatus(extTask.getStatus());
|
||||
// 处理 advice
|
||||
esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE))
|
||||
.findFirst().orElse(new CommentEntityImpl()).getFullMessage());
|
||||
// 处理 operationDesc
|
||||
esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType()))
|
||||
.max(Comparator.comparing(Comment::getTime))
|
||||
.orElse(new CommentEntityImpl()).getFullMessage());
|
||||
esProcessTask.setStartTime(task.getCreateTime());
|
||||
esProcessTask.setEndTime(task.getEndTime());
|
||||
esProcessTask.setDuration(task.getDurationInMillis());
|
||||
// esProcessTask.setLastUpdateTime(task);
|
||||
esProcessTask.setTenantId(task.getTenantId());
|
||||
esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null)));
|
||||
esProcessTask.setAttachments(attachmentConverter.toVos(attachments));
|
||||
if (Objects.nonNull(bpmnModel)) {
|
||||
FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey());
|
||||
BpmnMetaParserHelper.getApprovalMethod(flowElement)
|
||||
.ifPresent(e -> esProcessTask.setApprovalMethod(e.getType()));
|
||||
BpmnMetaParserHelper.getNodeType(flowElement)
|
||||
.ifPresent(e -> esProcessTask.setNodeType(e.getType()));
|
||||
// esProcessTask.setNodeMode();
|
||||
}
|
||||
toEsTasks.add(esProcessTask);
|
||||
});
|
||||
return toEsTasks;
|
||||
}
|
||||
|
||||
private List<HistoricTaskInstance> filterEffectiveTasks(List<HistoricTaskInstance> tasks, String instanceVersion) {
|
||||
List<HistoricTaskInstance> effectiveTasks = new ArrayList<>();
|
||||
if (CollectionUtils.isEmpty(tasks)) {
|
||||
return effectiveTasks;
|
||||
}
|
||||
Stream<HistoricTaskInstance> taskInstanceStream = tasks.stream()
|
||||
.filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID))
|
||||
.filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID))
|
||||
.filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus()));
|
||||
if (Objects.isNull(instanceVersion)) {
|
||||
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
|
||||
} else {
|
||||
if (StringUtils.hasText(instanceVersion)) {
|
||||
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
|
||||
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130);
|
||||
if (version.compareTo(supportVersion) < 0) {
|
||||
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
|
||||
} else {
|
||||
taskInstanceStream.forEach(effectiveTasks::add);
|
||||
}
|
||||
} else {
|
||||
taskInstanceStream.forEach(effectiveTasks::add);
|
||||
}
|
||||
}
|
||||
return effectiveTasks;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 兼容 1.2.1 版本的审批日志
|
||||
* 开始处理的入口
|
||||
*
|
||||
* @param stream
|
||||
* @return
|
||||
* @param search
|
||||
*/
|
||||
private Stream<HistoricTaskInstance> compatibleVersion(Stream<HistoricTaskInstance> stream) {
|
||||
return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|
||||
|| (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|
||||
).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason())));
|
||||
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
return aggregateProcessInstanceService.syncProcessInstanceForSearch(search);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user