From 45fb18d92b33efbfabd46dcc5dddc19f5314439d Mon Sep 17 00:00:00 2001 From: lilong Date: Sat, 23 Mar 2024 21:46:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:feature-REQ/2129=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E5=8E=86=E5=8F=B2=E8=AE=B0=E5=BD=95=E5=92=8C?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E7=9A=84=E6=9F=A5=E8=AF=A2=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- im-center-api/pom.xml | 11 + .../center/api/feign/AccountRegisterApi.java | 145 +++++++++ .../axzo/im/center/api/feign/MessageApi.java | 5 +- .../center/api/feign/MessageHistoryApi.java | 128 ++++++++ .../api/vo/req/AsyncSendMessageParam.java | 133 ++++++++ .../im/channel/netease/NimChannelService.java | 2 +- .../dto/MessageBatchDispatchResponse.java | 7 +- .../java/cn/axzo/im/config/BizResultCode.java | 8 +- .../controller/AccountRegisterController.java | 53 ++++ .../axzo/im/controller/MessageController.java | 52 ++- .../controller/MessageHistoryController.java | 54 ++++ .../cn/axzo/im/entity/AccountRegister.java | 26 +- .../cn/axzo/im/entity/MessageHistory.java | 44 ++- .../java/cn/axzo/im/entity/MessageTask.java | 44 +++ .../axzo/im/job/CreateMessageHistoryJob.java | 78 +++++ .../java/cn/axzo/im/job/SendMessageJob.java | 54 +--- .../axzo/im/job/UpdateImAccountOuIdJob.java | 7 +- .../im/service/AccountRegisterService.java | 35 +- .../cn/axzo/im/service/AccountService.java | 2 +- .../im/service/MessageHistoryService.java | 83 +++++ .../cn/axzo/im/service/MessageService.java | 62 ++-- .../axzo/im/service/MessageTaskService.java | 29 +- .../impl/AccountRegisterServiceImpl.java | 84 ++++- .../impl/MessageHistoryServiceImpl.java | 109 ++++++- .../service/impl/MessageTaskServiceImpl.java | 300 ++++++++++++++++-- sql/init.sql | 5 + 26 files changed, 1418 insertions(+), 142 deletions(-) create mode 100644 im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java create mode 100644 im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java create mode 100644 im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java create mode 100644 im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java create mode 100644 im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java create mode 100644 im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java diff --git a/im-center-api/pom.xml b/im-center-api/pom.xml index 41578f6..22e0a22 100644 --- a/im-center-api/pom.xml +++ b/im-center-api/pom.xml @@ -34,6 +34,17 @@ cn.axzo.framework axzo-common-web + + + cn.axzo.basics + basics-profiles-api + + + + cn.axzo.maokai + maokai-api + + diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java new file mode 100644 index 0000000..c59a5f7 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java @@ -0,0 +1,145 @@ +package cn.axzo.im.center.api.feign; + +import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; +import cn.axzo.framework.domain.web.result.ApiListResult; +import cn.axzo.framework.domain.web.result.ApiPageResult; +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +import java.util.Date; +import java.util.List; +import java.util.Set; + +@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}") +public interface AccountRegisterApi { + + @PostMapping("/api/im/account/register/page") + ApiPageResult page(PageAccountRegisterParam param); + + @PostMapping("/api/im/account/register/list") + ApiListResult list(ListAccountRegisterParam param); + + @Data + @SuperBuilder + @NoArgsConstructor + @AllArgsConstructor + class AccountRegisterDTO { + + private Long id; + + /** + * 账户 机器人robotId、普通用户userId + */ + private String accountId; + + /** + * 普通用户,通过appType和ouId包装 + * 包装以后进行账户注册 + */ + private String accountWrapper; + + /** + * IM账户 + */ + private String imAccount; + + /** + * 终端类型 + * + * @see AppTypeEnum + */ + private String appType; + + /** + * 网易云信appKey + */ + private String appKey; + + /** + * channel 服务提供商 + */ + private String channelProvider; + + /** + * 账户类型:机器人、普通用户 + */ + private String accountType; + + /** + * IM注册 token + */ + private String token; + + /** + * organizational_unit表的id + */ + private Long ouId; + + private Integer isDelete; + + private Date createAt; + + private Date updateAt; + + private PersonProfileDto personProfile; + + private OrganizationalUnitVO organizationalUnit; + } + + @SuperBuilder + @Data + @NoArgsConstructor + @AllArgsConstructor + class ListAccountRegisterParam { + + private List ids; + + private String appType; + + /** + * 注册用户ID唯一 + * 普通用户personId、机器人robotId + */ + private String accountId; + + private Set accountIds; + + /** + * 注册用户ID唯一 + */ + private String imAccount; + + /** + * appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号, + * 所以需要根据organizationalUnitId获取账号 + */ + private Long organizationalUnitId; + + private String accountType; + + private boolean needOuInfo; + + private boolean needUserInfo; + } + + @SuperBuilder + @Data + @NoArgsConstructor + @AllArgsConstructor + class PageAccountRegisterParam extends ListAccountRegisterParam { + Integer pageNumber; + + Integer pageSize; + + /** + * 排序:使用示例,createTime__DESC + */ + List sort; + } +} diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java index 0ead321..6c2a57a 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java @@ -1,6 +1,7 @@ package cn.axzo.im.center.api.feign; import cn.axzo.framework.domain.web.result.ApiResult; +import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam; import cn.axzo.im.center.api.vo.req.CustomMessageInfo; import cn.axzo.im.center.api.vo.req.MessageInfo; import cn.axzo.im.center.api.vo.req.SendCustomMessageParam; @@ -33,10 +34,10 @@ public interface MessageApi { * @return */ @PostMapping("/api/im/message/async/send") - ApiResult sendMessageAsync(@RequestBody @Validated SendMessageParam sendMessageParam); + ApiResult sendMessageAsync(@RequestBody @Validated AsyncSendMessageParam sendMessageParam); /** - * 同步发送消息,不建议使用,因为第三方接口有限流,会影响接口性能 + * 同步发送消息,不建议使用,因为第三方接口有限流,会影响接口性能,只能给最多10个用户发送 * @param sendMessageParam * @return */ diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java new file mode 100644 index 0000000..df9f61e --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java @@ -0,0 +1,128 @@ +package cn.axzo.im.center.api.feign; + +import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; +import cn.axzo.framework.domain.web.result.ApiListResult; +import cn.axzo.framework.domain.web.result.ApiPageResult; +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +import java.util.Date; +import java.util.List; +import java.util.Set; + +@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}") +public interface MessageHistoryApi { + + @PostMapping("/api/im/message/history/page") + ApiPageResult page(PageMessageHistoryParam param); + + @PostMapping("/api/im/message/history/list") + ApiListResult list(ListMessageHistoryParam param); + + @SuperBuilder + @Data + @NoArgsConstructor + @AllArgsConstructor + class ListMessageHistoryParam { + + private List ids; + + private Long imMessageTaskId; + + private Set receivePersonId; + + private Set toAccount; + + private boolean needReceiveOuInfo; + + private boolean needReceiveUserInfo; + } + + @SuperBuilder + @Data + @NoArgsConstructor + @AllArgsConstructor + class PageMessageHistoryParam extends ListMessageHistoryParam { + Integer pageNumber; + + Integer pageSize; + + /** + * 排序:使用示例,createTime__DESC + */ + List sort; + } + + @Data + @SuperBuilder + @NoArgsConstructor + @AllArgsConstructor + class MessageHistoryDTO { + + private Long id; + + /** + * 上游业务请求ID + */ + private String bizId; + + /** + * 普通用户,通过appType包装 + * 包装以后进行账户注册 + */ + private String messageId; + + /** + * 发送者IM账户 + */ + private String fromAccount; + + + /** + * 发送者IM账户 + */ + private String toAccount; + + /** + * 终端类型 + * + * @see AppTypeEnum + */ + private String appType; + + + /** + * channel 网易云信 + */ + private String channel; + + + private String messageBody; + + private String result; + + private Long imMessageTaskId; + + private String receivePersonId; + + private Long receiveOuId; + + private String status; + + private Integer isDelete; + + private Date createAt; + + private Date updateAt; + + private PersonProfileDto receivePersonProfile; + + private OrganizationalUnitVO receiveOrganizationalUnit; + } +} diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java new file mode 100644 index 0000000..2bee840 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java @@ -0,0 +1,133 @@ +package cn.axzo.im.center.api.vo.req; + +import cn.axzo.im.center.common.enums.AppTypeEnum; +import com.alibaba.fastjson.JSONObject; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; +import java.util.List; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AsyncSendMessageParam { + /** + * 发送者IM账号 + */ + @NotBlank(message = "sendImAccount不能为空") + private String sendImAccount; + + /** + * 消息接收用户信息 + */ + private List receivePersons; + + /** + * 给全员发送 + */ + private boolean allPerson; + + /** + * 全员发送时需要指定发送消息到App端 + * 工人端、企业端、服务器 + * CM、CMP、SYSTEM + * + * @See cn.axzo.im.center.common.enums.AppTypeEnum + */ + private List appTypes; + + /** + * 消息标题 + */ + @NotBlank(message = "消息标题不能为空") + private String msgHeader; + + /** + * 消息内容 + */ + @NotBlank(message = "消息内容不能为空") + private String msgContent; + + /** + * 跳转配置信息 + */ + private List jumpDatas; + + /** + * 封面图 + */ + private String cardBannerUrl; + + /** + * 消息扩展信息 + */ + private JSONObject ext; + + /** + * 业务的唯一ID,用于查询发送消息的记录和结果,不验证唯一 + */ + private String bizId; + + @Data + @Builder + @AllArgsConstructor + @NoArgsConstructor + public static class JumpData { + + private SendMessageParam.JumpPlatform platform; + + private String url; + } + + @Getter + @AllArgsConstructor + public enum JumpPlatform { + WEB("PC"), + MINI_PROGRAM("安心筑小程"), + IOS("IOS"), + ANDROID("ANDROID"), + WEB_VIEW("H5"), + WECHAT_MINI_PROGRAM("微信小程序"), + ; + + private String desc; + } + + @Data + @Builder + @AllArgsConstructor + @NoArgsConstructor + public static class ReceivePerson { + + /** + * 接收消息的personId + */ + private String personId; + + /** + * appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号, + * 所以需要根据organizationalUnitId获取账号 + */ + private Long ouId; + + /** + * 发送消息到App端 + * 工人端、企业端、服务器 + * CM、CMP、SYSTEM + * + * @See cn.axzo.im.center.common.enums.AppTypeEnum + */ + private AppTypeEnum appType; + + /** + * im账号,可以personId和imAccount二选一 + */ + private String imAccount; + } +} diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java index e97d2b9..c9b1a71 100644 --- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java @@ -54,7 +54,7 @@ public class NimChannelService implements IMChannelProvider { private static final String NIM_MESSAGE_ATTACH_URL = "https://api.netease.im/nimserver/msg/sendAttachMsg.action"; - private static final int SUCCESS_CODE = 200; + public static final int SUCCESS_CODE = 200; private static final int NIM_ACCOUNT_ALREADY_REGISTER = 414; diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java index 2e9c3ec..2734a26 100644 --- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java @@ -3,10 +3,12 @@ package cn.axzo.im.channel.netease.dto; import lombok.Builder; import lombok.Data; -import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import static cn.axzo.im.channel.netease.NimChannelService.SUCCESS_CODE; + /** * 批量发送消息返回响应 * 示例: @@ -37,4 +39,7 @@ public class MessageBatchDispatchResponse { private String desc; + public boolean isSuccess() { + return Objects.equals(this.getCode(), SUCCESS_CODE); + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java b/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java index 5cf70b3..669b1fa 100644 --- a/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java +++ b/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java @@ -8,7 +8,13 @@ import lombok.Getter; @AllArgsConstructor public enum BizResultCode implements ResultCode { - SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误"); + SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误"), + SEND_IM_ACCOUNT_MAX("101", "同步接口只支持最多10个IM账号,请选择异步接口发送"), + SEND_PERSSON_ERROR("102", "接收人信息和全部接收人不能同时都为空"), + ALL_PERSSON_TYPE_NOT_EMPTY("103", "全员发送时,接收端不能为空"), + ACQUIRE_RATE_LIMITER_FAIL("104", "获取滑动窗口令牌失败"), + MESSAGE_TASK_STATUS_ERROR("105", "更新消息任务失败,状态异常"), + MESSAGE_TASK_NOT_FOUND("106", "消息任务不存在"),; private String errorCode; diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java b/im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java new file mode 100644 index 0000000..068e7d4 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java @@ -0,0 +1,53 @@ +package cn.axzo.im.controller; + +import cn.axzo.framework.domain.web.result.ApiListResult; +import cn.axzo.framework.domain.web.result.ApiPageResult; +import cn.axzo.im.center.api.feign.AccountRegisterApi; +import cn.axzo.im.service.AccountRegisterService; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@RestController +@RequiredArgsConstructor +public class AccountRegisterController implements AccountRegisterApi { + + @Autowired + private AccountRegisterService accountRegisterService; + + @Override + public ApiPageResult page(PageAccountRegisterParam param) { + AccountRegisterService.PageAccountRegisterParam pageAccountRegisterParam = AccountRegisterService.PageAccountRegisterParam.builder().build(); + BeanUtils.copyProperties(param, pageAccountRegisterParam); + + Page page = accountRegisterService.page(pageAccountRegisterParam); + + return ApiPageResult.ok(page.convert(record -> { + AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build(); + BeanUtils.copyProperties(record, accountRegisterDTO); + return accountRegisterDTO; + })); + } + + @Override + public ApiListResult list(ListAccountRegisterParam param) { + AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder().build(); + BeanUtils.copyProperties(param, listAccountRegisterParam); + + List list = accountRegisterService.list(listAccountRegisterParam); + return ApiListResult.ok(list.stream() + .map(e -> { + AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build(); + BeanUtils.copyProperties(e, accountRegisterDTO); + return accountRegisterDTO; + }) + .collect(Collectors.toList())); + } +} diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java index b8ea91f..28317d9 100644 --- a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java +++ b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java @@ -4,6 +4,7 @@ import cn.axzo.basics.common.exception.ServiceException; import cn.axzo.framework.domain.web.result.ApiResult; import cn.axzo.im.center.api.feign.MessageApi; import cn.axzo.im.center.api.vo.req.AccountQuery; +import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam; import cn.axzo.im.center.api.vo.req.CustomMessageInfo; import cn.axzo.im.center.api.vo.req.MessageInfo; import cn.axzo.im.center.api.vo.req.SendMessageParam; @@ -27,6 +28,7 @@ import io.github.resilience4j.ratelimiter.RequestNotPermitted; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -38,7 +40,10 @@ import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.List; +import static cn.axzo.im.config.BizResultCode.ALL_PERSSON_TYPE_NOT_EMPTY; +import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_MAX; import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_NOT_FOUND; +import static cn.axzo.im.config.BizResultCode.SEND_PERSSON_ERROR; /** * IM消息派发相关 @@ -91,7 +96,7 @@ public class MessageController implements MessageApi { * @return */ @Override - public ApiResult sendMessageAsync(SendMessageParam sendMessageParam) { + public ApiResult sendMessageAsync(AsyncSendMessageParam sendMessageParam) { check(sendMessageParam); MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam)); return ApiResult.ok(toMessageTaskResp(messageTask)); @@ -102,7 +107,6 @@ public class MessageController implements MessageApi { check(sendMessageParam); MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam)); - return ApiResult.ok(toMessageTaskResp(messageTask)); } @@ -114,12 +118,32 @@ public class MessageController implements MessageApi { } private void check(SendMessageParam sendMessageParam) { - List accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder() + List accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder() .imAccount(sendMessageParam.getSendImAccount()) .build()); Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND); + + Aassert.check(sendMessageParam.getReceivePersons().size() <= 10, SEND_IM_ACCOUNT_MAX); } + + private void check(AsyncSendMessageParam sendMessageParam) { + List accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder() + .imAccount(sendMessageParam.getSendImAccount()) + .build()); + + Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND); + + if (CollectionUtils.isEmpty(sendMessageParam.getReceivePersons()) + && BooleanUtils.isNotTrue(sendMessageParam.isAllPerson())) { + throw SEND_PERSSON_ERROR.toException(); + } + + if (BooleanUtils.isTrue(sendMessageParam.isAllPerson())) { + Aassert.checkNotEmpty(sendMessageParam.getAppTypes(), ALL_PERSSON_TYPE_NOT_EMPTY); + } + } + private String check(SendTemplateMessageParam sendMessageParam) { List robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(sendMessageParam.getMsgTemplateId()); if (CollectionUtils.isEmpty(robotIdList)) { @@ -154,6 +178,28 @@ public class MessageController implements MessageApi { return messageTaskResp; } + private MessageTask toMessageTask(AsyncSendMessageParam sendMessageParam) { + + MessageTask.BizData bizData = MessageTask.BizData.builder() + .jumpDatas(sendMessageParam.getJumpDatas()) + // 全员发送是不常用的场景,不应该由业务处理,所以把配置放在bizData里面 + .allPerson(sendMessageParam.isAllPerson()) + .appTypes(sendMessageParam.getAppTypes()) + .build(); + Date now = new Date(); + return MessageTask.builder() + .bizId(sendMessageParam.getBizId()) + .sendImAccount(sendMessageParam.getSendImAccount()) + .receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class)) + .status(MessageTask.Status.PENDING) + .title(sendMessageParam.getMsgHeader()) + .content(sendMessageParam.getMsgContent()) + .bizData(bizData) + .ext(sendMessageParam.getExt()) + .planStartTime(now) + .createAt(now) + .build(); + } private MessageTask toMessageTask(SendMessageParam sendMessageParam) { diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java b/im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java new file mode 100644 index 0000000..11489a3 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java @@ -0,0 +1,54 @@ +package cn.axzo.im.controller; + + +import cn.axzo.framework.domain.web.result.ApiListResult; +import cn.axzo.framework.domain.web.result.ApiPageResult; +import cn.axzo.im.center.api.feign.MessageHistoryApi; +import cn.axzo.im.service.MessageHistoryService; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@RestController +@RequiredArgsConstructor +public class MessageHistoryController implements MessageHistoryApi { + + @Autowired + private MessageHistoryService messageHistoryService; + + @Override + public ApiPageResult page(PageMessageHistoryParam param) { + MessageHistoryService.PageMessageHistoryParam pageMessageHistoryParam = MessageHistoryService.PageMessageHistoryParam.builder().build(); + BeanUtils.copyProperties(param, pageMessageHistoryParam); + + Page page = messageHistoryService.page(pageMessageHistoryParam); + + return ApiPageResult.ok(page.convert(record -> { + MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build(); + BeanUtils.copyProperties(record, messageHistoryDTO); + return messageHistoryDTO; + })); + } + + @Override + public ApiListResult list(ListMessageHistoryParam param) { + MessageHistoryService.ListMessageHistoryParam listMessageHistoryParam = MessageHistoryService.ListMessageHistoryParam.builder().build(); + BeanUtils.copyProperties(param, listMessageHistoryParam); + + List list = messageHistoryService.list(listMessageHistoryParam); + return ApiListResult.ok(list.stream() + .map(e -> { + MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build(); + BeanUtils.copyProperties(e, messageHistoryDTO); + return messageHistoryDTO; + }) + .collect(Collectors.toList())); + } +} diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java b/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java index 4792ee7..006c6f4 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java @@ -1,14 +1,18 @@ package cn.axzo.im.entity; -import cn.axzo.framework.data.mybatisplus.model.BaseEntity; import cn.axzo.im.center.common.enums.AppTypeEnum; +import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; import lombok.Data; -import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import lombok.experimental.SuperBuilder; import java.io.Serializable; +import java.util.Date; /** * IM账户表 @@ -19,12 +23,17 @@ import java.io.Serializable; */ @TableName("im_account_register") @Data -@EqualsAndHashCode(callSuper = true) +@SuperBuilder @Accessors(chain = true) -public class AccountRegister extends BaseEntity implements Serializable { +@NoArgsConstructor +@AllArgsConstructor +public class AccountRegister implements Serializable { private static final long serialVersionUID = 1L; + @TableId(type = IdType.AUTO) + private Long id; + /** * 账户 机器人robotId、普通用户userId */ @@ -82,4 +91,13 @@ public class AccountRegister extends BaseEntity implements Ser */ @TableField("ou_id") private Long ouId; + + @TableField + private Integer isDelete; + + @TableField + private Date createAt; + + @TableField + private Date updateAt; } diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java index 8a29341..a17f138 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java @@ -2,13 +2,19 @@ package cn.axzo.im.entity; import cn.axzo.framework.data.mybatisplus.model.BaseEntity; import cn.axzo.im.center.common.enums.AppTypeEnum; +import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import lombok.experimental.SuperBuilder; import java.io.Serializable; +import java.util.Date; /** * IM消息模历史表 @@ -19,12 +25,17 @@ import java.io.Serializable; */ @TableName("im_message_history") @Data -@EqualsAndHashCode(callSuper = true) +@SuperBuilder @Accessors(chain = true) -public class MessageHistory extends BaseEntity implements Serializable { +@NoArgsConstructor +@AllArgsConstructor +public class MessageHistory implements Serializable { private static final long serialVersionUID = 1L; + @TableId(type = IdType.AUTO) + private Long id; + /** * 上游业务请求ID */ @@ -70,4 +81,33 @@ public class MessageHistory extends BaseEntity implements Seria @TableField("message_body") private String messageBody; + @TableField("result") + private String result; + + @TableField("im_message_task_id") + private Long imMessageTaskId; + + @TableField("receive_person_id") + private String receivePersonId; + + @TableField("receive_ou_id") + private Long receiveOuId; + + private Status status; + + @TableField + private Integer isDelete; + + @TableField + private Date createAt; + + @TableField + private Date updateAt; + + public enum Status { + PENDING, + SUCCEED, + FAILED, + ; + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java index e157c87..2b26f24 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java @@ -10,18 +10,24 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import lombok.experimental.SuperBuilder; +import org.apache.commons.lang3.StringUtils; import org.springframework.cglib.beans.BeanMap; import java.util.Date; import java.util.List; import java.util.Optional; +import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_STATUS_ERROR; + @Data @SuperBuilder @Accessors(chain = true) @@ -130,6 +136,20 @@ public class MessageTask { * 跳转信息 */ private List jumpDatas; + + /** + * 给全员发送 + */ + private boolean allPerson; + + /** + * 全员发送时需要指定发送消息到App端 + * 工人端、企业端、服务器 + * CM、CMP、SYSTEM + * + * @See cn.axzo.im.center.common.enums.AppTypeEnum + */ + private List appTypes; } @Data @@ -162,7 +182,31 @@ public class MessageTask { * im账号,可以personId和imAccount二选一 */ private String imAccount; + + public String buildKey() { + if (StringUtils.isNotBlank(this.getImAccount())) { + return this.getImAccount(); + } + return this.getPersonId() + "_" + this.getAppType() + "_" + this.getOuId(); + } } public static class ListReceivePersonTypeHandler extends BaseListTypeHandler {} + + @Getter + @AllArgsConstructor + public enum ActionEnum { + SUCCESS, + ; + + private static final Table STATUS_FLOWS = HashBasedTable.create(); + + static { + STATUS_FLOWS.put(Status.PENDING, SUCCESS, Status.SUCCEED); + } + + public Status getNextStatus(Status oldStatus) { + return Optional.ofNullable(STATUS_FLOWS.get(oldStatus, this)).orElseThrow(MESSAGE_TASK_STATUS_ERROR::toException); + } + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java b/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java new file mode 100644 index 0000000..6754a7c --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java @@ -0,0 +1,78 @@ +package cn.axzo.im.job; + +import cn.axzo.im.entity.MessageTask; +import cn.axzo.im.service.MessageTaskService; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.Optional; + +/** + * 查询ImMessageTask中的PEDING数据,添加到messageHistory + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class CreateMessageHistoryJob extends IJobHandler { + + @Autowired + private MessageTaskService messageTaskService; + + private static final Integer DEFAULT_PAGE_SIZE = 500; + + @Override + @XxlJob("sendMessageJob") + public ReturnT execute(String s) throws Exception { + + log.info("start sendMessageJob,s:{}", s); + SendMessageParam sendMessageParam = Optional.ofNullable(s) + .map(e -> JSONObject.parseObject(e, SendMessageParam.class)) + .orElseGet(() -> SendMessageParam.builder().build()); + Integer pageNumber = 1; + Date now = new Date(); + while (true) { + MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder() + .ids(sendMessageParam.getIds()) + .planStartTimeLE(now) + .status(MessageTask.Status.PENDING) + .pageNumber(pageNumber++) + .pageSize(DEFAULT_PAGE_SIZE) + .build(); + + Page page = messageTaskService.page(req); + if (CollectionUtils.isNotEmpty(page.getRecords())) { + page.getRecords().forEach(messageTask -> { + messageTaskService.createMessageHistory(messageTask); + }); + } + + if (!page.hasNext()) { + break; + } + } + log.info("end sendMessageJob"); + return ReturnT.SUCCESS; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class SendMessageParam { + private List ids; + } +} diff --git a/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java b/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java index b3a019b..5774d97 100644 --- a/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java +++ b/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java @@ -1,81 +1,37 @@ package cn.axzo.im.job; -import cn.axzo.im.entity.MessageTask; +import cn.axzo.im.service.MessageHistoryService; import cn.axzo.im.service.MessageTaskService; -import cn.axzo.pokonyan.client.RateLimiter; -import cn.axzo.pokonyan.client.RateLimiterClient; -import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.google.common.collect.Maps; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.Date; import java.util.List; -import java.util.Optional; +import java.util.Map; -/** - * 查询ImMessageTask中的PEDING数据,然后发送IM消息和push - */ @Slf4j @Component @RequiredArgsConstructor public class SendMessageJob extends IJobHandler { @Autowired - private MessageTaskService messageTaskService; + private MessageHistoryService messageHistoryService; private static final Integer DEFAULT_PAGE_SIZE = 500; @Override - @XxlJob("sendMessageJob") public ReturnT execute(String s) throws Exception { - - log.info("start sendMessageJob,s:{}", s); - SendMessageParam sendMessageParam = Optional.ofNullable(s) - .map(e -> JSONObject.parseObject(e, SendMessageParam.class)) - .orElseGet(() -> SendMessageParam.builder().build()); - Integer pageNumber = 1; - Date now = new Date(); - while (true) { - MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder() - .ids(sendMessageParam.getIds()) - .planStartTimeLE(now) - .status(MessageTask.Status.PENDING) - .pageNumber(pageNumber++) - .pageSize(DEFAULT_PAGE_SIZE) - .build(); - - Page page = messageTaskService.page(req); - if (CollectionUtils.isNotEmpty(page.getRecords())) { - sendMessage(page.getRecords()); - } - - if (!page.hasNext()) { - break; - } - } - log.info("end sendMessageJob"); - return ReturnT.SUCCESS; + return null; } - private void sendMessage(List messageTasks) { - - messageTasks.forEach(messageTask -> { - - }); - - } @Data @Builder diff --git a/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java b/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java index 1337d31..5003c39 100644 --- a/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java +++ b/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java @@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; import java.util.Collections; import java.util.Date; import java.util.List; @@ -68,7 +67,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler { .pageSize(DEFAULT_PAGE_SIZE) .build(); - Page page = accountRegisterService.page(req); + Page page = accountRegisterService.page(req); if (CollectionUtils.isNotEmpty(page.getRecords())) { Map nodeUsers = listNodeUsers(page.getRecords()); @@ -83,7 +82,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler { return ReturnT.SUCCESS; } - private void updateAccountRegister(List accountRegisters, Map nodeUsers) { + private void updateAccountRegister(List accountRegisters, Map nodeUsers) { List update = accountRegisters.stream() .filter(accountRegister -> nodeUsers.get(accountRegister.getAccountId()) != null) .map(accountRegister -> { @@ -103,7 +102,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler { accountRegisterService.updateBatchById(update); } - private Map listNodeUsers(List accountRegisters) { + private Map listNodeUsers(List accountRegisters) { if (CollectionUtils.isEmpty(accountRegisters)) { return Collections.EMPTY_MAP; } diff --git a/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java b/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java index 5dedbdb..8f27f23 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java @@ -1,6 +1,8 @@ package cn.axzo.im.service; +import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; import cn.axzo.im.entity.AccountRegister; +import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; import cn.axzo.pokonyan.dao.page.IPageParam; import cn.axzo.pokonyan.dao.wrapper.CriteriaField; import cn.axzo.pokonyan.dao.wrapper.Operator; @@ -10,15 +12,17 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; +import org.springframework.beans.BeanUtils; import java.util.List; +import java.util.Map; import java.util.Set; public interface AccountRegisterService extends IService { - Page page(PageAccountRegisterParam param); + Page page(PageAccountRegisterParam param); - List list(ListAccountRegisterParam param); + List list(ListAccountRegisterParam param); @SuperBuilder @Data @@ -57,6 +61,12 @@ public interface AccountRegisterService extends IService { @CriteriaField(field = "accountType", operator = Operator.EQ) private String accountType; + + @CriteriaField(ignore = true) + private boolean needOuInfo; + + @CriteriaField(ignore = true) + private boolean needUserInfo; } @SuperBuilder @@ -77,4 +87,25 @@ public interface AccountRegisterService extends IService { List sort; } + @Data + @SuperBuilder + @NoArgsConstructor + @AllArgsConstructor + class AccountRegisterDTO extends AccountRegister { + + private PersonProfileDto personProfile; + + private OrganizationalUnitVO organizationalUnit; + + public static AccountRegisterDTO from(AccountRegister accountRegister, + Map personProfiles, + Map organizationals) { + AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build(); + BeanUtils.copyProperties(accountRegister, accountRegisterDTO); + + accountRegisterDTO.setPersonProfile(personProfiles.get(Long.valueOf(accountRegisterDTO.getAccountId()))); + accountRegisterDTO.setOrganizationalUnit(organizationals.get(accountRegisterDTO.getOuId())); + return accountRegisterDTO; + } + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java b/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java index bdd99a5..a2f581d 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java @@ -348,7 +348,7 @@ public class AccountService { if (appTypeEnum == AppTypeEnum.CMP) { listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId()); } - List accountRegisters = accountRegisterService.list(listAccountRegisterParam); + List accountRegisters = accountRegisterService.list(listAccountRegisterParam); if (CollectionUtils.isNotEmpty(accountRegisters)) { userAccountAll.addAll(accountRegisters.stream() diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java index 2544ada..05df687 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java @@ -1,7 +1,90 @@ package cn.axzo.im.service; +import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; import cn.axzo.im.entity.MessageHistory; +import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; +import cn.axzo.pokonyan.dao.page.IPageParam; +import cn.axzo.pokonyan.dao.wrapper.CriteriaField; +import cn.axzo.pokonyan.dao.wrapper.Operator; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.IService; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.springframework.beans.BeanUtils; + +import java.util.List; +import java.util.Map; +import java.util.Set; public interface MessageHistoryService extends IService { + + Page page(PageMessageHistoryParam param); + + List list(ListMessageHistoryParam param); + + @SuperBuilder + @Data + @NoArgsConstructor + @AllArgsConstructor + class ListMessageHistoryParam { + + @CriteriaField(field = "id", operator = Operator.IN) + private List ids; + + @CriteriaField(field = "imMessageTaskId", operator = Operator.EQ) + private Long imMessageTaskId; + + @CriteriaField(field = "receivePersonId", operator = Operator.IN) + private Set receivePersonId; + + @CriteriaField(field = "toAccount", operator = Operator.IN) + private Set toAccount; + + @CriteriaField(ignore = true) + private boolean needReceiveOuInfo; + + @CriteriaField(ignore = true) + private boolean needReceiveUserInfo; + } + + @SuperBuilder + @Data + @NoArgsConstructor + @AllArgsConstructor + class PageMessageHistoryParam extends ListMessageHistoryParam implements IPageParam { + @CriteriaField(ignore = true) + Integer pageNumber; + + @CriteriaField(ignore = true) + Integer pageSize; + + /** + * 排序:使用示例,createTime__DESC + */ + @CriteriaField(ignore = true) + List sort; + } + + @Data + @SuperBuilder + @NoArgsConstructor + @AllArgsConstructor + class MessageHistoryDTO extends MessageHistory { + private PersonProfileDto receivePersonProfile; + + private OrganizationalUnitVO receiveOrganizationalUnit; + + public static MessageHistoryDTO from(MessageHistory messageHistory, + Map personProfiles, + Map organizationals) { + MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build(); + BeanUtils.copyProperties(messageHistory, messageHistoryDTO); + + messageHistoryDTO.setReceivePersonProfile(personProfiles.get(Long.valueOf(messageHistoryDTO.getReceivePersonId()))); + messageHistoryDTO.setReceiveOrganizationalUnit(organizationals.get(messageHistoryDTO.getReceiveOuId())); + return messageHistoryDTO; + } + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java index ae5cbb8..d0d2692 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java @@ -371,37 +371,37 @@ public class MessageService { // return messageDispatchRespList; // } -// private void insertImMessage(List messageRespList, String messageBody) { -// if (CollectionUtils.isEmpty(messageRespList)) { -// return; -// } -// String requestId = UUID.randomUUID().toString().replace("-", ""); -// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody); -// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> { -// MessageHistory messageHistory = new MessageHistory(); -// messageHistory.setBizId(requestId); -// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount()); -// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) { -// messageHistory.setToAccount(messageDispatchResp.getToImAccount()); -// } else { -// messageHistory.setToAccount("unregistered"); -// } -// messageHistory.setChannel(imChannel.getProviderType()); -// messageHistory.setAppType(messageDispatchResp.getAppType()); -// messageHistory.setMessageBody(messageBody); -// messageHistory.setCreateAt(new Date()); -// messageHistory.setUpdateAt(new Date()); -// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) { -// messageHistory.setMessageId(messageDispatchResp.getMsgid()); -// } -// try { -// messageHistoryDao.saveOrUpdate(messageHistory); -// Thread.sleep(50); -// } catch (Exception e) { -// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e); -// } -// })); -// } + private void insertImMessage(List messageRespList, String messageBody) { + if (CollectionUtils.isEmpty(messageRespList)) { + return; + } + String requestId = UUID.randomUUID().toString().replace("-", ""); + log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody); + CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> { + MessageHistory messageHistory = new MessageHistory(); + messageHistory.setBizId(requestId); + messageHistory.setFromAccount(messageDispatchResp.getFromImAccount()); + if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) { + messageHistory.setToAccount(messageDispatchResp.getToImAccount()); + } else { + messageHistory.setToAccount("unregistered"); + } + messageHistory.setChannel(imChannel.getProviderType()); + messageHistory.setAppType(messageDispatchResp.getAppType()); + messageHistory.setMessageBody(messageBody); + messageHistory.setCreateAt(new Date()); + messageHistory.setUpdateAt(new Date()); + if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) { + messageHistory.setMessageId(messageDispatchResp.getMsgid()); + } + try { + messageHistoryDao.saveOrUpdate(messageHistory); + Thread.sleep(50); + } catch (Exception e) { + log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e); + } + })); + } @Transactional(rollbackFor = Exception.class) diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java index 5c34964..6b66daa 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java @@ -1,6 +1,5 @@ package cn.axzo.im.service; -import cn.axzo.im.entity.MessageHistory; import cn.axzo.im.entity.MessageTask; import cn.axzo.pokonyan.dao.page.IPageParam; import cn.axzo.pokonyan.dao.wrapper.CriteriaField; @@ -8,6 +7,7 @@ import cn.axzo.pokonyan.dao.wrapper.Operator; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.IService; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; @@ -21,8 +21,33 @@ public interface MessageTaskService extends IService { Page page(PageMessageTaskParam param); - List sendMessage(MessageTask messageTask); + void createMessageHistory(MessageTask messageTask); + void update(UpdateMessageTaskParam param); + + @Builder + @Data + @NoArgsConstructor + @AllArgsConstructor + class UpdateMessageTaskParam { + + private Long id; + + private MessageTask.ActionEnum action; + + private Date startedTime; + + private Date finishedTime; + + public MessageTask to() { + return MessageTask.builder() + .id(this.getId()) + .startedTime(this.getStartedTime()) + .finishedTime(this.getFinishedTime()) + .build(); + } + + } @SuperBuilder @Data @NoArgsConstructor diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java index a3c70c2..9bec97c 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java @@ -1,34 +1,67 @@ package cn.axzo.im.service.impl; +import cn.axzo.basics.profiles.api.UserProfileServiceApi; +import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; +import cn.axzo.im.center.common.enums.AccountTypeEnum; +import cn.axzo.im.channel.IMChannelProvider; import cn.axzo.im.dao.mapper.AccountRegisterMapper; import cn.axzo.im.entity.AccountRegister; import cn.axzo.im.service.AccountRegisterService; +import cn.axzo.maokai.api.client.OrganizationalUnitApi; +import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery; +import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; import cn.axzo.pokonyan.dao.converter.PageConverter; import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j @Service public class AccountRegisterServiceImpl extends ServiceImpl implements AccountRegisterService { + @Autowired + private IMChannelProvider imChannelProvider; + @Autowired + private OrganizationalUnitApi organizationalUnitApi; + @Autowired + private UserProfileServiceApi userProfileServiceApi; + @Override - public Page page(PageAccountRegisterParam param) { + public Page page(PageAccountRegisterParam param) { QueryWrapper wrapper = QueryWrapperHelper.fromBean(param, AccountRegister.class); + // 只能取配置的appKey的数据,防止接口使用方没传appKey导致获取错乱的数据 + wrapper.eq("app_key", imChannelProvider.getProviderAppKey()); wrapper.eq("is_delete", 0); - return this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper); + Page page = this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper); + + Map personProfiles = listUserPersonProfile(param, page.getRecords()); + + Map organizationals = listOrganizational(param, page.getRecords()); + + return PageConverter.convert(page, (record) -> AccountRegisterDTO.from(record, + personProfiles, + organizationals)); } @Override - public List list(ListAccountRegisterParam param) { + public List list(ListAccountRegisterParam param) { return PageConverter.drainAll(pageNumber -> { PageAccountRegisterParam pageParam = PageAccountRegisterParam.builder().build(); BeanUtils.copyProperties(param, pageParam); @@ -37,4 +70,49 @@ public class AccountRegisterServiceImpl extends ServiceImpl listUserPersonProfile(PageAccountRegisterParam param, + List accountRegisters) { + if (CollectionUtils.isEmpty(accountRegisters) || BooleanUtils.isNotTrue(param.isNeedUserInfo())) { + return Collections.emptyMap(); + } + + List personIds = accountRegisters.stream() + .filter(e -> Objects.equals(e.getAccountType(), AccountTypeEnum.USER.getCode())) + .map(AccountRegister::getAccountId) + .filter(Objects::nonNull) + .filter(StringUtils::isNumeric) + .map(Long::valueOf) + .distinct() + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(personIds)) { + return Collections.emptyMap(); + } + + return userProfileServiceApi.postPersonProfiles(personIds).getData() + .stream() + .collect(Collectors.toMap(PersonProfileDto::getId, Function.identity())); + } + + private Map listOrganizational(PageAccountRegisterParam param, + List accountRegisters) { + if (CollectionUtils.isEmpty(accountRegisters) || BooleanUtils.isNotTrue(param.isNeedOuInfo())) { + return Collections.emptyMap(); + } + + List ouIds = accountRegisters.stream() + .map(AccountRegister::getOuId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(ouIds)) { + return Collections.emptyMap(); + } + + return organizationalUnitApi.page(OrganizationalUnitQuery.builder().unitIds(ouIds).build()) + .getData() + .getList() + .stream() + .collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f)); + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java index 7c99190..819f5e7 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java @@ -1,4 +1,111 @@ package cn.axzo.im.service.impl; -public class MessageHistoryServiceImpl { +import cn.axzo.basics.profiles.api.UserProfileServiceApi; +import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; +import cn.axzo.im.dao.mapper.MessageHistoryMapper; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.service.MessageHistoryService; +import cn.axzo.maokai.api.client.OrganizationalUnitApi; +import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery; +import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; +import cn.axzo.pokonyan.dao.converter.PageConverter; +import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class MessageHistoryServiceImpl extends ServiceImpl + implements MessageHistoryService { + + @Autowired + private OrganizationalUnitApi organizationalUnitApi; + @Autowired + private UserProfileServiceApi userProfileServiceApi; + + @Override + public Page page(PageMessageHistoryParam param) { + QueryWrapper wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class); + // 只能取配置的appKey的数据,防止接口使用方没传appKey导致获取错乱的数据 + wrapper.eq("is_delete", 0); + + Page page = this.page(PageConverter.convertToMybatis(param, MessageHistory.class), wrapper); + + Map personProfiles = listReceiveUserPersonProfile(param, page.getRecords()); + + Map organizationals = listReceiveOrganizational(param, page.getRecords()); + + return PageConverter.convert(page, (record) -> MessageHistoryDTO.from(record, + personProfiles, + organizationals)); + } + + @Override + public List list(ListMessageHistoryParam param) { + return PageConverter.drainAll(pageNumber -> { + PageMessageHistoryParam pageParam = PageMessageHistoryParam.builder().build(); + BeanUtils.copyProperties(param, pageParam); + pageParam.setPageNumber(pageNumber); + pageParam.setPageSize(500); + return page(pageParam); + }); + } + + private Map listReceiveUserPersonProfile(PageMessageHistoryParam param, + List messageHistories) { + if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveUserInfo())) { + return Collections.emptyMap(); + } + + List personIds = messageHistories.stream() + .map(MessageHistory::getReceivePersonId) + .filter(StringUtils::isNumeric) + .map(Long::valueOf) + .distinct() + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(personIds)) { + return Collections.emptyMap(); + } + + return userProfileServiceApi.postPersonProfiles(personIds).getData() + .stream() + .collect(Collectors.toMap(PersonProfileDto::getId, Function.identity())); + } + + private Map listReceiveOrganizational(PageMessageHistoryParam param, + List messageHistories) { + if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveOuInfo())) { + return Collections.emptyMap(); + } + + List ouIds = messageHistories.stream() + .map(MessageHistory::getReceiveOuId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(ouIds)) { + return Collections.emptyMap(); + } + + return organizationalUnitApi.page(OrganizationalUnitQuery.builder().unitIds(ouIds).build()) + .getData() + .getList() + .stream() + .collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f)); + } } diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java index f048729..0ab1ac5 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java @@ -6,20 +6,26 @@ import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest; import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse; import cn.axzo.im.channel.netease.dto.MessageBody; import cn.axzo.im.dao.mapper.MessageTaskMapper; +import cn.axzo.im.entity.AccountRegister; import cn.axzo.im.entity.MessageHistory; import cn.axzo.im.entity.MessageTask; import cn.axzo.im.service.AccountRegisterService; +import cn.axzo.im.service.MessageHistoryService; import cn.axzo.im.service.MessageTaskService; import cn.axzo.pokonyan.client.RateLimiter; import cn.axzo.pokonyan.client.RateLimiterClient; import cn.axzo.pokonyan.dao.converter.PageConverter; import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper; +import cn.axzo.pokonyan.exception.Aassert; import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; @@ -27,10 +33,19 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL; +import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_NOT_FOUND; @Slf4j @Service @@ -43,6 +58,8 @@ public class MessageTaskServiceImpl extends ServiceImpl sendMessage(MessageTask messageTask) { + public void createMessageHistory(MessageTask messageTask) { - if (rateLimiter.tryAcquire()) { - log.info("获得令牌"); + this.update(UpdateMessageTaskParam.builder() + .id(messageTask.getId()) + .startedTime(new Date()) + .build()); -// listAccountRegisters(messageTask); - - - MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest(); - batchDispatchRequest.setFromAccid(messageTask.getSendImAccount()); - batchDispatchRequest.setBody(resolveBody(messageTask)); -// batchDispatchRequest.setToAccids(imAccountList); - MessageBatchDispatchResponse batchResponse = imChannelProvider.dispatchBatchMessage(batchDispatchRequest); - - - return null; + MessageTask.BizData bizData = messageTask.getBizData(); + if (bizData.isAllPerson()) { + log.info("发送全员消息"); + doSendAll(messageTask); + } else { + log.info("发送非全员消息"); + doSendNotAll(messageTask); } - - - log.info("未获得令牌"); - return null; + this.update(UpdateMessageTaskParam.builder() + .id(messageTask.getId()) + .action(MessageTask.ActionEnum.SUCCESS) + .finishedTime(new Date()) + .build()); } -// private Map listAccountRegisters(MessageTask messageTask) { -// Set personIds = messageTask.getReceivePersons().stream() -// .filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId()) -// && StringUtils.isBlank(receivePerson.getImAccount())) -// .map(MessageTask.ReceivePerson::getPersonId) -// .collect(Collectors.toSet()); -// if (CollectionUtils.isEmpty(personIds)) { -// return; -// } -// -// accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder() -// .accountIds(personIds) -// .build()); -// } + @Override + public void update(UpdateMessageTaskParam param) { + MessageTask oldMessageTask = this.lambdaQuery() + .eq(MessageTask::getId, param.getId()) + .last("for update") + .one(); + Aassert.notNull(oldMessageTask, MESSAGE_TASK_NOT_FOUND);; + + MessageTask updateMessageTask = param.to(); + if (param.getAction() != null) { + updateMessageTask.setStatus(param.getAction().getNextStatus(oldMessageTask.getStatus())); + } + + this.updateById(updateMessageTask); + } + + private void doSendAll(MessageTask messageTask) { + + } + + private void doSendNotAll(MessageTask messageTask) { + + List> receivePersons = Lists.partition(messageTask.getReceivePersons(), msgSendPersonOfOneBatch); + String messageBody = resolveBody(messageTask); + receivePersons.forEach(e -> saveMessageHistory(e, messageTask, messageBody)); + } + + private void saveMessageHistory(List receivePersons, + MessageTask messageTask, + String messageBody) { + // 排除已经发送成功的记录,防止重复发送 + Set existPersons = listExistPerson(receivePersons, messageTask); + Set existImAccounts = listExistImAccount(receivePersons, messageTask); + + List absentReceivePersons = receivePersons.stream() + .filter(e -> { + String key = e.buildKey(); + return !existPersons.contains(key) && !existImAccounts.contains(key); + }) + .collect(Collectors.toList()); + + if (CollectionUtils.isEmpty(absentReceivePersons)) { + log.info("messageTask,{}, receivePersons,{},已经存在", JSONObject.toJSONString(messageTask), + JSONObject.toJSONString(receivePersons)); + return; + } + + Map accountRegisters = listAccountRegisters(absentReceivePersons); + + List messageHistories = absentReceivePersons.stream() + .map(receivePerson -> { + MessageHistory messageHistory = new MessageHistory(); + messageHistory.setImMessageTaskId(messageTask.getId()); + messageHistory.setFromAccount(messageTask.getSendImAccount()); + messageHistory.setChannel(imChannelProvider.getProviderType()); + messageHistory.setAppType(receivePerson.getAppType().name()); + messageHistory.setMessageBody(messageBody); + messageHistory.setCreateAt(new Date()); + messageHistory.setUpdateAt(new Date()); + messageHistory.setReceiveOuId(receivePerson.getOuId()); + messageHistory.setReceivePersonId(receivePerson.getPersonId()); + if (StringUtils.isNotBlank(receivePerson.getImAccount())) { + messageHistory.setToAccount(receivePerson.getImAccount()); + } else { + String key = receivePerson.buildKey(); + messageHistory.setToAccount(accountRegisters.get(key)); + messageHistory.setResult("未找到IM账号"); + messageHistory.setStatus(MessageHistory.Status.FAILED); + } + return messageHistory; + }) + .collect(Collectors.toList()); + messageHistoryService.saveBatch(messageHistories); + } + + private Set listExistImAccount(List receivePersons, + MessageTask messageTask) { + Set imAccounts = receivePersons.stream() + .map(MessageTask.ReceivePerson::getImAccount) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + if (CollectionUtils.isEmpty(imAccounts)) { + return Collections.emptySet(); + } + + return messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder() + .imMessageTaskId(messageTask.getId()) + .toAccount(imAccounts) + .build()) + .stream() + .map(MessageHistory::getToAccount) + .collect(Collectors.toSet()); + } + + private Set listExistPerson(List receivePersons, + MessageTask messageTask) { + Set personIds = receivePersons.stream() + .map(MessageTask.ReceivePerson::getPersonId) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + if (CollectionUtils.isEmpty(personIds)) { + return Collections.emptySet(); + } + + return messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder() + .imMessageTaskId(messageTask.getId()) + .receivePersonId(personIds) + .build()) + .stream() + .map(e -> e.getReceivePersonId() + "_" + e.getAppType() + "_" + e.getReceiveOuId()) + .collect(Collectors.toSet()); + } + + + private void sendMessage(List messageHistories, + MessageBatchDispatchRequest batchDispatchRequest) { + + // 三方接口调用频率120次/分,超限将限制1分钟使用,所以抛出异常,等待下一次xxlJob做重试补偿 + if (!rateLimiter.tryAcquire(DEFAULT_TIME_OUT_MILLIS)) { + log.info("未获得令牌"); + throw ACQUIRE_RATE_LIMITER_FAIL.toException(); + } + + log.info("获得令牌"); + List toAccids = messageHistories.stream() + .map(MessageHistory::getToAccount) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(toAccids)) { + log.info("没有需要发送消息的账号"); + return; + } + + batchDispatchRequest.setToAccids(toAccids); + MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchDispatchRequest); + + if (response.isSuccess()) { + // 发送成功的IMAccountId -> msgId + Map msgids = response.getMsgids(); + + // unregister的账号 + Set unregister = Optional.ofNullable(response.getUnregister()) + .orElseGet(Sets::newHashSet); + + List updateMessageHistories = messageHistories.stream() + .peek(e -> { + if (unregister.contains(e.getToAccount())) { + e.setStatus(MessageHistory.Status.FAILED); + e.setResult("IM账号未在网易云信注册"); + } else { + e.setStatus(MessageHistory.Status.SUCCEED); + e.setMessageId(msgids.get(e.getToAccount()).toString()); + } + }) + .collect(Collectors.toList()); + messageHistoryService.saveBatch(updateMessageHistories); + return; + } + + List failedMessageHistories = messageHistories.stream() + .peek(e -> { + e.setResult(response.getDesc()); + e.setStatus(MessageHistory.Status.FAILED); + }) + .collect(Collectors.toList()); + messageHistoryService.saveBatch(failedMessageHistories); + } + + private Map listAccountRegisters(List receivePersons) { + Set personIds = receivePersons.stream() + .filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId()) + && StringUtils.isBlank(receivePerson.getImAccount())) + .map(MessageTask.ReceivePerson::getPersonId) + .collect(Collectors.toSet()); + if (CollectionUtils.isEmpty(personIds)) { + return Collections.emptyMap(); + } + + return accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder() + .accountIds(personIds) + .build()) + .stream() + .collect(Collectors.toMap(accountRegister -> accountRegister.getAccountId() + + "_" + accountRegister.getAppType() + "_" + accountRegister.getOuId(), AccountRegister::getImAccount)); + } +// // private void sendBatchMessage() { // MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest(); // batchDispatchRequest.setBody(messageRequest.getBody()); @@ -129,7 +324,7 @@ public class MessageTaskServiceImpl extends ServiceImpl userMsgResponseMap = batchResponse.getMsgids(); // } - +// private String resolveBody(MessageTask messageTask) { MessageBody messageBody = new MessageBody(); @@ -153,9 +348,44 @@ public class MessageTaskServiceImpl extends ServiceImpl messageRespList, String messageBody) { +// if (CollectionUtils.isEmpty(messageRespList)) { +// return; +// } +// String requestId = UUID.randomUUID().toString().replace("-", ""); +// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody); +// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> { +// MessageHistory messageHistory = new MessageHistory(); +// messageHistory.setBizId(requestId); +// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount()); +// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) { +// messageHistory.setToAccount(messageDispatchResp.getToImAccount()); +// } else { +// messageHistory.setToAccount("unregistered"); +// } +// messageHistory.setChannel(imChannel.getProviderType()); +// messageHistory.setAppType(messageDispatchResp.getAppType()); +// messageHistory.setMessageBody(messageBody); +// messageHistory.setCreateAt(new Date()); +// messageHistory.setUpdateAt(new Date()); +// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) { +// messageHistory.setMessageId(messageDispatchResp.getMsgid()); +// } +// try { +// messageHistoryDao.saveOrUpdate(messageHistory); +// Thread.sleep(50); +// } catch (Exception e) { +// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e); +// } +// })); +// } + @Override public void afterPropertiesSet() throws Exception { rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder() diff --git a/sql/init.sql b/sql/init.sql index 13fc734..c4edff5 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -127,3 +127,8 @@ CREATE TABLE IF NOT EXISTS im_message_task ALTER TABLE im_message_history ADD COLUMN `result` varchar(1024) NULL COMMENT 'result'; ALTER TABLE im_message_history ADD COLUMN `im_message_task_id` bigint NULL COMMENT '消息推送任务的id'; +ALTER TABLE im_message_history ADD COLUMN status varchar(32) not null default 'PENDING' comment '消息状态:PENDING、SUCCEED、FAILED'; +ALTER TABLE im_message_history ADD COLUMN receive_person_id varchar(100) not null default '' comment 'IM消息接收personId'; +ALTER TABLE im_message_history ADD COLUMN receive_ou_id bigint not null default 0 comment 'organizational_unit表的id'; + +