feat:feature-REQ/2129 增加查询历史记录和用户的查询接口

This commit is contained in:
lilong 2024-03-23 21:46:28 +08:00
parent 28b81849c2
commit 45fb18d92b
26 changed files with 1418 additions and 142 deletions

View File

@ -34,6 +34,17 @@
<groupId>cn.axzo.framework</groupId>
<artifactId>axzo-common-web</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.basics</groupId>
<artifactId>basics-profiles-api</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.maokai</groupId>
<artifactId>maokai-api</artifactId>
</dependency>
</dependencies>
<repositories>
<repository>

View File

@ -0,0 +1,145 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.framework.domain.web.result.ApiListResult;
import cn.axzo.framework.domain.web.result.ApiPageResult;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import java.util.Date;
import java.util.List;
import java.util.Set;
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface AccountRegisterApi {
@PostMapping("/api/im/account/register/page")
ApiPageResult<AccountRegisterDTO> page(PageAccountRegisterParam param);
@PostMapping("/api/im/account/register/list")
ApiListResult<AccountRegisterDTO> list(ListAccountRegisterParam param);
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
class AccountRegisterDTO {
private Long id;
/**
* 账户 机器人robotId普通用户userId
*/
private String accountId;
/**
* 普通用户通过appType和ouId包装
* 包装以后进行账户注册
*/
private String accountWrapper;
/**
* IM账户
*/
private String imAccount;
/**
* 终端类型
*
* @see AppTypeEnum
*/
private String appType;
/**
* 网易云信appKey
*/
private String appKey;
/**
* channel 服务提供商
*/
private String channelProvider;
/**
* 账户类型:机器人普通用户
*/
private String accountType;
/**
* IM注册 token
*/
private String token;
/**
* organizational_unit表的id
*/
private Long ouId;
private Integer isDelete;
private Date createAt;
private Date updateAt;
private PersonProfileDto personProfile;
private OrganizationalUnitVO organizationalUnit;
}
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class ListAccountRegisterParam {
private List<Long> ids;
private String appType;
/**
* 注册用户ID唯一
* 普通用户personId机器人robotId
*/
private String accountId;
private Set<String> accountIds;
/**
* 注册用户ID唯一
*/
private String imAccount;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
private String accountType;
private boolean needOuInfo;
private boolean needUserInfo;
}
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class PageAccountRegisterParam extends ListAccountRegisterParam {
Integer pageNumber;
Integer pageSize;
/**
* 排序使用示例createTime__DESC
*/
List<String> sort;
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
@ -33,10 +34,10 @@ public interface MessageApi {
* @return
*/
@PostMapping("/api/im/message/async/send")
ApiResult<MessageTaskResp> sendMessageAsync(@RequestBody @Validated SendMessageParam sendMessageParam);
ApiResult<MessageTaskResp> sendMessageAsync(@RequestBody @Validated AsyncSendMessageParam sendMessageParam);
/**
* 同步发送消息不建议使用因为第三方接口有限流会影响接口性能
* 同步发送消息不建议使用因为第三方接口有限流会影响接口性能只能给最多10个用户发送
* @param sendMessageParam
* @return
*/

View File

@ -0,0 +1,128 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.framework.domain.web.result.ApiListResult;
import cn.axzo.framework.domain.web.result.ApiPageResult;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import java.util.Date;
import java.util.List;
import java.util.Set;
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface MessageHistoryApi {
@PostMapping("/api/im/message/history/page")
ApiPageResult<MessageHistoryDTO> page(PageMessageHistoryParam param);
@PostMapping("/api/im/message/history/list")
ApiListResult<MessageHistoryDTO> list(ListMessageHistoryParam param);
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class ListMessageHistoryParam {
private List<Long> ids;
private Long imMessageTaskId;
private Set<String> receivePersonId;
private Set<String> toAccount;
private boolean needReceiveOuInfo;
private boolean needReceiveUserInfo;
}
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class PageMessageHistoryParam extends ListMessageHistoryParam {
Integer pageNumber;
Integer pageSize;
/**
* 排序使用示例createTime__DESC
*/
List<String> sort;
}
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
class MessageHistoryDTO {
private Long id;
/**
* 上游业务请求ID
*/
private String bizId;
/**
* 普通用户通过appType包装
* 包装以后进行账户注册
*/
private String messageId;
/**
* 发送者IM账户
*/
private String fromAccount;
/**
* 发送者IM账户
*/
private String toAccount;
/**
* 终端类型
*
* @see AppTypeEnum
*/
private String appType;
/**
* channel 网易云信
*/
private String channel;
private String messageBody;
private String result;
private Long imMessageTaskId;
private String receivePersonId;
private Long receiveOuId;
private String status;
private Integer isDelete;
private Date createAt;
private Date updateAt;
private PersonProfileDto receivePersonProfile;
private OrganizationalUnitVO receiveOrganizationalUnit;
}
}

View File

@ -0,0 +1,133 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AsyncSendMessageParam {
/**
* 发送者IM账号
*/
@NotBlank(message = "sendImAccount不能为空")
private String sendImAccount;
/**
* 消息接收用户信息
*/
private List<SendMessageParam.ReceivePerson> receivePersons;
/**
* 给全员发送
*/
private boolean allPerson;
/**
* 全员发送时需要指定发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private List<AppTypeEnum> appTypes;
/**
* 消息标题
*/
@NotBlank(message = "消息标题不能为空")
private String msgHeader;
/**
* 消息内容
*/
@NotBlank(message = "消息内容不能为空")
private String msgContent;
/**
* 跳转配置信息
*/
private List<SendMessageParam.JumpData> jumpDatas;
/**
* 封面图
*/
private String cardBannerUrl;
/**
* 消息扩展信息
*/
private JSONObject ext;
/**
* 业务的唯一ID用于查询发送消息的记录和结果不验证唯一
*/
private String bizId;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class JumpData {
private SendMessageParam.JumpPlatform platform;
private String url;
}
@Getter
@AllArgsConstructor
public enum JumpPlatform {
WEB("PC"),
MINI_PROGRAM("安心筑小程"),
IOS("IOS"),
ANDROID("ANDROID"),
WEB_VIEW("H5"),
WECHAT_MINI_PROGRAM("微信小程序"),
;
private String desc;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class ReceivePerson {
/**
* 接收消息的personId
*/
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private AppTypeEnum appType;
/**
* im账号可以personId和imAccount二选一
*/
private String imAccount;
}
}

View File

@ -54,7 +54,7 @@ public class NimChannelService implements IMChannelProvider {
private static final String NIM_MESSAGE_ATTACH_URL = "https://api.netease.im/nimserver/msg/sendAttachMsg.action";
private static final int SUCCESS_CODE = 200;
public static final int SUCCESS_CODE = 200;
private static final int NIM_ACCOUNT_ALREADY_REGISTER = 414;

View File

@ -3,10 +3,12 @@ package cn.axzo.im.channel.netease.dto;
import lombok.Builder;
import lombok.Data;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static cn.axzo.im.channel.netease.NimChannelService.SUCCESS_CODE;
/**
* 批量发送消息返回响应
* 示例
@ -37,4 +39,7 @@ public class MessageBatchDispatchResponse {
private String desc;
public boolean isSuccess() {
return Objects.equals(this.getCode(), SUCCESS_CODE);
}
}

View File

@ -8,7 +8,13 @@ import lombok.Getter;
@AllArgsConstructor
public enum BizResultCode implements ResultCode {
SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误");
SEND_IM_ACCOUNT_NOT_FOUND("100", "发送者IM账号错误"),
SEND_IM_ACCOUNT_MAX("101", "同步接口只支持最多10个IM账号请选择异步接口发送"),
SEND_PERSSON_ERROR("102", "接收人信息和全部接收人不能同时都为空"),
ALL_PERSSON_TYPE_NOT_EMPTY("103", "全员发送时,接收端不能为空"),
ACQUIRE_RATE_LIMITER_FAIL("104", "获取滑动窗口令牌失败"),
MESSAGE_TASK_STATUS_ERROR("105", "更新消息任务失败,状态异常"),
MESSAGE_TASK_NOT_FOUND("106", "消息任务不存在"),;
private String errorCode;

View File

@ -0,0 +1,53 @@
package cn.axzo.im.controller;
import cn.axzo.framework.domain.web.result.ApiListResult;
import cn.axzo.framework.domain.web.result.ApiPageResult;
import cn.axzo.im.center.api.feign.AccountRegisterApi;
import cn.axzo.im.service.AccountRegisterService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@RestController
@RequiredArgsConstructor
public class AccountRegisterController implements AccountRegisterApi {
@Autowired
private AccountRegisterService accountRegisterService;
@Override
public ApiPageResult<AccountRegisterDTO> page(PageAccountRegisterParam param) {
AccountRegisterService.PageAccountRegisterParam pageAccountRegisterParam = AccountRegisterService.PageAccountRegisterParam.builder().build();
BeanUtils.copyProperties(param, pageAccountRegisterParam);
Page<AccountRegisterService.AccountRegisterDTO> page = accountRegisterService.page(pageAccountRegisterParam);
return ApiPageResult.ok(page.convert(record -> {
AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build();
BeanUtils.copyProperties(record, accountRegisterDTO);
return accountRegisterDTO;
}));
}
@Override
public ApiListResult<AccountRegisterDTO> list(ListAccountRegisterParam param) {
AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder().build();
BeanUtils.copyProperties(param, listAccountRegisterParam);
List<AccountRegisterService.AccountRegisterDTO> list = accountRegisterService.list(listAccountRegisterParam);
return ApiListResult.ok(list.stream()
.map(e -> {
AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build();
BeanUtils.copyProperties(e, accountRegisterDTO);
return accountRegisterDTO;
})
.collect(Collectors.toList()));
}
}

View File

@ -4,6 +4,7 @@ import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
@ -27,6 +28,7 @@ import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -38,7 +40,10 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.List;
import static cn.axzo.im.config.BizResultCode.ALL_PERSSON_TYPE_NOT_EMPTY;
import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_MAX;
import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_NOT_FOUND;
import static cn.axzo.im.config.BizResultCode.SEND_PERSSON_ERROR;
/**
* IM消息派发相关
@ -91,7 +96,7 @@ public class MessageController implements MessageApi {
* @return
*/
@Override
public ApiResult<MessageTaskResp> sendMessageAsync(SendMessageParam sendMessageParam) {
public ApiResult<MessageTaskResp> sendMessageAsync(AsyncSendMessageParam sendMessageParam) {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
return ApiResult.ok(toMessageTaskResp(messageTask));
@ -102,7 +107,6 @@ public class MessageController implements MessageApi {
check(sendMessageParam);
MessageTask messageTask = messageTaskService.create(toMessageTask(sendMessageParam));
return ApiResult.ok(toMessageTaskResp(messageTask));
}
@ -114,12 +118,32 @@ public class MessageController implements MessageApi {
}
private void check(SendMessageParam sendMessageParam) {
List<AccountRegister> accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
.imAccount(sendMessageParam.getSendImAccount())
.build());
Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND);
Aassert.check(sendMessageParam.getReceivePersons().size() <= 10, SEND_IM_ACCOUNT_MAX);
}
private void check(AsyncSendMessageParam sendMessageParam) {
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
.imAccount(sendMessageParam.getSendImAccount())
.build());
Aassert.checkNotEmpty(accountRegisters, SEND_IM_ACCOUNT_NOT_FOUND);
if (CollectionUtils.isEmpty(sendMessageParam.getReceivePersons())
&& BooleanUtils.isNotTrue(sendMessageParam.isAllPerson())) {
throw SEND_PERSSON_ERROR.toException();
}
if (BooleanUtils.isTrue(sendMessageParam.isAllPerson())) {
Aassert.checkNotEmpty(sendMessageParam.getAppTypes(), ALL_PERSSON_TYPE_NOT_EMPTY);
}
}
private String check(SendTemplateMessageParam sendMessageParam) {
List<String> robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(sendMessageParam.getMsgTemplateId());
if (CollectionUtils.isEmpty(robotIdList)) {
@ -154,6 +178,28 @@ public class MessageController implements MessageApi {
return messageTaskResp;
}
private MessageTask toMessageTask(AsyncSendMessageParam sendMessageParam) {
MessageTask.BizData bizData = MessageTask.BizData.builder()
.jumpDatas(sendMessageParam.getJumpDatas())
// 全员发送是不常用的场景不应该由业务处理所以把配置放在bizData里面
.allPerson(sendMessageParam.isAllPerson())
.appTypes(sendMessageParam.getAppTypes())
.build();
Date now = new Date();
return MessageTask.builder()
.bizId(sendMessageParam.getBizId())
.sendImAccount(sendMessageParam.getSendImAccount())
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class))
.status(MessageTask.Status.PENDING)
.title(sendMessageParam.getMsgHeader())
.content(sendMessageParam.getMsgContent())
.bizData(bizData)
.ext(sendMessageParam.getExt())
.planStartTime(now)
.createAt(now)
.build();
}
private MessageTask toMessageTask(SendMessageParam sendMessageParam) {

View File

@ -0,0 +1,54 @@
package cn.axzo.im.controller;
import cn.axzo.framework.domain.web.result.ApiListResult;
import cn.axzo.framework.domain.web.result.ApiPageResult;
import cn.axzo.im.center.api.feign.MessageHistoryApi;
import cn.axzo.im.service.MessageHistoryService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@RestController
@RequiredArgsConstructor
public class MessageHistoryController implements MessageHistoryApi {
@Autowired
private MessageHistoryService messageHistoryService;
@Override
public ApiPageResult<MessageHistoryDTO> page(PageMessageHistoryParam param) {
MessageHistoryService.PageMessageHistoryParam pageMessageHistoryParam = MessageHistoryService.PageMessageHistoryParam.builder().build();
BeanUtils.copyProperties(param, pageMessageHistoryParam);
Page<MessageHistoryService.MessageHistoryDTO> page = messageHistoryService.page(pageMessageHistoryParam);
return ApiPageResult.ok(page.convert(record -> {
MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build();
BeanUtils.copyProperties(record, messageHistoryDTO);
return messageHistoryDTO;
}));
}
@Override
public ApiListResult<MessageHistoryDTO> list(ListMessageHistoryParam param) {
MessageHistoryService.ListMessageHistoryParam listMessageHistoryParam = MessageHistoryService.ListMessageHistoryParam.builder().build();
BeanUtils.copyProperties(param, listMessageHistoryParam);
List<MessageHistoryService.MessageHistoryDTO> list = messageHistoryService.list(listMessageHistoryParam);
return ApiListResult.ok(list.stream()
.map(e -> {
MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build();
BeanUtils.copyProperties(e, messageHistoryDTO);
return messageHistoryDTO;
})
.collect(Collectors.toList()));
}
}

View File

@ -1,14 +1,18 @@
package cn.axzo.im.entity;
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import java.io.Serializable;
import java.util.Date;
/**
* IM账户表
@ -19,12 +23,17 @@ import java.io.Serializable;
*/
@TableName("im_account_register")
@Data
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@Accessors(chain = true)
public class AccountRegister extends BaseEntity<AccountRegister> implements Serializable {
@NoArgsConstructor
@AllArgsConstructor
public class AccountRegister implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
/**
* 账户 机器人robotId普通用户userId
*/
@ -82,4 +91,13 @@ public class AccountRegister extends BaseEntity<AccountRegister> implements Ser
*/
@TableField("ou_id")
private Long ouId;
@TableField
private Integer isDelete;
@TableField
private Date createAt;
@TableField
private Date updateAt;
}

View File

@ -2,13 +2,19 @@ package cn.axzo.im.entity;
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import java.io.Serializable;
import java.util.Date;
/**
* IM消息模历史表
@ -19,12 +25,17 @@ import java.io.Serializable;
*/
@TableName("im_message_history")
@Data
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@Accessors(chain = true)
public class MessageHistory extends BaseEntity<MessageHistory> implements Serializable {
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistory implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
/**
* 上游业务请求ID
*/
@ -70,4 +81,33 @@ public class MessageHistory extends BaseEntity<MessageHistory> implements Seria
@TableField("message_body")
private String messageBody;
@TableField("result")
private String result;
@TableField("im_message_task_id")
private Long imMessageTaskId;
@TableField("receive_person_id")
private String receivePersonId;
@TableField("receive_ou_id")
private Long receiveOuId;
private Status status;
@TableField
private Integer isDelete;
@TableField
private Date createAt;
@TableField
private Date updateAt;
public enum Status {
PENDING,
SUCCEED,
FAILED,
;
}
}

View File

@ -10,18 +10,24 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cglib.beans.BeanMap;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_STATUS_ERROR;
@Data
@SuperBuilder
@Accessors(chain = true)
@ -130,6 +136,20 @@ public class MessageTask {
* 跳转信息
*/
private List<SendMessageParam.JumpData> jumpDatas;
/**
* 给全员发送
*/
private boolean allPerson;
/**
* 全员发送时需要指定发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
private List<AppTypeEnum> appTypes;
}
@Data
@ -162,7 +182,31 @@ public class MessageTask {
* im账号可以personId和imAccount二选一
*/
private String imAccount;
public String buildKey() {
if (StringUtils.isNotBlank(this.getImAccount())) {
return this.getImAccount();
}
return this.getPersonId() + "_" + this.getAppType() + "_" + this.getOuId();
}
}
public static class ListReceivePersonTypeHandler extends BaseListTypeHandler<ReceivePerson> {}
@Getter
@AllArgsConstructor
public enum ActionEnum {
SUCCESS,
;
private static final Table<Status, ActionEnum, Status> STATUS_FLOWS = HashBasedTable.create();
static {
STATUS_FLOWS.put(Status.PENDING, SUCCESS, Status.SUCCEED);
}
public Status getNextStatus(Status oldStatus) {
return Optional.ofNullable(STATUS_FLOWS.get(oldStatus, this)).orElseThrow(MESSAGE_TASK_STATUS_ERROR::toException);
}
}
}

View File

@ -0,0 +1,78 @@
package cn.axzo.im.job;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.MessageTaskService;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.Optional;
/**
* 查询ImMessageTask中的PEDING数据添加到messageHistory
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class CreateMessageHistoryJob extends IJobHandler {
@Autowired
private MessageTaskService messageTaskService;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@XxlJob("sendMessageJob")
public ReturnT<String> execute(String s) throws Exception {
log.info("start sendMessageJob,s:{}", s);
SendMessageParam sendMessageParam = Optional.ofNullable(s)
.map(e -> JSONObject.parseObject(e, SendMessageParam.class))
.orElseGet(() -> SendMessageParam.builder().build());
Integer pageNumber = 1;
Date now = new Date();
while (true) {
MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder()
.ids(sendMessageParam.getIds())
.planStartTimeLE(now)
.status(MessageTask.Status.PENDING)
.pageNumber(pageNumber++)
.pageSize(DEFAULT_PAGE_SIZE)
.build();
Page<MessageTask> page = messageTaskService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
page.getRecords().forEach(messageTask -> {
messageTaskService.createMessageHistory(messageTask);
});
}
if (!page.hasNext()) {
break;
}
}
log.info("end sendMessageJob");
return ReturnT.SUCCESS;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class SendMessageParam {
private List<Long> ids;
}
}

View File

@ -1,81 +1,37 @@
package cn.axzo.im.job;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.pokonyan.client.RateLimiter;
import cn.axzo.pokonyan.client.RateLimiterClient;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Maps;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Map;
/**
* 查询ImMessageTask中的PEDING数据然后发送IM消息和push
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SendMessageJob extends IJobHandler {
@Autowired
private MessageTaskService messageTaskService;
private MessageHistoryService messageHistoryService;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@XxlJob("sendMessageJob")
public ReturnT<String> execute(String s) throws Exception {
log.info("start sendMessageJob,s:{}", s);
SendMessageParam sendMessageParam = Optional.ofNullable(s)
.map(e -> JSONObject.parseObject(e, SendMessageParam.class))
.orElseGet(() -> SendMessageParam.builder().build());
Integer pageNumber = 1;
Date now = new Date();
while (true) {
MessageTaskService.PageMessageTaskParam req = MessageTaskService.PageMessageTaskParam.builder()
.ids(sendMessageParam.getIds())
.planStartTimeLE(now)
.status(MessageTask.Status.PENDING)
.pageNumber(pageNumber++)
.pageSize(DEFAULT_PAGE_SIZE)
.build();
Page<MessageTask> page = messageTaskService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
sendMessage(page.getRecords());
}
if (!page.hasNext()) {
break;
}
}
log.info("end sendMessageJob");
return ReturnT.SUCCESS;
return null;
}
private void sendMessage(List<MessageTask> messageTasks) {
messageTasks.forEach(messageTask -> {
});
}
@Data
@Builder

View File

@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -68,7 +67,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
.pageSize(DEFAULT_PAGE_SIZE)
.build();
Page<AccountRegister> page = accountRegisterService.page(req);
Page<AccountRegisterService.AccountRegisterDTO> page = accountRegisterService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
Map<String, OrganizationalNodeUserBasicVO> nodeUsers = listNodeUsers(page.getRecords());
@ -83,7 +82,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
return ReturnT.SUCCESS;
}
private void updateAccountRegister(List<AccountRegister> accountRegisters, Map<String, OrganizationalNodeUserBasicVO> nodeUsers) {
private void updateAccountRegister(List<AccountRegisterService.AccountRegisterDTO> accountRegisters, Map<String, OrganizationalNodeUserBasicVO> nodeUsers) {
List<AccountRegister> update = accountRegisters.stream()
.filter(accountRegister -> nodeUsers.get(accountRegister.getAccountId()) != null)
.map(accountRegister -> {
@ -103,7 +102,7 @@ public class UpdateImAccountOuIdJob extends IJobHandler {
accountRegisterService.updateBatchById(update);
}
private Map<String, OrganizationalNodeUserBasicVO> listNodeUsers(List<AccountRegister> accountRegisters) {
private Map<String, OrganizationalNodeUserBasicVO> listNodeUsers(List<AccountRegisterService.AccountRegisterDTO> accountRegisters) {
if (CollectionUtils.isEmpty(accountRegisters)) {
return Collections.EMPTY_MAP;
}

View File

@ -1,6 +1,8 @@
package cn.axzo.im.service;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
import cn.axzo.pokonyan.dao.wrapper.Operator;
@ -10,15 +12,17 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.springframework.beans.BeanUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface AccountRegisterService extends IService<AccountRegister> {
Page<AccountRegister> page(PageAccountRegisterParam param);
Page<AccountRegisterDTO> page(PageAccountRegisterParam param);
List<AccountRegister> list(ListAccountRegisterParam param);
List<AccountRegisterDTO> list(ListAccountRegisterParam param);
@SuperBuilder
@Data
@ -57,6 +61,12 @@ public interface AccountRegisterService extends IService<AccountRegister> {
@CriteriaField(field = "accountType", operator = Operator.EQ)
private String accountType;
@CriteriaField(ignore = true)
private boolean needOuInfo;
@CriteriaField(ignore = true)
private boolean needUserInfo;
}
@SuperBuilder
@ -77,4 +87,25 @@ public interface AccountRegisterService extends IService<AccountRegister> {
List<String> sort;
}
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
class AccountRegisterDTO extends AccountRegister {
private PersonProfileDto personProfile;
private OrganizationalUnitVO organizationalUnit;
public static AccountRegisterDTO from(AccountRegister accountRegister,
Map<Long, PersonProfileDto> personProfiles,
Map<Long, OrganizationalUnitVO> organizationals) {
AccountRegisterDTO accountRegisterDTO = AccountRegisterDTO.builder().build();
BeanUtils.copyProperties(accountRegister, accountRegisterDTO);
accountRegisterDTO.setPersonProfile(personProfiles.get(Long.valueOf(accountRegisterDTO.getAccountId())));
accountRegisterDTO.setOrganizationalUnit(organizationals.get(accountRegisterDTO.getOuId()));
return accountRegisterDTO;
}
}
}

View File

@ -348,7 +348,7 @@ public class AccountService {
if (appTypeEnum == AppTypeEnum.CMP) {
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
}
List<AccountRegister> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
if (CollectionUtils.isNotEmpty(accountRegisters)) {
userAccountAll.addAll(accountRegisters.stream()

View File

@ -1,7 +1,90 @@
package cn.axzo.im.service;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
import cn.axzo.pokonyan.dao.wrapper.Operator;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.springframework.beans.BeanUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface MessageHistoryService extends IService<MessageHistory> {
Page<MessageHistoryDTO> page(PageMessageHistoryParam param);
List<MessageHistoryDTO> list(ListMessageHistoryParam param);
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class ListMessageHistoryParam {
@CriteriaField(field = "id", operator = Operator.IN)
private List<Long> ids;
@CriteriaField(field = "imMessageTaskId", operator = Operator.EQ)
private Long imMessageTaskId;
@CriteriaField(field = "receivePersonId", operator = Operator.IN)
private Set<String> receivePersonId;
@CriteriaField(field = "toAccount", operator = Operator.IN)
private Set<String> toAccount;
@CriteriaField(ignore = true)
private boolean needReceiveOuInfo;
@CriteriaField(ignore = true)
private boolean needReceiveUserInfo;
}
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class PageMessageHistoryParam extends ListMessageHistoryParam implements IPageParam {
@CriteriaField(ignore = true)
Integer pageNumber;
@CriteriaField(ignore = true)
Integer pageSize;
/**
* 排序使用示例createTime__DESC
*/
@CriteriaField(ignore = true)
List<String> sort;
}
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
class MessageHistoryDTO extends MessageHistory {
private PersonProfileDto receivePersonProfile;
private OrganizationalUnitVO receiveOrganizationalUnit;
public static MessageHistoryDTO from(MessageHistory messageHistory,
Map<Long, PersonProfileDto> personProfiles,
Map<Long, OrganizationalUnitVO> organizationals) {
MessageHistoryDTO messageHistoryDTO = MessageHistoryDTO.builder().build();
BeanUtils.copyProperties(messageHistory, messageHistoryDTO);
messageHistoryDTO.setReceivePersonProfile(personProfiles.get(Long.valueOf(messageHistoryDTO.getReceivePersonId())));
messageHistoryDTO.setReceiveOrganizationalUnit(organizationals.get(messageHistoryDTO.getReceiveOuId()));
return messageHistoryDTO;
}
}
}

View File

@ -371,37 +371,37 @@ public class MessageService {
// return messageDispatchRespList;
// }
// private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
// if (CollectionUtils.isEmpty(messageRespList)) {
// return;
// }
// String requestId = UUID.randomUUID().toString().replace("-", "");
// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
// MessageHistory messageHistory = new MessageHistory();
// messageHistory.setBizId(requestId);
// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
// messageHistory.setToAccount(messageDispatchResp.getToImAccount());
// } else {
// messageHistory.setToAccount("unregistered");
// }
// messageHistory.setChannel(imChannel.getProviderType());
// messageHistory.setAppType(messageDispatchResp.getAppType());
// messageHistory.setMessageBody(messageBody);
// messageHistory.setCreateAt(new Date());
// messageHistory.setUpdateAt(new Date());
// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
// messageHistory.setMessageId(messageDispatchResp.getMsgid());
// }
// try {
// messageHistoryDao.saveOrUpdate(messageHistory);
// Thread.sleep(50);
// } catch (Exception e) {
// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
// }
// }));
// }
private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
if (CollectionUtils.isEmpty(messageRespList)) {
return;
}
String requestId = UUID.randomUUID().toString().replace("-", "");
log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setBizId(requestId);
messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
messageHistory.setToAccount(messageDispatchResp.getToImAccount());
} else {
messageHistory.setToAccount("unregistered");
}
messageHistory.setChannel(imChannel.getProviderType());
messageHistory.setAppType(messageDispatchResp.getAppType());
messageHistory.setMessageBody(messageBody);
messageHistory.setCreateAt(new Date());
messageHistory.setUpdateAt(new Date());
if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
messageHistory.setMessageId(messageDispatchResp.getMsgid());
}
try {
messageHistoryDao.saveOrUpdate(messageHistory);
Thread.sleep(50);
} catch (Exception e) {
log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
}
}));
}
@Transactional(rollbackFor = Exception.class)

