Merge branch 'feature/REQ-2752' into feature/merge_2924_2752
This commit is contained in:
commit
2218710e10
@ -4,6 +4,7 @@ import cn.axzo.workflow.client.annotation.WorkflowEngineFeignClient;
|
||||
import cn.axzo.workflow.common.annotation.InvokeMode;
|
||||
import cn.axzo.workflow.common.annotation.Manageable;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminCreateDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminDeleteDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminQueryDTO;
|
||||
import cn.axzo.workflow.common.model.response.admin.ProcessAdminVo;
|
||||
import cn.azxo.framework.common.model.CommonResponse;
|
||||
@ -63,8 +64,16 @@ public interface ProcessAdminApi {
|
||||
*/
|
||||
@DeleteMapping("/api/process/admin/delete")
|
||||
@InvokeMode(SYNC)
|
||||
CommonResponse<Void> deleteCommonProcessAdmin(@RequestParam Long id);
|
||||
CommonResponse<Integer> deleteCommonProcessAdmin(@RequestParam Long id);
|
||||
|
||||
/**
|
||||
* 根据条件删除管理员
|
||||
* @param dto 删除条件
|
||||
* @return
|
||||
*/
|
||||
@DeleteMapping("/api/process/admin/delete/criteria")
|
||||
@InvokeMode(SYNC)
|
||||
CommonResponse<Integer> deleteProcessAdminCriteria(@RequestBody ProcessAdminDeleteDTO dto);
|
||||
|
||||
/**
|
||||
* 删除管理员
|
||||
@ -73,5 +82,5 @@ public interface ProcessAdminApi {
|
||||
*/
|
||||
@DeleteMapping("/api/process/admin/batch/delete")
|
||||
@InvokeMode(SYNC)
|
||||
CommonResponse<Void> batchDeleteProcessAdmin(@RequestBody List<Long> ids);
|
||||
CommonResponse<Integer> batchDeleteProcessAdmin(@RequestBody List<Long> ids);
|
||||
}
|
||||
|
||||
@ -39,17 +39,17 @@ public class ExtAxProcessAdmin extends BaseEntity<ExtAxProcessAdmin> {
|
||||
private Integer workspaceType;
|
||||
|
||||
/**
|
||||
* 管理员类型, super_admin-超级管理员, common_admin-普通管理员
|
||||
* 管理员类型, SUPER_ADMIN-超级管理员, COMMON_ADMIN-普通管理员
|
||||
*/
|
||||
private AdminTypeEnum adminType;
|
||||
|
||||
/**
|
||||
* 角色类型, organization_admin-单位超管, organization_worksapce_admin-项目内单位负责人, workspace_admin-项目超管,other-其他用户
|
||||
* 角色类型, ORGANIZATION_ADMIN-单位超管, ORG_WORKSPACE_ADMIN-项目内单位负责人, WORKSPACE_ADMIN-项目超管,OTHER-其他用户
|
||||
*/
|
||||
private AdminRoleType roleType;
|
||||
|
||||
/**
|
||||
* 数据来源, system-系统录入, user-用户手动录入
|
||||
* 数据来源, SYSTEM_ENTRY-系统录入, USER_ENTRY-用户手动录入
|
||||
*/
|
||||
private AdminDataSource dataSource;
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package cn.axzo.workflow.admin.service;
|
||||
|
||||
import cn.axzo.workflow.admin.repository.entity.ExtAxProcessAdmin;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminDeleteDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminQueryDTO;
|
||||
|
||||
import java.util.List;
|
||||
@ -36,13 +37,13 @@ public interface ExtAxProcessAdminService {
|
||||
|
||||
/**
|
||||
* 根据条件删除管理员配置
|
||||
* @param processAdmin 删除条件
|
||||
* @param deleteDTO 删除条件
|
||||
*/
|
||||
void delete(ExtAxProcessAdmin processAdmin);
|
||||
Integer delete(ProcessAdminDeleteDTO deleteDTO);
|
||||
|
||||
/**
|
||||
* 批量删除管理员
|
||||
* @param ids 配置表id列表
|
||||
*/
|
||||
void deleteCommonAdminsByIds(List<Long> ids);
|
||||
Integer deleteCommonAdminsByIds(List<Long> ids);
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import cn.axzo.workflow.admin.repository.mapper.ExtAxProcessAdminMapper;
|
||||
import cn.axzo.workflow.admin.service.ExtAxProcessAdminService;
|
||||
import cn.axzo.workflow.common.enums.AdminDataSource;
|
||||
import cn.axzo.workflow.common.enums.AdminTypeEnum;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminDeleteDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminQueryDTO;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -51,6 +52,39 @@ public class ExtAxProcessAdminServiceImpl implements ExtAxProcessAdminService {
|
||||
return extAxProcessAdminMapper.selectCount(getQueryWrapper(queryDTO));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer delete(ProcessAdminDeleteDTO deleteDTO) {
|
||||
if (deleteDTO == null) {
|
||||
return 0;
|
||||
}
|
||||
return extAxProcessAdminMapper.delete(getDeleteWrapper(deleteDTO));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer deleteCommonAdminsByIds(List<Long> ids) {
|
||||
if (CollectionUtils.isEmpty(ids)) {
|
||||
return 0;
|
||||
}
|
||||
List<ExtAxProcessAdmin> extAxProcessAdmins = extAxProcessAdminMapper.selectBatchIds(ids);
|
||||
if (CollectionUtils.isEmpty(extAxProcessAdmins)) {
|
||||
return 0;
|
||||
}
|
||||
validateDeleteCommonAdmins(extAxProcessAdmins);
|
||||
return extAxProcessAdminMapper.deleteBatchIds(ids);
|
||||
}
|
||||
|
||||
private void validateDeleteCommonAdmins(List<ExtAxProcessAdmin> extAxProcessAdmins) {
|
||||
if (CollectionUtils.isEmpty(extAxProcessAdmins)) {
|
||||
return;
|
||||
}
|
||||
if (extAxProcessAdmins.stream().anyMatch(ad -> ad.getAdminType() == AdminTypeEnum.SUPER_ADMIN)) {
|
||||
throw new ServiceException("超级管理员不允许删除");
|
||||
}
|
||||
if (extAxProcessAdmins.stream().anyMatch(ad -> ad.getDataSource() == AdminDataSource.SYSTEM_ENTRY)) {
|
||||
throw new ServiceException("系统数据不允许删除");
|
||||
}
|
||||
}
|
||||
|
||||
private LambdaQueryWrapper<ExtAxProcessAdmin> getQueryWrapper(ProcessAdminQueryDTO queryDTO) {
|
||||
return new LambdaQueryWrapper<ExtAxProcessAdmin>()
|
||||
.in(!CollectionUtils.isEmpty(queryDTO.getProcessAdminIds()), ExtAxProcessAdmin::getId, queryDTO.getProcessAdminIds())
|
||||
@ -62,43 +96,15 @@ public class ExtAxProcessAdminServiceImpl implements ExtAxProcessAdminService {
|
||||
.eq(ExtAxProcessAdmin::getIsDelete, TableIsDeleteEnum.NORMAL.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(ExtAxProcessAdmin processAdmin) {
|
||||
if (processAdmin == null) {
|
||||
return;
|
||||
}
|
||||
extAxProcessAdminMapper.delete(buildQueryWrapper(processAdmin));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteCommonAdminsByIds(List<Long> ids) {
|
||||
if (CollectionUtils.isEmpty(ids)) {
|
||||
return;
|
||||
}
|
||||
List<ExtAxProcessAdmin> extAxProcessAdmins = extAxProcessAdminMapper.selectBatchIds(ids);
|
||||
if (CollectionUtils.isEmpty(extAxProcessAdmins)) {
|
||||
return;
|
||||
}
|
||||
validateDeleteCommonAdmins(extAxProcessAdmins);
|
||||
extAxProcessAdminMapper.deleteBatchIds(ids);
|
||||
}
|
||||
|
||||
private void validateDeleteCommonAdmins(List<ExtAxProcessAdmin> extAxProcessAdmins) {
|
||||
if (CollectionUtils.isEmpty(extAxProcessAdmins)) {
|
||||
return;
|
||||
}
|
||||
if (extAxProcessAdmins.stream().anyMatch(ad -> ad.getAdminType() == AdminTypeEnum.SUPER_ADMIN || ad.getDataSource() == AdminDataSource.SYSTEM_ENTRY)) {
|
||||
throw new ServiceException("超级管理员不允许删除");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
LambdaQueryWrapper<ExtAxProcessAdmin> buildQueryWrapper(ExtAxProcessAdmin processAdmin) {
|
||||
private LambdaQueryWrapper<ExtAxProcessAdmin> getDeleteWrapper(ProcessAdminDeleteDTO deleteDTO) {
|
||||
return new LambdaQueryWrapper<ExtAxProcessAdmin>()
|
||||
.eq(Objects.nonNull(processAdmin.getId()), ExtAxProcessAdmin::getId, processAdmin.getId())
|
||||
.eq(Objects.nonNull(processAdmin.getWorkspaceId()), ExtAxProcessAdmin::getWorkspaceId, processAdmin.getWorkspaceId())
|
||||
.eq(Objects.nonNull(processAdmin.getOrganizationalUnitId()), ExtAxProcessAdmin::getOrganizationalUnitId, processAdmin.getOrganizationalUnitId())
|
||||
.eq(Objects.nonNull(processAdmin.getPersonId()), ExtAxProcessAdmin::getPersonId, processAdmin.getPersonId())
|
||||
.eq(Objects.nonNull(processAdmin.getDataSource()), ExtAxProcessAdmin::getDataSource, processAdmin.getDataSource());
|
||||
.in(!CollectionUtils.isEmpty(deleteDTO.getProcessAdminIds()), ExtAxProcessAdmin::getId, deleteDTO.getProcessAdminIds())
|
||||
.eq(Objects.nonNull(deleteDTO.getWorkspaceId()), ExtAxProcessAdmin::getWorkspaceId, deleteDTO.getWorkspaceId())
|
||||
.eq(deleteDTO.getAdminType() != null, ExtAxProcessAdmin::getAdminType, deleteDTO.getAdminType())
|
||||
.eq(Objects.nonNull(deleteDTO.getOrganizationalUnitId()), ExtAxProcessAdmin::getOrganizationalUnitId, deleteDTO.getOrganizationalUnitId())
|
||||
.in(!CollectionUtils.isEmpty(deleteDTO.getPersonIds()), ExtAxProcessAdmin::getPersonId, deleteDTO.getPersonIds())
|
||||
.eq(Objects.nonNull(deleteDTO.getDataSource()), ExtAxProcessAdmin::getDataSource, deleteDTO.getDataSource())
|
||||
.eq(ExtAxProcessAdmin::getIsDelete, TableIsDeleteEnum.NORMAL.value);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -193,7 +193,7 @@ public interface BpmnConstants {
|
||||
/**
|
||||
* 固定父子文档在相同分片
|
||||
*/
|
||||
String ES_FIXED_ROUTING = "routing";
|
||||
String ES_FIXED_ROUTING = "workflow_engine_join_routing";
|
||||
|
||||
/**
|
||||
* ouId+workspaceId 下限制人员数量为20
|
||||
|
||||
@ -82,10 +82,15 @@ public class HistoricProcessInstanceSearchForEsDTO {
|
||||
*/
|
||||
private TimeQueryDirection endTimeDirection = TimeQueryDirection.BEFORE;
|
||||
|
||||
/**
|
||||
* 是否包含流程变量
|
||||
*/
|
||||
private Boolean hasVariables = false;
|
||||
|
||||
/**
|
||||
* 用于覆盖同步逻辑中的PageSize,一般不需要传
|
||||
*/
|
||||
private Integer overPageSize = 100;
|
||||
private Integer overPageSize = 50;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -51,19 +51,19 @@ public class ProcessAdminCreateDTO {
|
||||
private Integer workspaceType;
|
||||
|
||||
/**
|
||||
* 管理员类型, super_admin-超级管理员, common_admin-普通管理员
|
||||
* 管理员类型, SUPER_ADMIN-超级管理员, COMMON_ADMIN-普通管理员
|
||||
*/
|
||||
@ApiModelProperty(value = "管理员类型, super_admin-超级管理员, common_admin-普通管理员")
|
||||
private AdminTypeEnum adminType;
|
||||
|
||||
/**
|
||||
* 角色类型, organization_admin-单位超管, organization_workspace_admin-项目内单位负责人, workspace_admin-项目超管,other-其他用户
|
||||
* 角色类型, ORGANIZATION_ADMIN-单位超管, ORG_WORKSPACE_ADMIN-项目内单位负责人, WORKSPACE_ADMIN-项目超管,OTHER-其他用户
|
||||
*/
|
||||
@ApiModelProperty(value = "角色类型, organization_admin-单位超管, organization_workspace_admin-项目内单位负责人, workspace_admin-项目超管,other-其他用户")
|
||||
private AdminRoleType roleType;
|
||||
|
||||
/**
|
||||
* 数据来源, system-系统录入, user-用户手动录入
|
||||
* 数据来源, SYSTEM_ENTRY-系统录入, USER_ENTRY-用户手动录入
|
||||
*/
|
||||
@ApiModelProperty(value = "数据来源, system-系统录入, user-用户手动录入")
|
||||
private AdminDataSource dataSource;
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
package cn.axzo.workflow.common.model.request.admin;
|
||||
|
||||
import cn.axzo.workflow.common.enums.AdminDataSource;
|
||||
import cn.axzo.workflow.common.enums.AdminTypeEnum;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ApiModel("删除管理员模型")
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ProcessAdminDeleteDTO {
|
||||
|
||||
/**
|
||||
* 流程管理员配置id列表
|
||||
*/
|
||||
@ApiModelProperty(value = "流程管理员配置id列表")
|
||||
private List<Long> processAdminIds;
|
||||
|
||||
/**
|
||||
* 自然人id
|
||||
*/
|
||||
@ApiModelProperty(value = "自然人id列表")
|
||||
private List<Long> personIds;
|
||||
|
||||
/**
|
||||
* 工作台ID
|
||||
*/
|
||||
@ApiModelProperty(value = "工作台id")
|
||||
private Long workspaceId;
|
||||
|
||||
/**
|
||||
* 单位id
|
||||
*/
|
||||
@ApiModelProperty(value = "单位id")
|
||||
private Long organizationalUnitId;
|
||||
|
||||
/**
|
||||
* 数据来源, SYSTEM_ENTRY-系统录入, USER_ENTRY-用户手动录入
|
||||
*/
|
||||
@ApiModelProperty(value = "数据来源")
|
||||
private AdminDataSource dataSource;
|
||||
|
||||
/**
|
||||
* 管理员类型
|
||||
*/
|
||||
@ApiModelProperty(value = "管理员类型")
|
||||
private AdminTypeEnum adminType;
|
||||
}
|
||||
@ -70,6 +70,11 @@ public class ProcessInstanceDocumentVO {
|
||||
*/
|
||||
private String businessStatus;
|
||||
|
||||
/**
|
||||
* 发起人姓名
|
||||
*/
|
||||
private String initiatorName;
|
||||
|
||||
/**
|
||||
* 实例对应的流程引擎服务端迭代版本
|
||||
*/
|
||||
|
||||
@ -0,0 +1,68 @@
|
||||
package cn.axzo.workflow.core.engine.cmd;
|
||||
|
||||
import org.flowable.common.engine.impl.interceptor.Command;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandContext;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.ManagementService;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
import org.flowable.variable.api.history.HistoricVariableInstance;
|
||||
import org.flowable.variable.api.history.NativeHistoricVariableInstanceQuery;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 自定义指定实例下的指定名称的变量数据
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-10-14 13:43
|
||||
*/
|
||||
public class CustomGetHistoricVariablesCmd implements Command<List<HistoricVariableInstance>> {
|
||||
|
||||
private final String processInstanceId;
|
||||
private final List<String> variableNames;
|
||||
|
||||
public CustomGetHistoricVariablesCmd(String processInstanceId, List<String> variableNames) {
|
||||
this.processInstanceId = processInstanceId;
|
||||
this.variableNames = variableNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HistoricVariableInstance> execute(CommandContext commandContext) {
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
|
||||
HistoryService historyService = processEngineConfiguration.getHistoryService();
|
||||
|
||||
NativeHistoricVariableInstanceQuery query = historyService.createNativeHistoricVariableInstanceQuery()
|
||||
.sql(buildQuerySql(commandContext))
|
||||
.parameter("processInstanceId", processInstanceId);
|
||||
if (!CollectionUtils.isEmpty(variableNames)) {
|
||||
for (int i = 0; i < variableNames.size(); i++) {
|
||||
query.parameter("variableName" + i, variableNames.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
return query.list();
|
||||
}
|
||||
|
||||
private String buildQuerySql(CommandContext commandContext) {
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
|
||||
ManagementService managementService = processEngineConfiguration.getManagementService();
|
||||
StringBuilder baseQuerySql = new StringBuilder("SELECT * ")
|
||||
.append(" FROM ")
|
||||
.append(managementService.getTableName(HistoricVariableInstance.class))
|
||||
.append(" WHERE 1 = 1")
|
||||
.append(" AND PROC_INST_ID_ = #{processInstanceId}");
|
||||
|
||||
if (!CollectionUtils.isEmpty(variableNames)) {
|
||||
baseQuerySql.append(" AND (");
|
||||
for (int i = 0; i < variableNames.size(); i++) {
|
||||
if (i != 0) {
|
||||
baseQuerySql.append(" OR ");
|
||||
}
|
||||
baseQuerySql.append(" NAME_ = #{variableName").append(i).append("}");
|
||||
}
|
||||
}
|
||||
return baseQuerySql.append(")").toString();
|
||||
}
|
||||
}
|
||||
@ -7,6 +7,7 @@ import org.flowable.engine.history.HistoricProcessInstance;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 专用与对接 ES 的流程实例相关操作
|
||||
@ -22,6 +23,8 @@ public interface BpmnProcessInstanceForEsService {
|
||||
|
||||
BpmnModel queryBpmnModel(String processDefinitionId);
|
||||
|
||||
Map<String, Object> queryInstanceVariables(String processInstanceId, List<String> variableNames);
|
||||
|
||||
Long queryHistoricProcessInstanceByUnfinishedAndAlterEndTimeTotalCount(Date endTime);
|
||||
|
||||
/**
|
||||
|
||||
@ -1,14 +1,19 @@
|
||||
package cn.axzo.workflow.core.service.impl;
|
||||
|
||||
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
|
||||
import cn.axzo.workflow.core.engine.cmd.CustomGetHistoricVariablesCmd;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.RepositoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.history.HistoricProcessInstanceQuery;
|
||||
import org.flowable.engine.impl.HistoryServiceImpl;
|
||||
import org.flowable.spring.SpringProcessEngineConfiguration;
|
||||
import org.flowable.variable.api.history.HistoricVariableInstance;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
@ -17,7 +22,9 @@ import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 专用与对接 ES 的流程实例相关操作
|
||||
@ -34,6 +41,9 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
@Resource
|
||||
@Lazy
|
||||
private RepositoryService repositoryService;
|
||||
@Resource
|
||||
@Lazy
|
||||
private SpringProcessEngineConfiguration processEngineConfiguration;
|
||||
|
||||
@Override
|
||||
public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
@ -49,9 +59,7 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
return Collections.emptyList();
|
||||
}
|
||||
HistoricProcessInstanceQuery query = buildHistoricProcessInstanceQuery(search)
|
||||
.includeProcessVariables()
|
||||
.orderByProcessInstanceId()
|
||||
.asc();
|
||||
.orderByProcessInstanceId().asc();
|
||||
if (Objects.nonNull(page)) {
|
||||
int firstResult = Math.toIntExact((page.getCurrent() - 1) * page.getSize());
|
||||
int maxResult = Math.toIntExact(page.getSize());
|
||||
@ -65,6 +73,13 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
return repositoryService.getBpmnModel(processDefinitionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> queryInstanceVariables(String processInstanceId, List<String> variableNames) {
|
||||
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
|
||||
List<HistoricVariableInstance> variables = commandExecutor.execute(new CustomGetHistoricVariablesCmd(processInstanceId, variableNames));
|
||||
return variables.stream().collect(Collectors.toMap(HistoricVariableInstance::getVariableName, HistoricVariableInstance::getValue, (s, t) -> s));
|
||||
}
|
||||
|
||||
private HistoricProcessInstanceQuery buildHistoricProcessInstanceQuery(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
HistoricProcessInstanceQuery historicProcessInstanceQuery = historyService.createHistoricProcessInstanceQuery();
|
||||
if (StringUtils.hasText(search.getProcessInstanceId())) {
|
||||
@ -113,6 +128,9 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (search.getHasVariables()) {
|
||||
historicProcessInstanceQuery.includeProcessVariables();
|
||||
}
|
||||
return historicProcessInstanceQuery;
|
||||
}
|
||||
|
||||
|
||||
@ -91,6 +91,12 @@ public class ProcessInstanceDocument {
|
||||
@IndexField(fieldType = FieldType.KEYWORD)
|
||||
private String businessStatus;
|
||||
|
||||
/**
|
||||
* 发起人姓名
|
||||
*/
|
||||
@IndexField(fieldType = FieldType.KEYWORD)
|
||||
private String initiatorName;
|
||||
|
||||
/**
|
||||
* 实例对应的流程引擎服务端迭代版本
|
||||
*/
|
||||
|
||||
@ -2,6 +2,7 @@ package cn.axzo.workflow.es.service.aggregation;
|
||||
|
||||
import cn.axzo.basics.common.BeanMapper;
|
||||
import cn.axzo.workflow.common.enums.WorkspaceType;
|
||||
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
|
||||
import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO;
|
||||
import cn.axzo.workflow.common.model.response.BpmPageResult;
|
||||
import cn.axzo.workflow.common.model.response.es.ProcessInstanceDocumentVO;
|
||||
@ -10,12 +11,14 @@ import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import cn.axzo.workflow.es.model.ProcessInstanceDocument;
|
||||
import cn.axzo.workflow.es.model.ProcessTaskDocument;
|
||||
import cn.axzo.workflow.es.service.EsProcessInstanceService;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -28,8 +31,10 @@ import java.util.Objects;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_INITIATOR;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_INTERNAL_INITIATOR;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
|
||||
import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.PROCESSING;
|
||||
|
||||
@ -66,7 +71,15 @@ public class AggregateProcessInstanceService {
|
||||
processInstanceDocument.setBusinessStatus(hpi.getBusinessStatus());
|
||||
|
||||
Map<String, Object> variables = hpi.getProcessVariables();
|
||||
if (CollectionUtils.isEmpty(variables)) {
|
||||
variables.putAll(bpmnProcessInstanceForEsService.queryInstanceVariables(hpi.getId(),
|
||||
Lists.newArrayList(WORKFLOW_ENGINE_VERSION, INTERNAL_PROCESS_WORKSPACE_TYPE, INTERNAL_PROCESS_AGENT, INTERNAL_INITIATOR, OLD_INTERNAL_INITIATOR)));
|
||||
}
|
||||
|
||||
BpmnTaskDelegateAssigner initiator = BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_INITIATOR, variables.getOrDefault(OLD_INTERNAL_INITIATOR, null)));
|
||||
if (Objects.nonNull(initiator)) {
|
||||
processInstanceDocument.setInitiatorName(initiator.getAssignerName());
|
||||
}
|
||||
processInstanceDocument.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
|
||||
processInstanceDocument.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
|
||||
processInstanceDocument.setAgent((Boolean) variables.getOrDefault(INTERNAL_PROCESS_AGENT, false));
|
||||
|
||||
@ -114,16 +114,14 @@ public class EsProcessInstanceServiceImpl implements EsProcessInstanceService {
|
||||
LambdaEsQueryWrapper<ProcessInstanceDocument> wrapper = new LambdaEsQueryWrapper<>();
|
||||
wrapper.hasChild("process_task_document",
|
||||
w -> w.matchPhrase(StringUtils.hasText(dto.getAssigneeName()), FieldUtils.val(ProcessTaskDocument::getAssigneeName), dto.getAssigneeName())
|
||||
// w -> w.like(StringUtils.hasText(dto.getAssigneeName()), FieldUtils.val(ProcessTaskDocument::getAssigneeName), dto.getAssigneeName())
|
||||
.eq(StringUtils.hasText(dto.getTenantId()), FieldUtils.val(ProcessTaskDocument::getAssigneeTenantId), dto.getTenantId())
|
||||
.eq(StringUtils.hasText(dto.getOuId()), FieldUtils.val(ProcessTaskDocument::getAssigneeOuId), dto.getOuId())
|
||||
.eq(StringUtils.hasText(dto.getPersonId()), FieldUtils.val(ProcessTaskDocument::getAssigneePersonId), dto.getPersonId())
|
||||
)
|
||||
.eq(StringUtils.hasText(dto.getBusinessStatus()), FieldUtils.val(ProcessInstanceDocument::getBusinessStatus), dto.getBusinessStatus())
|
||||
// .matchPhrase(StringUtils.hasText(dto.getProcessInstanceName()), FieldUtils.val(ProcessInstanceDocument::getProcessInstanceName), dto.getProcessInstanceName())
|
||||
.like(StringUtils.hasText(dto.getProcessInstanceName()), FieldUtils.val(ProcessInstanceDocument::getProcessInstanceName), dto.getProcessInstanceName())
|
||||
.matchPhrase(StringUtils.hasText(dto.getProcessInstanceName()), FieldUtils.val(ProcessInstanceDocument::getProcessInstanceName), dto.getProcessInstanceName())
|
||||
.in(CollectionUtils.isEmpty(dto.getProcessInstanceIds()), FieldUtils.val(ProcessInstanceDocument::getInstanceId), dto.getProcessInstanceIds())
|
||||
// .routing(ES_FIXED_ROUTING)
|
||||
.routing(ES_FIXED_ROUTING)
|
||||
.orderByDesc(FieldUtils.val(ProcessInstanceDocument::getInstanceStartTime))
|
||||
;
|
||||
EsPageInfo<ProcessInstanceDocument> pageInfo;
|
||||
|
||||
@ -15,6 +15,8 @@ import org.springframework.util.StringUtils;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
|
||||
|
||||
/**
|
||||
* 操作流程任务的 ES Service 实现
|
||||
*
|
||||
@ -60,9 +62,9 @@ public class EsProcessTaskServiceImpl implements EsProcessTaskService {
|
||||
wrapper.eq(StringUtils.hasText(dto.getAssigneeOuId()), ProcessTaskDocument::getAssigneeOuId, dto.getAssigneeOuId())
|
||||
.eq(StringUtils.hasText(dto.getAssigneeTenantId()), ProcessTaskDocument::getAssigneeTenantId, dto.getAssigneeTenantId())
|
||||
.eq(StringUtils.hasText(dto.getAssigneePersonId()), ProcessTaskDocument::getAssigneePersonId, dto.getAssigneePersonId())
|
||||
.eq(StringUtils.hasText(dto.getAssigneeName()), ProcessTaskDocument::getAssigneeName, dto.getAssigneeName())
|
||||
.matchPhrase(StringUtils.hasText(dto.getAssigneeName()), ProcessTaskDocument::getAssigneeName, dto.getAssigneeName())
|
||||
.parentId(StringUtils.hasText(dto.getProcessInstanceId()), dto.getProcessInstanceId(), "process_instance_document")
|
||||
// .hasParent(StringUtils.hasText(dto.getProcessInstanceId()), "workflow_engine_test_process_instance_document", w -> w.eq(FieldUtils.val(ProcessInstanceDocument::getId), dto.getProcessInstanceId()))
|
||||
.routing(ES_FIXED_ROUTING)
|
||||
;
|
||||
return esProcessTaskMapper.selectList(wrapper);
|
||||
}
|
||||
|
||||
@ -41,7 +41,6 @@ public class SyncEsTaskEntityEventHandle implements EntityEventHandle<TaskEntity
|
||||
HistoryService historyService = processEngineConfiguration.getHistoryService();
|
||||
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
|
||||
.processInstanceId(taskEntity.getProcessInstanceId())
|
||||
.includeProcessVariables()
|
||||
.singleResult();
|
||||
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance);
|
||||
}
|
||||
|
||||
@ -67,7 +67,9 @@ public class ErrorReportAspect implements Ordered {
|
||||
*/
|
||||
@Around(value = "@within(org.springframework.web.bind.annotation.RestController) || @within(org.springframework.stereotype.Service)")
|
||||
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
|
||||
log.info("{}.{}", joinPoint.getSignature().getDeclaringTypeName(), joinPoint.getSignature().getName());
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("{}.{}", joinPoint.getSignature().getDeclaringTypeName(), joinPoint.getSignature().getName());
|
||||
}
|
||||
Signature signature = joinPoint.getSignature();
|
||||
StopWatch watch = new StopWatch("running controller time");
|
||||
watch.start(signature.toShortString());
|
||||
@ -76,22 +78,26 @@ public class ErrorReportAspect implements Ordered {
|
||||
result = joinPoint.proceed();
|
||||
} finally {
|
||||
watch.stop();
|
||||
log.info("StopWatch '{}': running time = {} 's", watch.getLastTaskName(), watch.getTotalTimeSeconds());
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("StopWatch '{}': running time = {} 's", watch.getLastTaskName(), watch.getTotalTimeSeconds());
|
||||
}
|
||||
|
||||
if (!methodNames.contains(watch.getLastTaskName())) {
|
||||
log.info("StopWatch '{}': running time = {} 's", watch.getLastTaskName(), watch.getTotalTimeSeconds());
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("StopWatch '{}': running time = {} 's", watch.getLastTaskName(), watch.getTotalTimeSeconds());
|
||||
}
|
||||
}
|
||||
|
||||
if (!signature.toShortString().contains("ExtAxApiLogServiceImpl")) {
|
||||
String type = getType(joinPoint);
|
||||
ApiLogEvent event = new ApiLogEvent(MDC.get(CTX_LOG_ID_MDC),
|
||||
signature.toShortString(),
|
||||
Objects.isNull(getOriginRequest()) ? "" : getOriginRequest().getHeader(HEADER_SERVER_NAME),
|
||||
Objects.isNull(getOriginRequest()) ? "" : getOriginRequest().getHeader(HEADER_API_VERSION),
|
||||
Objects.equals(type, "Controller") ? joinPoint.getArgs() : null,
|
||||
Objects.equals(type, "Controller") ? result : null,
|
||||
watch.getTotalTimeSeconds(),
|
||||
type);
|
||||
signature.toShortString(),
|
||||
Objects.isNull(getOriginRequest()) ? "" : getOriginRequest().getHeader(HEADER_SERVER_NAME),
|
||||
Objects.isNull(getOriginRequest()) ? "" : getOriginRequest().getHeader(HEADER_API_VERSION),
|
||||
Objects.equals(type, "Controller") ? joinPoint.getArgs() : null,
|
||||
Objects.equals(type, "Controller") ? result : null,
|
||||
watch.getTotalTimeSeconds(),
|
||||
type);
|
||||
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
}
|
||||
@ -140,13 +146,13 @@ public class ErrorReportAspect implements Ordered {
|
||||
|
||||
log.warn("request header server name: {}", getHeader());
|
||||
envConfig.type().executeAction(profile,
|
||||
operation.summary(),
|
||||
filterSendDingTalk,
|
||||
joinPoint.getArgs(),
|
||||
joinPoint.getSignature().toShortString(),
|
||||
getHeader(),
|
||||
e,
|
||||
workflowProperties.getFilterOperations().contains(operation.summary()));
|
||||
operation.summary(),
|
||||
filterSendDingTalk,
|
||||
joinPoint.getArgs(),
|
||||
joinPoint.getSignature().toShortString(),
|
||||
getHeader(),
|
||||
e,
|
||||
workflowProperties.getFilterOperations().contains(operation.summary()));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import cn.axzo.workflow.admin.repository.entity.ExtAxProcessAdmin;
|
||||
import cn.axzo.workflow.admin.service.ExtAxProcessAdminService;
|
||||
import cn.axzo.workflow.client.feign.manage.ProcessAdminApi;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminCreateDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminDeleteDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminQueryDTO;
|
||||
import cn.axzo.workflow.common.model.response.admin.ProcessAdminVo;
|
||||
import cn.axzo.workflow.server.common.annotation.ErrorReporter;
|
||||
@ -59,6 +60,7 @@ public class ProcessAdminController implements ProcessAdminApi {
|
||||
}
|
||||
|
||||
@Override
|
||||
@PostMapping("/query/count")
|
||||
public CommonResponse<Integer> queryProcessAdminsCount(ProcessAdminQueryDTO dto) {
|
||||
return CommonResponse.success(extAxProcessAdminService.queryCount(dto));
|
||||
}
|
||||
@ -99,18 +101,22 @@ public class ProcessAdminController implements ProcessAdminApi {
|
||||
|
||||
@DeleteMapping("/delete")
|
||||
@Override
|
||||
public CommonResponse<Void> deleteCommonProcessAdmin(@RequestParam Long id) {
|
||||
extAxProcessAdminService.deleteCommonAdminsByIds(Collections.singletonList(id));
|
||||
return CommonResponse.success();
|
||||
public CommonResponse<Integer> deleteCommonProcessAdmin(@RequestParam Long id) {
|
||||
return CommonResponse.success(extAxProcessAdminService.deleteCommonAdminsByIds(Collections.singletonList(id)));
|
||||
}
|
||||
|
||||
@DeleteMapping("/delete/criteria")
|
||||
@Override
|
||||
public CommonResponse<Integer> deleteProcessAdminCriteria(@RequestBody ProcessAdminDeleteDTO dto) {
|
||||
return CommonResponse.success(extAxProcessAdminService.delete(dto));
|
||||
}
|
||||
|
||||
@DeleteMapping("/batch/delete")
|
||||
@Override
|
||||
public CommonResponse<Void> batchDeleteProcessAdmin(@RequestBody List<Long> ids) {
|
||||
public CommonResponse<Integer> batchDeleteProcessAdmin(@RequestBody List<Long> ids) {
|
||||
if (CollectionUtils.isEmpty(ids)) {
|
||||
return CommonResponse.success();
|
||||
}
|
||||
extAxProcessAdminService.deleteCommonAdminsByIds(ids);
|
||||
return CommonResponse.success();
|
||||
return CommonResponse.success(extAxProcessAdminService.deleteCommonAdminsByIds(ids));
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import com.xxl.job.core.util.ShardingUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
@ -24,6 +25,7 @@ import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
@ -48,6 +50,13 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
|
||||
@XxlJob("completedProcessInstanceSyncToEs")
|
||||
public ReturnT<String> execute(String s) {
|
||||
// 分片参数
|
||||
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
|
||||
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
|
||||
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
}
|
||||
|
||||
log.debug("start exec finished process instance data sync... ");
|
||||
XxlJobLogger.log("start exec finished process instance data sync... ");
|
||||
|
||||
@ -72,7 +81,7 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
}
|
||||
|
||||
// 开始同步完成的实例到 ES
|
||||
DataSyncSummaryDTO summary = doSync(search);
|
||||
DataSyncSummaryDTO summary = doSync(search, shardingVO.getTotal() > 1 ? shardingVO : null);
|
||||
|
||||
// 执行完同步后的一些额外操作
|
||||
afterSync(endTime);
|
||||
@ -97,9 +106,10 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
* 内部自动分页循环同步
|
||||
*
|
||||
* @param search
|
||||
* @param shardingVO 分片信息, 可能为 null
|
||||
* @return 应同步至 ES 的统计数据
|
||||
*/
|
||||
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search, ShardingUtil.ShardingVO shardingVO) {
|
||||
Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
|
||||
log.info("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
|
||||
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalProcessInstanceCount);
|
||||
@ -112,7 +122,18 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
skipRows, skipRows + search.getOverPageSize());
|
||||
int pageNo = skipRows / search.getOverPageSize() + 1;
|
||||
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page<HistoricProcessInstance>(pageNo, search.getOverPageSize()));
|
||||
instances.forEach(hpi -> {
|
||||
|
||||
instances.parallelStream().filter(hpi -> {
|
||||
if (Objects.nonNull(shardingVO)) {
|
||||
if (hpi.getId().contains("-")) {
|
||||
// 最开始的实例编号是 UUID,无法取模,所以统一交给第一个节点处理
|
||||
return shardingVO.getIndex() == 0;
|
||||
}
|
||||
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
|
||||
}
|
||||
// 如果配置的非分片广播的方式,那么 shardingVO 将是 null
|
||||
return true;
|
||||
}).forEach(hpi -> {
|
||||
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi);
|
||||
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
|
||||
});
|
||||
|
||||
@ -10,6 +10,7 @@ import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import com.xxl.job.core.util.ShardingUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||
@ -42,20 +43,27 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
|
||||
|
||||
@XxlJob("otherProcessInstanceSyncToEs")
|
||||
public ReturnT<String> execute(String s) {
|
||||
public ReturnT<String> execute(String param) {
|
||||
// 分片参数
|
||||
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
|
||||
if (Objects.nonNull(shardingVO) && shardingVO.getTotal() > 1) {
|
||||
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||
}
|
||||
|
||||
log.debug("start exec other process instance data sync... ");
|
||||
XxlJobLogger.log("start exec other process instance data sync... ");
|
||||
|
||||
// 同步前的一些额外动作
|
||||
Date endTime = beforeSync();
|
||||
if(Objects.isNull(endTime)){
|
||||
if (Objects.isNull(endTime)) {
|
||||
log.info("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
|
||||
XxlJobLogger.log("请先执行“流程实例数据同步 ES 第一步”, Bean Name:completedProcessInstanceSyncToEs。 待其执行完成后再执行该任务!");
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
|
||||
// 开始同步流程数据到 ES
|
||||
DataSyncSummaryDTO summary = doSync(endTime, s);
|
||||
DataSyncSummaryDTO summary = doSync(endTime, param, shardingVO.getTotal() > 1 ? shardingVO : null);
|
||||
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount());
|
||||
@ -73,7 +81,7 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
* @param endTime
|
||||
* @param pageSizeStr
|
||||
*/
|
||||
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr) {
|
||||
private DataSyncSummaryDTO doSync(Date endTime, String pageSizeStr, ShardingUtil.ShardingVO shardingVO) {
|
||||
Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTimeTotalCount(endTime);
|
||||
log.info("查询到待同步实例维度数据量: {} 条", totalCount);
|
||||
XxlJobLogger.log("查询到待同步实例维度数据量: {} 条", totalCount);
|
||||
@ -94,7 +102,18 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
|
||||
skipRows, skipRows + pageSize.get());
|
||||
int pageNo = skipRows / pageSize.get() + 1;
|
||||
List<HistoricProcessInstance> instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceByUnfinishedAndAlterEndTime(endTime, new Page<HistoricProcessInstance>(pageNo, pageSize.get()));
|
||||
instances.forEach(hpi -> {
|
||||
|
||||
instances.parallelStream().filter(hpi -> {
|
||||
if (Objects.nonNull(shardingVO)) {
|
||||
if (hpi.getId().contains("-")) {
|
||||
// 最开始的实例编号是 UUID,无法取模,所以统一交给第一个节点处理
|
||||
return shardingVO.getIndex() == 0;
|
||||
}
|
||||
return Long.parseLong(hpi.getId().substring(12)) % shardingVO.getTotal() == shardingVO.getIndex();
|
||||
}
|
||||
// 如果配置的非分片广播的方式,那么 shardingVO 将是 null
|
||||
return true;
|
||||
}).forEach(hpi -> {
|
||||
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi);
|
||||
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
|
||||
});
|
||||
|
||||
@ -83,6 +83,7 @@ import cn.axzo.workflow.common.model.response.bpmn.model.BpmnModelDetailVO;
|
||||
import cn.axzo.workflow.common.model.response.bpmn.model.BpmnModelExtVO;
|
||||
import cn.axzo.workflow.common.model.request.bpmn.RestBpmnProcessVariable;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminCreateDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminDeleteDTO;
|
||||
import cn.axzo.workflow.common.model.request.admin.ProcessAdminQueryDTO;
|
||||
import cn.axzo.workflow.common.model.response.admin.ProcessAdminVo;
|
||||
|
||||
@ -797,7 +798,16 @@ public interface WorkflowManageService {
|
||||
*/
|
||||
@DeleteMapping("/api/process/admin/delete")
|
||||
@InvokeMode(SYNC)
|
||||
Void deleteCommonProcessAdmin(@RequestParam Long id);
|
||||
Integer deleteCommonProcessAdmin(@RequestParam Long id);
|
||||
|
||||
/**
|
||||
* 根据条件删除管理员
|
||||
* @param dto 删除条件
|
||||
* @return
|
||||
*/
|
||||
@DeleteMapping("/api/process/admin/delete/criteria")
|
||||
@InvokeMode(SYNC)
|
||||
Integer deleteProcessAdminCriteria(@RequestParam ProcessAdminDeleteDTO dto);
|
||||
|
||||
/**
|
||||
* 删除管理员
|
||||
@ -806,7 +816,7 @@ public interface WorkflowManageService {
|
||||
*/
|
||||
@DeleteMapping("/api/process/admin/batch/delete")
|
||||
@InvokeMode(SYNC)
|
||||
Void batchDeleteProcessAdmin(@RequestBody List<Long> ids);
|
||||
Integer batchDeleteProcessAdmin(@RequestBody List<Long> ids);
|
||||
|
||||
/**
|
||||
* 强制使用‘同步’模式调用该方法,请在调用真实方法前调用该方法
|
||||
|
||||
Loading…
Reference in New Issue
Block a user