diff --git a/im-center-api/pom.xml b/im-center-api/pom.xml
index 41578f6..22e0a22 100644
--- a/im-center-api/pom.xml
+++ b/im-center-api/pom.xml
@@ -34,6 +34,17 @@
cn.axzo.framework
axzo-common-web
+
+
+ cn.axzo.basics
+ basics-profiles-api
+
+
+
+ cn.axzo.maokai
+ maokai-api
+
+
diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java
new file mode 100644
index 0000000..c59a5f7
--- /dev/null
+++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/AccountRegisterApi.java
@@ -0,0 +1,145 @@
+package cn.axzo.im.center.api.feign;
+
+import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
+import cn.axzo.framework.domain.web.result.ApiListResult;
+import cn.axzo.framework.domain.web.result.ApiPageResult;
+import cn.axzo.im.center.common.enums.AppTypeEnum;
+import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
+public interface AccountRegisterApi {
+
+ @PostMapping("/api/im/account/register/page")
+ ApiPageResult page(PageAccountRegisterParam param);
+
+ @PostMapping("/api/im/account/register/list")
+ ApiListResult list(ListAccountRegisterParam param);
+
+ @Data
+ @SuperBuilder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class AccountRegisterDTO {
+
+ private Long id;
+
+ /**
+ * 账户 机器人robotId、普通用户userId
+ */
+ private String accountId;
+
+ /**
+ * 普通用户,通过appType和ouId包装
+ * 包装以后进行账户注册
+ */
+ private String accountWrapper;
+
+ /**
+ * IM账户
+ */
+ private String imAccount;
+
+ /**
+ * 终端类型
+ *
+ * @see AppTypeEnum
+ */
+ private String appType;
+
+ /**
+ * 网易云信appKey
+ */
+ private String appKey;
+
+ /**
+ * channel 服务提供商
+ */
+ private String channelProvider;
+
+ /**
+ * 账户类型:机器人、普通用户
+ */
+ private String accountType;
+
+ /**
+ * IM注册 token
+ */
+ private String token;
+
+ /**
+ * organizational_unit表的id
+ */
+ private Long ouId;
+
+ private Integer isDelete;
+
+ private Date createAt;
+
+ private Date updateAt;
+
+ private PersonProfileDto personProfile;
+
+ private OrganizationalUnitVO organizationalUnit;
+ }
+
+ @SuperBuilder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class ListAccountRegisterParam {
+
+ private List ids;
+
+ private String appType;
+
+ /**
+ * 注册用户ID唯一
+ * 普通用户personId、机器人robotId
+ */
+ private String accountId;
+
+ private Set accountIds;
+
+ /**
+ * 注册用户ID唯一
+ */
+ private String imAccount;
+
+ /**
+ * appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号,
+ * 所以需要根据organizationalUnitId获取账号
+ */
+ private Long organizationalUnitId;
+
+ private String accountType;
+
+ private boolean needOuInfo;
+
+ private boolean needUserInfo;
+ }
+
+ @SuperBuilder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class PageAccountRegisterParam extends ListAccountRegisterParam {
+ Integer pageNumber;
+
+ Integer pageSize;
+
+ /**
+ * 排序:使用示例,createTime__DESC
+ */
+ List sort;
+ }
+}
diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java
index 0ead321..6c2a57a 100644
--- a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java
+++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java
@@ -1,6 +1,7 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
+import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
@@ -33,10 +34,10 @@ public interface MessageApi {
* @return
*/
@PostMapping("/api/im/message/async/send")
- ApiResult sendMessageAsync(@RequestBody @Validated SendMessageParam sendMessageParam);
+ ApiResult sendMessageAsync(@RequestBody @Validated AsyncSendMessageParam sendMessageParam);
/**
- * 同步发送消息,不建议使用,因为第三方接口有限流,会影响接口性能
+ * 同步发送消息,不建议使用,因为第三方接口有限流,会影响接口性能,只能给最多10个用户发送
* @param sendMessageParam
* @return
*/
diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java
new file mode 100644
index 0000000..df9f61e
--- /dev/null
+++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageHistoryApi.java
@@ -0,0 +1,128 @@
+package cn.axzo.im.center.api.feign;
+
+import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
+import cn.axzo.framework.domain.web.result.ApiListResult;
+import cn.axzo.framework.domain.web.result.ApiPageResult;
+import cn.axzo.im.center.common.enums.AppTypeEnum;
+import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
+public interface MessageHistoryApi {
+
+ @PostMapping("/api/im/message/history/page")
+ ApiPageResult page(PageMessageHistoryParam param);
+
+ @PostMapping("/api/im/message/history/list")
+ ApiListResult list(ListMessageHistoryParam param);
+
+ @SuperBuilder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class ListMessageHistoryParam {
+
+ private List ids;
+
+ private Long imMessageTaskId;
+
+ private Set receivePersonId;
+
+ private Set toAccount;
+
+ private boolean needReceiveOuInfo;
+
+ private boolean needReceiveUserInfo;
+ }
+
+ @SuperBuilder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class PageMessageHistoryParam extends ListMessageHistoryParam {
+ Integer pageNumber;
+
+ Integer pageSize;
+
+ /**
+ * 排序:使用示例,createTime__DESC
+ */
+ List sort;
+ }
+
+ @Data
+ @SuperBuilder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class MessageHistoryDTO {
+
+ private Long id;
+
+ /**
+ * 上游业务请求ID
+ */
+ private String bizId;
+
+ /**
+ * 普通用户,通过appType包装
+ * 包装以后进行账户注册
+ */
+ private String messageId;
+
+ /**
+ * 发送者IM账户
+ */
+ private String fromAccount;
+
+
+ /**
+ * 发送者IM账户
+ */
+ private String toAccount;
+
+ /**
+ * 终端类型
+ *
+ * @see AppTypeEnum
+ */
+ private String appType;
+
+
+ /**
+ * channel 网易云信
+ */
+ private String channel;
+
+
+ private String messageBody;
+
+ private String result;
+
+ private Long imMessageTaskId;
+
+ private String receivePersonId;
+
+ private Long receiveOuId;
+
+ private String status;
+
+ private Integer isDelete;
+
+ private Date createAt;
+
+ private Date updateAt;
+
+ private PersonProfileDto receivePersonProfile;
+
+ private OrganizationalUnitVO receiveOrganizationalUnit;
+ }
+}
diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java
new file mode 100644
index 0000000..2bee840
--- /dev/null
+++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AsyncSendMessageParam.java
@@ -0,0 +1,133 @@
+package cn.axzo.im.center.api.vo.req;
+
+import cn.axzo.im.center.common.enums.AppTypeEnum;
+import com.alibaba.fastjson.JSONObject;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotEmpty;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class AsyncSendMessageParam {
+ /**
+ * 发送者IM账号
+ */
+ @NotBlank(message = "sendImAccount不能为空")
+ private String sendImAccount;
+
+ /**
+ * 消息接收用户信息
+ */
+ private List receivePersons;
+
+ /**
+ * 给全员发送
+ */
+ private boolean allPerson;
+
+ /**
+ * 全员发送时需要指定发送消息到App端
+ * 工人端、企业端、服务器
+ * CM、CMP、SYSTEM
+ *
+ * @See cn.axzo.im.center.common.enums.AppTypeEnum
+ */
+ private List appTypes;
+
+ /**
+ * 消息标题
+ */
+ @NotBlank(message = "消息标题不能为空")
+ private String msgHeader;
+
+ /**
+ * 消息内容
+ */
+ @NotBlank(message = "消息内容不能为空")
+ private String msgContent;
+
+ /**
+ * 跳转配置信息
+ */
+ private List jumpDatas;
+
+ /**
+ * 封面图
+ */
+ private String cardBannerUrl;
+
+ /**
+ * 消息扩展信息
+ */
+ private JSONObject ext;
+
+ /**
+ * 业务的唯一ID,用于查询发送消息的记录和结果,不验证唯一
+ */
+ private String bizId;
+
+ @Data
+ @Builder
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class JumpData {
+
+ private SendMessageParam.JumpPlatform platform;
+
+ private String url;
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public enum JumpPlatform {
+ WEB("PC"),
+ MINI_PROGRAM("安心筑小程"),
+ IOS("IOS"),
+ ANDROID("ANDROID"),
+ WEB_VIEW("H5"),
+ WECHAT_MINI_PROGRAM("微信小程序"),
+ ;
+
+ private String desc;
+ }
+
+ @Data
+ @Builder
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class ReceivePerson {
+
+ /**
+ * 接收消息的personId
+ */
+ private String personId;
+
+ /**
+ * appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号,
+ * 所以需要根据organizationalUnitId获取账号
+ */
+ private Long ouId;
+
+ /**
+ * 发送消息到App端
+ * 工人端、企业端、服务器
+ * CM、CMP、SYSTEM
+ *
+ * @See cn.axzo.im.center.common.enums.AppTypeEnum
+ */
+ private AppTypeEnum appType;
+
+ /**
+ * im账号,可以personId和imAccount二选一
+ */
+ private String imAccount;
+ }
+}
diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java
index e97d2b9..c9b1a71 100644
--- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java
+++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/NimChannelService.java
@@ -54,7 +54,7 @@ public class NimChannelService implements IMChannelProvider {
private static final String NIM_MESSAGE_ATTACH_URL = "https://api.netease.im/nimserver/msg/sendAttachMsg.action";
- private static final int SUCCESS_CODE = 200;
+ public static final int SUCCESS_CODE = 200;
private static final int NIM_ACCOUNT_ALREADY_REGISTER = 414;
diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java
index 2e9c3ec..2734a26 100644
--- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java
+++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBatchDispatchResponse.java
@@ -3,10 +3,12 @@ package cn.axzo.im.channel.netease.dto;
import lombok.Builder;
import lombok.Data;
-import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import static cn.axzo.im.channel.netease.NimChannelService.SUCCESS_CODE;
+
/**
* 批量发送消息返回响应
* 示例:
@@ -37,4 +39,7 @@ public class MessageBatchDispatchResponse {
private String desc;
+ public boolean isSuccess() {
+ return Objects.equals(this.getCode(), SUCCESS_CODE);
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java b/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java
index 5cf70b3..669b1fa 100644
--- a/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java
+++ b/im-center-server/src/main/java/cn/axzo/im/config/BizResultCode.java
@@ -8,7 +8,13 @@ import lombok.Getter;
@AllArgsConstructor
public enum BizResultCode implements ResultCode {
- SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误");
+ SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误"),
+ SEND_IM_ACCOUNT_MAX("101", "同步接口只支持最多10个IM账号,请选择异步接口发送"),
+ SEND_PERSSON_ERROR("102", "接收人信息和全部接收人不能同时都为空"),
+ ALL_PERSSON_TYPE_NOT_EMPTY("103", "全员发送时,接收端不能为空"),
+ ACQUIRE_RATE_LIMITER_FAIL("104", "获取滑动窗口令牌失败"),
+ MESSAGE_TASK_STATUS_ERROR("105", "更新消息任务失败,状态异常"),
+ MESSAGE_TASK_NOT_FOUND("106", "消息任务不存在"),;
private String errorCode;
diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java b/im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java
new file mode 100644
index 0000000..068e7d4
--- /dev/null
+++ b/im-center-server/src/main/java/cn/axzo/im/controller/AccountRegisterController.java
@@ -0,0 +1,53 @@
+package cn.axzo.im.controller;
+
+import cn.axzo.framework.domain.web.result.ApiListResult;
+import cn.axzo.framework.domain.web.result.ApiPageResult;
+import cn.axzo.im.center.api.feign.AccountRegisterApi;
+import cn.axzo.im.service.AccountRegisterService;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@RestController
+@RequiredArgsConstructor
+public class AccountRegisterController implements AccountRegisterApi {
+
+ @Autowired
+ private AccountRegisterService accountRegisterService;
+
+ @Override
+ public ApiPageResult page(PageAccountRegisterParam param) {
+ AccountRegisterService.PageAccountRegisterParam pageAccountRegisterParam = AccountRegisterService.PageAccountRegisterParam.builder().build();
+ BeanUtils.copyProperties(param, pageAccountRegisterParam);
+
+ Page page = accountRegisterService.page(pageAccountRegisterParam);
+
+ return ApiPageResult.ok(page.convert(record -> {
+ AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build();
+ BeanUtils.copyProperties(record, accountRegisterDTO);
+ return accountRegisterDTO;
+ }));
+ }
+
+ @Override
+ public ApiListResult list(ListAccountRegisterParam param) {
+ AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder().build();
+ BeanUtils.copyProperties(param, listAccountRegisterParam);
+
+ List list = accountRegisterService.list(listAccountRegisterParam);
+ return ApiListResult.ok(list.stream()
+ .map(e -> {
+ AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build();
+ BeanUtils.copyProperties(e, accountRegisterDTO);
+ return accountRegisterDTO;
+ })
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java
index b8ea91f..28317d9 100644
--- a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java
+++ b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java
@@ -4,6 +4,7 @@ import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.AccountQuery;
+import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
@@ -27,6 +28,7 @@ import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,7 +40,10 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.List;
+import static cn.axzo.im.config.BizResultCode.ALL_PERSSON_TYPE_NOT_EMPTY;
+import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_MAX;
import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_NOT_FOUND;
+import static cn.axzo.im.config.BizResultCode.SEND_PERSSON_ERROR;
/**
* IM消息派发相关
@@ -91,7 +96,7 @@ public class MessageController implements MessageApi {
* @return
*/
@Override
- public ApiResult sendMessageAsync(SendMessageParam sendMessageParam) {
+ public ApiResult sendMessageAsync(AsyncSendMessageParam sendMessageParam) {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
return ApiResult.ok(toMessageTaskResp(messageTask));
@@ -102,7 +107,6 @@ public class MessageController implements MessageApi {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
-
return ApiResult.ok(toMessageTaskResp(messageTask));
}
@@ -114,12 +118,32 @@ public class MessageController implements MessageApi {
}
private void check(SendMessageParam sendMessageParam) {
- List accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
+ List accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
.imAccount(sendMessageParam.getSendImAccount())
.build());
Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND);
+
+ Aassert.check(sendMessageParam.getReceivePersons().size() <= 10, SEND_IM_ACCOUNT_MAX);
}
+
+ private void check(AsyncSendMessageParam sendMessageParam) {
+ List accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
+ .imAccount(sendMessageParam.getSendImAccount())
+ .build());
+
+ Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND);
+
+ if (CollectionUtils.isEmpty(sendMessageParam.getReceivePersons())
+ && BooleanUtils.isNotTrue(sendMessageParam.isAllPerson())) {
+ throw SEND_PERSSON_ERROR.toException();
+ }
+
+ if (BooleanUtils.isTrue(sendMessageParam.isAllPerson())) {
+ Aassert.checkNotEmpty(sendMessageParam.getAppTypes(), ALL_PERSSON_TYPE_NOT_EMPTY);
+ }
+ }
+
private String check(SendTemplateMessageParam sendMessageParam) {
List robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(sendMessageParam.getMsgTemplateId());
if (CollectionUtils.isEmpty(robotIdList)) {
@@ -154,6 +178,28 @@ public class MessageController implements MessageApi {
return messageTaskResp;
}
+ private MessageTask toMessageTask(AsyncSendMessageParam sendMessageParam) {
+
+ MessageTask.BizData bizData = MessageTask.BizData.builder()
+ .jumpDatas(sendMessageParam.getJumpDatas())
+ // 全员发送是不常用的场景,不应该由业务处理,所以把配置放在bizData里面
+ .allPerson(sendMessageParam.isAllPerson())
+ .appTypes(sendMessageParam.getAppTypes())
+ .build();
+ Date now = new Date();
+ return MessageTask.builder()
+ .bizId(sendMessageParam.getBizId())
+ .sendImAccount(sendMessageParam.getSendImAccount())
+ .receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class))
+ .status(MessageTask.Status.PENDING)
+ .title(sendMessageParam.getMsgHeader())
+ .content(sendMessageParam.getMsgContent())
+ .bizData(bizData)
+ .ext(sendMessageParam.getExt())
+ .planStartTime(now)
+ .createAt(now)
+ .build();
+ }
private MessageTask toMessageTask(SendMessageParam sendMessageParam) {
diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java b/im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java
new file mode 100644
index 0000000..11489a3
--- /dev/null
+++ b/im-center-server/src/main/java/cn/axzo/im/controller/MessageHistoryController.java
@@ -0,0 +1,54 @@
+package cn.axzo.im.controller;
+
+
+import cn.axzo.framework.domain.web.result.ApiListResult;
+import cn.axzo.framework.domain.web.result.ApiPageResult;
+import cn.axzo.im.center.api.feign.MessageHistoryApi;
+import cn.axzo.im.service.MessageHistoryService;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@RestController
+@RequiredArgsConstructor
+public class MessageHistoryController implements MessageHistoryApi {
+
+ @Autowired
+ private MessageHistoryService messageHistoryService;
+
+ @Override
+ public ApiPageResult page(PageMessageHistoryParam param) {
+ MessageHistoryService.PageMessageHistoryParam pageMessageHistoryParam = MessageHistoryService.PageMessageHistoryParam.builder().build();
+ BeanUtils.copyProperties(param, pageMessageHistoryParam);
+
+ Page page = messageHistoryService.page(pageMessageHistoryParam);
+
+ return ApiPageResult.ok(page.convert(record -> {
+ MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build();
+ BeanUtils.copyProperties(record, messageHistoryDTO);
+ return messageHistoryDTO;
+ }));
+ }
+
+ @Override
+ public ApiListResult list(ListMessageHistoryParam param) {
+ MessageHistoryService.ListMessageHistoryParam listMessageHistoryParam = MessageHistoryService.ListMessageHistoryParam.builder().build();
+ BeanUtils.copyProperties(param, listMessageHistoryParam);
+
+ List list = messageHistoryService.list(listMessageHistoryParam);
+ return ApiListResult.ok(list.stream()
+ .map(e -> {
+ MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build();
+ BeanUtils.copyProperties(e, messageHistoryDTO);
+ return messageHistoryDTO;
+ })
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java b/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java
index 4792ee7..006c6f4 100644
--- a/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java
+++ b/im-center-server/src/main/java/cn/axzo/im/entity/AccountRegister.java
@@ -1,14 +1,18 @@
package cn.axzo.im.entity;
-import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import cn.axzo.im.center.common.enums.AppTypeEnum;
+import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
+import lombok.experimental.SuperBuilder;
import java.io.Serializable;
+import java.util.Date;
/**
* IM账户表
@@ -19,12 +23,17 @@ import java.io.Serializable;
*/
@TableName("im_account_register")
@Data
-@EqualsAndHashCode(callSuper = true)
+@SuperBuilder
@Accessors(chain = true)
-public class AccountRegister extends BaseEntity implements Serializable {
+@NoArgsConstructor
+@AllArgsConstructor
+public class AccountRegister implements Serializable {
private static final long serialVersionUID = 1L;
+ @TableId(type = IdType.AUTO)
+ private Long id;
+
/**
* 账户 机器人robotId、普通用户userId
*/
@@ -82,4 +91,13 @@ public class AccountRegister extends BaseEntity implements Ser
*/
@TableField("ou_id")
private Long ouId;
+
+ @TableField
+ private Integer isDelete;
+
+ @TableField
+ private Date createAt;
+
+ @TableField
+ private Date updateAt;
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java
index 8a29341..a17f138 100644
--- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java
+++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java
@@ -2,13 +2,19 @@ package cn.axzo.im.entity;
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import cn.axzo.im.center.common.enums.AppTypeEnum;
+import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
+import lombok.experimental.SuperBuilder;
import java.io.Serializable;
+import java.util.Date;
/**
* IM消息模历史表
@@ -19,12 +25,17 @@ import java.io.Serializable;
*/
@TableName("im_message_history")
@Data
-@EqualsAndHashCode(callSuper = true)
+@SuperBuilder
@Accessors(chain = true)
-public class MessageHistory extends BaseEntity implements Serializable {
+@NoArgsConstructor
+@AllArgsConstructor
+public class MessageHistory implements Serializable {
private static final long serialVersionUID = 1L;
+ @TableId(type = IdType.AUTO)
+ private Long id;
+
/**
* 上游业务请求ID
*/
@@ -70,4 +81,33 @@ public class MessageHistory extends BaseEntity implements Seria
@TableField("message_body")
private String messageBody;
+ @TableField("result")
+ private String result;
+
+ @TableField("im_message_task_id")
+ private Long imMessageTaskId;
+
+ @TableField("receive_person_id")
+ private String receivePersonId;
+
+ @TableField("receive_ou_id")
+ private Long receiveOuId;
+
+ private Status status;
+
+ @TableField
+ private Integer isDelete;
+
+ @TableField
+ private Date createAt;
+
+ @TableField
+ private Date updateAt;
+
+ public enum Status {
+ PENDING,
+ SUCCEED,
+ FAILED,
+ ;
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java
index e157c87..2b26f24 100644
--- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java
+++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java
@@ -10,18 +10,24 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.cglib.beans.BeanMap;
import java.util.Date;
import java.util.List;
import java.util.Optional;
+import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_STATUS_ERROR;
+
@Data
@SuperBuilder
@Accessors(chain = true)
@@ -130,6 +136,20 @@ public class MessageTask {
* 跳转信息
*/
private List jumpDatas;
+
+ /**
+ * 给全员发送
+ */
+ private boolean allPerson;
+
+ /**
+ * 全员发送时需要指定发送消息到App端
+ * 工人端、企业端、服务器
+ * CM、CMP、SYSTEM
+ *
+ * @See cn.axzo.im.center.common.enums.AppTypeEnum
+ */
+ private List appTypes;
}
@Data
@@ -162,7 +182,31 @@ public class MessageTask {
* im账号,可以personId和imAccount二选一
*/
private String imAccount;
+
+ public String buildKey() {
+ if (StringUtils.isNotBlank(this.getImAccount())) {
+ return this.getImAccount();
+ }
+ return this.getPersonId() + "_" + this.getAppType() + "_" + this.getOuId();
+ }
}
public static class ListReceivePersonTypeHandler extends BaseListTypeHandler {}
+
+ @Getter
+ @AllArgsConstructor
+ public enum ActionEnum {
+ SUCCESS,
+ ;
+
+ private static final Table STATUS_FLOWS = HashBasedTable.create();
+
+ static {
+ STATUS_FLOWS.put(Status.PENDING, SUCCESS, Status.SUCCEED);
+ }
+
+ public Status getNextStatus(Status oldStatus) {
+ return Optional.ofNullable(STATUS_FLOWS.get(oldStatus, this)).orElseThrow(MESSAGE_TASK_STATUS_ERROR::toException);
+ }
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java b/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java
new file mode 100644
index 0000000..6754a7c
--- /dev/null
+++ b/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java
@@ -0,0 +1,78 @@
+package cn.axzo.im.job;
+
+import cn.axzo.im.entity.MessageTask;
+import cn.axzo.im.service.MessageTaskService;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.IJobHandler;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * 查询ImMessageTask中的PEDING数据,添加到messageHistory
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class CreateMessageHistoryJob extends IJobHandler {
+
+ @Autowired
+ private MessageTaskService messageTaskService;
+
+ private static final Integer DEFAULT_PAGE_SIZE = 500;
+
+ @Override
+ @XxlJob("sendMessageJob")
+ public ReturnT execute(String s) throws Exception {
+
+ log.info("start sendMessageJob,s:{}", s);
+ SendMessageParam sendMessageParam = Optional.ofNullable(s)
+ .map(e -> JSONObject.parseObject(e, SendMessageParam.class))
+ .orElseGet(() -> SendMessageParam.builder().build());
+ Integer pageNumber = 1;
+ Date now = new Date();
+ while (true) {
+ MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder()
+ .ids(sendMessageParam.getIds())
+ .planStartTimeLE(now)
+ .status(MessageTask.Status.PENDING)
+ .pageNumber(pageNumber++)
+ .pageSize(DEFAULT_PAGE_SIZE)
+ .build();
+
+ Page page = messageTaskService.page(req);
+ if (CollectionUtils.isNotEmpty(page.getRecords())) {
+ page.getRecords().forEach(messageTask -> {
+ messageTaskService.createMessageHistory(messageTask);
+ });
+ }
+
+ if (!page.hasNext()) {
+ break;
+ }
+ }
+ log.info("end sendMessageJob");
+ return ReturnT.SUCCESS;
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class SendMessageParam {
+ private List ids;
+ }
+}
diff --git a/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java b/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java
index b3a019b..5774d97 100644
--- a/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java
+++ b/im-center-server/src/main/java/cn/axzo/im/job/SendMessageJob.java
@@ -1,81 +1,37 @@
package cn.axzo.im.job;
-import cn.axzo.im.entity.MessageTask;
+import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
-import cn.axzo.pokonyan.client.RateLimiter;
-import cn.axzo.pokonyan.client.RateLimiterClient;
-import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.google.common.collect.Maps;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
-import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import java.util.Date;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
-/**
- * 查询ImMessageTask中的PEDING数据,然后发送IM消息和push
- */
@Slf4j
@Component
@RequiredArgsConstructor
public class SendMessageJob extends IJobHandler {
@Autowired
- private MessageTaskService messageTaskService;
+ private MessageHistoryService messageHistoryService;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
- @XxlJob("sendMessageJob")
public ReturnT execute(String s) throws Exception {
-
- log.info("start sendMessageJob,s:{}", s);
- SendMessageParam sendMessageParam = Optional.ofNullable(s)
- .map(e -> JSONObject.parseObject(e, SendMessageParam.class))
- .orElseGet(() -> SendMessageParam.builder().build());
- Integer pageNumber = 1;
- Date now = new Date();
- while (true) {
- MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder()
- .ids(sendMessageParam.getIds())
- .planStartTimeLE(now)
- .status(MessageTask.Status.PENDING)
- .pageNumber(pageNumber++)
- .pageSize(DEFAULT_PAGE_SIZE)
- .build();
-
- Page page = messageTaskService.page(req);
- if (CollectionUtils.isNotEmpty(page.getRecords())) {
- sendMessage(page.getRecords());
- }
-
- if (!page.hasNext()) {
- break;
- }
- }
- log.info("end sendMessageJob");
- return ReturnT.SUCCESS;
+ return null;
}
- private void sendMessage(List messageTasks) {
-
- messageTasks.forEach(messageTask -> {
-
- });
-
- }
@Data
@Builder
diff --git a/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java b/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java
index 1337d31..5003c39 100644
--- a/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java
+++ b/im-center-server/src/main/java/cn/axzo/im/job/UpdateImAccountOuIdJob.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -68,7 +67,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
.pageSize(DEFAULT_PAGE_SIZE)
.build();
- Page page = accountRegisterService.page(req);
+ Page page = accountRegisterService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
Map nodeUsers = listNodeUsers(page.getRecords());
@@ -83,7 +82,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
return ReturnT.SUCCESS;
}
- private void updateAccountRegister(List accountRegisters, Map nodeUsers) {
+ private void updateAccountRegister(List accountRegisters, Map nodeUsers) {
List update = accountRegisters.stream()
.filter(accountRegister -> nodeUsers.get(accountRegister.getAccountId()) != null)
.map(accountRegister -> {
@@ -103,7 +102,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
accountRegisterService.updateBatchById(update);
}
- private Map listNodeUsers(List accountRegisters) {
+ private Map listNodeUsers(List accountRegisters) {
if (CollectionUtils.isEmpty(accountRegisters)) {
return Collections.EMPTY_MAP;
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java b/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java
index 5dedbdb..8f27f23 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/AccountRegisterService.java
@@ -1,6 +1,8 @@
package cn.axzo.im.service;
+import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.entity.AccountRegister;
+import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
import cn.axzo.pokonyan.dao.wrapper.Operator;
@@ -10,15 +12,17 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
+import org.springframework.beans.BeanUtils;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public interface AccountRegisterService extends IService {
- Page page(PageAccountRegisterParam param);
+ Page page(PageAccountRegisterParam param);
- List list(ListAccountRegisterParam param);
+ List list(ListAccountRegisterParam param);
@SuperBuilder
@Data
@@ -57,6 +61,12 @@ public interface AccountRegisterService extends IService {
@CriteriaField(field = "accountType", operator = Operator.EQ)
private String accountType;
+
+ @CriteriaField(ignore = true)
+ private boolean needOuInfo;
+
+ @CriteriaField(ignore = true)
+ private boolean needUserInfo;
}
@SuperBuilder
@@ -77,4 +87,25 @@ public interface AccountRegisterService extends IService {
List sort;
}
+ @Data
+ @SuperBuilder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class AccountRegisterDTO extends AccountRegister {
+
+ private PersonProfileDto personProfile;
+
+ private OrganizationalUnitVO organizationalUnit;
+
+ public static AccountRegisterDTO from(AccountRegister accountRegister,
+ Map personProfiles,
+ Map organizationals) {
+ AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build();
+ BeanUtils.copyProperties(accountRegister, accountRegisterDTO);
+
+ accountRegisterDTO.setPersonProfile(personProfiles.get(Long.valueOf(accountRegisterDTO.getAccountId())));
+ accountRegisterDTO.setOrganizationalUnit(organizationals.get(accountRegisterDTO.getOuId()));
+ return accountRegisterDTO;
+ }
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java b/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java
index bdd99a5..a2f581d 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/AccountService.java
@@ -348,7 +348,7 @@ public class AccountService {
if (appTypeEnum == AppTypeEnum.CMP) {
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
}
- List accountRegisters = accountRegisterService.list(listAccountRegisterParam);
+ List accountRegisters = accountRegisterService.list(listAccountRegisterParam);
if (CollectionUtils.isNotEmpty(accountRegisters)) {
userAccountAll.addAll(accountRegisters.stream()
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java
index 2544ada..05df687 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageHistoryService.java
@@ -1,7 +1,90 @@
package cn.axzo.im.service;
+import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.entity.MessageHistory;
+import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
+import cn.axzo.pokonyan.dao.page.IPageParam;
+import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
+import cn.axzo.pokonyan.dao.wrapper.Operator;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.springframework.beans.BeanUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public interface MessageHistoryService extends IService {
+
+ Page page(PageMessageHistoryParam param);
+
+ List list(ListMessageHistoryParam param);
+
+ @SuperBuilder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class ListMessageHistoryParam {
+
+ @CriteriaField(field = "id", operator = Operator.IN)
+ private List ids;
+
+ @CriteriaField(field = "imMessageTaskId", operator = Operator.EQ)
+ private Long imMessageTaskId;
+
+ @CriteriaField(field = "receivePersonId", operator = Operator.IN)
+ private Set receivePersonId;
+
+ @CriteriaField(field = "toAccount", operator = Operator.IN)
+ private Set toAccount;
+
+ @CriteriaField(ignore = true)
+ private boolean needReceiveOuInfo;
+
+ @CriteriaField(ignore = true)
+ private boolean needReceiveUserInfo;
+ }
+
+ @SuperBuilder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class PageMessageHistoryParam extends ListMessageHistoryParam implements IPageParam {
+ @CriteriaField(ignore = true)
+ Integer pageNumber;
+
+ @CriteriaField(ignore = true)
+ Integer pageSize;
+
+ /**
+ * 排序:使用示例,createTime__DESC
+ */
+ @CriteriaField(ignore = true)
+ List sort;
+ }
+
+ @Data
+ @SuperBuilder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class MessageHistoryDTO extends MessageHistory {
+ private PersonProfileDto receivePersonProfile;
+
+ private OrganizationalUnitVO receiveOrganizationalUnit;
+
+ public static MessageHistoryDTO from(MessageHistory messageHistory,
+ Map personProfiles,
+ Map organizationals) {
+ MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build();
+ BeanUtils.copyProperties(messageHistory, messageHistoryDTO);
+
+ messageHistoryDTO.setReceivePersonProfile(personProfiles.get(Long.valueOf(messageHistoryDTO.getReceivePersonId())));
+ messageHistoryDTO.setReceiveOrganizationalUnit(organizationals.get(messageHistoryDTO.getReceiveOuId()));
+ return messageHistoryDTO;
+ }
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java
index ae5cbb8..d0d2692 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageService.java
@@ -371,37 +371,37 @@ public class MessageService {
// return messageDispatchRespList;
// }
-// private void insertImMessage(List messageRespList, String messageBody) {
-// if (CollectionUtils.isEmpty(messageRespList)) {
-// return;
-// }
-// String requestId = UUID.randomUUID().toString().replace("-", "");
-// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
-// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
-// MessageHistory messageHistory = new MessageHistory();
-// messageHistory.setBizId(requestId);
-// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
-// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
-// messageHistory.setToAccount(messageDispatchResp.getToImAccount());
-// } else {
-// messageHistory.setToAccount("unregistered");
-// }
-// messageHistory.setChannel(imChannel.getProviderType());
-// messageHistory.setAppType(messageDispatchResp.getAppType());
-// messageHistory.setMessageBody(messageBody);
-// messageHistory.setCreateAt(new Date());
-// messageHistory.setUpdateAt(new Date());
-// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
-// messageHistory.setMessageId(messageDispatchResp.getMsgid());
-// }
-// try {
-// messageHistoryDao.saveOrUpdate(messageHistory);
-// Thread.sleep(50);
-// } catch (Exception e) {
-// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
-// }
-// }));
-// }
+ private void insertImMessage(List messageRespList, String messageBody) {
+ if (CollectionUtils.isEmpty(messageRespList)) {
+ return;
+ }
+ String requestId = UUID.randomUUID().toString().replace("-", "");
+ log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
+ CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
+ MessageHistory messageHistory = new MessageHistory();
+ messageHistory.setBizId(requestId);
+ messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
+ if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
+ messageHistory.setToAccount(messageDispatchResp.getToImAccount());
+ } else {
+ messageHistory.setToAccount("unregistered");
+ }
+ messageHistory.setChannel(imChannel.getProviderType());
+ messageHistory.setAppType(messageDispatchResp.getAppType());
+ messageHistory.setMessageBody(messageBody);
+ messageHistory.setCreateAt(new Date());
+ messageHistory.setUpdateAt(new Date());
+ if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
+ messageHistory.setMessageId(messageDispatchResp.getMsgid());
+ }
+ try {
+ messageHistoryDao.saveOrUpdate(messageHistory);
+ Thread.sleep(50);
+ } catch (Exception e) {
+ log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
+ }
+ }));
+ }
@Transactional(rollbackFor = Exception.class)
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java
index 5c34964..6b66daa 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java
@@ -1,6 +1,5 @@
package cn.axzo.im.service;
-import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
@@ -8,6 +7,7 @@ import cn.axzo.pokonyan.dao.wrapper.Operator;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -21,8 +21,33 @@ public interface MessageTaskService extends IService {
Page page(PageMessageTaskParam param);
- List sendMessage(MessageTask messageTask);
+ void createMessageHistory(MessageTask messageTask);
+ void update(UpdateMessageTaskParam param);
+
+ @Builder
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class UpdateMessageTaskParam {
+
+ private Long id;
+
+ private MessageTask.ActionEnum action;
+
+ private Date startedTime;
+
+ private Date finishedTime;
+
+ public MessageTask to() {
+ return MessageTask.builder()
+ .id(this.getId())
+ .startedTime(this.getStartedTime())
+ .finishedTime(this.getFinishedTime())
+ .build();
+ }
+
+ }
@SuperBuilder
@Data
@NoArgsConstructor
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java
index a3c70c2..9bec97c 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/AccountRegisterServiceImpl.java
@@ -1,34 +1,67 @@
package cn.axzo.im.service.impl;
+import cn.axzo.basics.profiles.api.UserProfileServiceApi;
+import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
+import cn.axzo.im.center.common.enums.AccountTypeEnum;
+import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.service.AccountRegisterService;
+import cn.axzo.maokai.api.client.OrganizationalUnitApi;
+import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
+import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
@Slf4j
@Service
public class AccountRegisterServiceImpl extends ServiceImpl
implements AccountRegisterService {
+ @Autowired
+ private IMChannelProvider imChannelProvider;
+ @Autowired
+ private OrganizationalUnitApi organizationalUnitApi;
+ @Autowired
+ private UserProfileServiceApi userProfileServiceApi;
+
@Override
- public Page page(PageAccountRegisterParam param) {
+ public Page page(PageAccountRegisterParam param) {
QueryWrapper wrapper = QueryWrapperHelper.fromBean(param, AccountRegister.class);
+ // 只能取配置的appKey的数据,防止接口使用方没传appKey导致获取错乱的数据
+ wrapper.eq("app_key", imChannelProvider.getProviderAppKey());
wrapper.eq("is_delete", 0);
- return this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper);
+ Page page = this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper);
+
+ Map personProfiles = listUserPersonProfile(param, page.getRecords());
+
+ Map organizationals = listOrganizational(param, page.getRecords());
+
+ return PageConverter.convert(page, (record) -> AccountRegisterDTO.from(record,
+ personProfiles,
+ organizationals));
}
@Override
- public List list(ListAccountRegisterParam param) {
+ public List list(ListAccountRegisterParam param) {
return PageConverter.drainAll(pageNumber -> {
PageAccountRegisterParam pageParam = PageAccountRegisterParam.builder().build();
BeanUtils.copyProperties(param, pageParam);
@@ -37,4 +70,49 @@ public class AccountRegisterServiceImpl extends ServiceImpl listUserPersonProfile(PageAccountRegisterParam param,
+ List accountRegisters) {
+ if (CollectionUtils.isEmpty(accountRegisters) || BooleanUtils.isNotTrue(param.isNeedUserInfo())) {
+ return Collections.emptyMap();
+ }
+
+ List personIds = accountRegisters.stream()
+ .filter(e -> Objects.equals(e.getAccountType(), AccountTypeEnum.USER.getCode()))
+ .map(AccountRegister::getAccountId)
+ .filter(Objects::nonNull)
+ .filter(StringUtils::isNumeric)
+ .map(Long::valueOf)
+ .distinct()
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(personIds)) {
+ return Collections.emptyMap();
+ }
+
+ return userProfileServiceApi.postPersonProfiles(personIds).getData()
+ .stream()
+ .collect(Collectors.toMap(PersonProfileDto::getId, Function.identity()));
+ }
+
+ private Map listOrganizational(PageAccountRegisterParam param,
+ List accountRegisters) {
+ if (CollectionUtils.isEmpty(accountRegisters) || BooleanUtils.isNotTrue(param.isNeedOuInfo())) {
+ return Collections.emptyMap();
+ }
+
+ List ouIds = accountRegisters.stream()
+ .map(AccountRegister::getOuId)
+ .filter(Objects::nonNull)
+ .distinct()
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(ouIds)) {
+ return Collections.emptyMap();
+ }
+
+ return organizationalUnitApi.page(OrganizationalUnitQuery.builder().unitIds(ouIds).build())
+ .getData()
+ .getList()
+ .stream()
+ .collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f));
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java
index 7c99190..819f5e7 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java
@@ -1,4 +1,111 @@
package cn.axzo.im.service.impl;
-public class MessageHistoryServiceImpl {
+import cn.axzo.basics.profiles.api.UserProfileServiceApi;
+import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
+import cn.axzo.im.dao.mapper.MessageHistoryMapper;
+import cn.axzo.im.entity.MessageHistory;
+import cn.axzo.im.service.MessageHistoryService;
+import cn.axzo.maokai.api.client.OrganizationalUnitApi;
+import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
+import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
+import cn.axzo.pokonyan.dao.converter.PageConverter;
+import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class MessageHistoryServiceImpl extends ServiceImpl
+ implements MessageHistoryService {
+
+ @Autowired
+ private OrganizationalUnitApi organizationalUnitApi;
+ @Autowired
+ private UserProfileServiceApi userProfileServiceApi;
+
+ @Override
+ public Page page(PageMessageHistoryParam param) {
+ QueryWrapper wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class);
+ // 只能取配置的appKey的数据,防止接口使用方没传appKey导致获取错乱的数据
+ wrapper.eq("is_delete", 0);
+
+ Page page = this.page(PageConverter.convertToMybatis(param, MessageHistory.class), wrapper);
+
+ Map personProfiles = listReceiveUserPersonProfile(param, page.getRecords());
+
+ Map organizationals = listReceiveOrganizational(param, page.getRecords());
+
+ return PageConverter.convert(page, (record) -> MessageHistoryDTO.from(record,
+ personProfiles,
+ organizationals));
+ }
+
+ @Override
+ public List list(ListMessageHistoryParam param) {
+ return PageConverter.drainAll(pageNumber -> {
+ PageMessageHistoryParam pageParam = PageMessageHistoryParam.builder().build();
+ BeanUtils.copyProperties(param, pageParam);
+ pageParam.setPageNumber(pageNumber);
+ pageParam.setPageSize(500);
+ return page(pageParam);
+ });
+ }
+
+ private Map listReceiveUserPersonProfile(PageMessageHistoryParam param,
+ List messageHistories) {
+ if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveUserInfo())) {
+ return Collections.emptyMap();
+ }
+
+ List personIds = messageHistories.stream()
+ .map(MessageHistory::getReceivePersonId)
+ .filter(StringUtils::isNumeric)
+ .map(Long::valueOf)
+ .distinct()
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(personIds)) {
+ return Collections.emptyMap();
+ }
+
+ return userProfileServiceApi.postPersonProfiles(personIds).getData()
+ .stream()
+ .collect(Collectors.toMap(PersonProfileDto::getId, Function.identity()));
+ }
+
+ private Map listReceiveOrganizational(PageMessageHistoryParam param,
+ List messageHistories) {
+ if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveOuInfo())) {
+ return Collections.emptyMap();
+ }
+
+ List ouIds = messageHistories.stream()
+ .map(MessageHistory::getReceiveOuId)
+ .filter(Objects::nonNull)
+ .distinct()
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(ouIds)) {
+ return Collections.emptyMap();
+ }
+
+ return organizationalUnitApi.page(OrganizationalUnitQuery.builder().unitIds(ouIds).build())
+ .getData()
+ .getList()
+ .stream()
+ .collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f));
+ }
}
diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java
index f048729..0ab1ac5 100644
--- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java
+++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java
@@ -6,20 +6,26 @@ import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
+import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountRegisterService;
+import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.pokonyan.client.RateLimiter;
import cn.axzo.pokonyan.client.RateLimiterClient;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
+import cn.axzo.pokonyan.exception.Aassert;
import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
@@ -27,10 +33,19 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.CollectionUtils;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
+import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_NOT_FOUND;
@Slf4j
@Service
@@ -43,6 +58,8 @@ public class MessageTaskServiceImpl extends ServiceImpl sendMessage(MessageTask messageTask) {
+ public void createMessageHistory(MessageTask messageTask) {
- if (rateLimiter.tryAcquire()) {
- log.info("获得令牌");
+ this.update(UpdateMessageTaskParam.builder()
+ .id(messageTask.getId())
+ .startedTime(new Date())
+ .build());
-// listAccountRegisters(messageTask);
-
-
- MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
- batchDispatchRequest.setFromAccid(messageTask.getSendImAccount());
- batchDispatchRequest.setBody(resolveBody(messageTask));
-// batchDispatchRequest.setToAccids(imAccountList);
- MessageBatchDispatchResponse batchResponse = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
-
-
- return null;
+ MessageTask.BizData bizData = messageTask.getBizData();
+ if (bizData.isAllPerson()) {
+ log.info("发送全员消息");
+ doSendAll(messageTask);
+ } else {
+ log.info("发送非全员消息");
+ doSendNotAll(messageTask);
}
-
-
- log.info("未获得令牌");
- return null;
+ this.update(UpdateMessageTaskParam.builder()
+ .id(messageTask.getId())
+ .action(MessageTask.ActionEnum.SUCCESS)
+ .finishedTime(new Date())
+ .build());
}
-// private Map listAccountRegisters(MessageTask messageTask) {
-// Set personIds = messageTask.getReceivePersons().stream()
-// .filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId())
-// && StringUtils.isBlank(receivePerson.getImAccount()))
-// .map(MessageTask.ReceivePerson::getPersonId)
-// .collect(Collectors.toSet());
-// if (CollectionUtils.isEmpty(personIds)) {
-// return;
-// }
-//
-// accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
-// .accountIds(personIds)
-// .build());
-// }
+ @Override
+ public void update(UpdateMessageTaskParam param) {
+ MessageTask oldMessageTask = this.lambdaQuery()
+ .eq(MessageTask::getId, param.getId())
+ .last("for update")
+ .one();
+ Aassert.notNull(oldMessageTask, MESSAGE_TASK_NOT_FOUND);;
+
+ MessageTask updateMessageTask = param.to();
+ if (param.getAction() != null) {
+ updateMessageTask.setStatus(param.getAction().getNextStatus(oldMessageTask.getStatus()));
+ }
+
+ this.updateById(updateMessageTask);
+ }
+
+ private void doSendAll(MessageTask messageTask) {
+
+ }
+
+ private void doSendNotAll(MessageTask messageTask) {
+
+ List> receivePersons = Lists.partition(messageTask.getReceivePersons(), msgSendPersonOfOneBatch);
+ String messageBody = resolveBody(messageTask);
+ receivePersons.forEach(e -> saveMessageHistory(e, messageTask, messageBody));
+ }
+
+ private void saveMessageHistory(List receivePersons,
+ MessageTask messageTask,
+ String messageBody) {
+ // 排除已经发送成功的记录,防止重复发送
+ Set existPersons = listExistPerson(receivePersons, messageTask);
+ Set existImAccounts = listExistImAccount(receivePersons, messageTask);
+
+ List absentReceivePersons = receivePersons.stream()
+ .filter(e -> {
+ String key = e.buildKey();
+ return !existPersons.contains(key) && !existImAccounts.contains(key);
+ })
+ .collect(Collectors.toList());
+
+ if (CollectionUtils.isEmpty(absentReceivePersons)) {
+ log.info("messageTask,{}, receivePersons,{},已经存在", JSONObject.toJSONString(messageTask),
+ JSONObject.toJSONString(receivePersons));
+ return;
+ }
+
+ Map accountRegisters = listAccountRegisters(absentReceivePersons);
+
+ List messageHistories = absentReceivePersons.stream()
+ .map(receivePerson -> {
+ MessageHistory messageHistory = new MessageHistory();
+ messageHistory.setImMessageTaskId(messageTask.getId());
+ messageHistory.setFromAccount(messageTask.getSendImAccount());
+ messageHistory.setChannel(imChannelProvider.getProviderType());
+ messageHistory.setAppType(receivePerson.getAppType().name());
+ messageHistory.setMessageBody(messageBody);
+ messageHistory.setCreateAt(new Date());
+ messageHistory.setUpdateAt(new Date());
+ messageHistory.setReceiveOuId(receivePerson.getOuId());
+ messageHistory.setReceivePersonId(receivePerson.getPersonId());
+ if (StringUtils.isNotBlank(receivePerson.getImAccount())) {
+ messageHistory.setToAccount(receivePerson.getImAccount());
+ } else {
+ String key = receivePerson.buildKey();
+ messageHistory.setToAccount(accountRegisters.get(key));
+ messageHistory.setResult("未找到IM账号");
+ messageHistory.setStatus(MessageHistory.Status.FAILED);
+ }
+ return messageHistory;
+ })
+ .collect(Collectors.toList());
+ messageHistoryService.saveBatch(messageHistories);
+ }
+
+ private Set listExistImAccount(List receivePersons,
+ MessageTask messageTask) {
+ Set imAccounts = receivePersons.stream()
+ .map(MessageTask.ReceivePerson::getImAccount)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ if (CollectionUtils.isEmpty(imAccounts)) {
+ return Collections.emptySet();
+ }
+
+ return messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder()
+ .imMessageTaskId(messageTask.getId())
+ .toAccount(imAccounts)
+ .build())
+ .stream()
+ .map(MessageHistory::getToAccount)
+ .collect(Collectors.toSet());
+ }
+
+ private Set listExistPerson(List receivePersons,
+ MessageTask messageTask) {
+ Set personIds = receivePersons.stream()
+ .map(MessageTask.ReceivePerson::getPersonId)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ if (CollectionUtils.isEmpty(personIds)) {
+ return Collections.emptySet();
+ }
+
+ return messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder()
+ .imMessageTaskId(messageTask.getId())
+ .receivePersonId(personIds)
+ .build())
+ .stream()
+ .map(e -> e.getReceivePersonId() + "_" + e.getAppType() + "_" + e.getReceiveOuId())
+ .collect(Collectors.toSet());
+ }
+
+
+ private void sendMessage(List messageHistories,
+ MessageBatchDispatchRequest batchDispatchRequest) {
+
+ // 三方接口调用频率120次/分,超限将限制1分钟使用,所以抛出异常,等待下一次xxlJob做重试补偿
+ if (!rateLimiter.tryAcquire(DEFAULT_TIME_OUT_MILLIS)) {
+ log.info("未获得令牌");
+ throw ACQUIRE_RATE_LIMITER_FAIL.toException();
+ }
+
+ log.info("获得令牌");
+ List toAccids = messageHistories.stream()
+ .map(MessageHistory::getToAccount)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(toAccids)) {
+ log.info("没有需要发送消息的账号");
+ return;
+ }
+
+ batchDispatchRequest.setToAccids(toAccids);
+ MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
+
+ if (response.isSuccess()) {
+ // 发送成功的IMAccountId -> msgId
+ Map msgids = response.getMsgids();
+
+ // unregister的账号
+ Set unregister = Optional.ofNullable(response.getUnregister())
+ .orElseGet(Sets::newHashSet);
+
+ List updateMessageHistories = messageHistories.stream()
+ .peek(e -> {
+ if (unregister.contains(e.getToAccount())) {
+ e.setStatus(MessageHistory.Status.FAILED);
+ e.setResult("IM账号未在网易云信注册");
+ } else {
+ e.setStatus(MessageHistory.Status.SUCCEED);
+ e.setMessageId(msgids.get(e.getToAccount()).toString());
+ }
+ })
+ .collect(Collectors.toList());
+ messageHistoryService.saveBatch(updateMessageHistories);
+ return;
+ }
+
+ List failedMessageHistories = messageHistories.stream()
+ .peek(e -> {
+ e.setResult(response.getDesc());
+ e.setStatus(MessageHistory.Status.FAILED);
+ })
+ .collect(Collectors.toList());
+ messageHistoryService.saveBatch(failedMessageHistories);
+ }
+
+ private Map listAccountRegisters(List receivePersons) {
+ Set personIds = receivePersons.stream()
+ .filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId())
+ && StringUtils.isBlank(receivePerson.getImAccount()))
+ .map(MessageTask.ReceivePerson::getPersonId)
+ .collect(Collectors.toSet());
+ if (CollectionUtils.isEmpty(personIds)) {
+ return Collections.emptyMap();
+ }
+
+ return accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
+ .accountIds(personIds)
+ .build())
+ .stream()
+ .collect(Collectors.toMap(accountRegister -> accountRegister.getAccountId() +
+ "_" + accountRegister.getAppType() + "_" + accountRegister.getOuId(), AccountRegister::getImAccount));
+ }
+//
// private void sendBatchMessage() {
// MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
// batchDispatchRequest.setBody(messageRequest.getBody());
@@ -129,7 +324,7 @@ public class MessageTaskServiceImpl extends ServiceImpl userMsgResponseMap = batchResponse.getMsgids();
// }
-
+//
private String resolveBody(MessageTask messageTask) {
MessageBody messageBody = new MessageBody();
@@ -153,9 +348,44 @@ public class MessageTaskServiceImpl extends ServiceImpl messageRespList, String messageBody) {
+// if (CollectionUtils.isEmpty(messageRespList)) {
+// return;
+// }
+// String requestId = UUID.randomUUID().toString().replace("-", "");
+// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
+// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
+// MessageHistory messageHistory = new MessageHistory();
+// messageHistory.setBizId(requestId);
+// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
+// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
+// messageHistory.setToAccount(messageDispatchResp.getToImAccount());
+// } else {
+// messageHistory.setToAccount("unregistered");
+// }
+// messageHistory.setChannel(imChannel.getProviderType());
+// messageHistory.setAppType(messageDispatchResp.getAppType());
+// messageHistory.setMessageBody(messageBody);
+// messageHistory.setCreateAt(new Date());
+// messageHistory.setUpdateAt(new Date());
+// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
+// messageHistory.setMessageId(messageDispatchResp.getMsgid());
+// }
+// try {
+// messageHistoryDao.saveOrUpdate(messageHistory);
+// Thread.sleep(50);
+// } catch (Exception e) {
+// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
+// }
+// }));
+// }
+
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder()
diff --git a/sql/init.sql b/sql/init.sql
index 13fc734..c4edff5 100644
--- a/sql/init.sql
+++ b/sql/init.sql
@@ -127,3 +127,8 @@ CREATE TABLE IF NOT EXISTS im_message_task
ALTER TABLE im_message_history ADD COLUMN `result` varchar(1024) NULL COMMENT 'result';
ALTER TABLE im_message_history ADD COLUMN `im_message_task_id` bigint NULL COMMENT '消息推送任务的id';
+ALTER TABLE im_message_history ADD COLUMN status varchar(32) not null default 'PENDING' comment '消息状态:PENDING、SUCCEED、FAILED';
+ALTER TABLE im_message_history ADD COLUMN receive_person_id varchar(100) not null default '' comment 'IM消息接收personId';
+ALTER TABLE im_message_history ADD COLUMN receive_ou_id bigint not null default 0 comment 'organizational_unit表的id';
+
+