feat:feature-REQ/2129 重新调整发送消息接口

This commit is contained in:
lilong 2024-03-21 13:40:52 +08:00
parent 2d582c7c01
commit d7e4e9314b
19 changed files with 627 additions and 234 deletions

View File

@ -5,6 +5,7 @@ import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
@ -32,34 +33,35 @@ public interface MessageApi {
* @return
*/
@PostMapping("/api/im/message/send")
ApiResult<MessageTaskResp> sendMessage(@RequestBody @Validated SendMessageParam sendMessageParam);
ApiResult<MessageTaskResp> sendMessageAsync(@RequestBody @Validated SendMessageParam sendMessageParam);
/**
* 发送自定义消息
*/
/**
* 发送自定义消息:
*
* 发送消息时只是存储在messageTask中通过xxlJob或者mq异步去处理
* 因为1为了提高接口响应性能2第三方接口有限流控制防止被限流后阻塞业务
* @param messageInfo 发送消息请求参数
* 同步发送消息不建议使用因为第三方接口有限流会影响接口性能
* @param sendMessageParam
* @return
*/
@PostMapping("/api/im/custom-message/send")
ApiResult<MessageTaskResp> sendCustomMessage(@RequestBody @Validated SendCustomMessageParam messageInfo);
@PostMapping("/api/im/message/send")
ApiResult<MessageTaskResp> sendMessage(@RequestBody @Validated SendMessageParam sendMessageParam);
/**
* 通过消息模板来发送消息
* 发送消息时只是存储在messageTask中通过xxlJob或者mq异步去处理
* 因为1为了提高接口响应性能2第三方接口有限流控制防止被限流后阻塞业务
* @param sendMessageParam
* @return
*/
@PostMapping("/api/im/template-message/send")
ApiResult<MessageTaskResp> sendTemplateMessageAsync(@RequestBody @Validated SendTemplateMessageParam sendMessageParam);
/**
* 发送消息,单条消息批量发送消息统一入口
* 1.该接口一次请求接收人支持最大2000人
* 2.网易云信一分钟支持120次调用,每次调用IM中心设置100个账户(能返回msgId最大支持100人)
* 3.IM中心接收人有工人端和管理端账户,故当接收人最大2000人时需要调用网易云信发送4000条消息
* 4.按照每批次发送100条消息,需要发送40次
* 5.因此该接口一分钟内最大支持3次接收人为2000人的请求
*
* 接口已经作废可以使用sendTemplateMessage来替换
* @param messageInfo 发送消息请求参数
* @return 发送消息请求响应
*/
@PostMapping("api/im/message/dispatch")
@Deprecated
ApiResult<List<MessageDispatchResp>> sendMessage(@RequestBody @Validated MessageInfo messageInfo);
/**

View File

@ -3,6 +3,8 @@ package cn.axzo.im.center.api.vo.req;
import cn.axzo.basics.common.page.PageRequest;
import lombok.Data;
import java.util.List;
/**
* 机器人信息
*
@ -34,5 +36,8 @@ public class RobotPageQuery extends PageRequest {
*/
private String imAccount;
/**
* todo 待优化替换成cn.axzo.im.center.api.feign.RobotInfoApi#page
*/
private String msgTemplateCode;
}

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
@ -24,12 +25,11 @@ import java.util.List;
@AllArgsConstructor
public class SendMessageParam {
/**
* 发送用户Id,目前暂不支持非机器人发送消息
* 发送者IM账号
*/
@NotBlank(message = "发送用户id不能为空")
private String sendPersonId;
@NotBlank(message = "sendImAccount不能为空")
private String sendImAccount;
/**
* 消息接收用户信息
@ -50,16 +50,14 @@ public class SendMessageParam {
private String msgContent;
/**
* 消息模板ID
* 跳转配置信息
*/
@NotBlank(message = "消息模板ID不能为空")
private String msgTemplateId;
private List<JumpData> jumpDatas;
/**
* 消息模板内容
* 封面图
*/
@NotBlank(message = "消息模板内容不能为空")
private String msgTemplateContent;
private String cardBannerUrl;
/**
* 消息扩展信息
@ -71,6 +69,30 @@ public class SendMessageParam {
*/
private String bizId;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class JumpData {
private JumpPlatform platform;
private String url;
}
@Getter
@AllArgsConstructor
enum JumpPlatform {
WEB("PC"),
MINI_PROGRAM("安心筑小程"),
IOS("IOS"),
ANDROID("ANDROID"),
WEB_VIEW("H5"),
WECHAT_MINI_PROGRAM("微信小程序"),
;
private String desc;
}
@Data
@Builder
@ -97,6 +119,11 @@ public class SendMessageParam {
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private AppTypeEnum appType;
/**
* im账号可以personId和imAccount二选一
*/
private String imAccount;
}
}

