feat:feature-REQ/2129 消息发送方式
This commit is contained in:
parent
91d2d418a7
commit
8631568215
@ -28,8 +28,8 @@ public class AccountAbsentQuery {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号,
|
* appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号,
|
||||||
* 所以需要根据organizationalUnitId获取账号
|
* 所以需要根据ouId获取账号
|
||||||
*/
|
*/
|
||||||
private Long organizationalUnitId;
|
private Long ouId;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,40 @@
|
|||||||
|
package cn.axzo.im.config;
|
||||||
|
|
||||||
|
import cn.axzo.framework.rocketmq.Event;
|
||||||
|
import cn.axzo.framework.rocketmq.EventProducer;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yanglin
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RefreshScope
|
||||||
|
public class MqProducer {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private EventProducer<?> eventProducer;
|
||||||
|
|
||||||
|
@Value("${sendMq}")
|
||||||
|
private Boolean sendMq;
|
||||||
|
|
||||||
|
public void send(Event event){
|
||||||
|
log.info(JSON.toJSONString(event));
|
||||||
|
if(sendMq != null && !sendMq){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//生产消息
|
||||||
|
eventProducer.send(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendBatch(List<Event> events){
|
||||||
|
events.forEach(this::send);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package com.fiture.rbd.erp.connector.order.event.inner;
|
package cn.axzo.im.event.inner;
|
||||||
|
|
||||||
import cn.axzo.framework.rocketmq.Event;
|
import cn.axzo.framework.rocketmq.Event;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|||||||
@ -346,7 +346,7 @@ public class AccountService {
|
|||||||
.accountId(accountAbsentQuery.getPersonId())
|
.accountId(accountAbsentQuery.getPersonId())
|
||||||
.build();
|
.build();
|
||||||
if (appTypeEnum == AppTypeEnum.CMP) {
|
if (appTypeEnum == AppTypeEnum.CMP) {
|
||||||
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
|
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOuId());
|
||||||
}
|
}
|
||||||
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
|
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
|
||||||
|
|
||||||
@ -373,7 +373,7 @@ public class AccountService {
|
|||||||
userAccountReq.setNickName(DEFAULT_NICK_NAME + accountAbsentQuery.getPersonId());
|
userAccountReq.setNickName(DEFAULT_NICK_NAME + accountAbsentQuery.getPersonId());
|
||||||
// 管理版需要根据ou注册IM账号,做数据隔离
|
// 管理版需要根据ou注册IM账号,做数据隔离
|
||||||
if (appTypeEnum == AppTypeEnum.CMP) {
|
if (appTypeEnum == AppTypeEnum.CMP) {
|
||||||
userAccountReq.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
|
userAccountReq.setOrganizationalUnitId(accountAbsentQuery.getOuId());
|
||||||
}
|
}
|
||||||
UserAccountResp accountResp = generateAccount(userAccountReq, iNotifyService);
|
UserAccountResp accountResp = generateAccount(userAccountReq, iNotifyService);
|
||||||
if (StringUtils.isEmpty(accountResp.getToken())) {
|
if (StringUtils.isEmpty(accountResp.getToken())) {
|
||||||
|
|||||||
@ -3,10 +3,10 @@ package cn.axzo.im.service.impl;
|
|||||||
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
|
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
|
||||||
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
|
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
|
||||||
import cn.axzo.framework.rocketmq.Event;
|
import cn.axzo.framework.rocketmq.Event;
|
||||||
import cn.axzo.framework.rocketmq.EventProducer;
|
|
||||||
import cn.axzo.im.channel.IMChannelProvider;
|
import cn.axzo.im.channel.IMChannelProvider;
|
||||||
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
|
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
|
||||||
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
|
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
|
||||||
|
import cn.axzo.im.config.MqProducer;
|
||||||
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
|
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
|
||||||
import cn.axzo.im.entity.MessageHistory;
|
import cn.axzo.im.entity.MessageHistory;
|
||||||
import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
|
import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
|
||||||
@ -45,8 +45,8 @@ import java.util.function.Function;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
|
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
|
||||||
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_CREATED;
|
import static cn.axzo.im.event.inner.EventTypeEnum.MESSAGE_HISTORY_CREATED;
|
||||||
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_UPDATED;
|
import static cn.axzo.im.event.inner.EventTypeEnum.MESSAGE_HISTORY_UPDATED;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@ -62,7 +62,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IMChannelProvider imChannelProvider;
|
private IMChannelProvider imChannelProvider;
|
||||||
@Autowired
|
@Autowired
|
||||||
private EventProducer<?> eventProducer;
|
private MqProducer mqProducer;
|
||||||
|
|
||||||
@Value("${send.message.limiter.permits:1}")
|
@Value("${send.message.limiter.permits:1}")
|
||||||
private int permits;
|
private int permits;
|
||||||
@ -121,17 +121,19 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
|||||||
|
|
||||||
this.saveBatch(messageHistories);
|
this.saveBatch(messageHistories);
|
||||||
|
|
||||||
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
|
List<Event> events = this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
|
||||||
.forEach(messageHistory -> {
|
.stream()
|
||||||
eventProducer.send(Event.builder()
|
.map(messageHistory ->
|
||||||
.targetId(String.valueOf(messageHistory.getId()))
|
Event.builder()
|
||||||
.targetType(TARGET_TYPE)
|
.targetId(String.valueOf(messageHistory.getId()))
|
||||||
.eventCode(MESSAGE_HISTORY_CREATED.getEventCode())
|
.targetType(TARGET_TYPE)
|
||||||
.data(MessageHistoryCreatedPayload.builder()
|
.eventCode(MESSAGE_HISTORY_CREATED.getEventCode())
|
||||||
.messageHistory(messageHistory)
|
.data(MessageHistoryCreatedPayload.builder()
|
||||||
.build())
|
.messageHistory(messageHistory)
|
||||||
.build());
|
.build())
|
||||||
});
|
.build())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
mqProducer.sendBatch(events);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -144,17 +146,19 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
|||||||
|
|
||||||
this.updateBatchById(messageHistories);
|
this.updateBatchById(messageHistories);
|
||||||
|
|
||||||
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId)).forEach(messageHistory -> {
|
List<Event> events = this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
|
||||||
eventProducer.send(Event.builder()
|
.stream()
|
||||||
.targetId(String.valueOf(messageHistory.getId()))
|
.map(messageHistory -> Event.builder()
|
||||||
.targetType(TARGET_TYPE)
|
.targetId(String.valueOf(messageHistory.getId()))
|
||||||
.eventCode(MESSAGE_HISTORY_UPDATED.getEventCode())
|
.targetType(TARGET_TYPE)
|
||||||
.data(MessageHistoryUpdatedPayload.builder()
|
.eventCode(MESSAGE_HISTORY_UPDATED.getEventCode())
|
||||||
.newMessageHistory(messageHistory)
|
.data(MessageHistoryUpdatedPayload.builder()
|
||||||
.oldMessageHistory(oldMessageHistories.get(messageHistory.getId()))
|
.newMessageHistory(messageHistory)
|
||||||
.build())
|
.oldMessageHistory(oldMessageHistories.get(messageHistory.getId()))
|
||||||
.build());
|
.build())
|
||||||
});
|
.build())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
mqProducer.sendBatch(events);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -104,7 +104,7 @@ ALTER TABLE im_account_register ADD COLUMN `ou_id` bigint not null default 0 com
|
|||||||
CREATE TABLE IF NOT EXISTS im_message_task
|
CREATE TABLE IF NOT EXISTS im_message_task
|
||||||
(
|
(
|
||||||
id bigint auto_increment comment '主键',
|
id bigint auto_increment comment '主键',
|
||||||
biz_id varchar(50) not null default '' comment '业务请求时可以带的排查问题的id',
|
biz_id varchar(200) not null default '' comment '业务请求时可以带的排查问题的id',
|
||||||
send_im_account varchar(100) not null comment '发送者的三方平台账号id',
|
send_im_account varchar(100) not null comment '发送者的三方平台账号id',
|
||||||
send_person_id varchar(100) not null default '' comment 'IM消息发送personId,自定义消息没有personId',
|
send_person_id varchar(100) not null default '' comment 'IM消息发送personId,自定义消息没有personId',
|
||||||
receive_persons json not null comment 'IM消息接收人person列表',
|
receive_persons json not null comment 'IM消息接收人person列表',
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user