feat:feature-REQ/2129 消息发送方式

This commit is contained in:
lilong 2024-03-27 16:48:51 +08:00
parent f1d348e8ff
commit 6e1422cdd0
6 changed files with 76 additions and 32 deletions

View File

@ -28,8 +28,8 @@ public class AccountAbsentQuery {
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
* 所以需要根据ouId获取账号
*/
private Long organizationalUnitId;
private Long ouId;
}

View File

@ -0,0 +1,40 @@
package cn.axzo.im.config;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Slf4j
@Component
@RefreshScope
public class MqProducer {
@Autowired
private EventProducer<?> eventProducer;
@Value("${sendMq}")
private Boolean sendMq;
public void send(Event event){
log.info(JSON.toJSONString(event));
if(sendMq != null && !sendMq){
return;
}
//生产消息
eventProducer.send(event);
}
public void sendBatch(List<Event> events){
events.forEach(this::send);
}
}

View File

@ -1,4 +1,4 @@
package com.fiture.rbd.erp.connector.order.event.inner;
package cn.axzo.im.event.inner;
import cn.axzo.framework.rocketmq.Event;
import lombok.Getter;

View File

@ -346,7 +346,7 @@ public class AccountService {
.accountId(accountAbsentQuery.getPersonId())
.build();
if (appTypeEnum == AppTypeEnum.CMP) {
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOuId());
}
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
@ -373,7 +373,7 @@ public class AccountService {
userAccountReq.setNickName(DEFAULT_NICK_NAME + accountAbsentQuery.getPersonId());
// 管理版需要根据ou注册IM账号做数据隔离
if (appTypeEnum == AppTypeEnum.CMP) {
userAccountReq.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
userAccountReq.setOrganizationalUnitId(accountAbsentQuery.getOuId());
}
UserAccountResp accountResp = generateAccount(userAccountReq, iNotifyService);
if (StringUtils.isEmpty(accountResp.getToken())) {

View File

@ -3,10 +3,10 @@ package cn.axzo.im.service.impl;
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.config.MqProducer;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
@ -45,8 +45,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_CREATED;
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_UPDATED;
import static cn.axzo.im.event.inner.EventTypeEnum.MESSAGE_HISTORY_CREATED;
import static cn.axzo.im.event.inner.EventTypeEnum.MESSAGE_HISTORY_UPDATED;
@Slf4j
@Service
@ -62,7 +62,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
@Autowired
private IMChannelProvider imChannelProvider;
@Autowired
private EventProducer<?> eventProducer;
private MqProducer mqProducer;
@Value("${send.message.limiter.permits:1}")
private int permits;
@ -121,17 +121,19 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
this.saveBatch(messageHistories);
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
.forEach(messageHistory -> {
eventProducer.send(Event.builder()
.targetId(String.valueOf(messageHistory.getId()))
.targetType(TARGET_TYPE)
.eventCode(MESSAGE_HISTORY_CREATED.getEventCode())
.data(MessageHistoryCreatedPayload.builder()
.messageHistory(messageHistory)
.build())
.build());
});
List<Event> events = this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
.stream()
.map(messageHistory ->
Event.builder()
.targetId(String.valueOf(messageHistory.getId()))
.targetType(TARGET_TYPE)
.eventCode(MESSAGE_HISTORY_CREATED.getEventCode())
.data(MessageHistoryCreatedPayload.builder()
.messageHistory(messageHistory)
.build())
.build())
.collect(Collectors.toList());
mqProducer.sendBatch(events);
}
@Override
@ -144,17 +146,19 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
this.updateBatchById(messageHistories);
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId)).forEach(messageHistory -> {
eventProducer.send(Event.builder()
.targetId(String.valueOf(messageHistory.getId()))
.targetType(TARGET_TYPE)
.eventCode(MESSAGE_HISTORY_UPDATED.getEventCode())
.data(MessageHistoryUpdatedPayload.builder()
.newMessageHistory(messageHistory)
.oldMessageHistory(oldMessageHistories.get(messageHistory.getId()))
.build())
.build());
});
List<Event> events = this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
.stream()
.map(messageHistory -> Event.builder()
.targetId(String.valueOf(messageHistory.getId()))
.targetType(TARGET_TYPE)
.eventCode(MESSAGE_HISTORY_UPDATED.getEventCode())
.data(MessageHistoryUpdatedPayload.builder()
.newMessageHistory(messageHistory)
.oldMessageHistory(oldMessageHistories.get(messageHistory.getId()))
.build())
.build())
.collect(Collectors.toList());
mqProducer.sendBatch(events);
}
@Override

View File

@ -104,7 +104,7 @@ ALTER TABLE im_account_register ADD COLUMN `ou_id` bigint not null default 0 com
CREATE TABLE IF NOT EXISTS im_message_task
(
id bigint auto_increment comment '主键',
biz_id varchar(50) not null default '' comment '业务请求时可以带的排查问题的id',
biz_id varchar(200) not null default '' comment '业务请求时可以带的排查问题的id',
send_im_account varchar(100) not null comment '发送者的三方平台账号id',
send_person_id varchar(100) not null default '' comment 'IM消息发送personId自定义消息没有personId',
receive_persons json not null comment 'IM消息接收人person列表',