View File

@ -0,0 +1,87 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SendTemplateMessageParam {
/**
* 消息接收用户信息
*/
@NotEmpty(message = "消息接收用户信息不能为空")
private List<SendMessageParam.ReceivePerson> receivePersons;
/**
* 消息标题
*/
@NotBlank(message = "消息标题不能为空")
private String msgHeader;
/**
* 消息内容
*/
@NotBlank(message = "消息内容不能为空")
private String msgContent;
/**
* 消息模板ID
*/
@NotBlank(message = "消息模板ID不能为空")
private String msgTemplateId;
/**
* 消息模板内容
*/
@NotBlank(message = "消息模板内容不能为空")
private String msgTemplateContent;
/**
* 消息扩展信息
*/
private JSONObject ext;
/**
* 业务的唯一ID用于查询发送消息的记录和结果不验证唯一
*/
private String bizId;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
static class ReceivePerson {
/**
* 接收消息的personId
*/
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private AppTypeEnum appType;
}
}

View File

@ -0,0 +1,51 @@
package cn.axzo.im.config;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedJdbcTypes;
import org.apache.ibatis.type.MappedTypes;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@MappedTypes({List.class})
@MappedJdbcTypes(JdbcType.VARCHAR)
public abstract class BaseListTypeHandler<T> extends BaseTypeHandler<List<T>> {
private Class<T> type = getGenericType();
@Override
public void setNonNullParameter(PreparedStatement preparedStatement, int i,
List<T> list, JdbcType jdbcType) throws SQLException {
preparedStatement.setString(i, JSONArray.toJSONString(list, SerializerFeature.WriteMapNullValue,
SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullStringAsEmpty));
}
@Override
public List<T> getNullableResult(ResultSet resultSet, String s) throws SQLException {
return JSONArray.parseArray(resultSet.getString(s), type);
}
@Override
public List<T> getNullableResult(ResultSet resultSet, int i) throws SQLException {
return JSONArray.parseArray(resultSet.getString(i), type);
}
@Override
public List<T> getNullableResult(CallableStatement callableStatement, int i) throws SQLException {
return JSONArray.parseArray(callableStatement.getString(i), type);
}
private Class<T> getGenericType() {
Type t = getClass().getGenericSuperclass();
Type[] params = ((ParameterizedType) t).getActualTypeArguments();
return (Class<T>) params[0];
}
}

View File

@ -0,0 +1,16 @@
package cn.axzo.im.config;
import cn.axzo.pokonyan.exception.ResultCode;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum BizResultCode implements ResultCode {
SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误");
private String errorCode;
private String errorMessage;
}

View File

