add - 在 API module 中添加通过消息队列处理工作流相关事件

This commit is contained in:
wangli 2023-07-18 18:29:33 +08:00
parent e05b779194
commit 1adfeab1f6
13 changed files with 568 additions and 10 deletions

View File

@ -98,6 +98,5 @@
<module>workflow-engine-core</module>
<module>workflow-engine-api</module>
<module>workflow-engine-server</module>
<module>axzo-workflow-spring-boot-starter</module>
</modules>
</project>

View File

@ -18,16 +18,25 @@
<groupId>cn.axzo.framework</groupId>
<artifactId>axzo-consumer-spring-cloud-starter</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.framework</groupId>
<artifactId>axzo-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<scope>provided</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- for test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<!-- for test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.client.config;
import cn.axzo.workflow.client.MicroArchetypeClient;
import cn.axzo.workflow.client.feign.MicroArchetypeApi;
import cn.axzo.workflow.client.feign.MicroArchetypeFallbackFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -16,6 +17,7 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class MicroArchetypeClientAutoConfiguration {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Bean
public MicroArchetypeFallbackFactory microArchetypeFallbackFactory() {
return new MicroArchetypeFallbackFactory();

View File

@ -0,0 +1,56 @@
package cn.axzo.workflow.client.consumer;
import org.springframework.core.Ordered;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
/**
* @author: wangli
* @date: 2022/6/9 14:41
*/
public abstract class AbstractFlowableEngineEventMQListener implements
FlowableEngineEventMQListener {
protected final List<FlowableEngineEventProcessor> listeners = new ArrayList<>();
private Set<TaskListener> taskListeners;
private Set<ProcessInstanceListener> processInstanceListeners;
@Override
public Set<TaskListener> getTaskListeners() {
return taskListeners;
}
@Override
public void setTaskListeners(Set<TaskListener> taskListeners) {
this.taskListeners = taskListeners;
}
@Override
public Set<ProcessInstanceListener> getProcessInstanceListeners() {
return processInstanceListeners;
}
@Override
public void setProcessInstanceListeners(
Set<ProcessInstanceListener> processInstanceListeners) {
this.processInstanceListeners = processInstanceListeners;
}
@Override
public final List<FlowableEngineEventProcessor> getAllListeners() {
if (CollectionUtils.isEmpty(listeners)) {
synchronized (listeners) {
if (CollectionUtils.isEmpty(listeners)) {
listeners.addAll(getTaskListeners());
listeners.addAll(getProcessInstanceListeners());
}
}
}
listeners.sort(Comparator.comparing(Ordered::getOrder));
return listeners;
}
}

View File

@ -0,0 +1,129 @@
package cn.axzo.workflow.client.consumer;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.core.Ordered;
import java.io.IOException;
import java.util.Objects;
import static cn.axzo.workflow.client.config.MicroArchetypeClientAutoConfiguration.OBJECT_MAPPER;
/**
* @author: wangli
* @date: 2022/6/9 15:08
*/
public abstract class AbstractListener implements FlowableEngineEventProcessor {
private final Logger log = LoggerFactory.getLogger(AbstractListener.class);
public static final ThreadLocal<Channel> TL_CHANNEL = new ThreadLocal<>();
public static final ThreadLocal<Message> TL_MESSAGE = new ThreadLocal<>();
@Override
public final void process(Message message, Channel channel) {
TL_CHANNEL.set(channel);
invoke(message);
}
/**
* 判断是否能支持当前的事件
* <p>
* Note that: 如果该方法内部实现抛出异常, 此条消息不会自动 ack, 将会引起消息一直"退回 -> 消费"路径循环.
*
* @param flag
* @return 返回 true ,将执行 Listener 中覆写的事件方法, 否则该消息将自动 ACK,不可再消费该条消息
*/
@Override
public abstract boolean supportEvent(String flag);
/**
* 解析监听的到工作流引擎消息未指定的 Entity 对象
*
* @param msgBytes 消息原信息
* @param type 被转成的类型
* @param <T>
* @return
*/
protected final <T> T parseEntity(byte[] msgBytes, Class<T> type) {
String msg = new String(msgBytes);
if (log.isDebugEnabled()) {
log.debug("接收的消息内容为: {}", msg);
}
try {
String[] split = msg.split("");
return OBJECT_MAPPER.readValue(split[1], type);
} catch (Exception e) {
e.printStackTrace();
if (log.isDebugEnabled()) {
log.debug("接收的消息无法转换为正常的 Entity 模型, 将直接丢弃该消息! 当前信息为: {}", new String(msgBytes));
}
autoAck();
return null;
}
}
/**
* 判断配置是否开启手动 ACK, 如果开启还需要判断 Channel 对象是否为空, 否则 ACK 动作会直接抛出异常
*/
protected final void ack() throws IOException {
if (Objects.isNull(TL_CHANNEL.get())) {
throw new IllegalArgumentException("线程本地变量表已不存在 Channel, 无法进行 ACK 确认!");
}
TL_CHANNEL.get().basicAck(TL_MESSAGE.get().getMessageProperties().getDeliveryTag(), false);
clearThreadLocal();
}
protected final void nack(boolean requeue) throws IOException {
if (Objects.isNull(TL_CHANNEL.get())) {
throw new IllegalArgumentException("线程本地变量表已不存在 Channel, 无法进行 NACK 确认!");
}
TL_CHANNEL.get()
.basicNack(TL_MESSAGE.get().getMessageProperties().getDeliveryTag(), false, requeue);
clearThreadLocal();
}
/**
* 考虑是否将 Channel Message 本身直接暴露给使用者
*
* @return
*/
private final Channel getChannel() {
return TL_CHANNEL.get();
}
private final Message getMessage() {
return TL_MESSAGE.get();
}
/**
* 清理线程变量中的无用的 Message Channel
*/
@Override
public final void clearThreadLocal() {
TL_MESSAGE.remove();
TL_CHANNEL.remove();
}
protected final void autoAck() {
//TODO ACK 处理问题
// WorkflowUtils.getRabbitProperties().ifPresent(r -> {
// if (Objects.nonNull(r.getListener().getSimple().getAcknowledgeMode()) && r.getListener()
// .getSimple().getAcknowledgeMode().isManual()) {
// try {
// ack();
// } catch (IOException e) {
// e.printStackTrace();
// log.error("RabbitMQ 消息自动 ACK 发生异常");
// }
// }
// });
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}

View File

@ -0,0 +1,80 @@
package cn.axzo.workflow.client.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
/**
* 流程实例自身状态变更事件
*
* @author: wangli
* @date: 2022/6/9 14:13
*/
public abstract class AbstractProcessInstanceListener extends AbstractListener implements
ProcessInstanceListener {
private Logger log = LoggerFactory.getLogger(AbstractProcessInstanceListener.class);
private static final String PROCESS_INSTANCE_STR = "pi";
@Override
public final boolean supportEvent(String flag) {
return PROCESS_INSTANCE_STR.equals(flag);
}
@Override
public final void invoke(Message message) {
TL_MESSAGE.set(message);
// ProcessInstanceEntity entity = parseEntity(message.getBody(),
// ProcessInstanceEntity.class);
// if (Objects.isNull(entity) || !support(entity)) {
// if (log.isDebugEnabled()) {
// log.debug("无需处理该消息,将自动确认消息");
// }
// autoAck();
// return;
// }
// switch (entity.getType()) {
// case PROCESS_CREATED:
// created(entity);
// break;
// case PROCESS_STARTED:
// started(entity);
// break;
// case PROCESS_CANCELLED:
// cancelled(entity);
// break;
// case PROCESS_COMPLETED:
// completed(entity);
// break;
// default:
// autoAck();
// throw new IllegalArgumentException("不支持的流程实例事件类型!");
// }
}
/**
* @param entity
* @return true 才会真实回调 {@link ProcessInstanceListener} 接口相关方法
*/
protected abstract Boolean support();
@Override
public void created() {
autoAck();
}
@Override
public void started() {
autoAck();
}
@Override
public void cancelled() {
autoAck();
}
@Override
public void completed() {
autoAck();
}
}

View File

@ -0,0 +1,74 @@
package cn.axzo.workflow.client.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
/**
* 流程实例内的审批任务节点自身的状态变更事件
*
* @author: wangli
* @date: 2022/6/7 15:35
*/
public abstract class AbstractTaskListener extends AbstractListener implements TaskListener {
private Logger log = LoggerFactory.getLogger(AbstractTaskListener.class);
private static final String TASK_STR = "ts";
@Override
public final boolean supportEvent(String flag) {
return TASK_STR.equals(flag);
}
@Override
public final void invoke(Message message) {
// TL_MESSAGE.set(message);
// TaskEntity entity = parseEntity(message.getBody(), TaskEntity.class);
// if (Objects.isNull(entity) || !support(entity)) {
// if (log.isDebugEnabled()) {
// log.debug("无需处理该消息,将自动确认消息");
// }
// autoAck();
// return;
// }
// switch (entity.getType()) {
// case TASK_CREATED:
// created(entity);
// break;
// case TASK_ASSIGNED:
// assigned(entity);
// break;
// case TASK_COMPLETED:
// completed(entity);
// break;
// default:
// autoAck();
// throw new IllegalArgumentException("不支持的活动事件类型!");
// }
}
/**
* 判断是否需要处理该流程实例的信息
* <p>
* Note That: 请杜绝该方法中抛出异常.
*
* @return true 才会真实回调 {@link TaskListener} 接口相关方法
*/
protected abstract Boolean support();
@Override
public void created() {
autoAck();
}
@Override
public void assigned() {
autoAck();
}
@Override
public void completed() {
autoAck();
}
}

View File

@ -0,0 +1,44 @@
package cn.axzo.workflow.client.consumer;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.util.StringUtils;
/**
* @author: wangli
* @date: 2022/6/9 14:20
*/
public class DefaultFlowableEngineEventRabbitMQListener extends
AbstractFlowableEngineEventMQListener {
private Logger log = LoggerFactory.getLogger(DefaultFlowableEngineEventRabbitMQListener.class);
/**
* 判断事件类型是 Process 还是 Activity,调用不同的 Listener
*/
@Override
public void onMessage(Message message, Channel channel) {
String body = new String(message.getBody());
log.info("[onMessage][线程编号:{} 消息内容:{}], Message: {}", Thread.currentThread().getId(),
body, message);
//这里是有简易的协议标识
String[] split = body.split("");
if (split.length == 2 && StringUtils.hasLength(split[1])) {
listeners.forEach(listener -> {
try {
if (listener.supportEvent(split[0])) {
listener.process(message, channel);
listener.clearThreadLocal();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}

View File

@ -0,0 +1,42 @@
package cn.axzo.workflow.client.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
* 接收全量 MQ 中关于 Flowable Engine Event 事件
*
* @author: wangli
* @date: 2022/6/9 14:04
*/
public interface FlowableEngineEventMQListener {
/**
* 接收全量工作流引擎的状态变更事件
*/
void onMessage(Message message, Channel channel) throws IOException;
/**
* 获取 {@link TaskListener} 对象
*
* @return
*/
Set<TaskListener> getTaskListeners();
/**
* 获取 {@link ProcessInstanceListener} 对象
*
* @return
*/
Set<ProcessInstanceListener> getProcessInstanceListeners();
void setTaskListeners(Set<TaskListener> taskListeners);
void setProcessInstanceListeners(Set<ProcessInstanceListener> processInstanceListeners);
List<FlowableEngineEventProcessor> getAllListeners();
}

View File

@ -0,0 +1,36 @@
package cn.axzo.workflow.client.consumer;
import java.util.Objects;
/**
* 工作流事件消息列队消费者的工厂类
*
* @author wangli
* @date 2022/6/9 14:17
*/
public class FlowableEngineEventMqListenerFactory {
private volatile FlowableEngineEventMQListener flowableEngineEventMqListener;
protected void createFlowableEngineEventMqListener() {
if (Objects.isNull(flowableEngineEventMqListener)) {
synchronized (this) {
if (Objects.isNull(flowableEngineEventMqListener)) {
flowableEngineEventMqListener = new DefaultFlowableEngineEventRabbitMQListener();
}
}
}
}
public FlowableEngineEventMQListener getFlowableEngineEventMqListener() {
if (Objects.isNull(flowableEngineEventMqListener)) {
createFlowableEngineEventMqListener();
}
return flowableEngineEventMqListener;
}
public void setFlowableEngineEventMqListener(
FlowableEngineEventMQListener flowableEngineEventMqListener) {
this.flowableEngineEventMqListener = flowableEngineEventMqListener;
}
}

View File

@ -0,0 +1,23 @@
package cn.axzo.workflow.client.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.core.Ordered;
/**
* Processor
*
* @author: wangli
* @date: 2022/6/9 14:01
*/
public interface FlowableEngineEventProcessor extends Ordered {
boolean supportEvent(String flag);
void process(Message message, Channel channel);
void invoke(Message message);
void clearThreadLocal();
}

View File

@ -0,0 +1,32 @@
package cn.axzo.workflow.client.consumer;
/**
* 实例状态变更通知
*
* @author: wangli
* @date: 2022/6/7 14:30
*/
public interface ProcessInstanceListener extends FlowableEngineEventProcessor {
/**
* 实例已创建
*/
void created();
/**
* 实例已启动
*/
void started();
/**
* 实例已取消
*/
void cancelled();
/**
* 实例已完成
*/
void completed();
}

View File

@ -0,0 +1,32 @@
package cn.axzo.workflow.client.consumer;
/**
* 用户任务节点状态变更时触发的回调
* <p>
* 实际上对应 Flowable Engine Activity 的相关事件, 但为了保证业务的正常理解,更名为 Task
*
* @author: wangli
* @date: 2022/6/7 10:43
*/
public interface TaskListener extends FlowableEngineEventProcessor {
/**
* 用户任务已创建,未指派审核人
*/
void created();
/**
* 用户任务已指派审核人
*/
void assigned();
/**
* 用户任务已处理完成
* <p>
* 仅审核通过一个用户任务时触发, 如果任务是拒绝了, 则直接走实例取消事件
*/
void completed();
}