feat:feature-REQ/2129 测试发送消息

This commit is contained in:
lilong 2024-03-25 20:11:26 +08:00
parent 8d09787fc4
commit 19745cde64
12 changed files with 332 additions and 207 deletions

View File

@ -112,12 +112,15 @@ public class SendMessageParam {
@Getter
@AllArgsConstructor
public enum JumpPlatform {
PC,
CM_IOS,
CM_ANDROID,
CMP_IOS,
CMP_ANDROID
PC(null, "WEB"),
CM_IOS(AppTypeEnum.CM, "IOS"),
CM_ANDROID(AppTypeEnum.CM, "ANDROID"),
CMP_IOS(AppTypeEnum.CMP, "IOS"),
CMP_ANDROID(AppTypeEnum.CMP, "ANDROID")
;
private AppTypeEnum appType;
private String oldPlatform;
}
@Data

View File

@ -18,16 +18,6 @@ import org.springframework.core.env.Environment;
public class Application {
public static void main(String[] args) {
System.setProperty("spring.profiles.active","dev");
System.setProperty("NACOS_HOST","https://dev-nacos.axzo.cn");
System.setProperty("NACOS_PORT","443");
System.setProperty("NACOS_NAMESPACE_ID","35eada10-9574-4db8-9fea-bc6a4960b6c7");
System.setProperty("CUSTOM_ENV","dev");
System.setProperty("spring.redis.port","31270");
System.setProperty("spring.redis.host","123.249.44.111");
System.setProperty("xxl.job.admin.addresses","http://dev-xxl-job.axzo.cn/xxl-job-admin");
ConfigurableApplicationContext run = SpringApplication.run(Application.class, args);
Environment env = run.getEnvironment();
log.info(

View File

@ -14,10 +14,11 @@ import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.service.RobotMsgTemplateService;
@ -68,6 +69,8 @@ public class MessageController implements MessageApi {
private MessageService messageService;
@Autowired
private AccountRegisterService accountRegisterService;
@Autowired
private MessageHistoryService messageHistoryService;
@Override
@ -107,6 +110,14 @@ public class MessageController implements MessageApi {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
messageTaskService.createMessageHistory(messageTask);
List<MessageHistoryService.MessageHistoryDTO> messageHistories = messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder()
.imMessageTaskId(messageTask.getId())
.status(MessageHistory.Status.PENDING.name())
.build());
if (!CollectionUtils.isEmpty(messageHistories)) {
messageHistoryService.sendMessage(messageHistories);
}
return ApiResult.ok(toMessageTaskResp(messageTask));
}
@ -199,6 +210,7 @@ public class MessageController implements MessageApi {
.ext(sendMessageParam.getExt())
.planStartTime(now)
.createAt(now)
.cardBannerUrl(sendMessageParam.getCardBannerUrl())
.build();
}

View File

@ -1,9 +1,11 @@
package cn.axzo.im.controller;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.channel.netease.dto.QueryEventRequest;
import cn.axzo.im.channel.netease.dto.QueryMessageRequest;
import cn.axzo.im.channel.netease.dto.RevokeMessageRequest;
import cn.axzo.im.job.CreateMessageHistoryJob;
import cn.axzo.im.job.RevokeAllMessagesJob;
import cn.axzo.im.job.SendMessageJob;
import cn.axzo.im.job.UpdateImAccountOuIdJob;
@ -29,6 +31,8 @@ public class PrivateController {
private final UpdateImAccountOuIdJob updateImAccountOuIdJob;
private final AccountRegisterService accountRegisterService;
private final SendMessageJob sendMessageJob;
private final CreateMessageHistoryJob createMessageHistoryJob;
private final MessageController messageController;
@PostMapping("/private/revoke")
public Object revoke(@Valid @RequestBody RevokeMessageRequest request) {
@ -60,8 +64,18 @@ public class PrivateController {
return accountRegisterService.page(param);
}
@PostMapping("/private/message/send")
public Object sendMessageJob(@RequestBody SendMessageJob.SendMessageParam param) throws Exception {
@PostMapping("/private/message/history/job/do")
public Object doMessageHistory(@RequestBody CreateMessageHistoryJob.CreateMessageHistoryParam param) throws Exception {
return createMessageHistoryJob.execute(JSONObject.toJSONString(param));
}
@PostMapping("/private/message/job/do")
public Object doMessageJob(@RequestBody SendMessageJob.SendMessageParam param) throws Exception {
return sendMessageJob.execute(JSONObject.toJSONString(param));
}
@PostMapping("/private/message/send")
public Object sendMessage(@RequestBody SendMessageParam param) throws Exception {
return messageController.sendMessage(param);
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.im.exception;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.pokonyan.exception.BusinessException;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
@ -36,6 +37,12 @@ public class ExceptionAdviceHandler {
return CommonResponse.fail(e.getMessage());
}
@ExceptionHandler(BusinessException.class)
public CommonResponse<Void> businessExceptionHandler(BusinessException e) {
log.warn("业务异常", e);
return CommonResponse.fail(e.getMessage());
}
@ExceptionHandler(BindException.class)
public CommonResponse<Void> bindExceptionHandler(BindException e) {
log.warn("业务异常", e);

View File

@ -35,18 +35,18 @@ public class CreateMessageHistoryJob extends IJobHandler {
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@XxlJob("sendMessageJob")
@XxlJob("createMessageHistoryJob")
public ReturnT<String> execute(String s) throws Exception {
log.info("start sendMessageJob,s:{}", s);
SendMessageParam sendMessageParam = Optional.ofNullable(s)
.map(e -> JSONObject.parseObject(e, SendMessageParam.class))
.orElseGet(() -> SendMessageParam.builder().build());
log.info("start createMessageHistoryJob,s:{}", s);
CreateMessageHistoryParam createMessageHistoryParam = Optional.ofNullable(s)
.map(e -> JSONObject.parseObject(e, CreateMessageHistoryParam.class))
.orElseGet(() -> CreateMessageHistoryParam.builder().build());
Integer pageNumber = 1;
Date now = new Date();
while (true) {
MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder()
.ids(sendMessageParam.getIds())
.ids(createMessageHistoryParam.getIds())
.planStartTimeLE(now)
.status(MessageTask.Status.PENDING)
.pageNumber(pageNumber++)
@ -56,7 +56,12 @@ public class CreateMessageHistoryJob extends IJobHandler {
Page<MessageTask> page = messageTaskService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
page.getRecords().forEach(messageTask -> {
messageTaskService.createMessageHistory(messageTask);
try {
messageTaskService.createMessageHistory(messageTask);
} catch (Exception exception) {
log.warn("messageTask 执行失败,{}, ex", messageTask.getId(), exception);
}
});
}
@ -64,7 +69,7 @@ public class CreateMessageHistoryJob extends IJobHandler {
break;
}
}
log.info("end sendMessageJob");
log.info("end createMessageHistoryJob");
return ReturnT.SUCCESS;
}
@ -72,7 +77,7 @@ public class CreateMessageHistoryJob extends IJobHandler {
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class SendMessageParam {
public static class CreateMessageHistoryParam {
private List<Long> ids;
}
}

View File

@ -1,21 +1,27 @@
package cn.axzo.im.job;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import com.google.common.collect.Maps;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@Component
@ -28,8 +34,36 @@ public class SendMessageJob extends IJobHandler {
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@XxlJob("sendMessageJob")
public ReturnT<String> execute(String s) throws Exception {
return null;
log.info("start sendMessageJob,s:{}", s);
SendMessageParam sendMessageParam = Optional.ofNullable(s)
.map(e -> JSONObject.parseObject(e, SendMessageParam.class))
.orElseGet(() -> SendMessageParam.builder().build());
Integer pageNumber = 1;
Date now = new Date();
while (true) {
MessageHistoryService.PageMessageHistoryParam req = MessageHistoryService.PageMessageHistoryParam.builder()
.ids(sendMessageParam.getIds())
.status(MessageHistory.Status.PENDING.name())
.pageNumber(pageNumber++)
.pageSize(DEFAULT_PAGE_SIZE)
.build();
Page<MessageHistoryService.MessageHistoryDTO> page = messageHistoryService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
Map<Long, List<MessageHistoryService.MessageHistoryDTO>> messageHistories = page.getRecords().stream()
.collect(Collectors.groupingBy(MessageHistoryService.MessageHistoryDTO::getImMessageTaskId));
messageHistories.values().forEach(messageHistoryService::sendMessage);
}
if (!page.hasNext()) {
break;
}
}
log.info("end sendMessageJob");
return ReturnT.SUCCESS;
}

View File

@ -36,6 +36,9 @@ public interface AccountRegisterService extends IService<AccountRegister> {
@CriteriaField(field = "appType", operator = Operator.EQ)
private String appType;
@CriteriaField(field = "appType", operator = Operator.IN)
private Set<String> appTypes;
/**
* 注册用户ID唯一
* 普通用户personId机器人robotId
@ -52,6 +55,9 @@ public interface AccountRegisterService extends IService<AccountRegister> {
@CriteriaField(field = "imAccount", operator = Operator.EQ)
private String imAccount;
@CriteriaField(field = "imAccount", operator = Operator.IN)
private Set<String> imAccounts;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号

View File

@ -24,6 +24,8 @@ public interface MessageHistoryService extends IService<MessageHistory> {
List<MessageHistoryDTO> list(ListMessageHistoryParam param);
void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories);
@SuperBuilder
@Data
@NoArgsConstructor

View File

@ -1,5 +1,6 @@
package cn.axzo.im.service;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;

View File

@ -2,22 +2,31 @@ package cn.axzo.im.service.impl;
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.maokai.api.client.OrganizationalUnitApi;
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.client.RateLimiter;
import cn.axzo.pokonyan.client.RateLimiterClient;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -25,18 +34,47 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
@Slf4j
@Service
public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper, MessageHistory>
implements MessageHistoryService {
implements MessageHistoryService, InitializingBean {
@Autowired
private OrganizationalUnitApi organizationalUnitApi;
@Autowired
private UserProfileServiceApi userProfileServiceApi;
@Autowired
private RateLimiterClient rateLimiterClient;
@Autowired
private IMChannelProvider imChannelProvider;
@Value("${send.message.limiter.permits:1}")
private int permits;
@Value("${send.message.limiter.seconds:60}")
private long seconds;
/**
* 网易云信IM批量发送-每批次发送给多少用户
*/
@Value("${im-center.message.batch.receiver.once:10}")
public int msgSendPersonOfOneBatch;
private RateLimiter rateLimiter;
private static final String LIMITER_KEY = "im-center:sendMessage";
/**
* 默认超时时间毫秒
*/
private static final long DEFAULT_TIME_OUT_MILLIS = 60 * 1000;
@Override
public Page<MessageHistoryDTO> page(PageMessageHistoryParam param) {
@ -66,6 +104,64 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
});
}
@Override
public void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories) {
if (CollectionUtils.isEmpty(messageHistories)) {
log.info("发送的消息记录为空");
return;
}
// 三方接口调用频率120次/超限将限制1分钟使用所以抛出异常等待下一次xxlJob做重试补偿
if (!rateLimiter.tryAcquire(DEFAULT_TIME_OUT_MILLIS)) {
log.info("未获得令牌");
throw ACQUIRE_RATE_LIMITER_FAIL.toException();
}
log.info("获得令牌");
MessageHistoryDTO messageHistoryDTO = messageHistories.stream().findFirst().get();
MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
batchDispatchRequest.setBody(messageHistoryDTO.getMessageBody());
batchDispatchRequest.setFromAccid(messageHistoryDTO.getFromAccount());
batchDispatchRequest.setToAccids(Lists.transform(messageHistories, MessageHistoryService.MessageHistoryDTO::getToAccount));
MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
if (response.isSuccess()) {
// 发送成功的IMAccountId -> msgId
Map<String, Long> msgids = response.getMsgids();
// unregister的账号
Set<String> unregister = Optional.ofNullable(response.getUnregister())
.orElseGet(Sets::newHashSet);
List<MessageHistory> updateMessageHistories = messageHistories.stream()
.map(e -> {
MessageHistory messageHistory = MessageHistory.builder()
.id(e.getId())
.build();
if (unregister.contains(e.getToAccount())) {
messageHistory.setStatus(MessageHistory.Status.FAILED);
messageHistory.setResult("IM账号未在网易云信注册");
} else {
messageHistory.setStatus(MessageHistory.Status.SUCCEED);
messageHistory.setMessageId(msgids.get(e.getToAccount()).toString());
}
return messageHistory;
})
.collect(Collectors.toList());
this.updateBatchById(updateMessageHistories);
return;
}
List<MessageHistory> failedMessageHistories = messageHistories.stream()
.map(e -> MessageHistory.builder()
.id(e.getId())
.result(response.getDesc())
.status(MessageHistory.Status.FAILED)
.build())
.collect(Collectors.toList());
this.updateBatchById(failedMessageHistories);
}
private Map<String, PersonProfileDto> listReceiveUserPersonProfile(PageMessageHistoryParam param,
List<MessageHistory> messageHistories) {
if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveUserInfo())) {
@ -108,4 +204,17 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
.stream()
.collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f));
}
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder()
.windowType(RateLimiter.WindowType.SLIDING)
.limiterKey(LIMITER_KEY)
.rule(RateLimiter.LimitRule.builder()
.permits(permits)
.seconds(seconds)
.build())
.build());
}
}

View File

@ -1,9 +1,10 @@
package cn.axzo.im.service.impl;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.NimMsgTypeEnum;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
import cn.axzo.im.entity.AccountRegister;
@ -12,8 +13,6 @@ import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.pokonyan.client.RateLimiter;
import cn.axzo.pokonyan.client.RateLimiterClient;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import cn.axzo.pokonyan.exception.Aassert;
@ -25,12 +24,9 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -42,18 +38,16 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_NOT_FOUND;
@Slf4j
@Service
public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, MessageTask>
implements MessageTaskService, InitializingBean {
implements MessageTaskService {
@Autowired
private RateLimiterClient rateLimiterClient;
@Autowired
private IMChannelProvider imChannelProvider;
@Autowired
@ -61,26 +55,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
@Autowired
private MessageHistoryService messageHistoryService;
@Value("${send.message.limiter.permits:1}")
private int permits;
@Value("${send.message.limiter.seconds:60}")
private long seconds;
/**
* 网易云信IM批量发送-每批次发送给多少用户
*/
@Value("${im-center.message.batch.receiver.once:10}")
public int msgSendPersonOfOneBatch;
private RateLimiter rateLimiter;
private static final String LIMITER_KEY = "im-center:sendMessage";
/**
* 默认超时时间毫秒
*/
private static final long DEFAULT_TIME_OUT_MILLIS = 60 * 1000;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@Transactional
@ -113,7 +88,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
MessageTask.BizData bizData = messageTask.getBizData();
if (bizData.isAllPerson()) {
log.info("发送全员消息");
doSendAll(messageTask);
doSendAll(messageTask, bizData);
} else {
log.info("发送非全员消息");
doSendNotAll(messageTask);
@ -143,20 +118,36 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
this.updateById(updateMessageTask);
}
private void doSendAll(MessageTask messageTask) {
private void doSendAll(MessageTask messageTask, MessageTask.BizData bizData) {
Integer pageNumber = 1;
while (true) {
Page<AccountRegisterService.AccountRegisterDTO> page = accountRegisterService.page(AccountRegisterService.PageAccountRegisterParam.builder()
.accountType(AccountTypeEnum.USER.getCode())
.appTypes(bizData.getAppTypes().stream().map(AppTypeEnum::getCode).collect(Collectors.toSet()))
.pageNumber(pageNumber++)
.pageSize(DEFAULT_PAGE_SIZE)
.build());
if (!CollectionUtils.isEmpty(page.getRecords())) {
List<MessageTask.ReceivePerson> receivePersons = page.getRecords().stream()
.map(e -> MessageTask.ReceivePerson.builder().imAccount(e.getImAccount()).build())
.collect(Collectors.toList());
saveMessageHistory(receivePersons, messageTask);
}
if (!page.hasNext()) {
break;
}
}
}
private void doSendNotAll(MessageTask messageTask) {
List<List<MessageTask.ReceivePerson>> receivePersons = Lists.partition(messageTask.getReceivePersons(), msgSendPersonOfOneBatch);
String messageBody = resolveBody(messageTask);
receivePersons.forEach(e -> saveMessageHistory(e, messageTask, messageBody));
// 防止sql过长
List<List<MessageTask.ReceivePerson>> receivePersons = Lists.partition(messageTask.getReceivePersons(), DEFAULT_PAGE_SIZE);
receivePersons.forEach(e -> saveMessageHistory(e, messageTask));
}
private void saveMessageHistory(List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask,
String messageBody) {
MessageTask messageTask) {
// 排除已经发送成功的记录防止重复发送
Set<String> existPersons = listExistPerson(receivePersons, messageTask);
Set<String> existImAccounts = listExistImAccount(receivePersons, messageTask);
@ -176,32 +167,51 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
Map<String, String> accountRegisters = listAccountRegisters(absentReceivePersons);
Map<String, AccountRegisterService.AccountRegisterDTO> imAccounts = listImAccount(absentReceivePersons);
List<MessageHistory> messageHistories = absentReceivePersons.stream()
.map(receivePerson -> {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setImMessageTaskId(messageTask.getId());
messageHistory.setFromAccount(messageTask.getSendImAccount());
messageHistory.setChannel(imChannelProvider.getProviderType());
messageHistory.setAppType(receivePerson.getAppType().name());
messageHistory.setMessageBody(messageBody);
messageHistory.setCreateAt(new Date());
messageHistory.setUpdateAt(new Date());
messageHistory.setReceiveOuId(receivePerson.getOuId());
messageHistory.setReceivePersonId(receivePerson.getPersonId());
if (StringUtils.isNotBlank(receivePerson.getImAccount())) {
messageHistory.setToAccount(receivePerson.getImAccount());
} else {
String key = receivePerson.buildKey();
messageHistory.setToAccount(accountRegisters.get(key));
messageHistory.setResult("未找到IM账号");
messageHistory.setStatus(MessageHistory.Status.FAILED);
}
return messageHistory;
})
.map(receivePerson -> resolveMessageHistory(messageTask, receivePerson, imAccounts, accountRegisters))
.collect(Collectors.toList());
messageHistoryService.saveBatch(messageHistories);
}
private MessageHistory resolveMessageHistory(MessageTask messageTask, MessageTask.ReceivePerson receivePerson, Map<String, AccountRegisterService.AccountRegisterDTO> imAccounts, Map<String, String> accountRegisters) {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setBizId(Optional.ofNullable(messageTask.getBizId()).orElseGet(() -> messageTask.getId().toString()));
messageHistory.setImMessageTaskId(messageTask.getId());
messageHistory.setFromAccount(messageTask.getSendImAccount());
messageHistory.setChannel(imChannelProvider.getProviderType());
messageHistory.setCreateAt(new Date());
if (StringUtils.isNotBlank(receivePerson.getImAccount())) {
AccountRegisterService.AccountRegisterDTO imAccount = imAccounts.get(receivePerson.getImAccount());
if (imAccount == null) {
messageHistory.setToAccount(receivePerson.getImAccount());
messageHistory.setResult("IM账号未在IM-CENTER注册");
messageHistory.setStatus(MessageHistory.Status.FAILED);
// 因为appType不能为空所以随便填写一个
messageHistory.setAppType(AppTypeEnum.CM.getCode());
} else {
messageHistory.setReceiveOuId(imAccount.getOuId());
messageHistory.setReceivePersonId(imAccount.getAccountId());
messageHistory.setAppType(imAccount.getAppType());
messageHistory.setToAccount(receivePerson.getImAccount());
}
} else {
String key = receivePerson.buildKey();
String imAccount = accountRegisters.get(key);
messageHistory.setReceiveOuId(receivePerson.getOuId());
messageHistory.setReceivePersonId(receivePerson.getPersonId());
messageHistory.setAppType(receivePerson.getAppType().getCode());
messageHistory.setToAccount(imAccount);
if (StringUtils.isBlank(imAccount)) {
messageHistory.setResult("未找到IM账号");
messageHistory.setStatus(MessageHistory.Status.FAILED);
}
}
messageHistory.setMessageBody(resolveBody(messageTask, messageHistory.getAppType()));
return messageHistory;
}
private Set<String> listExistImAccount(List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask) {
Set<String> imAccounts = receivePersons.stream()
@ -241,61 +251,6 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.collect(Collectors.toSet());
}
private void sendMessage(List<MessageHistory> messageHistories,
MessageBatchDispatchRequest batchDispatchRequest) {
// 三方接口调用频率120次/超限将限制1分钟使用所以抛出异常等待下一次xxlJob做重试补偿
if (!rateLimiter.tryAcquire(DEFAULT_TIME_OUT_MILLIS)) {
log.info("未获得令牌");
throw ACQUIRE_RATE_LIMITER_FAIL.toException();
}
log.info("获得令牌");
List<String> toAccids = messageHistories.stream()
.map(MessageHistory::getToAccount)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(toAccids)) {
log.info("没有需要发送消息的账号");
return;
}
batchDispatchRequest.setToAccids(toAccids);
MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
if (response.isSuccess()) {
// 发送成功的IMAccountId -> msgId
Map<String, Long> msgids = response.getMsgids();
// unregister的账号
Set<String> unregister = Optional.ofNullable(response.getUnregister())
.orElseGet(Sets::newHashSet);
List<MessageHistory> updateMessageHistories = messageHistories.stream()
.peek(e -> {
if (unregister.contains(e.getToAccount())) {
e.setStatus(MessageHistory.Status.FAILED);
e.setResult("IM账号未在网易云信注册");
} else {
e.setStatus(MessageHistory.Status.SUCCEED);
e.setMessageId(msgids.get(e.getToAccount()).toString());
}
})
.collect(Collectors.toList());
messageHistoryService.saveBatch(updateMessageHistories);
return;
}
List<MessageHistory> failedMessageHistories = messageHistories.stream()
.peek(e -> {
e.setResult(response.getDesc());
e.setStatus(MessageHistory.Status.FAILED);
})
.collect(Collectors.toList());
messageHistoryService.saveBatch(failedMessageHistories);
}
private Map<String, String> listAccountRegisters(List<MessageTask.ReceivePerson> receivePersons) {
Set<String> personIds = receivePersons.stream()
.filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId())
@ -313,19 +268,24 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.collect(Collectors.toMap(accountRegister -> accountRegister.getAccountId() +
"_" + accountRegister.getAppType() + "_" + accountRegister.getOuId(), AccountRegister::getImAccount));
}
//
// private void sendBatchMessage() {
// MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
// batchDispatchRequest.setBody(messageRequest.getBody());
// batchDispatchRequest.setFromAccid(fromAccId);
// personPage.forEach(imAccountList -> {
// batchDispatchRequest.setToAccids(imAccountList);
// MessageBatchDispatchResponse batchResponse = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
// if (batchResponse != null) {
// Map<String, Long> userMsgResponseMap = batchResponse.getMsgids();
// }
//
private String resolveBody(MessageTask messageTask) {
private Map<String, AccountRegisterService.AccountRegisterDTO> listImAccount(List<MessageTask.ReceivePerson> receivePersons) {
Set<String> imAccounts = receivePersons.stream()
.filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getImAccount()))
.map(MessageTask.ReceivePerson::getImAccount)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(imAccounts)) {
return Collections.emptyMap();
}
return accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
.imAccounts(imAccounts)
.build())
.stream()
.collect(Collectors.toMap(AccountRegisterService.AccountRegisterDTO::getImAccount, Function.identity()));
}
private String resolveBody(MessageTask messageTask, String appType) {
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
@ -339,62 +299,44 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId());
} else {
JSONObject msgBody = new JSONObject()
// 必填字段非模板消息的业务没有这个参数
.fluentPut("isHighlight", false)
.fluentPut("cardTitle", messageTask.getTitle())
.fluentPut("cardContent", messageTask.getContent())
.fluentPut("cardBannerUrl", messageTask.getCardBannerUrl())
.fluentPut("cardDetailButton", new JSONObject()
.fluentPut("title", "查看详情")
.fluentPut("action", "JUMP")
.fluentPut("actionPaths", bizData.getJumpData()));
.fluentPut("cardBannerUrl", messageTask.getCardBannerUrl());
if (!CollectionUtils.isEmpty(bizData.getJumpData())) {
List<JSONObject> actionPaths = bizData.getJumpData().stream()
.map(e -> {
String platform;
if (e.getPlatform() == SendMessageParam.JumpPlatform.PC) {
platform = SendMessageParam.JumpPlatform.PC.getOldPlatform();
} else if (Objects.equals(e.getPlatform().getAppType().getCode(), appType)) {
platform = e.getPlatform().getOldPlatform();
} else {
return null;
}
return new JSONObject()
.fluentPut("platform", platform)
.fluentPut("url", e.getUrl());
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
msgBody.fluentPut("cardDetailButton", new JSONObject()
.fluentPut("title", "查看详情")
.fluentPut("action", "JUMP")
.fluentPut("actionPaths", actionPaths));
}
messageBody.setMsgBody(msgBody.toJSONString());
}
if (messageTask.getExt() != null) {
defaultExtMap.putAll((Map) JSON.parseObject(JSONObject.toJSONString(messageTask.getExt())));
}
// 传入app版本号保证消息能被正常打开
defaultExtMap.putIfAbsent("minAppVersion", "2.1.0");
messageBody.setMessageExtension(defaultExtMap);
return JSONUtil.toJsonStr(messageBody);
}
// private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
// if (CollectionUtils.isEmpty(messageRespList)) {
// return;
// }
// String requestId = UUID.randomUUID().toString().replace("-", "");
// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
// MessageHistory messageHistory = new MessageHistory();
// messageHistory.setBizId(requestId);
// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
// messageHistory.setToAccount(messageDispatchResp.getToImAccount());
// } else {
// messageHistory.setToAccount("unregistered");
// }
// messageHistory.setChannel(imChannel.getProviderType());
// messageHistory.setAppType(messageDispatchResp.getAppType());
// messageHistory.setMessageBody(messageBody);
// messageHistory.setCreateAt(new Date());
// messageHistory.setUpdateAt(new Date());
// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
// messageHistory.setMessageId(messageDispatchResp.getMsgid());
// }
// try {
// messageHistoryDao.saveOrUpdate(messageHistory);
// Thread.sleep(50);
// } catch (Exception e) {
// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
// }
// }));
// }
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder()
.windowType(RateLimiter.WindowType.SLIDING)
.limiterKey(LIMITER_KEY)
.rule(RateLimiter.LimitRule.builder()
.permits(permits)
.seconds(seconds)
.build())
.build());
}
}