@ -1,27 +1,26 @@
package cn.axzo.im.controller;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.service.RobotMsgTemplateService;
import cn.axzo.pokonyan.exception.Aassert;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
@ -39,6 +38,8 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.List;
import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_NOT_FOUND;
/**
* IM消息派发相关
*
@ -55,13 +56,13 @@ public class MessageController implements MessageApi {
@Autowired
private MessageTaskService messageTaskService;
@Autowired
private IMChannelProvider imChannel;
@Autowired
private AccountService accountService;
@Autowired
private RobotMsgTemplateService robotMsgTemplateService;
@Autowired
private MessageService messageService;
@Autowired
private AccountRegisterService accountRegisterService;
@Override
@ -72,8 +73,8 @@ public class MessageController implements MessageApi {
@Override
public ApiResult<List<MessageCustomResp>> sendCustomMessage(CustomMessageInfo customMessage) {
// List<MessageCustomResp> messageRespList = messageService.sendCustomMessage(customMessage);
return ApiResult.ok(null);
List<MessageCustomResp> messageRespList = messageService.sendCustomMessage(customMessage);
return ApiResult.ok(messageRespList);
}
@ExceptionHandler({ RequestNotPermitted.class })
@ -90,13 +91,36 @@ public class MessageController implements MessageApi {
* @return
*/
@Override
public ApiResult<MessageTaskResp> sendMessage(SendMessageParam sendMessageParam) {
public ApiResult<MessageTaskResp> sendMessageAsync(SendMessageParam sendMessageParam) {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
@Override
public ApiResult<MessageTaskResp> sendMessage(SendMessageParam sendMessageParam) {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
@Override
public ApiResult<MessageTaskResp> sendTemplateMessageAsync(SendTemplateMessageParam sendMessageParam) {
String sendImAccount = check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam, sendImAccount));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
private void check(SendMessageParam sendMessageParam) {
List<AccountRegister> accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
.imAccount(sendMessageParam.getSendImAccount())
.build());
Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND);
}
private String check(SendTemplateMessageParam sendMessageParam) {
List<String> robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(sendMessageParam.getMsgTemplateId());
if (CollectionUtils.isEmpty(robotIdList)) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],还未维护机器人账户!");
@ -120,20 +144,8 @@ public class MessageController implements MessageApi {
if (StringUtils.isBlank(robotImAccount)) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],机器人ID[" + robotId + "],还未生成IM账户!");
}
}
@Override
public ApiResult<MessageTaskResp> sendCustomMessage(SendCustomMessageParam customMessage) {
check();
MessageTask messageTask = messageTaskService.create(toMessageTask(customMessage));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
private void check() {
String appKey = imChannel.getProviderAppKey();
AccountRegister customSendAccount = accountService.queryCustomAccount(
AccountTypeEnum.CUSTOM, AppTypeEnum.SYSTEM, appKey);
AssertUtil.notNull(customSendAccount, String.format("appKey=%s自定义用户没有注册im账号", appKey));
return robotImAccount;
}
public MessageTaskResp toMessageTaskResp(MessageTask messageTask) {
@ -142,8 +154,30 @@ public class MessageController implements MessageApi {
return messageTaskResp;
}
private MessageTask toMessageTask(SendMessageParam sendMessageParam) {
MessageTask.BizData bizData = MessageTask.BizData.builder()
.jumpDatas(sendMessageParam.getJumpDatas())
.build();
Date now = new Date();
return MessageTask.builder()
.bizId(sendMessageParam.getBizId())
.sendImAccount(sendMessageParam.getSendImAccount())
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class))
.status(MessageTask.Status.PENDING)
.title(sendMessageParam.getMsgHeader())
.content(sendMessageParam.getMsgContent())
.bizData(bizData)
.ext(sendMessageParam.getExt())
.planStartTime(now)
.createAt(now)
.build();
}
private MessageTask toMessageTask(SendTemplateMessageParam sendMessageParam,
String sendImAccount) {
MessageTask.BizData bizData = MessageTask.BizData.builder()
.msgTemplateContent(sendMessageParam.getMsgTemplateContent())
.msgTemplateId(sendMessageParam.getMsgTemplateId())
@ -151,32 +185,15 @@ public class MessageController implements MessageApi {
Date now = new Date();
return MessageTask.builder()
.bizId(sendMessageParam.getBizId())
.sendPersonId(sendMessageParam.getSendPersonId())
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons())))
.sendImAccount(sendImAccount)
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class))
.status(MessageTask.Status.PENDING)
.title(sendMessageParam.getMsgHeader())
.content(sendMessageParam.getMsgContent())
.bizData(JSONObject.parseObject(JSONObject.toJSONString(bizData)))
.bizData(bizData)
.ext(sendMessageParam.getExt())
.planStartTime(now)
.createAt(now)
.build();
}
private MessageTask toMessageTask(SendCustomMessageParam customMessage) {
MessageTask.BizData bizData = MessageTask.BizData.builder()
.bizType(customMessage.getBizType())
.payload(customMessage.getPayload())
.build();
Date now = new Date();
return MessageTask.builder()
.bizId(customMessage.getBizId())
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(customMessage.getReceivePersons())))
.status(MessageTask.Status.PENDING)
.bizData(JSONObject.parseObject(JSONObject.toJSONString(bizData)))
.planStartTime(now)
.createAt(now)
.build();
}
}

View File

