feat:feature-REQ/2129 改造发消息的接口

This commit is contained in:
lilong 2024-03-15 11:49:57 +08:00
parent 6727594463
commit 757a784668
20 changed files with 998 additions and 456 deletions

View File

@ -1,11 +1,9 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import java.util.List;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
@ -22,23 +20,26 @@ import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface MessageApi {
/**
* 发送消息,单条消息批量发送消息统一入口
* 1.该接口一次请求接收人支持最大2000人
* 2.网易云信一分钟支持120次调用,每次调用IM中心设置100个账户(能返回msgId最大支持100人)
* 3.IM中心接收人有工人端和管理端账户,故当接收人最大2000人时需要调用网易云信发送4000条消息
* 4.按照每批次发送100条消息,需要发送40次
* 5.因此该接口一分钟内最大支持3次接收人为2000人的请求
*
* @param messageInfo 发送消息请求参数
* @return 发送消息请求响应
* 发送消息时只是存储在messageTask中通过xxlJob或者mq异步去处理
* 因为1为了提高接口响应性能2第三方接口有限流控制防止被限流后阻塞业务
* @param sendMessageParam 发送消息请求参数
* @return
*/
@PostMapping("api/im/message/dispatch")
ApiResult<List<MessageDispatchResp>> sendMessage(@RequestBody @Validated MessageInfo messageInfo);
@PostMapping("/api/im/message/send")
ApiResult<MessageTaskResp> sendMessage(@RequestBody @Validated SendMessageParam sendMessageParam);
/**
* 发送自定义消息
*/
@PostMapping("api/im/custom-message/send")
ApiResult<List<MessageCustomResp>> sendCustomMessage(@RequestBody @Validated CustomMessageInfo messageInfo);
/**
* 发送自定义消息:
*
* 发送消息时只是存储在messageTask中通过xxlJob或者mq异步去处理
* 因为1为了提高接口响应性能2第三方接口有限流控制防止被限流后阻塞业务
* @param messageInfo 发送消息请求参数
* @return
*/
@PostMapping("/api/im/custom-message/send")
ApiResult<MessageTaskResp> sendCustomMessage(@RequestBody @Validated SendCustomMessageParam messageInfo);
}

View File

@ -1,50 +0,0 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import java.util.List;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author syl
* @date 2023/12/21
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CustomMessageInfo {
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
@NotEmpty(message = "消息接收端类型appTypeList不能为空")
private List<AppTypeEnum> appTypeList;
/**
* 接收用户自然人Id
*/
@NotBlank(message = "接收用户自然人Id不能为空")
private String toPersonId;
/**
* 业务类型
*/
@NotNull(message = "业务类型不能为空")
private BizTypeEnum bizType;
/**
* 推送内容 - 业务数据json格式
*/
private String payload;
}

View File

@ -1,78 +0,0 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import com.google.common.collect.Maps;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* IM消息信息
*
* @author zuoqinbo
* @version V1.0
* @date 2023/10/9 16:01
*/
@Data
public class MessageInfo {
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
@NotNull(message = "消息接收端类型appTypeList不能为空")
private List<AppTypeEnum> appTypeList;
/**
* 发送用户Id,目前暂不支持非机器人发送消息
*/
private String personId;
/**
* 消息接收用户Id列表
*/
@NotNull(message = "接收消息用户personIdList不能为空")
private Set<String> toPersonIdList;
/**
* 消息标题
*/
@NotNull(message = "消息标题不能为空")
private String msgHeader;
/**
* 消息内容
*/
@NotNull(message = "消息内容不能为空")
private String msgContent;
/**
* 消息模板ID
*/
@NotNull(message = "消息模板ID不能为空")
private String msgTemplateId;
/**
* 消息模板内容
*/
@NotNull(message = "消息模板内容不能为空")
private String msgTemplateContent;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
/**
* 消息扩展信息
*/
private Map<String, String> extendsInfo = Maps.newHashMap();
}

View File

@ -0,0 +1,73 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.List;
/**
* @author syl
* @date 2023/12/21
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SendCustomMessageParam {
/**
* 消息接收用户信息
*/
@NotEmpty(message = "消息接收用户信息不能为空")
private List<SendMessageParam.ReceivePerson> receivePersons;
/**
* 业务类型
*/
@NotNull(message = "业务类型不能为空")
private BizTypeEnum bizType;
/**
* 推送内容 - 业务数据json格式
*/
private String payload;
/**
* 业务的唯一ID用于查询发送消息的记录和结果
*/
private String bizId;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
static class ReceivePerson {
/**
* 接收消息的personId
*/
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private AppTypeEnum appType;
}
}

View File