View File

@ -1,6 +1,5 @@
package cn.axzo.im.service;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.pokonyan.dao.page.IPageParam;
import cn.axzo.pokonyan.dao.wrapper.CriteriaField;
@ -8,6 +7,7 @@ import cn.axzo.pokonyan.dao.wrapper.Operator;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@ -21,8 +21,33 @@ public interface MessageTaskService extends IService<MessageTask> {
Page<MessageTask> page(PageMessageTaskParam param);
List<MessageHistory> sendMessage(MessageTask messageTask);
void createMessageHistory(MessageTask messageTask);
void update(UpdateMessageTaskParam param);
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
class UpdateMessageTaskParam {
private Long id;
private MessageTask.ActionEnum action;
private Date startedTime;
private Date finishedTime;
public MessageTask to() {
return MessageTask.builder()
.id(this.getId())
.startedTime(this.getStartedTime())
.finishedTime(this.getFinishedTime())
.build();
}
}
@SuperBuilder
@Data
@NoArgsConstructor

View File

@ -1,34 +1,67 @@
package cn.axzo.im.service.impl;
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.maokai.api.client.OrganizationalUnitApi;
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Service
public class AccountRegisterServiceImpl extends ServiceImpl<AccountRegisterMapper, AccountRegister>
implements AccountRegisterService {
@Autowired
private IMChannelProvider imChannelProvider;
@Autowired
private OrganizationalUnitApi organizationalUnitApi;
@Autowired
private UserProfileServiceApi userProfileServiceApi;
@Override
public Page<AccountRegister> page(PageAccountRegisterParam param) {
public Page<AccountRegisterDTO> page(PageAccountRegisterParam param) {
QueryWrapper<AccountRegister> wrapper = QueryWrapperHelper.fromBean(param, AccountRegister.class);
// 只能取配置的appKey的数据防止接口使用方没传appKey导致获取错乱的数据
wrapper.eq("app_key", imChannelProvider.getProviderAppKey());
wrapper.eq("is_delete", 0);
return this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper);
Page<AccountRegister> page = this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper);
Map<Long, PersonProfileDto> personProfiles = listUserPersonProfile(param, page.getRecords());
Map<Long, OrganizationalUnitVO> organizationals = listOrganizational(param, page.getRecords());
return PageConverter.convert(page, (record) -> AccountRegisterDTO.from(record,
personProfiles,
organizationals));
}
@Override
public List<AccountRegister> list(ListAccountRegisterParam param) {
public List<AccountRegisterDTO> list(ListAccountRegisterParam param) {
return PageConverter.drainAll(pageNumber -> {
PageAccountRegisterParam pageParam = PageAccountRegisterParam.builder().build();
BeanUtils.copyProperties(param, pageParam);
@ -37,4 +70,49 @@ public class AccountRegisterServiceImpl extends ServiceImpl<AccountRegisterMappe
return page(pageParam);
});
}
private Map<Long, PersonProfileDto> listUserPersonProfile(PageAccountRegisterParam param,
List<AccountRegister> accountRegisters) {
if (CollectionUtils.isEmpty(accountRegisters) || BooleanUtils.isNotTrue(param.isNeedUserInfo())) {
return Collections.emptyMap();
}
List<Long> personIds = accountRegisters.stream()
.filter(e -> Objects.equals(e.getAccountType(), AccountTypeEnum.USER.getCode()))
.map(AccountRegister::getAccountId)
.filter(Objects::nonNull)
.filter(StringUtils::isNumeric)
.map(Long::valueOf)
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(personIds)) {
return Collections.emptyMap();
}
return userProfileServiceApi.postPersonProfiles(personIds).getData()
.stream()
.collect(Collectors.toMap(PersonProfileDto::getId, Function.identity()));
}
private Map<Long, OrganizationalUnitVO> listOrganizational(PageAccountRegisterParam param,
List<AccountRegister> accountRegisters) {
if (CollectionUtils.isEmpty(accountRegisters) || BooleanUtils.isNotTrue(param.isNeedOuInfo())) {
return Collections.emptyMap();
}
List<Long> ouIds = accountRegisters.stream()
.map(AccountRegister::getOuId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(ouIds)) {
return Collections.emptyMap();
}
return organizationalUnitApi.page(OrganizationalUnitQuery.builder().unitIds(ouIds).build())
.getData()
.getList()
.stream()
.collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f));
}
}