@ -28,7 +28,8 @@ public class RobotInfoDao extends ServiceImpl<RobotInfoMapper, RobotInfo> {
* @param robotInfoQuery 机器人标签分页查询条件
* @return 机器人分页查询结果
*/
public IPage<RobotInfo> queryRobotInfoOfPage(RobotPageQuery robotInfoQuery) {
public IPage<RobotInfo> queryRobotInfoOfPage(RobotPageQuery robotInfoQuery,
List<String> robotIds) {
return lambdaQuery().eq(RobotInfo::getIsDelete, 0)
.like(StringUtils.isNoneBlank(robotInfoQuery.getNickName()),
RobotInfo::getNickName,
@ -37,6 +38,7 @@ public class RobotInfoDao extends ServiceImpl<RobotInfoMapper, RobotInfo> {
RobotInfo::getStatus, robotInfoQuery.getStatus())
.eq(StringUtils.isNoneBlank(robotInfoQuery.getImAccount()),
RobotInfo::getImAccount, robotInfoQuery.getImAccount())
.in(!CollectionUtils.isEmpty(robotIds), RobotInfo::getRobotId, robotIds)
.orderByDesc(RobotInfo::getUpdateAt)
.page(robotInfoQuery.toPage());
}

View File

@ -1,7 +1,9 @@
package cn.axzo.im.entity;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import com.alibaba.fastjson.JSONArray;
import cn.axzo.im.config.BaseListTypeHandler;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@ -16,8 +18,8 @@ import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import org.springframework.cglib.beans.BeanMap;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.Optional;
@Data
@ -38,16 +40,16 @@ public class MessageTask {
private String bizId;
/**
* IM消息发送personId
* IM消息发送者IM的账号
*/
@TableField(value = "send_person_id")
private String sendPersonId;
@TableField(value = "send_im_account")
private String sendImAccount;
/**
* IM消息接收人person信息
*/
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONArray receivePersons;
@TableField(typeHandler = ListReceivePersonTypeHandler.class)
private List<ReceivePerson> receivePersons;
private Status status;
@ -57,8 +59,11 @@ public class MessageTask {
@TableField(value = "content")
private String content;
@TableField(value = "cardBannerUrl")
private String cardBannerUrl;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizData;
private BizData bizData;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject ext;
@ -81,6 +86,7 @@ public class MessageTask {
@TableField
private Date updateAt;
public enum Status {
PENDING,
SUCCEED,
@ -88,15 +94,9 @@ public class MessageTask {
;
}
public BizData resolveBizData() {
JSONObject bizData = Optional.ofNullable(this.getBizData())
.orElseGet(JSONObject::new);
return JSONObject.toJavaObject(bizData, BizData.class);
}
public JSONObject mergeBizData(BizData param) {
JSONObject bizData = Optional.ofNullable(this.getBizData())
.orElseGet(JSONObject::new);
JSONObject bizData = JSONObject.parseObject(JSONObject.toJSONString(Optional.ofNullable(this.getBizData())
.orElseGet(BizData::new)));
if (param == null) {
return bizData;
}
@ -125,5 +125,44 @@ public class MessageTask {
* 推送内容 - 业务数据json格式
*/
private String payload;
/**
* 跳转信息
*/
private List<SendMessageParam.JumpData> jumpDatas;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class ReceivePerson {
/**
* 接收消息的personId
*/
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private AppTypeEnum appType;
/**
* im账号可以personId和imAccount二选一
*/
private String imAccount;
}
public static class ListReceivePersonTypeHandler extends BaseListTypeHandler<ReceivePerson> {}
}

View File

@ -34,16 +34,8 @@ public class SendMessageJob extends IJobHandler {
@Autowired
private MessageTaskService messageTaskService;
@Autowired
private RateLimiterClient rateLimiterClient;
@Value("${send.message.limiter.permits:1}")
private int permits;
@Value("${send.message.limiter.seconds:60}")
private long seconds;
private static final Integer DEFAULT_PAGE_SIZE = 500;
private static final String LIMITER_KEY = "im-center:sendMessage";
@Override
@XxlJob("sendMessageJob")
@ -78,21 +70,9 @@ public class SendMessageJob extends IJobHandler {
}
private void sendMessage(List<MessageTask> messageTasks) {
RateLimiter rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder()
.windowType(RateLimiter.WindowType.SLIDING)
.limiterKey(LIMITER_KEY)
.rule(RateLimiter.LimitRule.builder()
.permits(permits)
.seconds(seconds)
.build())
.build());
messageTasks.forEach(messageTask -> {
if (rateLimiter.tryAcquire()) {
log.info("获得令牌");
return;
}
log.info("未获得令牌");
messageTasks.forEach(messageTask -> {
});
}

View File

@ -12,6 +12,7 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Set;
public interface AccountRegisterService extends IService<AccountRegister> {
@ -38,6 +39,9 @@ public interface AccountRegisterService extends IService<AccountRegister> {
@CriteriaField(field = "accountId", operator = Operator.EQ)
private String accountId;
@CriteriaField(field = "accountId", operator = Operator.IN)
private Set<String> accountIds;
/**
* 注册用户ID唯一
*/

View File

@ -1,5 +1,6 @@
package cn.axzo.im.service;
import cn.axzo.basics.common.BeanMapper;
import cn.axzo.basics.common.constant.enums.TableIsDeleteEnum;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.im.center.api.vo.req.AccountAbsentQuery;
@ -12,7 +13,9 @@ import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.RobotStatusEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.INotifyService;
import cn.axzo.im.channel.netease.dto.NimAccountInfo;
import cn.axzo.im.channel.netease.dto.RegisterRequest;
import cn.axzo.im.channel.netease.dto.RegisterResponse;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.dao.repository.AccountRegisterDao;
import cn.axzo.im.dao.repository.RobotInfoDao;
@ -265,14 +268,14 @@ public class AccountService {
register.setName(name);
UserAccountResp userAccountResp = new UserAccountResp();
//3.调用公共的网易云信IM接口创建账户 网易云信只是一种IM实现
// RegisterResponse registerResponse = imChannelProvider.registerAccount(register);
// if (registerResponse.getInfo() != null) {
// NimAccountInfo userAccount = BeanMapper.map(registerResponse.getInfo(), NimAccountInfo.class);
// userAccountResp.setImAccount(userAccount.getAccid());
// userAccountResp.setUserId(userId);
// userAccountResp.setToken(userAccount.getToken());
// }
// userAccountResp.setDesc(registerResponse.getDesc());
RegisterResponse registerResponse = imChannelProvider.registerAccount(register);
if (registerResponse.getInfo() != null) {
NimAccountInfo userAccount = BeanMapper.map(registerResponse.getInfo(), NimAccountInfo.class);
userAccountResp.setImAccount(userAccount.getAccid());
userAccountResp.setUserId(userId);
userAccountResp.setToken(userAccount.getToken());
}
userAccountResp.setDesc(registerResponse.getDesc());
return UserAccountResp.builder()
.imAccount(userWapperId)
.userId(userId)

View File

@ -0,0 +1,7 @@
package cn.axzo.im.service;
import cn.axzo.im.entity.MessageHistory;
import com.baomidou.mybatisplus.extension.service.IService;
public interface MessageHistoryService extends IService<MessageHistory> {
}

View File

@ -4,6 +4,7 @@ import cn.axzo.basics.common.BeanMapper;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
@ -180,24 +181,24 @@ public class MessageService {
}
}
private MessageDispatchRequest buildMessageDispatchRequest(SendMessageParam sendMessageParam) {
MessageDispatchRequest messageRequest = new MessageDispatchRequest();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgContent(sendMessageParam.getMsgContent());
messageBody.setMsgHeader(sendMessageParam.getMsgHeader());
messageBody.setMsgBody(sendMessageParam.getMsgTemplateContent());
Map<String, String> defaultExtMap = Maps.newHashMap();
defaultExtMap.put("msgTemplateId", sendMessageParam.getMsgTemplateId());
if (sendMessageParam.getExt() != null) {
defaultExtMap.putAll(BeanMap.create(sendMessageParam.getExt()));
}
messageBody.setMessageExtension(defaultExtMap);
String body = JSONUtil.toJsonStr(messageBody);
messageRequest.setBody(body);
return messageRequest;
}
//
// private MessageDispatchRequest buildMessageDispatchRequest(SendMessageParam sendMessageParam) {
// MessageDispatchRequest messageRequest = new MessageDispatchRequest();
// MessageBody messageBody = new MessageBody();
// messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
// messageBody.setMsgContent(sendMessageParam.getMsgContent());
// messageBody.setMsgHeader(sendMessageParam.getMsgHeader());
// messageBody.setMsgBody(sendMessageParam.getMsgTemplateContent());
// Map<String, String> defaultExtMap = Maps.newHashMap();
// defaultExtMap.put("msgTemplateId", sendMessageParam.getMsgTemplateId());
// if (sendMessageParam.getExt() != null) {
// defaultExtMap.putAll(BeanMap.create(sendMessageParam.getExt()));
// }
// messageBody.setMessageExtension(defaultExtMap);
// String body = JSONUtil.toJsonStr(messageBody);
// messageRequest.setBody(body);
// return messageRequest;
// }
// private List<MessageDispatchResp> sendBatchMessage(SendMessageParam sendMessageParam, MessageDispatchRequest messageRequest) {
// //消息模板是针对多App端,则单个用户有多个IM账户,需要分开进行发送消息
// List<AppTypeEnum> appTypeList = sendMessageParam.getAppTypeList();
@ -370,97 +371,97 @@ public class MessageService {
// return messageDispatchRespList;
// }
private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
if (CollectionUtils.isEmpty(messageRespList)) {
return;
}
String requestId = UUID.randomUUID().toString().replace("-", "");
log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setBizId(requestId);
messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
messageHistory.setToAccount(messageDispatchResp.getToImAccount());
} else {
messageHistory.setToAccount("unregistered");
}
messageHistory.setChannel(imChannel.getProviderType());
messageHistory.setAppType(messageDispatchResp.getAppType());
messageHistory.setMessageBody(messageBody);
messageHistory.setCreateAt(new Date());
messageHistory.setUpdateAt(new Date());
if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
messageHistory.setMessageId(messageDispatchResp.getMsgid());
}
try {
messageHistoryDao.saveOrUpdate(messageHistory);
Thread.sleep(50);
} catch (Exception e) {
log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
}
}));
}
// @Transactional(rollbackFor = Exception.class)
// public List<MessageCustomResp> sendCustomMessage(SendCustomMessageParam customMessage) {
// MessageCustomDispatchRequest messageRequest = new MessageCustomDispatchRequest();
// String appKey = imChannel.getProviderAppKey();
// AccountRegister customSendAccount = accountService.queryCustomAccount(
// AccountTypeEnum.CUSTOM, AppTypeEnum.SYSTEM, appKey);
// AssertUtil.notNull(customSendAccount, String.format("appKey=%s自定义用户没有注册im账号", appKey));
// messageRequest.setFrom(customSendAccount.getImAccount());
//
// //如果消息模板是针对多App端则分开进行发送消息
// List<AppTypeEnum> appTypeList = customMessage.getAppTypeList();
// List<MessageCustomResp> messageCustomRespList = Lists.newArrayList();
// appTypeList.forEach(appType -> {
// if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
// throw new ServiceException("当前服务器不支持该appType类型!");
// private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
// if (CollectionUtils.isEmpty(messageRespList)) {
// return;
// }
// String requestId = UUID.randomUUID().toString().replace("-", "");
// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
// MessageHistory messageHistory = new MessageHistory();
// messageHistory.setBizId(requestId);
// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
// messageHistory.setToAccount(messageDispatchResp.getToImAccount());
// } else {
// messageHistory.setToAccount("unregistered");
// }
// String personId = customMessage.getToPersonId();
// //进行接收用户IM账户校验
// List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery()
// .eq(AccountRegister::getIsDelete, 0)
// .eq(AccountRegister::getAccountId, personId)
// .eq(AccountRegister::getAppKey, imChannel.getProviderAppKey())
// .isNotNull(AccountRegister::getToken)
// .eq(AccountRegister::getAppType, appType.getCode()).list();
// if (CollectionUtils.isEmpty(accountRegisterList)) {
// //返回未注册的IM账户信息
// MessageCustomResp messageDispatchResp = MessageCustomResp.builder()
// .appType(appType.getCode())
// .personId(personId)
// .toImAccount(null)
// .isSuccess(false)
// .build();
// log.warn("user personId=[" + personId + "],appType[" + appType.getCode() + "],"
// + " Do not send messages if you have not registered an IM account!");
// return;
// messageHistory.setChannel(imChannel.getProviderType());
// messageHistory.setAppType(messageDispatchResp.getAppType());
// messageHistory.setMessageBody(messageBody);
// messageHistory.setCreateAt(new Date());
// messageHistory.setUpdateAt(new Date());
// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
// messageHistory.setMessageId(messageDispatchResp.getMsgid());
// }
// accountRegisterList.stream()
// .filter(a -> StringUtils.isNoneBlank(a.getImAccount(), a.getToken()))
// .forEach(a -> {
// messageRequest.setTo(a.getImAccount());
// String body = JSONUtil.toJsonStr(wrapperCustomMessage(a.getImAccount(),
// customMessage));
// messageRequest.setAttach(body);
// messageRequest.setPayload(body);
// MessageCustomDispatchResponse response = imChannel.dispatchCustomMessage(messageRequest);
// MessageCustomResp customResp = MessageCustomResp.builder()
// .appType(appType.getCode())
// .personId(personId)
// .toImAccount(a.getImAccount())
// .isSuccess(response.isSuccess())
// .build();
// messageCustomRespList.add(customResp);
// });
// });
// insertImCustomMessage(messageCustomRespList, messageRequest, customSendAccount.getImAccount());
// return messageCustomRespList;
// try {
// messageHistoryDao.saveOrUpdate(messageHistory);
// Thread.sleep(50);
// } catch (Exception e) {
// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
// }
// }));
// }
@Transactional(rollbackFor = Exception.class)
public List<MessageCustomResp> sendCustomMessage(CustomMessageInfo customMessage) {
MessageCustomDispatchRequest messageRequest = new MessageCustomDispatchRequest();
String appKey = imChannel.getProviderAppKey();
AccountRegister customSendAccount = accountService.queryCustomAccount(
AccountTypeEnum.CUSTOM, AppTypeEnum.SYSTEM, appKey);
AssertUtil.notNull(customSendAccount, String.format("appKey=%s自定义用户没有注册im账号", appKey));
messageRequest.setFrom(customSendAccount.getImAccount());
//如果消息模板是针对多App端则分开进行发送消息
List<AppTypeEnum> appTypeList = customMessage.getAppTypeList();
List<MessageCustomResp> messageCustomRespList = Lists.newArrayList();
appTypeList.forEach(appType -> {
if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
throw new ServiceException("当前服务器不支持该appType类型!");
}
String personId = customMessage.getToPersonId();
//进行接收用户IM账户校验
List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery()
.eq(AccountRegister::getIsDelete, 0)
.eq(AccountRegister::getAccountId, personId)
.eq(AccountRegister::getAppKey, imChannel.getProviderAppKey())
.isNotNull(AccountRegister::getToken)
.eq(AccountRegister::getAppType, appType.getCode()).list();
if (CollectionUtils.isEmpty(accountRegisterList)) {
//返回未注册的IM账户信息
MessageCustomResp messageDispatchResp = MessageCustomResp.builder()
.appType(appType.getCode())
.personId(personId)
.toImAccount(null)
.isSuccess(false)
.build();
log.warn("user personId=[" + personId + "],appType[" + appType.getCode() + "],"
+ " Do not send messages if you have not registered an IM account!");
return;
}
accountRegisterList.stream()
.filter(a -> StringUtils.isNoneBlank(a.getImAccount(), a.getToken()))
.forEach(a -> {
messageRequest.setTo(a.getImAccount());
String body = JSONUtil.toJsonStr(wrapperCustomMessage(a.getImAccount(),
customMessage));
messageRequest.setAttach(body);
messageRequest.setPayload(body);
MessageCustomDispatchResponse response = imChannel.dispatchCustomMessage(messageRequest);
MessageCustomResp customResp = MessageCustomResp.builder()
.appType(appType.getCode())
.personId(personId)
.toImAccount(a.getImAccount())
.isSuccess(response.isSuccess())
.build();
messageCustomRespList.add(customResp);
});
});
insertImCustomMessage(messageCustomRespList, messageRequest, customSendAccount.getImAccount());
return messageCustomRespList;
}
private void insertImCustomMessage(List<MessageCustomResp> messageRespList,
MessageCustomDispatchRequest messageRequest, String fromImAccount) {
if (CollectionUtils.isEmpty(messageRespList)) {
@ -495,13 +496,13 @@ public class MessageService {
});
}
// public static MessageCustomBody wrapperCustomMessage(String toImAccount,
// SendCustomMessageParam customMessage) {
// return MessageCustomBody.builder()
// .toImAccount(toImAccount)
// .personId(customMessage.getToPersonId())
// .bizType(customMessage.getBizType())
// .payload(Optional.ofNullable(customMessage.getPayload()).orElse("{}"))
// .build();
// }
public static MessageCustomBody wrapperCustomMessage(String toImAccount,
CustomMessageInfo customMessage) {
return MessageCustomBody.builder()
.toImAccount(toImAccount)
.personId(customMessage.getToPersonId())
.bizType(customMessage.getBizType())
.payload(Optional.ofNullable(customMessage.getPayload()).orElse("{}"))
.build();
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.im.service;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
@ -20,6 +21,8 @@ public interface MessageTaskService extends IService<MessageTask> {
Page<MessageTask> page(PageMessageTaskParam param);
List<MessageHistory> sendMessage(MessageTask messageTask);
@SuperBuilder
@Data
@NoArgsConstructor

View File

@ -25,6 +25,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.UUID;
@ -153,7 +154,10 @@ public class RobotInfoService {
}
public PageResp<RobotInfoResp> queryRobotInfoList(RobotPageQuery robotInfoQuery) {
IPage<RobotInfo> robotInfoPage = robotInfoDao.queryRobotInfoOfPage(robotInfoQuery);
List<String> robotIds = resolveRobotIds(robotInfoQuery);
IPage<RobotInfo> robotInfoPage = robotInfoDao.queryRobotInfoOfPage(robotInfoQuery, robotIds);
List<RobotInfoResp> robotInfoRespList = BeanMapper.copyList(robotInfoPage.getRecords(), RobotInfoResp.class);
PageResp<RobotInfoResp> pageOfRobotInfoResp = PageResp.list(robotInfoPage.getCurrent(), robotInfoPage.getSize(),
robotInfoPage.getTotal(), robotInfoRespList);
@ -178,6 +182,14 @@ public class RobotInfoService {
return pageOfRobotInfoResp;
}
private List<String> resolveRobotIds(RobotPageQuery robotInfoQuery) {
if (StringUtils.isBlank(robotInfoQuery.getMsgTemplateCode())) {
return Collections.emptyList();
}
// todo 这里查询也需要优化现在是把所有数据查询出来过滤的数据量过大的情况下会有性能和内存影响
return templateService.queryRobotIdByTemplate(robotInfoQuery.getMsgTemplateCode());
}
public List<RobotInfoResp> queryRunningRobotList() {
List<RobotInfo> runningRobots = robotInfoDao.queryRunningRobotList();
if (CollectionUtils.isEmpty(runningRobots)) {

View File

@ -0,0 +1,4 @@
package cn.axzo.im.service.impl;
public class MessageHistoryServiceImpl {
}

View File

@ -1,24 +1,68 @@
package cn.axzo.im.service.impl;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.NimMsgTypeEnum;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.pokonyan.client.RateLimiter;
import cn.axzo.pokonyan.client.RateLimiterClient;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, MessageTask>
implements MessageTaskService {
implements MessageTaskService, InitializingBean {
@Autowired
private RateLimiterClient rateLimiterClient;
@Autowired
private IMChannelProvider imChannelProvider;
@Autowired
private AccountRegisterService accountRegisterService;
@Value("${send.message.limiter.permits:1}")
private int permits;
@Value("${send.message.limiter.seconds:60}")
private long seconds;
/**
* 网易云信IM批量发送-每批次发送给多少用户
*/
@Value("${im-center.message.batch.receiver.once}")
public int msgSendPersonOfOneBatch = 100;
private RateLimiter rateLimiter;
private static final String LIMITER_KEY = "im-center:sendMessage";
@Override
@Transactional
public MessageTask create(MessageTask param) {
@ -38,4 +82,93 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
return this.page(PageConverter.convertToMybatis(param, MessageTask.class), wrapper);
}
@Override
public List<MessageHistory> sendMessage(MessageTask messageTask) {
if (rateLimiter.tryAcquire()) {
log.info("获得令牌");
// listAccountRegisters(messageTask);
MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
batchDispatchRequest.setFromAccid(messageTask.getSendImAccount());
batchDispatchRequest.setBody(resolveBody(messageTask));
// batchDispatchRequest.setToAccids(imAccountList);
MessageBatchDispatchResponse batchResponse = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
return null;
}
log.info("未获得令牌");
return null;
}
// private Map<String, AccountRegister> listAccountRegisters(MessageTask messageTask) {
// Set<String> personIds = messageTask.getReceivePersons().stream()
// .filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId())
// && StringUtils.isBlank(receivePerson.getImAccount()))
// .map(MessageTask.ReceivePerson::getPersonId)
// .collect(Collectors.toSet());
// if (CollectionUtils.isEmpty(personIds)) {
// return;
// }
//
// accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
// .accountIds(personIds)
// .build());
// }
// private void sendBatchMessage() {
// MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
// batchDispatchRequest.setBody(messageRequest.getBody());
// batchDispatchRequest.setFromAccid(fromAccId);
// personPage.forEach(imAccountList -> {
// batchDispatchRequest.setToAccids(imAccountList);
// MessageBatchDispatchResponse batchResponse = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
// if (batchResponse != null) {
// Map<String, Long> userMsgResponseMap = batchResponse.getMsgids();
// }
private String resolveBody(MessageTask messageTask) {
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgContent(messageTask.getContent());
messageBody.setMsgHeader(messageTask.getTitle());
Map<String, String> defaultExtMap = Maps.newHashMap();
MessageTask.BizData bizData = messageTask.resolveBizData();
if (StringUtils.isNotBlank(bizData.getMsgTemplateContent())) {
messageBody.setMsgBody(bizData.getMsgTemplateContent());
defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId());
} else {
JSONObject msgBody = new JSONObject()
.fluentPut("cardTitle", messageTask.getTitle())
.fluentPut("cardContent", messageTask.getContent())
.fluentPut("cardBannerUrl", messageTask.getCardBannerUrl())
.fluentPut("cardDetailButton", new JSONObject()
.fluentPut("title", "查看详情")
.fluentPut("action", "JUMP")
.fluentPut("actionPaths", bizData.getJumpDatas()));
messageBody.setMsgBody(msgBody.toJSONString());
}
messageBody.setMessageExtension(defaultExtMap);
return JSONUtil.toJsonStr(messageBody);
}
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder()
.windowType(RateLimiter.WindowType.SLIDING)
.limiterKey(LIMITER_KEY)
.rule(RateLimiter.LimitRule.builder()
.permits(permits)
.seconds(seconds)
.build())
.build());
}
}

View File

@ -99,7 +99,7 @@ create index idx_im_from_account
on im_message_history (from_account);
ALTER TABLE im_account_register ADD COLUMN `ou_id` bigint null comment 'organizational_unit表的id';
ALTER TABLE im_account_register ADD COLUMN `ou_id` bigint not null default 0 comment 'organizational_unit表的id';
CREATE TABLE IF NOT EXISTS im_message_task
(