fix - 调整 es 同步
This commit is contained in:
parent
aec13a3f1b
commit
77080cb083
@ -0,0 +1,45 @@
|
||||
package cn.axzo.workflow.common.enums;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
|
||||
/**
|
||||
* 流程实例相关的 MQ 事件枚举定义
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2023/9/25 11:47
|
||||
*/
|
||||
public enum ElasticSearchEventEnum {
|
||||
|
||||
ELASTIC_SEARCH_SYNC("elastic-search", "elastic-search-sync", "同步实例数据到ES"),
|
||||
;
|
||||
private final String module;
|
||||
private final String tag;
|
||||
private final String desc;
|
||||
private Event.EventCode eventCode;
|
||||
|
||||
ElasticSearchEventEnum(String module, String tag, String desc) {
|
||||
this.eventCode = Event.EventCode.builder()
|
||||
.module(module)
|
||||
.name(tag)
|
||||
.build();
|
||||
this.module = module;
|
||||
this.tag = tag;
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
public String getModule() {
|
||||
return module;
|
||||
}
|
||||
|
||||
public String getTag() {
|
||||
return tag;
|
||||
}
|
||||
|
||||
public String getDesc() {
|
||||
return desc;
|
||||
}
|
||||
|
||||
public Event.EventCode getEventCode() {
|
||||
return eventCode;
|
||||
}
|
||||
}
|
||||
@ -1,28 +1,22 @@
|
||||
package cn.axzo.workflow.server.controller.listener.process;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.workflow.common.enums.ElasticSearchEventEnum;
|
||||
import cn.axzo.workflow.core.common.context.ProcessOperationContext;
|
||||
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
|
||||
import cn.axzo.workflow.core.listener.BpmnProcessEventListener;
|
||||
import cn.axzo.workflow.es.model.ProcessTaskDocument;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
|
||||
import cn.hutool.core.lang.UUID;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
|
||||
import org.flowable.common.engine.impl.cfg.TransactionState;
|
||||
import org.flowable.common.engine.impl.context.Context;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.delegate.event.FlowableCancelledEvent;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 流程实例结束后,同步最新数据至 ES
|
||||
@ -33,9 +27,11 @@ import java.util.List;
|
||||
@Slf4j
|
||||
@Component
|
||||
@Scope("prototype")
|
||||
@AllArgsConstructor
|
||||
public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<ProcessOperationContext> implements BpmnProcessEventListener, Ordered {
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
@Resource
|
||||
private EventProducer<?> eventProducer;
|
||||
@Value("${sendMq:true}")
|
||||
private Boolean sendMQ;
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
@ -86,23 +82,20 @@ public class SyncToEsProcessEventListener extends AbstractBpmnEventListener<Proc
|
||||
|
||||
@Async
|
||||
public void syncToEs(String processInstanceId) {
|
||||
// String uuid = UUID.fastUUID().toString();
|
||||
// Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
|
||||
// new OnTxCommittedSyncToEsListener(processInstanceId, null, uuid));
|
||||
sync(processInstanceId);
|
||||
sendMessageQueue(processInstanceId, ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC);
|
||||
}
|
||||
|
||||
void sync(String processInstanceId) {
|
||||
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
||||
HistoryService historyService = processEngineConfiguration.getHistoryService();
|
||||
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
|
||||
.processInstanceId(processInstanceId)
|
||||
.singleResult();
|
||||
// 删除指定实例的父子文档
|
||||
log.info("delete document processInstanceId: {}", processInstanceId);
|
||||
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
|
||||
log.info("reInsert document processInstanceId: {}", processInstanceId);
|
||||
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
|
||||
private void sendMessageQueue(String processInstanceId, ElasticSearchEventEnum eventEnum) {
|
||||
if (!sendMQ) {
|
||||
return;
|
||||
}
|
||||
eventProducer.send(Event.builder()
|
||||
.shardingKey(processInstanceId)
|
||||
.eventCode(eventEnum.getEventCode())
|
||||
.targetId(processInstanceId)
|
||||
.targetType(processInstanceId)
|
||||
.data(processInstanceId)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,27 +1,20 @@
|
||||
package cn.axzo.workflow.server.controller.listener.task;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.workflow.common.enums.ElasticSearchEventEnum;
|
||||
import cn.axzo.workflow.core.common.context.TaskOperationContext;
|
||||
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
|
||||
import cn.axzo.workflow.core.listener.BpmnTaskEventListener;
|
||||
import cn.axzo.workflow.es.model.ProcessTaskDocument;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import cn.axzo.workflow.server.controller.listener.tx.OnTxCommittedSyncToEsListener;
|
||||
import cn.hutool.core.lang.UUID;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.impl.cfg.TransactionState;
|
||||
import org.flowable.common.engine.impl.context.Context;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
import org.flowable.task.service.delegate.DelegateTask;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 同步任务相关数据至 ES
|
||||
@ -32,10 +25,11 @@ import java.util.List;
|
||||
@Slf4j
|
||||
@Component
|
||||
@Scope("prototype")
|
||||
@AllArgsConstructor
|
||||
public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<TaskOperationContext> implements BpmnTaskEventListener, Ordered {
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
private final HistoryService historyService;
|
||||
@Resource
|
||||
private EventProducer<?> eventProducer;
|
||||
@Value("${sendMq:true}")
|
||||
private Boolean sendMQ;
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
@ -50,24 +44,20 @@ public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener<Ta
|
||||
@Override
|
||||
@Async
|
||||
public void onAssigned(DelegateTask delegateTask) {
|
||||
// String uuid = UUID.fastUUID().toString();
|
||||
// Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
|
||||
// new OnTxCommittedSyncToEsListener(delegateTask.getProcessInstanceId(), delegateTask.getId(), uuid));
|
||||
sync(delegateTask);
|
||||
sendMessageQueue(delegateTask.getProcessInstanceId(), ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC);
|
||||
}
|
||||
|
||||
void sync(DelegateTask delegateTask) {
|
||||
String processInstanceId = delegateTask.getProcessInstanceId();
|
||||
// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
||||
// HistoryService historyService = processEngineConfiguration.getHistoryService();
|
||||
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
|
||||
.processInstanceId(processInstanceId)
|
||||
.singleResult();
|
||||
// 删除指定实例的父子文档
|
||||
log.info("delete document processInstanceId: {}", processInstanceId);
|
||||
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
|
||||
log.info("reInsert document processInstanceId: {}", processInstanceId);
|
||||
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
|
||||
private void sendMessageQueue(String processInstanceId, ElasticSearchEventEnum eventEnum) {
|
||||
if (!sendMQ) {
|
||||
return;
|
||||
}
|
||||
eventProducer.send(Event.builder()
|
||||
.shardingKey(processInstanceId)
|
||||
.eventCode(eventEnum.getEventCode())
|
||||
.targetId(processInstanceId)
|
||||
.targetType(processInstanceId)
|
||||
.data(processInstanceId)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
package cn.axzo.workflow.server.mq.inside;
|
||||
|
||||
import cn.axzo.framework.rocketmq.BaseListener;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 监听引擎自己的广播事件
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-11-06 15:05
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
public class SelfBoradcastRocketConfiguration {
|
||||
public static final String DEFAULT_EVENT = "topic_workflow_engine_";
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "rocketmq.name-server")
|
||||
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
|
||||
consumerGroup = "GID_${spring.application.name}_workflow_engine_consumer",
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
maxReconsumeTimes = 3,
|
||||
nameServer = "${rocketmq.name-server}"
|
||||
)
|
||||
public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> {
|
||||
@Resource
|
||||
private EventConsumer eventConsumer;
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt message) {
|
||||
super.onEvent(message, eventConsumer);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
package cn.axzo.workflow.server.mq.inside.consumer;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventHandler;
|
||||
import cn.axzo.workflow.common.enums.ElasticSearchEventEnum;
|
||||
import cn.axzo.workflow.es.model.ProcessTaskDocument;
|
||||
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
|
||||
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ES 数据同步处理
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-11-06 15:11
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
public class ElasticSearchSyncListener implements EventHandler, InitializingBean {
|
||||
private final EventConsumer eventConsumer;
|
||||
private final HistoryService historyService;
|
||||
private final AggregateProcessInstanceService aggregateProcessInstanceService;
|
||||
@Override
|
||||
public void onEvent(Event event, EventConsumer.Context context) {
|
||||
String processInstanceId = event.normalizedData(String.class);
|
||||
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
|
||||
.processInstanceId(processInstanceId)
|
||||
.singleResult();
|
||||
// 删除指定实例的父子文档
|
||||
log.info("delete document processInstanceId: {}", processInstanceId);
|
||||
aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId);
|
||||
log.info("reInsert document processInstanceId: {}", processInstanceId);
|
||||
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
|
||||
log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
eventConsumer.registerHandler(ElasticSearchEventEnum.ELASTIC_SEARCH_SYNC.getEventCode(), this);
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package cn.axzo.workflow.server.outside.mq.consumer;
|
||||
package cn.axzo.workflow.server.mq.outside.consumer;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
@ -7,15 +7,12 @@ import cn.axzo.framework.rocketmq.EventHandler;
|
||||
//import cn.axzo.riven.client.model.DingtalkReceiveMqModel;
|
||||
//import cn.axzo.riven.client.model.DingtalkSendMqModel;
|
||||
//import cn.axzo.riven.client.model.SampleText;
|
||||
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import cn.axzo.workflow.server.mq.outside.producer.DingtalkSendProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 监听钉钉群消息的事件
|
||||
@ -1,4 +1,4 @@
|
||||
package cn.axzo.workflow.server.outside.mq.producer;
|
||||
package cn.axzo.workflow.server.mq.outside.producer;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
|
||||
Loading…
Reference in New Issue
Block a user