Merge remote-tracking branch 'origin/master' into feature/form_test

# Conflicts:
#	workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/util/ShellUtil.java
This commit is contained in:
wangli 2024-10-31 09:59:49 +08:00
commit fc21effdbe
31 changed files with 420 additions and 67 deletions

View File

@ -27,7 +27,7 @@
<arthas.version>3.7.1</arthas.version>
<apache-maven.version>3.2.5</apache-maven.version>
<javaparse.version>3.26.0</javaparse.version>
<elasticsearch.version>7.14.0</elasticsearch.version>
<elasticsearch.version>7.10.2</elasticsearch.version>
<easy-es.version>2.0.0</easy-es.version>
</properties>
@ -150,6 +150,11 @@
<artifactId>easy-es-annotation</artifactId>
<version>${easy-es.version}</version>
</dependency>
<dependency>
<groupId>cn.axzo</groupId>
<artifactId>riven-api</artifactId>
<version>${axzo-dependencies.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -62,9 +62,12 @@ public class WorkflowEngineClientAutoConfiguration {
log.error("get version error: {}", e.getMessage(), e);
}
String serviceVersion = Objects.isNull(version) ? "1.2.0-SNAPSHOT" : version;
String serviceVersion = Objects.isNull(version) ? "1.2.0" : version;
log.info("client current version: {}", serviceVersion);
return serviceVersion;
return serviceVersion
.replaceAll("-SNAPSHOT", "")
.replaceAll("-RELEASE", "")
.trim();
}
private String getVersionFromPod(URL location) throws URISyntaxException {

View File

@ -102,6 +102,8 @@ public class ExtAxProcessAdminServiceImpl implements ExtAxProcessAdminService {
.eq(Objects.nonNull(deleteDTO.getWorkspaceId()), ExtAxProcessAdmin::getWorkspaceId, deleteDTO.getWorkspaceId())
.eq(deleteDTO.getAdminType() != null, ExtAxProcessAdmin::getAdminType, deleteDTO.getAdminType())
.eq(Objects.nonNull(deleteDTO.getOrganizationalUnitId()), ExtAxProcessAdmin::getOrganizationalUnitId, deleteDTO.getOrganizationalUnitId())
.in(!CollectionUtils.isEmpty(deleteDTO.getWorkspaceIds()), ExtAxProcessAdmin::getWorkspaceId, deleteDTO.getWorkspaceIds())
.in(!CollectionUtils.isEmpty(deleteDTO.getOrganizationalUnitIds()), ExtAxProcessAdmin::getOrganizationalUnitId, deleteDTO.getOrganizationalUnitIds())
.in(!CollectionUtils.isEmpty(deleteDTO.getPersonIds()), ExtAxProcessAdmin::getPersonId, deleteDTO.getPersonIds())
.eq(Objects.nonNull(deleteDTO.getDataSource()), ExtAxProcessAdmin::getDataSource, deleteDTO.getDataSource())
.eq(ExtAxProcessAdmin::getIsDelete, TableIsDeleteEnum.NORMAL.value);

View File

@ -36,12 +36,24 @@ public class ProcessAdminDeleteDTO {
@ApiModelProperty(value = "工作台id")
private Long workspaceId;
/**
* 工作台id列表
*/
@ApiModelProperty(value = "工作台id列表")
private List<Long> workspaceIds;
/**
* 单位id
*/
@ApiModelProperty(value = "单位id")
private Long organizationalUnitId;
/**
* 单位id列表
*/
@ApiModelProperty(value = "单位id列表")
private List<Long> organizationalUnitIds;
/**
* 数据来源, SYSTEM_ENTRY-系统录入, USER_ENTRY-用户手动录入
*/

View File

@ -20,6 +20,11 @@ public class ProcessTaskDocumentVO {
*/
private String taskId;
/**
* 所属实例 ID
*/
private String processInstanceId;
/**
* 任务定义 KEY,对应节点 ID
*/

View File

@ -72,6 +72,6 @@ public class SupportRefreshProperties {
/**
* 用于控制转交管理员的 API
*/
@Value("${workflow.useNewToAdminApi:true}")
@Value("${workflow.useNewToAdminApi:false}")
private Boolean useNewToAdminApi;
}

View File

@ -5,6 +5,7 @@ import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.impl.identity.Authentication;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.RuntimeService;
@ -114,7 +115,7 @@ public class CustomTransferUserTaskCmd extends AbstractCommand<Void> implements
task.setScopeType("TRANSFER");
Authentication.setAuthenticatedUserId(originTaskAssignee.buildAssigneeId());
addComment(commandContext, task, COMMENT_TYPE_OPERATION_DESC, "转交给" + targetTaskAssignee.getAssignerName() + additionalOpeDesc);
addComment(commandContext, task, COMMENT_TYPE_OPERATION_DESC, "转交给" + targetTaskAssignee.getAssignerName() + (StringUtils.isNotBlank(additionalOpeDesc) ? additionalOpeDesc : ""));
addComment(commandContext, task, COMMENT_TYPE_ADVICE, advice);
Authentication.setAuthenticatedUserId(null);

View File

@ -100,7 +100,9 @@ public class AggregateProcessInstanceService {
// 实例纬度数据同步 ES
esProcessInstanceService.insert(ES_FIXED_ROUTING, processInstanceDocument);
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121));
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121))
.replaceAll("-SNAPSHOT","")
.replaceAll("-RELEASE", "");
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>();

View File

@ -119,13 +119,18 @@ public class EsProcessInstanceServiceImpl implements EsProcessInstanceService {
.eq(StringUtils.hasText(dto.getPersonId()), FieldUtils.val(ProcessTaskDocument::getAssigneePersonId), dto.getPersonId())
)
.eq(StringUtils.hasText(dto.getBusinessStatus()), FieldUtils.val(ProcessInstanceDocument::getBusinessStatus), dto.getBusinessStatus())
.match(StringUtils.hasText(dto.getProcessInstanceName()), FieldUtils.val(ProcessInstanceDocument::getProcessInstanceName), dto.getProcessInstanceName())
.and(StringUtils.hasText(dto.getProcessInstanceName()),
w -> w.or(i -> i.eq(FieldUtils.val(ProcessInstanceDocument::getProcessInstanceName), dto.getProcessInstanceName(), 1.2F))
.or(j->j.match(FieldUtils.val(ProcessInstanceDocument::getProcessInstanceName), dto.getProcessInstanceName()))
)
.in(CollectionUtils.isEmpty(dto.getProcessInstanceIds()), FieldUtils.val(ProcessInstanceDocument::getId), dto.getProcessInstanceIds())
.ge(Objects.nonNull(dto.getBeginStartTime()), ProcessInstanceDocument::getInstanceStartTime, dto.getBeginStartTime())
.le(Objects.nonNull(dto.getOverStartTime()), ProcessInstanceDocument::getInstanceStartTime, dto.getOverStartTime())
.ge(Objects.nonNull(dto.getBeginEndTime()), ProcessInstanceDocument::getInstanceEndTime, dto.getBeginEndTime())
.le(Objects.nonNull(dto.getOverEndTime()), ProcessInstanceDocument::getInstanceEndTime, dto.getOverEndTime())
.sortByScore()
.orderByDesc(FieldUtils.val(ProcessInstanceDocument::getInstanceStartTime))
.trackScores()
.routing(ES_FIXED_ROUTING)
;
EsPageInfo<ProcessInstanceDocument> pageInfo;

View File

@ -138,6 +138,10 @@
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo</groupId>
<artifactId>riven-api</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -135,7 +135,9 @@ public class ErrorReportAspect implements Ordered {
*/
@AfterThrowing(pointcut = "@within(errorReporter) && @annotation(operation)", throwing = "e", argNames = "joinPoint,errorReporter,operation,e")
public void doAfterThrowing(JoinPoint joinPoint, ErrorReporter errorReporter, Operation operation, Exception e) {
log.info("ErrorReportAspect 记录异常信息: {}", e.getMessage());
if(log.isDebugEnabled()) {
log.debug("ErrorReportAspect 记录异常信息: {}", e.getMessage());
}
EnvConfig[] envConfigs = errorReporter.envConfig();
for (EnvConfig envConfig : envConfigs) {
if (Arrays.asList(envConfig.profiles()).contains(profile)) {

View File

@ -13,7 +13,7 @@ import java.util.Date;
import java.util.Objects;
/**
* TODO
* 一个一次性 JVM 外部进程执行并返回结果的 Shell 工具
*
* @author wangli
* @since 2024-10-22 15:05
@ -55,7 +55,7 @@ public class ShellUtil {
* @return
*/
public static String grepLog(String profile, String keyword, Date date, String grepKeyword) {
return executeCmd(getLogPath(profile) + " && zgrep '" + keyword + "' workflowEngine.log" +
return executeCmd("cd " + getLogPath(profile) + " && zgrep '" + keyword + "' workflowEngine.log" +
(Objects.nonNull(date) ? "." + DateUtil.formatDate(date) + ".*" : "") +
(StringUtils.hasText(grepKeyword) ? " | grep '" + grepKeyword + "'": "")
);

View File

@ -0,0 +1,84 @@
package cn.axzo.workflow.server.controller.listener.process;
import cn.axzo.workflow.core.common.context.ProcessOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnProcessEventListener;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
import cn.hutool.core.lang.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.engine.delegate.event.FlowableCancelledEvent;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
/**
* 流程实例结束后同步最新数据至 ES
*
* @author wangli
* @since 2024-10-18 11:09
*/
@Slf4j
@Component
@Scope("prototype")
@AllArgsConstructor
public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
@Override
public int getOrder() {
return Integer.MIN_VALUE + 2;
}
/**
* 流程实例被撤回后回调
*
* @param event
*/
@Override
public void onCancelled(FlowableCancelledEvent event) {
syncToEs(event.getProcessInstanceId());
}
/**
* 流程实例被驳回后回调
*
* @param event
*/
@Override
public void onRejected(FlowableCancelledEvent event) {
syncToEs(event.getProcessInstanceId());
}
/**
* 流程实例被中止后回调
*
* @param event
*/
@Override
public void onAborted(FlowableCancelledEvent event) {
syncToEs(event.getProcessInstanceId());
}
/**
* 流程实例运行完成后回调
* <p>
* 注意: 完成只是说明流程实例已停止运行
*
* @param event
*/
@Override
public void onCompleted(FlowableEngineEntityEvent event) {
syncToEs(event.getProcessInstanceId());
}
private void syncToEs(String processInstanceId) {
String uuid = UUID.fastUUID().toString();
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, processInstanceId, null, uuid));
}
}

View File

@ -3,26 +3,18 @@ package cn.axzo.workflow.server.controller.listener.task;
import cn.axzo.workflow.core.common.context.TaskOperationContext;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnTaskEventListener;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.es.service.impl.EsProcessTaskServiceImpl;
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
import cn.hutool.core.lang.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.flowable.common.engine.impl.cfg.TransactionState;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 同步任务相关数据至 ES
*
@ -48,18 +40,8 @@ public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<Ta
*/
@Override
public void onAssigned(DelegateTask delegateTask) {
log.info("SyncEsTaskEntityEventHandle onInitialized processInstanceId:{}, taskId: {}", delegateTask.getProcessInstanceId(), delegateTask.getId());
// 删除指定实例的父子文档
aggregateProcessInstanceService.deleteDocumentParentAndChild(delegateTask.getProcessInstanceId());
log.info("delete document processInstanceId: {}", delegateTask.getProcessInstanceId());
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(delegateTask.getProcessInstanceId())
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("reInsert document processInstanceId: {}", delegateTask.getProcessInstanceId());
String uuid = UUID.fastUUID().toString();
Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
new OnTxCommittedSyncToEsListener(aggregateProcessInstanceService, delegateTask.getProcessInstanceId(), delegateTask.getId(), uuid));
}
}

View File

@ -0,0 +1,47 @@
package cn.axzo.workflow.server.controller.listener.tx;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionListener;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import java.util.List;
/**
* 引擎内的动作事务提交后执行同步 ES
*
* @author wangli
* @since 2024-10-18 16:34
*/
@Slf4j
@AllArgsConstructor
public class OnTxCommittedSyncToEsListener implements TransactionListener {
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final String processInstanceId;
private final String taskId;
private final String uuid;
@Override
public void execute(CommandContext commandContext) {
log.info("SyncEsTaskEntityEventHandle onInitialized uuid:{}, processInstanceId:{}, taskId: {}", uuid, processInstanceId, taskId);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 删除指定实例的父子文档
log.info("delete document processInstanceId: {}", processInstanceId);
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
log.info("reInsert document processInstanceId: {}", processInstanceId);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
}
}

View File

@ -1,22 +1,14 @@
package cn.axzo.workflow.server.controller.web;
import cn.axzo.karma.client.feign.FlowSupportApi;
import cn.axzo.karma.client.model.request.PersonProfileQueryReq;
import cn.axzo.karma.client.model.response.PersonProfileResp;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.server.common.util.RpcExternalUtil;
import cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssigneeSelector;
import com.google.common.collect.Lists;
import org.springframework.context.ApplicationContext;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssigneeSelector.populateNameAndAvatar;

View File

@ -1,8 +1,5 @@
package cn.axzo.workflow.server.controller.web.bpmn;
import cn.axzo.karma.client.feign.FlowSupportApi;
import cn.axzo.karma.client.model.request.PersonProfileQueryReq;
import cn.axzo.karma.client.model.response.PersonProfileResp;
import cn.axzo.workflow.client.feign.bpmn.ProcessTaskApi;
import cn.axzo.workflow.common.enums.AttachmentTypeEnum;
import cn.axzo.workflow.common.model.request.bpmn.task.AttachmentDTO;
@ -28,7 +25,6 @@ import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskTodoPageItemVO;
import cn.axzo.workflow.core.service.BpmnProcessTaskService;
import cn.axzo.workflow.server.common.annotation.ErrorReporter;
import cn.axzo.workflow.server.common.annotation.RepeatSubmit;
import cn.axzo.workflow.server.common.util.RpcExternalUtil;
import cn.axzo.workflow.server.controller.web.BasicPopulateAvatarController;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
@ -52,7 +48,6 @@ import javax.validation.constraints.NotEmpty;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.azxo.framework.common.model.CommonResponse.success;

View File

@ -0,0 +1,80 @@
package cn.axzo.workflow.server.outside.mq;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
/**
* 钉钉消息 Rocket 配置
*
* @author wangli
* @since 2024-10-25 13:39
*/
@Slf4j
@Configuration(proxyBeanMethods = false)
public class DingtalkRocketConfiguration {
@Value("${spring.profiles.active:dev}")
private String activeProfile;
private static final String DEFAULT_MODULE = "workflowEngine";
private static final String DEFAULT_EVENT = "topic_third_party_sync_event_";
private static final String MODULE_NAME_SUFFIX = "_dingtalk_message";
@Bean
public DingtalkSendProducer dingtalkSendProducer(RocketMQTemplate rocketMQTemplate) {
return new DingtalkSendProducer(rocketMQTemplate,
DEFAULT_MODULE,
DEFAULT_MODULE + MODULE_NAME_SUFFIX,
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(DEFAULT_EVENT + activeProfile)
.build())
.headers(new HashMap<>())
.syncSending(Boolean.TRUE)
.exceptionHandler(context -> {
log.error("MQ, send event error: {}, event: {}",
context.getThrowable().getCause().getMessage(),
context.getEvent().toPrettyJsonString(),
context.getThrowable());
})
.build(),
null
);
}
@Component
@ConditionalOnProperty(name = "rocketmq.name-server")
@RocketMQMessageListener(topic = "topic_third_party_sync_event_${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_riven_consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorType = SelectorType.TAG,
selectorExpression = "riven-dingtalk-receive",
maxReconsumeTimes = 0,
nameServer = "${rocketmq.name-server}"
)
public static class ReplyMessageRocketConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
}

View File

@ -0,0 +1,72 @@
package cn.axzo.workflow.server.outside.mq.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
//import cn.axzo.riven.client.common.enums.DingtalkEventEnum;
//import cn.axzo.riven.client.model.DingtalkReceiveMqModel;
//import cn.axzo.riven.client.model.DingtalkSendMqModel;
//import cn.axzo.riven.client.model.SampleText;
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 监听钉钉群消息的事件
*
* @author wangli
* @since 2024-10-25 11:16
*/
@Slf4j
//@Component
public class DingtalkReceiveListener implements EventHandler, InitializingBean {
@Value("${spring.application.name}")
private String applicationName;
@Resource
private EventConsumer eventConsumer;
@Resource
private DingtalkSendProducer dingtalkSendProducer;
@Override
public void onEvent(Event event, EventConsumer.Context context) {
}
@Override
public void afterPropertiesSet() throws Exception {
}
// @Override
// public void onEvent(Event event, EventConsumer.Context context) {
// log.info("receive dingding message: {}", event.getTargetId());
//
// if (!Objects.equals(applicationName, event.getTargetType())) {
// return;
// }
// DingtalkReceiveMqModel data = event.normalizedData(DingtalkReceiveMqModel.class);
// if (log.isDebugEnabled()) {
// log.debug("message data: {}", JSON.toJSONString(data));
// }
//
// DingtalkSendMqModel<SampleText> sendModel = new DingtalkSendMqModel<>();
// sendModel.setTraceId(data.getTraceId());
// sendModel.setConversationId(data.getConversationId());
// sendModel.setMsgId(data.getMsgId());
// sendModel.setRobotCode(data.getRobotCode());
// sendModel.setMessage(SampleText.from("由 WorkflowEngine 处理的消息: " + data.getContent()));
//
// dingtalkSendProducer.send(sendModel);
// }
// @Override
// public void afterPropertiesSet() throws Exception {
// eventConsumer.registerHandler(DingtalkEventEnum.receive.getEventCode(), this);
// }
}

View File

@ -0,0 +1,39 @@
package cn.axzo.workflow.server.outside.mq.producer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
//import cn.axzo.riven.client.common.enums.DingtalkEventEnum;
//import cn.axzo.riven.client.model.DingtalkSendMqModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.function.BiConsumer;
/**
* 回复钉钉消息给 Riven 的事件生产者
*
* @author wangli
* @since 2024-10-25 11:33
*/
@Slf4j
@Component
public class DingtalkSendProducer extends RocketMQEventProducer {
@Value("${spring.application.name}")
private String applicationName;
public DingtalkSendProducer(RocketMQTemplate rocketMQTemplate, String defaultModule, String appName, Context<RocketMQMessageMeta> defaultContext, BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) {
super(rocketMQTemplate, defaultModule, appName, defaultContext, sendCallback);
}
// public void send(DingtalkSendMqModel model) {
// send(Event.builder()
// .shardingKey(applicationName)
// .eventCode(DingtalkEventEnum.send.getEventCode())
// .targetId(model.getTraceId())
// .targetType(DingtalkEventEnum.send.getTag())
// .data(model)
// .build());
// }
}

View File

@ -36,9 +36,8 @@
<version>4.9.1</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<groupId>cn.axzo.workflow</groupId>
<artifactId>workflow-engine-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>jakarta.servlet</groupId>

View File

@ -110,7 +110,9 @@ public class StarterBroadcastMQConfiguration {
// 处理 properties 配置进行事件过滤
for (InnerMessageQueueHandleBeforeFilter filter : filters) {
if (filter.doFilter(message)) {
log.info("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
if (log.isDebugEnabled()) {
log.debug("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
}
return;
}
}

View File

@ -107,7 +107,9 @@ public class StarterRPCInvokeMQConfiguration {
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendBeforeCallback() {
return (event, context) -> {
event.setEventId(IdHelper.get32UUID());
log.info("mq_send_Before: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
if (log.isDebugEnabled()) {
log.debug("mq_send_Before: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
}
};
}
@ -121,7 +123,9 @@ public class StarterRPCInvokeMQConfiguration {
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendAfterCallback() {
return (event, context) -> {
String messageId = context.getHeaders().get(MQ_MESSAGE_ID);
log.info("mq_send_after: {}, uniqueId: {}, messageId: {}", event.getShardingKey(), event.getEventId(), messageId);
if (log.isDebugEnabled()) {
log.debug("mq_send_after: {}, uniqueId: {}, messageId: {}", event.getShardingKey(), event.getEventId(), messageId);
}
};
}
@ -134,7 +138,9 @@ public class StarterRPCInvokeMQConfiguration {
*/
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getTransactionRollbackHandler() {
return (event, context) -> {
log.info("mq_transaction_rollback: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
if (log.isDebugEnabled()) {
log.debug("mq_transaction_rollback: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
}
};
}
@ -156,7 +162,9 @@ public class StarterRPCInvokeMQConfiguration {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
Event event = eventWrapper.getEvent();
log.info("WorkflowEngineStarter RPC MQ, handled event: {}", event.toPrettyJsonString());
if (log.isDebugEnabled()) {
log.debug("WorkflowEngineStarter RPC MQ, handled event: {}", event.toPrettyJsonString());
}
}
};
return new DefaultEventConsumer(applicationName + MODULE_NAME_SUFFIX, workflowEngineStarterEventHandlerRepository, callback);
@ -185,7 +193,9 @@ public class StarterRPCInvokeMQConfiguration {
@Override
public void onMessage(MessageExt message) {
if (filter.doFilter(message)) {
log.info("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
if (log.isDebugEnabled()) {
log.debug("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());
}
return;
}
super.onEvent(message, workflowEngineStarterEventConsumer);

View File

@ -104,7 +104,9 @@ public class WorkflowEngineStarterAutoConfiguration {
private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties) {
BroadcastListenerProperties listenerRetry = starterProperties.getBroadcast();
FailHandleTypeEnum failHandleType = listenerRetry.getFailHandleType();
log.info("workflow engine starter fail handle type : {}", failHandleType);
if (log.isDebugEnabled()) {
log.debug("workflow engine starter fail handle type : {}", failHandleType);
}
switch (failHandleType) {
case FAIL_BACK:
// return new FailBackInterceptor();

View File

@ -807,7 +807,7 @@ public interface WorkflowManageService {
*/
@DeleteMapping("/api/process/admin/delete/criteria")
@InvokeMode(SYNC)
Integer deleteProcessAdminCriteria(@RequestParam ProcessAdminDeleteDTO dto);
Integer deleteProcessAdminCriteria(@RequestBody ProcessAdminDeleteDTO dto);
/**
* 删除管理员

View File

@ -244,8 +244,8 @@ public class ComplexInvokeClient implements Client {
// at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
// at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753)
// 只设置为字符串会报错恢复为设置成CommonResponse
final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success(HttpStatus.OK.value(), "Send MQ Success", null))
.getBytes(UTF_8));
final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success(HttpStatus.OK.value(), null, null))
.getBytes(UTF_8));
@Override
public Integer length() {

View File

@ -72,7 +72,9 @@ final class WorkflowEngineStarterDecoder implements Decoder {
wrappedType = ParameterizedTypeImpl.make(CommonResponse.class, new Type[]{type}, null);
}
Object decode = delegate.decode(response, wrappedType);
log.info("workflow engine starter response :{}", JSON.toJSONString(decode));
if (log.isDebugEnabled()) {
log.debug("workflow engine starter response :{}", JSON.toJSONString(decode));
}
if (decode instanceof CommonResponse) {
CommonResponse<?> commonResponse = (CommonResponse<?>) decode;
if (response.status() == 202) {

View File

@ -61,8 +61,10 @@ class WorkflowEngineStarterInvocationHandler implements InvocationHandler {
stopWatch.stop();
List<String> ignoreMethods = Lists.newArrayList("sync", "async");
if (!ignoreMethods.contains(method.getName())) {
log.info("Workflow starter Method invoke record: {}.{}, args: {}, result: {} const: {} ms",
target.type().getSimpleName(), method.getName(), JSON.toJSONString(args), JSON.toJSONString(invoke), stopWatch.getTotalTimeMillis());
if (log.isDebugEnabled()) {
log.debug("Workflow starter Method invoke record: {}.{}, args: {}, result: {} const: {} ms",
target.type().getSimpleName(), method.getName(), JSON.toJSONString(args), JSON.toJSONString(invoke), stopWatch.getTotalTimeMillis());
}
}
}
return invoke;

View File

@ -35,7 +35,9 @@ public final class FailOverInterceptor extends AbstractListenerInterceptor {
int failedAttempts = 0;
do {
if (failedAttempts > 0) {
log.info("Waiting for {} ms before retrying the command. retryTimes: {}", waitTime, failedAttempts);
if (log.isDebugEnabled()) {
log.debug("Waiting for {} ms before retrying the command. retryTimes: {}", waitTime, failedAttempts);
}
waitBeforeRetry(waitTime);
waitTime *= waitIncreaseFactor;
}

View File

@ -21,8 +21,8 @@ public final class LogInterceptor extends AbstractListenerInterceptor {
if (log.isDebugEnabled()) {
log.debug("--- starting handle mq ---- ");
}
if (log.isInfoEnabled()) {
log.info("messageId: {}, eventCode: {}, messageBody: {}", context.getMsgId(), context.getEventCode().toString(), JSON.toJSONString(t));
if (log.isDebugEnabled()) {
log.debug("messageId: {}, eventCode: {}, messageBody: {}", context.getMsgId(), context.getEventCode().toString(), JSON.toJSONString(t));
}
try {
getNext().execute(executor, consumer, context, t);

View File

@ -59,7 +59,9 @@ public abstract class AbstractInnerWorkflowListener<H extends Ordered, F extends
if (!CollectionUtils.isEmpty(businessListeners)) {
for (F filter : businessFilters) {
if (filter.doFilter(event, context, convert)) {
log.info("【{}】filtered message, messageId: {}", filter.getClass().getSimpleName(), context.getMsgId());
if (log.isDebugEnabled()) {
log.debug("【{}】filtered message, messageId: {}", filter.getClass().getSimpleName(), context.getMsgId());
}
return;
}
}