@ -0,0 +1,102 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import java.util.List;
/**
* IM消息信息
*
* @author zuoqinbo
* @version V1.0
* @date 2023/10/9 16:01
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SendMessageParam {
/**
* 发送用户Id,目前暂不支持非机器人发送消息
*/
@NotBlank(message = "发送用户id不能为空")
private String sendPersonId;
/**
* 消息接收用户信息
*/
@NotEmpty(message = "消息接收用户信息不能为空")
private List<ReceivePerson> receivePersons;
/**
* 消息标题
*/
@NotBlank(message = "消息标题不能为空")
private String msgHeader;
/**
* 消息内容
*/
@NotBlank(message = "消息内容不能为空")
private String msgContent;
/**
* 消息模板ID
*/
@NotBlank(message = "消息模板ID不能为空")
private String msgTemplateId;
/**
* 消息模板内容
*/
@NotBlank(message = "消息模板内容不能为空")
private String msgTemplateContent;
/**
* 消息扩展信息
*/
private JSONObject ext;
/**
* 业务的唯一ID用于查询发送消息的记录和结果不验证唯一
*/
private String bizId;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
static class ReceivePerson {
/**
* 接收消息的personId
*/
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private AppTypeEnum appType;
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageTaskResp {
private Long id;
/**
* 业务请求时可以带的排查问题的id
*/
private String bizId;
/**
* IM消息发送personId
*/
private String sendPersonId;
/**
* IM消息接收人person信息
*/
private JSONArray receivePersons;
private String status;
private String title;
@TableField(value = "content")
private String content;
private JSONObject bizData;
private JSONObject ext;
private Date planStartTime;
private Date startedTime;
private Date finishedTime;
private Integer isDelete;
private Date createAt;
private Date updateAt;
}

View File

@ -47,5 +47,5 @@ public class UserAccountResp {
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
private Long ouId;
}

View File

@ -17,6 +17,18 @@ import org.springframework.core.env.Environment;
@EnableDiscoveryClient
public class Application {
public static void main(String[] args) {
System.setProperty("spring.profiles.active","dev");
System.setProperty("NACOS_HOST","https://dev-nacos.axzo.cn");
System.setProperty("NACOS_PORT","443");
System.setProperty("NACOS_NAMESPACE_ID","35eada10-9574-4db8-9fea-bc6a4960b6c7");
System.setProperty("CUSTOM_ENV","dev");
System.setProperty("spring.redis.port","31270");
System.setProperty("spring.redis.host","123.249.44.111");
System.setProperty("xxl.job.admin.addresses","http://dev-xxl-job.axzo.cn/xxl-job-admin");
ConfigurableApplicationContext run = SpringApplication.run(Application.class, args);
Environment env = run.getEnvironment();
log.info(

View File

@ -1,22 +1,36 @@
package cn.axzo.im.controller;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.service.MessageService;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import java.util.List;
import javax.annotation.Resource;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.service.RobotMsgTemplateService;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
/**
* IM消息派发相关
*
@ -29,24 +43,110 @@ import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
public class MessageController implements MessageApi {
@Resource
private MessageService messageService;
@Autowired
private MessageTaskService messageTaskService;
@Autowired
private IMChannelProvider imChannel;
@Autowired
private AccountService accountService;
@Autowired
private RobotMsgTemplateService robotMsgTemplateService;
/**
* 发送消息时只是存储在messageTask中通过xxlJob或者mq异步去处理
* 因为1为了提高接口响应性能2第三方接口有限流控制防止被限流后阻塞业务
* @param sendMessageParam 发送消息请求参数
* @return
*/
@Override
public ApiResult<List<MessageDispatchResp>> sendMessage(MessageInfo messageInfo) {
List<MessageDispatchResp> messageRespList = messageService.sendMessage(messageInfo);
return ApiResult.ok(messageRespList);
public ApiResult<MessageTaskResp> sendMessage(SendMessageParam sendMessageParam) {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
private void check(SendMessageParam sendMessageParam) {
List<String> robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(sendMessageParam.getMsgTemplateId());
if (CollectionUtils.isEmpty(robotIdList)) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],还未维护机器人账户!");
}
if (CollectionUtils.size(robotIdList) > 1) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],关联了多个机器人!");
}
AccountQuery accountQuery = new AccountQuery();
String robotId = robotIdList.get(0);
accountQuery.setAccountId(robotId);
accountQuery.setAppType(AppTypeEnum.SYSTEM.getCode());
List<UserAccountResp> robotImAccountList = accountService.queryAccountInfo(accountQuery);
if (CollectionUtils.isEmpty(robotImAccountList)) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],机器人ID[" + robotId + "]," +
"未查询到机器人IM账户注册信息!");
}
if (CollectionUtils.isNotEmpty(robotImAccountList) && robotImAccountList.size() > 1) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],机器人ID[" + robotId + "],存在多个机器人IM账户!");
}
String robotImAccount = robotImAccountList.get(0).getImAccount();
if (StringUtils.isBlank(robotImAccount)) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],机器人ID[" + robotId + "],还未生成IM账户!");
}
}
@Override
public ApiResult<List<MessageCustomResp>> sendCustomMessage(CustomMessageInfo customMessage) {
List<MessageCustomResp> messageRespList = messageService.sendCustomMessage(customMessage);
return ApiResult.ok(messageRespList);
public ApiResult<MessageTaskResp> sendCustomMessage(SendCustomMessageParam customMessage) {
check();
MessageTask messageTask = messageTaskService.create(toMessageTask(customMessage));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
@ExceptionHandler({ RequestNotPermitted.class })
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
public ApiResult<String> handleRequestNotPermitted() {
return ApiResult.err("服务器资源繁忙,请求被拒绝!");
private void check() {
String appKey = imChannel.getProviderAppKey();
AccountRegister customSendAccount = accountService.queryCustomAccount(
AccountTypeEnum.CUSTOM, AppTypeEnum.SYSTEM, appKey);
AssertUtil.notNull(customSendAccount, String.format("appKey=%s自定义用户没有注册im账号", appKey));
}
public MessageTaskResp toMessageTaskResp(MessageTask messageTask) {
MessageTaskResp messageTaskResp = MessageTaskResp.builder().build();
BeanUtils.copyProperties(messageTask, messageTaskResp);
return messageTaskResp;
}
private MessageTask toMessageTask(SendMessageParam sendMessageParam) {
MessageTask.BizData bizData = MessageTask.BizData.builder()
.msgTemplateContent(sendMessageParam.getMsgTemplateContent())
.msgTemplateId(sendMessageParam.getMsgTemplateId())
.build();
Date now = new Date();
return MessageTask.builder()
.bizId(sendMessageParam.getBizId())
.sendPersonId(sendMessageParam.getSendPersonId())
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons())))
.status(MessageTask.Status.PENDING)
.title(sendMessageParam.getMsgHeader())
.content(sendMessageParam.getMsgContent())
.bizData(JSONObject.parseObject(JSONObject.toJSONString(bizData)))
.ext(sendMessageParam.getExt())
.planStartTime(now)
.createAt(now)
.build();
}
private MessageTask toMessageTask(SendCustomMessageParam customMessage) {
MessageTask.BizData bizData = MessageTask.BizData.builder()
.bizType(customMessage.getBizType())
.payload(customMessage.getPayload())
.build();
Date now = new Date();
return MessageTask.builder()
.bizId(customMessage.getBizId())
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(customMessage.getReceivePersons())))
.status(MessageTask.Status.PENDING)
.bizData(JSONObject.parseObject(JSONObject.toJSONString(bizData)))
.planStartTime(now)
.createAt(now)
.build();
}
}

View File

@ -5,6 +5,7 @@ import cn.axzo.im.channel.netease.dto.QueryEventRequest;
import cn.axzo.im.channel.netease.dto.QueryMessageRequest;
import cn.axzo.im.channel.netease.dto.RevokeMessageRequest;
import cn.axzo.im.job.RevokeAllMessagesJob;
import cn.axzo.im.job.UpdateImAccountOuIdJob;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@ -22,6 +23,7 @@ public class PrivateController {
private final NimClient nimClient;
private final RevokeAllMessagesJob revokeAllMessagesJob;
private final UpdateImAccountOuIdJob updateImAccountOuIdJob;
@PostMapping("/private/revoke")
public Object revoke(@Valid @RequestBody RevokeMessageRequest request) {
@ -43,4 +45,9 @@ public class PrivateController {
return revokeAllMessagesJob.execute(param);
}
@PostMapping("/private/im-account/ou-id/update")
public Object updateImAccountOuId(@RequestParam("param") String param) throws Exception {
return updateImAccountOuIdJob.execute(param);
}
}

View File

@ -0,0 +1,9 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.MessageTask;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.springframework.stereotype.Repository;
@Repository
public interface MessageTaskMapper extends BaseMapper<MessageTask> {
}

View File

@ -78,8 +78,8 @@ public class AccountRegister extends BaseEntity<AccountRegister> implements Ser
private String token;
/**
* 企业id
* organizational_unit表的id
*/
@TableField("organizational_unit_id")
private Long organizationalUnitId;
@TableField("ou_id")
private Long ouId;
}

View File