View File

@ -1,4 +1,111 @@
package cn.axzo.im.service.impl;
public class MessageHistoryServiceImpl {
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.maokai.api.client.OrganizationalUnitApi;
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper, MessageHistory>
implements MessageHistoryService {
@Autowired
private OrganizationalUnitApi organizationalUnitApi;
@Autowired
private UserProfileServiceApi userProfileServiceApi;
@Override
public Page<MessageHistoryDTO> page(PageMessageHistoryParam param) {
QueryWrapper<MessageHistory> wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class);
// 只能取配置的appKey的数据防止接口使用方没传appKey导致获取错乱的数据
wrapper.eq("is_delete", 0);
Page<MessageHistory> page = this.page(PageConverter.convertToMybatis(param, MessageHistory.class), wrapper);
Map<Long, PersonProfileDto> personProfiles = listReceiveUserPersonProfile(param, page.getRecords());
Map<Long, OrganizationalUnitVO> organizationals = listReceiveOrganizational(param, page.getRecords());
return PageConverter.convert(page, (record) -> MessageHistoryDTO.from(record,
personProfiles,
organizationals));
}
@Override
public List<MessageHistoryDTO> list(ListMessageHistoryParam param) {
return PageConverter.drainAll(pageNumber -> {
PageMessageHistoryParam pageParam = PageMessageHistoryParam.builder().build();
BeanUtils.copyProperties(param, pageParam);
pageParam.setPageNumber(pageNumber);
pageParam.setPageSize(500);
return page(pageParam);
});
}
private Map<Long, PersonProfileDto> listReceiveUserPersonProfile(PageMessageHistoryParam param,
List<MessageHistory> messageHistories) {
if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveUserInfo())) {
return Collections.emptyMap();
}
List<Long> personIds = messageHistories.stream()
.map(MessageHistory::getReceivePersonId)
.filter(StringUtils::isNumeric)
.map(Long::valueOf)
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(personIds)) {
return Collections.emptyMap();
}
return userProfileServiceApi.postPersonProfiles(personIds).getData()
.stream()
.collect(Collectors.toMap(PersonProfileDto::getId, Function.identity()));
}
private Map<Long, OrganizationalUnitVO> listReceiveOrganizational(PageMessageHistoryParam param,
List<MessageHistory> messageHistories) {
if (CollectionUtils.isEmpty(messageHistories) || BooleanUtils.isNotTrue(param.isNeedReceiveOuInfo())) {
return Collections.emptyMap();
}
List<Long> ouIds = messageHistories.stream()
.map(MessageHistory::getReceiveOuId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(ouIds)) {
return Collections.emptyMap();
}
return organizationalUnitApi.page(OrganizationalUnitQuery.builder().unitIds(ouIds).build())
.getData()
.getList()
.stream()
.collect(Collectors.toMap(OrganizationalUnitVO::getId, Function.identity(), (f, s) -> f));
}
}