@ -0,0 +1,129 @@
package cn.axzo.im.entity;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import org.springframework.cglib.beans.BeanMap;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Optional;
@Data
@SuperBuilder
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@TableName(value = "`im_message_task`", autoResultMap = true)
public class MessageTask {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 业务请求时可以带的排查问题的id
*/
@TableField(value = "biz_id")
private String bizId;
/**
* IM消息发送personId
*/
@TableField(value = "send_person_id")
private String sendPersonId;
/**
* IM消息接收人person信息
*/
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONArray receivePersons;
private Status status;
@TableField(value = "title")
private String title;
@TableField(value = "content")
private String content;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizData;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject ext;
@TableField
private Date planStartTime;
@TableField
private Date startedTime;
@TableField
private Date finishedTime;
@TableField
private Integer isDelete;
@TableField
private Date createAt;
@TableField
private Date updateAt;
public enum Status {
PENDING,
SUCCEED,
FAILED,
;
}
public BizData resolveBizData() {
JSONObject bizData = Optional.ofNullable(this.getBizData())
.orElseGet(JSONObject::new);
return JSONObject.toJavaObject(bizData, BizData.class);
}
public JSONObject mergeBizData(BizData param) {
JSONObject bizData = Optional.ofNullable(this.getBizData())
.orElseGet(JSONObject::new);
if (param == null) {
return bizData;
}
return bizData.fluentPutAll(BeanMap.create(param));
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class BizData {
private String msgTemplateId;
/**
* 消息模板内容
*/
private String msgTemplateContent;
/**
* 网易云信-自定义消息使用
*/
private BizTypeEnum bizType;
/**
* 网易云信-自定义消息使用
* 推送内容 - 业务数据json格式
*/
private String payload;
}
}

View File

@ -19,10 +19,13 @@ import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -67,7 +70,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
Page<AccountRegister> page = accountRegisterService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
Map<Long, OrganizationalNodeUserBasicVO> nodeUsers = listNodeUsers(page.getRecords());
Map<String, OrganizationalNodeUserBasicVO> nodeUsers = listNodeUsers(page.getRecords());
updateAccountRegister(page.getRecords(), nodeUsers);
}
@ -80,22 +83,17 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
return ReturnT.SUCCESS;
}
private void updateAccountRegister(List<AccountRegister> accountRegisters, Map<Long, OrganizationalNodeUserBasicVO> nodeUsers) {
private void updateAccountRegister(List<AccountRegister> accountRegisters, Map<String, OrganizationalNodeUserBasicVO> nodeUsers) {
List<AccountRegister> update = accountRegisters.stream()
.filter(accountRegister -> nodeUsers.get(accountRegister.getAccountId()) != null)
.map(accountRegister -> {
OrganizationalNodeUserBasicVO nodeUser = nodeUsers.get(Long.valueOf(accountRegister.getAccountId()));
if (nodeUser == null) {
log.info("updateImAccountOuIdJob: accountRegisterId :{} not found node user",
accountRegister.getId());
return null;
}
OrganizationalNodeUserBasicVO nodeUser = nodeUsers.get(accountRegister.getAccountId());
AccountRegister result = new AccountRegister();
result.setId(accountRegister.getId());
result.setOrganizationalUnitId(nodeUser.getOrganizationalUnitId());
result.setOuId(nodeUser.getOrganizationalUnitId());
result.setUpdateAt(new Date());
return result;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(accountRegisters)) {
log.info("updateImAccountOuIdJob: no data update");
@ -105,25 +103,25 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
accountRegisterService.updateBatchById(update);
}
private Map<Long, OrganizationalNodeUserBasicVO> listNodeUsers(List<AccountRegister> accountRegisters) {
private Map<String, OrganizationalNodeUserBasicVO> listNodeUsers(List<AccountRegister> accountRegisters) {
if (CollectionUtils.isEmpty(accountRegisters)) {
return Collections.EMPTY_MAP;
}
Set<Long> accountIds = accountRegisters.stream()
.map(AccountRegister::getAccountId)
.filter(Objects::nonNull)
.filter(StringUtils::isNumeric)
.map(Long::valueOf)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(accountIds)) {
return Collections.EMPTY_MAP;
}
OrganizationalNodeUserBasicQueryVO queryVO = new OrganizationalNodeUserBasicQueryVO();
queryVO.setPersonIds(accountIds);
List<OrganizationalNodeUserBasicVO> nodeUsers = organizationalNodeUserApi.queryUserBasic(queryVO).getData();
return nodeUsers.stream()
.collect(Collectors.toMap(OrganizationalNodeUserBasicVO::getPersonId, Function.identity(), (f, s) -> s));
.collect(Collectors.toMap(e -> e.getPersonId().toString(), Function.identity(), (f, s) -> s));
}
@Data

View File

@ -1,6 +1,5 @@
package cn.axzo.im.service;
import cn.axzo.basics.common.BeanMapper;
import cn.axzo.basics.common.constant.enums.TableIsDeleteEnum;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.im.center.api.vo.req.AccountAbsentQuery;
@ -13,9 +12,7 @@ import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.RobotStatusEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.INotifyService;
import cn.axzo.im.channel.netease.dto.NimAccountInfo;
import cn.axzo.im.channel.netease.dto.RegisterRequest;
import cn.axzo.im.channel.netease.dto.RegisterResponse;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.dao.repository.AccountRegisterDao;
import cn.axzo.im.dao.repository.RobotInfoDao;
@ -122,7 +119,7 @@ public class AccountService {
String userIdWrapper = buildUserIdWrapper(userAccountReq.getUserId(), userAccountReq.getAppType(), userAccountReq.getOrganizationalUnitId());
//后续AppKey可能会更换普通用户通过userIdappTypeappKey维度来保证数据库唯一性
UserAccountResp userAccountResp = createAccountRegister(userAccountReq.getUserId(), userIdWrapper, appType,
AccountTypeEnum.USER.getCode(), userAccountReq.getHeadImageUrl(), userAccountReq.getNickName());
AccountTypeEnum.USER.getCode(), userAccountReq.getHeadImageUrl(), userAccountReq.getNickName(), userAccountReq.getOrganizationalUnitId());
if (iNotifyService != null && userAccountResp != null && StringUtils.isNotBlank(userAccountResp.getImAccount())) {
iNotifyService.notifyUserAccountChange(userAccountResp.getImAccount(), userAccountReq.getNickName(), null);
@ -170,7 +167,8 @@ public class AccountService {
}
UserAccountResp userAccountResp = createAccountRegister(userAccountReq.getUserId(), userIdWrapper, appType,
AccountTypeEnum.CUSTOM.getCode(), userAccountReq.getHeadImageUrl(), userAccountReq.getNickName());
AccountTypeEnum.CUSTOM.getCode(), userAccountReq.getHeadImageUrl(), userAccountReq.getNickName(),
userAccountReq.getOrganizationalUnitId());
if (iNotifyService != null && userAccountResp != null && StringUtils.isNotBlank(userAccountResp.getImAccount())) {
iNotifyService.notifyUserAccountChange(userAccountResp.getImAccount(), userAccountReq.getNickName(), null);
@ -199,7 +197,7 @@ public class AccountService {
throw new ServiceException("该机器人robotId:{} 还未创建信息!");
}
UserAccountResp userAccountResp = createAccountRegister(robotId, robotId, AppTypeEnum.SYSTEM.getCode(),
AccountTypeEnum.ROBOT.getCode(), robotAccountReq.getHeadImageUrl(), robotAccountReq.getNickName());
AccountTypeEnum.ROBOT.getCode(), robotAccountReq.getHeadImageUrl(), robotAccountReq.getNickName(), null);
if (userAccountResp != null && StringUtils.isNotBlank(userAccountResp.getImAccount())) {
//生成后更新机器人状态和IM账户
robotInfoService.updateRobotStatus(robotId, userAccountResp.getImAccount(), RobotStatusEnum.UN_ENABLE);
@ -211,7 +209,8 @@ public class AccountService {
}
public UserAccountResp createAccountRegister(String userId, String userIdWrapper, String appType,
String accountType, String headImageUrl, String nickName) {
String accountType, String headImageUrl, String nickName,
Long ouId) {
//1.检查账户是否已经创建
String appKey = imChannelProvider.getProviderAppKey();
UserAccountResp userAccountResp = new UserAccountResp();
@ -237,6 +236,7 @@ public class AccountService {
accountRegister.setChannelProvider(imChannelProvider.getProviderType());
accountRegister.setCreateAt(new Date());
accountRegister.setUpdateAt(new Date());
accountRegister.setOuId(ouId);
accountRegisterDao.saveOrUpdate(accountRegister);
} else {
//2.1注册出现异常
@ -244,6 +244,7 @@ public class AccountService {
userAccountResp.setDesc(accountResp.getDesc());
return userAccountResp;
}
userAccountResp.setOuId(ouId);
accountResp.setAppType(appType);
return accountResp;
}
@ -252,6 +253,7 @@ public class AccountService {
userAccountResp.setUserId(userId);
userAccountResp.setAppType(appType);
userAccountResp.setToken(accountRegister.getToken());
userAccountResp.setOuId(ouId);
return userAccountResp;
}
@ -263,15 +265,19 @@ public class AccountService {
register.setName(name);
UserAccountResp userAccountResp = new UserAccountResp();
//3.调用公共的网易云信IM接口创建账户 网易云信只是一种IM实现
RegisterResponse registerResponse = imChannelProvider.registerAccount(register);
if (registerResponse.getInfo() != null) {
NimAccountInfo userAccount = BeanMapper.map(registerResponse.getInfo(), NimAccountInfo.class);
userAccountResp.setImAccount(userAccount.getAccid());
userAccountResp.setUserId(userId);
userAccountResp.setToken(userAccount.getToken());
}
userAccountResp.setDesc(registerResponse.getDesc());
return userAccountResp;
// RegisterResponse registerResponse = imChannelProvider.registerAccount(register);
// if (registerResponse.getInfo() != null) {
// NimAccountInfo userAccount = BeanMapper.map(registerResponse.getInfo(), NimAccountInfo.class);
// userAccountResp.setImAccount(userAccount.getAccid());
// userAccountResp.setUserId(userId);
// userAccountResp.setToken(userAccount.getToken());
// }
// userAccountResp.setDesc(registerResponse.getDesc());
return UserAccountResp.builder()
.imAccount(userWapperId)
.userId(userId)
.token("testToken")
.build();
}
/**
@ -349,6 +355,7 @@ public class AccountService {
userAccountResp.setUserId(accountRegister.getAccountId());
userAccountResp.setAppType(accountRegister.getAppType());
userAccountResp.setToken(accountRegister.getToken());
userAccountResp.setOuId(accountRegister.getOuId());
return userAccountResp;
})
.collect(Collectors.toList()));
@ -361,7 +368,10 @@ public class AccountService {
userAccountReq.setAppType(appTypeEnum.getCode());
userAccountReq.setUserId(accountAbsentQuery.getPersonId());
userAccountReq.setNickName(DEFAULT_NICK_NAME + accountAbsentQuery.getPersonId());
// 管理版需要根据ou注册IM账号做数据隔离
if (appTypeEnum == AppTypeEnum.CMP) {
userAccountReq.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
}
UserAccountResp accountResp = generateAccount(userAccountReq, iNotifyService);
if (StringUtils.isEmpty(accountResp.getToken())) {
continue;

View File

@ -4,8 +4,8 @@ import cn.axzo.basics.common.BeanMapper;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
@ -35,6 +35,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cglib.beans.BeanMap;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@ -105,33 +106,40 @@ public class MessageService {
@Autowired
private Environment environment;
@Transactional(rollbackFor = Exception.class)
public List<MessageDispatchResp> sendMessage(MessageInfo messageInfo) {
String msgTemplateId = messageInfo.getMsgTemplateId();
MessageDispatchRequest messageRequest = buildMessageDispatchRequest(messageInfo);
buildMsgFromAccount(messageRequest, msgTemplateId);
//设置IM消息发送者账号
if (messageInfo.getToPersonIdList().size() > msgReceiverLimit) {
throw new ServiceException("IM消息接收用户数量超过上限:[" + msgReceiverLimit + "]");
}
List<MessageDispatchResp> messageDispatchRespList;
log.info("sendMessage发送消息,msgReceiverLimit:{},msgReceiverThreshold:{},msgSendPersonOfOneBatch:{}"
, msgReceiverLimit, msgReceiverThreshold, msgSendPersonOfOneBatch);
// 异步发送消息1同步发送接口性能不好2第三方接口有接口限流处理
int personCount = messageInfo.getToPersonIdList().size();
//小于阈值就走单个IM消息发送接口
if (personCount <= msgReceiverThreshold) {
messageDispatchRespList = sendOneByOneMessage(messageInfo, messageRequest);
} else {
log.info("sendBatchMessage批量发送消息:" + JSONUtil.toJsonStr(messageInfo));
messageDispatchRespList = sendBatchMessage(messageInfo, messageRequest);
}
List<MessageDispatchResp> saveRespList = messageDispatchRespList.stream()
.filter(i -> StringUtils.isBlank(i.getSendFailCause()))
.collect(toList());
insertImMessage(saveRespList, messageRequest.getBody());
return messageDispatchRespList;
}
/**
* 使用xxlJob异步发送第三方接口消息
* 1第三方接口有限流
* 2提高接口的性能
* @param sendMessageParam
* @return
*/
// @Transactional(rollbackFor = Exception.class)
// public List<MessageDispatchResp> sendMessage(SendMessageParam sendMessageParam) {
// String msgTemplateId = sendMessageParam.getMsgTemplateId();
// MessageDispatchRequest messageRequest = buildMessageDispatchRequest(sendMessageParam);
// buildMsgFromAccount(messageRequest, msgTemplateId);
// //设置IM消息发送者账号
// if (sendMessageParam.getToPersonIdList().size() > msgReceiverLimit) {
// throw new ServiceException("IM消息接收用户数量超过上限:[" + msgReceiverLimit + "]");
// }
// List<MessageDispatchResp> messageDispatchRespList;
// log.info("sendMessage发送消息,msgReceiverLimit:{},msgReceiverThreshold:{},msgSendPersonOfOneBatch:{}"
// , msgReceiverLimit, msgReceiverThreshold, msgSendPersonOfOneBatch);
// // 异步发送消息1同步发送接口性能不好2第三方接口有接口限流处理
// int personCount = sendMessageParam.getToPersonIdList().size();
// //小于阈值就走单个IM消息发送接口
// if (personCount <= msgReceiverThreshold) {
// messageDispatchRespList = sendOneByOneMessage(sendMessageParam, messageRequest);
// } else {
// log.info("sendBatchMessage批量发送消息:" + JSONUtil.toJsonStr(sendMessageParam));
// messageDispatchRespList = sendBatchMessage(sendMessageParam, messageRequest);
// }
// List<MessageDispatchResp> saveRespList = messageDispatchRespList.stream()
// .filter(i -> StringUtils.isBlank(i.getSendFailCause()))
// .collect(toList());
// insertImMessage(saveRespList, messageRequest.getBody());
// return messageDispatchRespList;
// }
/**
* 设置IM消息发送者账户目前只支持机器人账户发送
@ -172,115 +180,115 @@ public class MessageService {
}
}
private MessageDispatchRequest buildMessageDispatchRequest(MessageInfo messageInfo) {
private MessageDispatchRequest buildMessageDispatchRequest(SendMessageParam sendMessageParam) {
MessageDispatchRequest messageRequest = new MessageDispatchRequest();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgContent(messageInfo.getMsgContent());
messageBody.setMsgHeader(messageInfo.getMsgHeader());
messageBody.setMsgBody(messageInfo.getMsgTemplateContent());
messageBody.setMsgContent(sendMessageParam.getMsgContent());
messageBody.setMsgHeader(sendMessageParam.getMsgHeader());
messageBody.setMsgBody(sendMessageParam.getMsgTemplateContent());
Map<String, String> defaultExtMap = Maps.newHashMap();
defaultExtMap.put("msgTemplateId", messageInfo.getMsgTemplateId());
if (messageInfo.getExtendsInfo() != null) {
defaultExtMap.putAll(messageInfo.getExtendsInfo());
defaultExtMap.put("msgTemplateId", sendMessageParam.getMsgTemplateId());
if (sendMessageParam.getExt() != null) {
defaultExtMap.putAll(BeanMap.create(sendMessageParam.getExt()));
}
messageBody.setMessageExtension(defaultExtMap);
String body = JSONUtil.toJsonStr(messageBody);
messageRequest.setBody(body);
return messageRequest;
}
private List<MessageDispatchResp> sendBatchMessage(MessageInfo messageInfo, MessageDispatchRequest messageRequest) {
//消息模板是针对多App端,则单个用户有多个IM账户,需要分开进行发送消息
List<AppTypeEnum> appTypeList = messageInfo.getAppTypeList();
String fromAccId = messageRequest.getFrom();
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
//1.首先自动添加IM账户进行批量发送,返回其中IM账户未注册部分
//2.如下是默认IM账户规则
//TODO 批量这里的账户生成判断有问题生产环境有两种类型的数据123_cm和master123_cm
//TODO 先去数据库里面查询判断是否有IM账户
//消息接收者分页,然后进行批量发送
for (AppTypeEnum appTypeEnum : appTypeList) {
String appType = appTypeEnum.getCode();
if (appType == null || AppTypeEnum.isValidAppType(appType) == null) {
throw new ServiceException("当前服务器不支持该appType类型!");
}
Set<String> sourcePersonList = messageInfo.getToPersonIdList();
List<String> toPersonIMList = Lists.newArrayList();
HashMap<String, String> imAccount2PersonId = new HashMap<>();
for (String sourcePersonId : sourcePersonList) {
AccountQuery accountQuery = new AccountQuery();
accountQuery.setAppType(appType);
accountQuery.setAccountId(sourcePersonId);
List<UserAccountResp> userAccountRespList = accountService.queryAccountInfo(accountQuery);
if (CollectionUtils.isNotEmpty(userAccountRespList)) {
userAccountRespList.forEach(userAccountResp -> {
if (StringUtils.isNotEmpty(userAccountResp.getImAccount())
&& StringUtils.isNotEmpty(userAccountResp.getToken())) {
toPersonIMList.add(userAccountResp.getImAccount());
imAccount2PersonId.put(userAccountResp.getImAccount(), sourcePersonId);
}
});
} else {
log.warn("发送IM消息异常,不存在personId=[" + sourcePersonId + "]的IM账户");
MessageDispatchResp resp = new MessageDispatchResp();
resp.setAppType(appType);
resp.setTimetag(System.currentTimeMillis());
resp.setFromImAccount(messageRequest.getFrom());
resp.setToImAccount(messageRequest.getTo());
resp.setPersonId(sourcePersonId);
resp.setSendFailCause("personId=" + sourcePersonId + ", 未注册IM账户");
messageDispatchRespList.add(resp);
}
}
List<List<String>> personPage = Lists.partition(toPersonIMList, msgSendPersonOfOneBatch);
MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
batchDispatchRequest.setBody(messageRequest.getBody());
batchDispatchRequest.setFromAccid(fromAccId);
personPage.forEach(imAccountList -> {
batchDispatchRequest.setToAccids(imAccountList);
MessageBatchDispatchResponse batchResponse = imChannel.dispatchBatchMessage(batchDispatchRequest);
if (batchResponse != null) {
Map<String, Long> userMsgResponseMap = batchResponse.getMsgids();
if (userMsgResponseMap != null) {
//遍历批量返回中每一个账户对应的MessageId
userMsgResponseMap.forEach((imAccount, messageId) -> {
String personId = imAccount2PersonId.get(imAccount);
MessageDispatchResp messageDispatchResp =
buildMessageDispatchResp(String.valueOf(messageId), fromAccId,
imAccount, personId, appType, batchResponse.getTimetag(), null);
messageDispatchRespList.add(messageDispatchResp);
});
} else if (StringUtils.isNotBlank(batchResponse.getDesc())) {
for (String imAccount : imAccountList) {
String personId = imAccount2PersonId.get(imAccount);
MessageDispatchResp messageDispatchResp =
buildMessageDispatchResp(null, fromAccId,
imAccount, personId, appType, batchResponse.getTimetag(), batchResponse.getDesc());
messageDispatchRespList.add(messageDispatchResp);
}
} else {
log.error("dispatchBatchMessage请求返回出现异常:{}", JSONUtil.toJsonStr(batchResponse));
}
//返回未注册的IM账户
Set<String> unregisterAccountSets = batchResponse.getUnregister();
//字符串转义字符处理
if (CollectionUtils.isNotEmpty(unregisterAccountSets)) {
unregisterAccountSets = unregisterAccountSets.stream()
.map(account -> account.replace("\"", "")).collect(Collectors.toSet());
unregisterAccountSets.forEach(unregisterAccount -> {
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null, fromAccId,
unregisterAccount, null, appType, batchResponse.getTimetag(), null);
});
}
} else {
log.error("dispatchBatchMessage请求返回出现异常");
}
});
}
return messageDispatchRespList;
}
//
// private List<MessageDispatchResp> sendBatchMessage(SendMessageParam sendMessageParam, MessageDispatchRequest messageRequest) {
// //消息模板是针对多App端,则单个用户有多个IM账户,需要分开进行发送消息
// List<AppTypeEnum> appTypeList = sendMessageParam.getAppTypeList();
// String fromAccId = messageRequest.getFrom();
// List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
// //1.首先自动添加IM账户进行批量发送,返回其中IM账户未注册部分
// //2.如下是默认IM账户规则
// //TODO 批量这里的账户生成判断有问题生产环境有两种类型的数据123_cm和master123_cm
// //TODO 先去数据库里面查询判断是否有IM账户
// //消息接收者分页,然后进行批量发送
// for (AppTypeEnum appTypeEnum : appTypeList) {
// String appType = appTypeEnum.getCode();
// if (appType == null || AppTypeEnum.isValidAppType(appType) == null) {
// throw new ServiceException("当前服务器不支持该appType类型!");
// }
// Set<String> sourcePersonList = sendMessageParam.getToPersonIdList();
// List<String> toPersonIMList = Lists.newArrayList();
// HashMap<String, String> imAccount2PersonId = new HashMap<>();
// for (String sourcePersonId : sourcePersonList) {
// AccountQuery accountQuery = new AccountQuery();
// accountQuery.setAppType(appType);
// accountQuery.setAccountId(sourcePersonId);
// List<UserAccountResp> userAccountRespList = accountService.queryAccountInfo(accountQuery);
// if (CollectionUtils.isNotEmpty(userAccountRespList)) {
// userAccountRespList.forEach(userAccountResp -> {
// if (StringUtils.isNotEmpty(userAccountResp.getImAccount())
// && StringUtils.isNotEmpty(userAccountResp.getToken())) {
// toPersonIMList.add(userAccountResp.getImAccount());
// imAccount2PersonId.put(userAccountResp.getImAccount(), sourcePersonId);
// }
// });
// } else {
// log.warn("发送IM消息异常,不存在personId=[" + sourcePersonId + "]的IM账户");
// MessageDispatchResp resp = new MessageDispatchResp();
// resp.setAppType(appType);
// resp.setTimetag(System.currentTimeMillis());
// resp.setFromImAccount(messageRequest.getFrom());
// resp.setToImAccount(messageRequest.getTo());
// resp.setPersonId(sourcePersonId);
// resp.setSendFailCause("personId=" + sourcePersonId + ", 未注册IM账户");
// messageDispatchRespList.add(resp);
// }
// }
// List<List<String>> personPage = Lists.partition(toPersonIMList, msgSendPersonOfOneBatch);
// MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
// batchDispatchRequest.setBody(messageRequest.getBody());
// batchDispatchRequest.setFromAccid(fromAccId);
// personPage.forEach(imAccountList -> {
// batchDispatchRequest.setToAccids(imAccountList);
// MessageBatchDispatchResponse batchResponse = imChannel.dispatchBatchMessage(batchDispatchRequest);
// if (batchResponse != null) {
// Map<String, Long> userMsgResponseMap = batchResponse.getMsgids();
// if (userMsgResponseMap != null) {
// //遍历批量返回中每一个账户对应的MessageId
// userMsgResponseMap.forEach((imAccount, messageId) -> {
// String personId = imAccount2PersonId.get(imAccount);
// MessageDispatchResp messageDispatchResp =
// buildMessageDispatchResp(String.valueOf(messageId), fromAccId,
// imAccount, personId, appType, batchResponse.getTimetag(), null);
// messageDispatchRespList.add(messageDispatchResp);
// });
// } else if (StringUtils.isNotBlank(batchResponse.getDesc())) {
// for (String imAccount : imAccountList) {
// String personId = imAccount2PersonId.get(imAccount);
// MessageDispatchResp messageDispatchResp =
// buildMessageDispatchResp(null, fromAccId,
// imAccount, personId, appType, batchResponse.getTimetag(), batchResponse.getDesc());
// messageDispatchRespList.add(messageDispatchResp);
// }
// } else {
// log.error("dispatchBatchMessage请求返回出现异常:{}", JSONUtil.toJsonStr(batchResponse));
// }
// //返回未注册的IM账户
// Set<String> unregisterAccountSets = batchResponse.getUnregister();
// //字符串转义字符处理
// if (CollectionUtils.isNotEmpty(unregisterAccountSets)) {
// unregisterAccountSets = unregisterAccountSets.stream()
// .map(account -> account.replace("\"", "")).collect(Collectors.toSet());
// unregisterAccountSets.forEach(unregisterAccount -> {
// MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null, fromAccId,
// unregisterAccount, null, appType, batchResponse.getTimetag(), null);
// });
// }
// } else {
// log.error("dispatchBatchMessage请求返回出现异常");
// }
//
// });
// }
// return messageDispatchRespList;
// }
private MessageDispatchResp buildMessageDispatchResp(String messageId, String fromImAccount, String toImAccount, String personId,
String appType, Long timeTag, String desc) {
@ -304,63 +312,63 @@ public class MessageService {
}
return messageDispatchResp;
}
private List<MessageDispatchResp> sendOneByOneMessage(MessageInfo messageInfo, MessageDispatchRequest messageRequest) {
//如果消息模板是针对多App端则分开进行发送消息
List<AppTypeEnum> appTypeList = messageInfo.getAppTypeList();
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
appTypeList.forEach(appType -> {
if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
throw new ServiceException("当前服务器不支持该appType类型!");
}
List<String> toPersonList = Lists.newArrayList(messageInfo.getToPersonIdList());
//进行接收用户IM账户校验 目前支持单个用户进行IM消息发送,多个IM用户进行消息接收
for (String personId : toPersonList) {
List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery().eq(AccountRegister::getIsDelete, 0)
.eq(AccountRegister::getAccountId, personId)
.eq(AccountRegister::getAppKey, imChannel.getProviderAppKey())
.isNotNull(AccountRegister::getToken)
.eq(AccountRegister::getAppType, appType.getCode()).list();
if (CollectionUtils.isEmpty(accountRegisterList)) {
//返回未注册的IM账户信息
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null,
messageRequest.getFrom(), null,
personId, appType.getCode(), 0L, "unregistered");
log.warn("用户personId=[" + personId + "],appType[" + appType.getCode() + "],未注册IM账户,不进行消息发送!");
MessageDispatchResp resp = new MessageDispatchResp();
resp.setAppType(appType.getCode());
resp.setTimetag(System.currentTimeMillis());
resp.setFromImAccount(messageRequest.getFrom());
resp.setToImAccount(messageRequest.getTo());
resp.setPersonId(personId);
resp.setSendFailCause("personId=" + personId + ",未注册IM账户");
messageDispatchRespList.add(resp);
continue;
}
accountRegisterList.forEach(accountRegister -> {
if (StringUtils.isNotEmpty(accountRegister.getImAccount()) &&
StringUtils.isNotEmpty(accountRegister.getToken())) {
messageRequest.setTo(accountRegister.getImAccount());
messageRequest.setType(ChannelMsgTypeEnum.CUSTOM.getCode());
MessageDispatchResponse response = imChannel.dispatchMessage(messageRequest);
MessageDispatchResp messageDispatchResp = BeanMapper.map(response.getData(), MessageDispatchResp.class);
if (messageDispatchResp == null) {
messageDispatchResp = new MessageDispatchResp();
}
if (StringUtils.isNotBlank(response.getDesc())) {
messageDispatchResp.setDesc(response.getDesc());
}
messageDispatchResp.setAppType(appType.getCode());
messageDispatchResp.setFromImAccount(messageRequest.getFrom());
messageDispatchResp.setToImAccount(accountRegister.getImAccount());
messageDispatchResp.setPersonId(personId);
messageDispatchRespList.add(messageDispatchResp);
}
});
}
});
return messageDispatchRespList;
}
//
// private List<MessageDispatchResp> sendOneByOneMessage(SendMessageParam sendMessageParam, MessageDispatchRequest messageRequest) {
// //如果消息模板是针对多App端则分开进行发送消息
// List<AppTypeEnum> appTypeList = sendMessageParam.getAppTypeList();
// List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
// appTypeList.forEach(appType -> {
// if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
// throw new ServiceException("当前服务器不支持该appType类型!");
// }
// List<String> toPersonList = Lists.newArrayList(sendMessageParam.getToPersonIdList());
// //进行接收用户IM账户校验 目前支持单个用户进行IM消息发送,多个IM用户进行消息接收
// for (String personId : toPersonList) {
// List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery().eq(AccountRegister::getIsDelete, 0)
// .eq(AccountRegister::getAccountId, personId)
// .eq(AccountRegister::getAppKey, imChannel.getProviderAppKey())
// .isNotNull(AccountRegister::getToken)
// .eq(AccountRegister::getAppType, appType.getCode()).list();
// if (CollectionUtils.isEmpty(accountRegisterList)) {
// //返回未注册的IM账户信息
// MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null,
// messageRequest.getFrom(), null,
// personId, appType.getCode(), 0L, "unregistered");
// log.warn("用户personId=[" + personId + "],appType[" + appType.getCode() + "],未注册IM账户,不进行消息发送!");
// MessageDispatchResp resp = new MessageDispatchResp();
// resp.setAppType(appType.getCode());
// resp.setTimetag(System.currentTimeMillis());
// resp.setFromImAccount(messageRequest.getFrom());
// resp.setToImAccount(messageRequest.getTo());
// resp.setPersonId(personId);
// resp.setSendFailCause("personId=" + personId + ",未注册IM账户");
// messageDispatchRespList.add(resp);
// continue;
// }
// accountRegisterList.forEach(accountRegister -> {
// if (StringUtils.isNotEmpty(accountRegister.getImAccount()) &&
// StringUtils.isNotEmpty(accountRegister.getToken())) {
// messageRequest.setTo(accountRegister.getImAccount());
// messageRequest.setType(ChannelMsgTypeEnum.CUSTOM.getCode());
// MessageDispatchResponse response = imChannel.dispatchMessage(messageRequest);
// MessageDispatchResp messageDispatchResp = BeanMapper.map(response.getData(), MessageDispatchResp.class);
// if (messageDispatchResp == null) {
// messageDispatchResp = new MessageDispatchResp();
// }
// if (StringUtils.isNotBlank(response.getDesc())) {
// messageDispatchResp.setDesc(response.getDesc());
// }
// messageDispatchResp.setAppType(appType.getCode());
// messageDispatchResp.setFromImAccount(messageRequest.getFrom());
// messageDispatchResp.setToImAccount(accountRegister.getImAccount());
// messageDispatchResp.setPersonId(personId);
// messageDispatchRespList.add(messageDispatchResp);
// }
// });
// }
// });
// return messageDispatchRespList;
// }
private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
if (CollectionUtils.isEmpty(messageRespList)) {
@ -395,63 +403,63 @@ public class MessageService {
}
@Transactional(rollbackFor = Exception.class)
public List<MessageCustomResp> sendCustomMessage(CustomMessageInfo customMessage) {
MessageCustomDispatchRequest messageRequest = new MessageCustomDispatchRequest();
String appKey = imChannel.getProviderAppKey();
AccountRegister customSendAccount = accountService.queryCustomAccount(
AccountTypeEnum.CUSTOM, AppTypeEnum.SYSTEM, appKey);
AssertUtil.notNull(customSendAccount, String.format("appKey=%s自定义用户没有注册im账号", appKey));
messageRequest.setFrom(customSendAccount.getImAccount());
//如果消息模板是针对多App端则分开进行发送消息
List<AppTypeEnum> appTypeList = customMessage.getAppTypeList();
List<MessageCustomResp> messageCustomRespList = Lists.newArrayList();
appTypeList.forEach(appType -> {
if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
throw new ServiceException("当前服务器不支持该appType类型!");
}
String personId = customMessage.getToPersonId();
//进行接收用户IM账户校验
List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery()
.eq(AccountRegister::getIsDelete, 0)
.eq(AccountRegister::getAccountId, personId)
.eq(AccountRegister::getAppKey, imChannel.getProviderAppKey())
.isNotNull(AccountRegister::getToken)
.eq(AccountRegister::getAppType, appType.getCode()).list();
if (CollectionUtils.isEmpty(accountRegisterList)) {
//返回未注册的IM账户信息
MessageCustomResp messageDispatchResp = MessageCustomResp.builder()
.appType(appType.getCode())
.personId(personId)
.toImAccount(null)
.isSuccess(false)
.build();
log.warn("user personId=[" + personId + "],appType[" + appType.getCode() + "],"
+ " Do not send messages if you have not registered an IM account!");
return;
}
accountRegisterList.stream()
.filter(a -> StringUtils.isNoneBlank(a.getImAccount(), a.getToken()))
.forEach(a -> {
messageRequest.setTo(a.getImAccount());
String body = JSONUtil.toJsonStr(wrapperCustomMessage(a.getImAccount(),
customMessage));
messageRequest.setAttach(body);
messageRequest.setPayload(body);
MessageCustomDispatchResponse response = imChannel.dispatchCustomMessage(messageRequest);
MessageCustomResp customResp = MessageCustomResp.builder()
.appType(appType.getCode())
.personId(personId)
.toImAccount(a.getImAccount())
.isSuccess(response.isSuccess())
.build();
messageCustomRespList.add(customResp);
});
});
insertImCustomMessage(messageCustomRespList, messageRequest, customSendAccount.getImAccount());
return messageCustomRespList;
}
// @Transactional(rollbackFor = Exception.class)
// public List<MessageCustomResp> sendCustomMessage(SendCustomMessageParam customMessage) {
// MessageCustomDispatchRequest messageRequest = new MessageCustomDispatchRequest();
// String appKey = imChannel.getProviderAppKey();
// AccountRegister customSendAccount = accountService.queryCustomAccount(
// AccountTypeEnum.CUSTOM, AppTypeEnum.SYSTEM, appKey);
// AssertUtil.notNull(customSendAccount, String.format("appKey=%s自定义用户没有注册im账号", appKey));
// messageRequest.setFrom(customSendAccount.getImAccount());
//
// //如果消息模板是针对多App端则分开进行发送消息
// List<AppTypeEnum> appTypeList = customMessage.getAppTypeList();
// List<MessageCustomResp> messageCustomRespList = Lists.newArrayList();
// appTypeList.forEach(appType -> {
// if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
// throw new ServiceException("当前服务器不支持该appType类型!");
// }
// String personId = customMessage.getToPersonId();
// //进行接收用户IM账户校验
// List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery()
// .eq(AccountRegister::getIsDelete, 0)
// .eq(AccountRegister::getAccountId, personId)
// .eq(AccountRegister::getAppKey, imChannel.getProviderAppKey())
// .isNotNull(AccountRegister::getToken)
// .eq(AccountRegister::getAppType, appType.getCode()).list();
// if (CollectionUtils.isEmpty(accountRegisterList)) {
// //返回未注册的IM账户信息
// MessageCustomResp messageDispatchResp = MessageCustomResp.builder()
// .appType(appType.getCode())
// .personId(personId)
// .toImAccount(null)
// .isSuccess(false)
// .build();
// log.warn("user personId=[" + personId + "],appType[" + appType.getCode() + "],"
// + " Do not send messages if you have not registered an IM account!");
// return;
// }
// accountRegisterList.stream()
// .filter(a -> StringUtils.isNoneBlank(a.getImAccount(), a.getToken()))
// .forEach(a -> {
// messageRequest.setTo(a.getImAccount());
// String body = JSONUtil.toJsonStr(wrapperCustomMessage(a.getImAccount(),
// customMessage));
// messageRequest.setAttach(body);
// messageRequest.setPayload(body);
// MessageCustomDispatchResponse response = imChannel.dispatchCustomMessage(messageRequest);
// MessageCustomResp customResp = MessageCustomResp.builder()
// .appType(appType.getCode())
// .personId(personId)
// .toImAccount(a.getImAccount())
// .isSuccess(response.isSuccess())
// .build();
// messageCustomRespList.add(customResp);
// });
// });
// insertImCustomMessage(messageCustomRespList, messageRequest, customSendAccount.getImAccount());
// return messageCustomRespList;
// }
private void insertImCustomMessage(List<MessageCustomResp> messageRespList,
MessageCustomDispatchRequest messageRequest, String fromImAccount) {
@ -487,13 +495,13 @@ public class MessageService {
});
}
public static MessageCustomBody wrapperCustomMessage(String toImAccount,
CustomMessageInfo customMessage) {
return MessageCustomBody.builder()
.toImAccount(toImAccount)
.personId(customMessage.getToPersonId())
.bizType(customMessage.getBizType())
.payload(Optional.ofNullable(customMessage.getPayload()).orElse("{}"))
.build();
}
// public static MessageCustomBody wrapperCustomMessage(String toImAccount,
// SendCustomMessageParam customMessage) {
// return MessageCustomBody.builder()
// .toImAccount(toImAccount)
// .personId(customMessage.getToPersonId())
// .bizType(customMessage.getBizType())
// .payload(Optional.ofNullable(customMessage.getPayload()).orElse("{}"))
// .build();
// }
}

View File

@ -0,0 +1,51 @@
package cn.axzo.im.service;
import cn.axzo.bfs.support.data.page.IPageParam;
import cn.axzo.bfs.support.data.wrapper.CriteriaField;
import cn.axzo.bfs.support.data.wrapper.Operator;
import cn.axzo.im.entity.MessageTask;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
public interface MessageTaskService extends IService<MessageTask> {
MessageTask create(MessageTask param);
Page<MessageTask> page(PageMessageTaskParam param);
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class ListMessageTaskParam {
@CriteriaField(field = "id", operator = Operator.IN)
private List<Long> ids;
// private
}
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class PageMessageTaskParam extends ListMessageTaskParam implements IPageParam {
@CriteriaField(ignore = true)
Integer pageNumber;
@CriteriaField(ignore = true)
Integer pageSize;
/**
* 排序使用示例createTime__DESC
*/
@CriteriaField(ignore = true)
List<String> sort;
}
}

View File

@ -0,0 +1,35 @@
package cn.axzo.im.service.impl;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.MessageTaskService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
@Slf4j
@Service
public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, MessageTask>
implements MessageTaskService {
@Override
@Transactional
public MessageTask create(MessageTask param) {
// 未设置计划执行时间则计划时间为当前时间xxlJob可立即执行
if (param.getPlanStartTime() == null) {
param.setPlanStartTime(new Date());
}
this.save(param);
return this.getById(param.getId());
}
@Override
public Page<MessageTask> page(PageMessageTaskParam param) {
return null;
}
}

View File

@ -0,0 +1,51 @@
package cn.axzo.im.service;
import cn.axzo.im.Application;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.controller.AccountController;
import cn.axzo.im.entity.AccountRegister;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.TestPropertySources;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.web.servlet.MockMvc;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
@TestPropertySource(properties = {
"NACOS_HOST=https://dev-nacos.axzo.cn",
"xxl.job.admin.addresses=http://dev-xxl-job.axzo.cn/xxl-job-admin",
"xxl.job.executor.appName=im-center",
"xxl.job.executor.port=8990"
})
@SpringBootTest(classes = Application.class)
@AutoConfigureMockMvc
class AccountServiceTest {
@Autowired
private AccountController accountController;
@Autowired
protected MockMvc mockMvc;
@Autowired
private AccountRegisterService accountRegisterService;
@Test
void registerAccountIfAbsent() {
AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder()
.build();
List<AccountRegister> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
System.out.println(accountRegisters);
}
}

View File

@ -99,4 +99,30 @@ create index idx_im_from_account
on im_message_history (from_account);
ALTER TABLE im_account_register ADD COLUMN organizational_unit_id bigint null comment '企业id';
ALTER TABLE im_account_register ADD COLUMN `ou_id` bigint null comment 'organizational_unit表的id';
CREATE TABLE IF NOT EXISTS im_message_task
(
id bigint auto_increment comment '主键',
biz_id varchar(50) null comment '业务请求时可以带的排查问题的id',
send_person_id varchar(50) null comment 'IM消息发送personId自定义消息没有personId发送时会使用系统配置的机器人账号',
receive_persons json not null comment 'IM消息接收人person列表',
status varchar(32) not null default 'PENDING' comment '消息状态PENDING、SUCCEED、FAILED',
title VARCHAR(128) NOT NULL DEFAULT '' COMMENT '标题',
content VARCHAR(512) NOT NULL DEFAULT '' COMMENT '内容',
biz_data json not null comment '消息业务数据,JSON格式不同的第三方格式不同',
ext VARCHAR(1024) NOT NULL DEFAULT '{}' COMMENT '其它额外信息',
plan_start_time DATETIME(3) NOT NULL COMMENT '任务计划开始时间,时间大于改时间会对未完成的任务进行执行操作',
started_time DATETIME(3) null comment '实际开始时间',
finished_time DATETIME(3) null comment '实际完成时间',
is_delete tinyint default 0 not null comment '未删除0,删除1',
create_at datetime default CURRENT_TIMESTAMP not null comment '创建时间',
update_at datetime default CURRENT_TIMESTAMP not null comment '更新时间',
PRIMARY KEY (`id`),
key idx_message_task_biz_id (`biz_id`),
key idx_message_task_plan_start_time (`plan_start_time`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 comment '消息推送任务';
ALTER TABLE im_message_history ADD COLUMN `result` varchar(1024) NULL COMMENT 'result';
ALTER TABLE im_message_history ADD COLUMN `im_message_task_id` bigint NULL COMMENT '消息推送任务的id';