View File

@ -6,20 +6,26 @@ import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.pokonyan.client.RateLimiter;
import cn.axzo.pokonyan.client.RateLimiterClient;
import cn.axzo.pokonyan.dao.converter.PageConverter;
import cn.axzo.pokonyan.dao.mysql.QueryWrapperHelper;
import cn.axzo.pokonyan.exception.Aassert;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
@ -27,10 +33,19 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
import static cn.axzo.im.config.BizResultCode.MESSAGE_TASK_NOT_FOUND;
@Slf4j
@Service
@ -43,6 +58,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
private IMChannelProvider imChannelProvider;
@Autowired
private AccountRegisterService accountRegisterService;
@Autowired
private MessageHistoryService messageHistoryService;
@Value("${send.message.limiter.permits:1}")
private int permits;
@ -53,12 +70,18 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
/**
* 网易云信IM批量发送-每批次发送给多少用户
*/
@Value("${im-center.message.batch.receiver.once}")
public int msgSendPersonOfOneBatch = 100;
@Value("${im-center.message.batch.receiver.once:10}")
public int msgSendPersonOfOneBatch;
private RateLimiter rateLimiter;
private static final String LIMITER_KEY = "im-center:sendMessage";
/**
* 默认超时时间毫秒
*/
private static final long DEFAULT_TIME_OUT_MILLIS = 60 * 1000;
@Override
@Transactional
public MessageTask create(MessageTask param) {
@ -80,45 +103,217 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
@Override
public List<MessageHistory> sendMessage(MessageTask messageTask) {
public void createMessageHistory(MessageTask messageTask) {
if (rateLimiter.tryAcquire()) {
log.info("获得令牌");
this.update(UpdateMessageTaskParam.builder()
.id(messageTask.getId())
.startedTime(new Date())
.build());
// listAccountRegisters(messageTask);
MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
batchDispatchRequest.setFromAccid(messageTask.getSendImAccount());
batchDispatchRequest.setBody(resolveBody(messageTask));
// batchDispatchRequest.setToAccids(imAccountList);
MessageBatchDispatchResponse batchResponse = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
return null;
MessageTask.BizData bizData = messageTask.getBizData();
if (bizData.isAllPerson()) {
log.info("发送全员消息");
doSendAll(messageTask);
} else {
log.info("发送非全员消息");
doSendNotAll(messageTask);
}
log.info("未获得令牌");
return null;
this.update(UpdateMessageTaskParam.builder()
.id(messageTask.getId())
.action(MessageTask.ActionEnum.SUCCESS)
.finishedTime(new Date())
.build());
}
// private Map<String, AccountRegister> listAccountRegisters(MessageTask messageTask) {
// Set<String> personIds = messageTask.getReceivePersons().stream()
// .filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId())
// && StringUtils.isBlank(receivePerson.getImAccount()))
// .map(MessageTask.ReceivePerson::getPersonId)
// .collect(Collectors.toSet());
// if (CollectionUtils.isEmpty(personIds)) {
// return;
// }
//
// accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
// .accountIds(personIds)
// .build());
// }
@Override
public void update(UpdateMessageTaskParam param) {
MessageTask oldMessageTask = this.lambdaQuery()
.eq(MessageTask::getId, param.getId())
.last("for update")
.one();
Aassert.notNull(oldMessageTask, MESSAGE_TASK_NOT_FOUND);;
MessageTask updateMessageTask = param.to();
if (param.getAction() != null) {
updateMessageTask.setStatus(param.getAction().getNextStatus(oldMessageTask.getStatus()));
}
this.updateById(updateMessageTask);
}
private void doSendAll(MessageTask messageTask) {
}
private void doSendNotAll(MessageTask messageTask) {
List<List<MessageTask.ReceivePerson>> receivePersons = Lists.partition(messageTask.getReceivePersons(), msgSendPersonOfOneBatch);
String messageBody = resolveBody(messageTask);
receivePersons.forEach(e -> saveMessageHistory(e, messageTask, messageBody));
}
private void saveMessageHistory(List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask,
String messageBody) {
// 排除已经发送成功的记录防止重复发送
Set<String> existPersons = listExistPerson(receivePersons, messageTask);
Set<String> existImAccounts = listExistImAccount(receivePersons, messageTask);
List<MessageTask.ReceivePerson> absentReceivePersons = receivePersons.stream()
.filter(e -> {
String key = e.buildKey();
return !existPersons.contains(key) && !existImAccounts.contains(key);
})
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(absentReceivePersons)) {
log.info("messageTask,{}, receivePersons,{},已经存在", JSONObject.toJSONString(messageTask),
JSONObject.toJSONString(receivePersons));
return;
}
Map<String, String> accountRegisters = listAccountRegisters(absentReceivePersons);
List<MessageHistory> messageHistories = absentReceivePersons.stream()
.map(receivePerson -> {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setImMessageTaskId(messageTask.getId());
messageHistory.setFromAccount(messageTask.getSendImAccount());
messageHistory.setChannel(imChannelProvider.getProviderType());
messageHistory.setAppType(receivePerson.getAppType().name());
messageHistory.setMessageBody(messageBody);
messageHistory.setCreateAt(new Date());
messageHistory.setUpdateAt(new Date());
messageHistory.setReceiveOuId(receivePerson.getOuId());
messageHistory.setReceivePersonId(receivePerson.getPersonId());
if (StringUtils.isNotBlank(receivePerson.getImAccount())) {
messageHistory.setToAccount(receivePerson.getImAccount());
} else {
String key = receivePerson.buildKey();
messageHistory.setToAccount(accountRegisters.get(key));
messageHistory.setResult("未找到IM账号");
messageHistory.setStatus(MessageHistory.Status.FAILED);
}
return messageHistory;
})
.collect(Collectors.toList());
messageHistoryService.saveBatch(messageHistories);
}
private Set<String> listExistImAccount(List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask) {
Set<String> imAccounts = receivePersons.stream()
.map(MessageTask.ReceivePerson::getImAccount)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(imAccounts)) {
return Collections.emptySet();
}
return messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder()
.imMessageTaskId(messageTask.getId())
.toAccount(imAccounts)
.build())
.stream()
.map(MessageHistory::getToAccount)
.collect(Collectors.toSet());
}
private Set<String> listExistPerson(List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask) {
Set<String> personIds = receivePersons.stream()
.map(MessageTask.ReceivePerson::getPersonId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(personIds)) {
return Collections.emptySet();
}
return messageHistoryService.list(MessageHistoryService.ListMessageHistoryParam.builder()
.imMessageTaskId(messageTask.getId())
.receivePersonId(personIds)
.build())
.stream()
.map(e -> e.getReceivePersonId() + "_" + e.getAppType() + "_" + e.getReceiveOuId())
.collect(Collectors.toSet());
}
private void sendMessage(List<MessageHistory> messageHistories,
MessageBatchDispatchRequest batchDispatchRequest) {
// 三方接口调用频率120次/超限将限制1分钟使用所以抛出异常等待下一次xxlJob做重试补偿
if (!rateLimiter.tryAcquire(DEFAULT_TIME_OUT_MILLIS)) {
log.info("未获得令牌");
throw ACQUIRE_RATE_LIMITER_FAIL.toException();
}
log.info("获得令牌");
List<String> toAccids = messageHistories.stream()
.map(MessageHistory::getToAccount)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(toAccids)) {
log.info("没有需要发送消息的账号");
return;
}
batchDispatchRequest.setToAccids(toAccids);
MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchDispatchRequest);
if (response.isSuccess()) {
// 发送成功的IMAccountId -> msgId
Map<String, Long> msgids = response.getMsgids();
// unregister的账号
Set<String> unregister = Optional.ofNullable(response.getUnregister())
.orElseGet(Sets::newHashSet);
List<MessageHistory> updateMessageHistories = messageHistories.stream()
.peek(e -> {
if (unregister.contains(e.getToAccount())) {
e.setStatus(MessageHistory.Status.FAILED);
e.setResult("IM账号未在网易云信注册");
} else {
e.setStatus(MessageHistory.Status.SUCCEED);
e.setMessageId(msgids.get(e.getToAccount()).toString());
}
})
.collect(Collectors.toList());
messageHistoryService.saveBatch(updateMessageHistories);
return;
}
List<MessageHistory> failedMessageHistories = messageHistories.stream()
.peek(e -> {
e.setResult(response.getDesc());
e.setStatus(MessageHistory.Status.FAILED);
})
.collect(Collectors.toList());
messageHistoryService.saveBatch(failedMessageHistories);
}
private Map<String, String> listAccountRegisters(List<MessageTask.ReceivePerson> receivePersons) {
Set<String> personIds = receivePersons.stream()
.filter(receivePerson -> StringUtils.isNotBlank(receivePerson.getPersonId())
&& StringUtils.isBlank(receivePerson.getImAccount()))
.map(MessageTask.ReceivePerson::getPersonId)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(personIds)) {
return Collections.emptyMap();
}
return accountRegisterService.list(AccountRegisterService.ListAccountRegisterParam.builder()
.accountIds(personIds)
.build())
.stream()
.collect(Collectors.toMap(accountRegister -> accountRegister.getAccountId() +
"_" + accountRegister.getAppType() + "_" + accountRegister.getOuId(), AccountRegister::getImAccount));
}
//
// private void sendBatchMessage() {
// MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
// batchDispatchRequest.setBody(messageRequest.getBody());
@ -129,7 +324,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
// if (batchResponse != null) {
// Map<String, Long> userMsgResponseMap = batchResponse.getMsgids();
// }
//
private String resolveBody(MessageTask messageTask) {
MessageBody messageBody = new MessageBody();
@ -153,9 +348,44 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.fluentPut("actionPaths", bizData.getJumpDatas()));
messageBody.setMsgBody(msgBody.toJSONString());
}
if (messageTask.getExt() != null) {
defaultExtMap.putAll((Map) JSON.parseObject(JSONObject.toJSONString(messageTask.getExt())));
}
messageBody.setMessageExtension(defaultExtMap);
return JSONUtil.toJsonStr(messageBody);
}
// private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
// if (CollectionUtils.isEmpty(messageRespList)) {
// return;
// }
// String requestId = UUID.randomUUID().toString().replace("-", "");
// log.info("IM消息发送成功保持到数据库:{},请求Body信息:{}", JSONUtil.toJsonStr(messageRespList), messageBody);
// CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
// MessageHistory messageHistory = new MessageHistory();
// messageHistory.setBizId(requestId);
// messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
// if (StringUtils.isNotBlank(messageDispatchResp.getToImAccount())) {
// messageHistory.setToAccount(messageDispatchResp.getToImAccount());
// } else {
// messageHistory.setToAccount("unregistered");
// }
// messageHistory.setChannel(imChannel.getProviderType());
// messageHistory.setAppType(messageDispatchResp.getAppType());
// messageHistory.setMessageBody(messageBody);
// messageHistory.setCreateAt(new Date());
// messageHistory.setUpdateAt(new Date());
// if (StringUtils.isNotEmpty(messageDispatchResp.getMsgid())) {
// messageHistory.setMessageId(messageDispatchResp.getMsgid());
// }
// try {
// messageHistoryDao.saveOrUpdate(messageHistory);
// Thread.sleep(50);
// } catch (Exception e) {
// log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
// }
// }));
// }
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = rateLimiterClient.build(RateLimiterClient.RateLimiterReq.builder()

View File

@ -127,3 +127,8 @@ CREATE TABLE IF NOT EXISTS im_message_task
ALTER TABLE im_message_history ADD COLUMN `result` varchar(1024) NULL COMMENT 'result';
ALTER TABLE im_message_history ADD COLUMN `im_message_task_id` bigint NULL COMMENT '消息推送任务的id';
ALTER TABLE im_message_history ADD COLUMN status varchar(32) not null default 'PENDING' comment '消息状态PENDING、SUCCEED、FAILED';
ALTER TABLE im_message_history ADD COLUMN receive_person_id varchar(100) not null default '' comment 'IM消息接收personId';
ALTER TABLE im_message_history ADD COLUMN receive_ou_id bigint not null default 0 comment 'organizational_unit表的id';