REQ-3345: 同步消息

This commit is contained in:
yanglin 2025-01-23 13:32:33 +08:00
parent b42deed486
commit cafc969b02
39 changed files with 1181 additions and 162 deletions

View File

@ -0,0 +1,27 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiPageResult;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.GroupCreateRequest;
import cn.axzo.im.center.api.vo.req.GroupMessagePageQueryRequest;
import cn.axzo.im.center.api.vo.resp.GroupCreateResponse;
import cn.axzo.im.center.api.vo.resp.GroupMessagePageQueryResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author yanglin
*/
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface GroupMessageApi {
/**
* 查询群聊历史消息
*/
@PostMapping("/api/im/group/message/pageQuery")
ApiPageResult<GroupMessagePageQueryResponse> pageQuery(
@RequestBody @Validated GroupMessagePageQueryRequest request);
}

View File

@ -6,6 +6,7 @@ import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetMessageDetailRequest;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendChatMessageRequest;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
@ -64,6 +65,10 @@ public interface MessageApi {
ApiResult<MessageTaskResp> sendTemplateMessageAsync(
@RequestBody @Validated SendTemplateMessageParam sendMessageParam);
@PostMapping("/api/im/template-message/async/send/chatMessage")
ApiResult<Long> sendChatMessage(
@RequestBody @Validated SendChatMessageRequest request);
/**
* 更新消息
*/

View File

@ -12,6 +12,7 @@ import lombok.RequiredArgsConstructor;
public class SendPriority {
public static final SendPriority TEMPLATE_MESSAGE = create(1000);
public static final SendPriority CHAT_MESSAGE = create(1500);
public static final SendPriority SYSTEM_CUSTOM_MESSAGE = create(5000);
public static final SendPriority UPDATE_MESSAGE = create(5500);
public static final SendPriority UPDATE_MESSAGE_RETRY = create(5510); // lower priority than UPDATE_MESSAGE

View File

@ -11,6 +11,7 @@ import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
@ -59,6 +60,11 @@ public class GroupCreateRequest {
*/
private String avatar;
/**
* 群聊业务扩展信息, 透传到群属性中
*/
private Map<String, Object> bizAttachment;
@JSONField(serialize = false, deserialize = false)
public Set<PersonAccountAttribute> getOwnerAndMembers() {
Set<PersonAccountAttribute> ownerAndMembers = new HashSet<>(members);

View File

@ -0,0 +1,22 @@
package cn.axzo.im.center.api.vo.req;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotNull;
/**
* @author yanglin
*/
@Setter
@Getter
public class GroupMessagePageQueryRequest {
@NotNull(message = "群ID不能为空")
private Long tid;
private Integer page;
private Integer pageSize;
}

View File

@ -0,0 +1,78 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.NimMessageType;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class SendChatMessageRequest extends SendMessageRequest {
/**
* 消息类型
*/
@NotNull(message = "消息类型不能为空")
private NimMessageType messageType;
/**
* 消息内容
*/
@NotEmpty(message = "消息体不能为空")
private Map<String, Object> messageBody;
/**
* 发送文本消息
*
* @param text 消息内容
*/
public void setAsTextMessage(String text) {
messageType = NimMessageType.TEXT;
addToMessageBody("msg", text);
}
/**
* 发送图片消息
*
* @param name 图片名称
* @param md5 图片文件 md5按照字节流加密
* @param url url
* @param ext 图片后缀
* @param width 单位为像素
* @param height 单位为像素
* @param size 图片文件大小单位为字节Byte
*/
public void setAsImageMessage(String name,
String md5,
String url,
String ext,
int width,
int height,
int size) {
messageType = NimMessageType.IMAGE;
addToMessageBody("name", name);
addToMessageBody("md5", md5);
addToMessageBody("url", url);
addToMessageBody("ext", ext);
addToMessageBody("w", width);
addToMessageBody("h", height);
addToMessageBody("size", size);
}
private void addToMessageBody(String key, Object value) {
if (messageBody == null)
messageBody = new java.util.HashMap<>();
messageBody.put(key, value);
}
}

View File

@ -0,0 +1,71 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import lombok.Getter;
import lombok.Setter;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class SendMessageRequest {
/**
* 业务id
*/
@NotBlank(message = "业务id不能为空")
private String bizId;
/**
* 发送人
*/
@NotNull(message = "发送人不能为空")
private PersonAccountAttribute sender;
/**
* 消息接收用户信息
*/
@Valid
private Set<PersonAccountAttribute> receivePersons;
/**
* 消息接收IM账号或群id
*/
private Set<String> imReceiveAccounts;
public Long determineSenderPersonId() {
if (sender != null)
return Long.parseLong(sender.getPersonId());
return 0L;
}
public boolean isSendByRobot() {
return sender.getAppType() == null;
}
public Set<PersonAccountAttribute> receivePersonsOrEmpty() {
if (receivePersons == null)
return Collections.emptySet();
return new HashSet<>(receivePersons);
}
public Set<String> imReceiveAccountsOrEmpty() {
return imReceiveAccounts == null ? Collections.emptySet() : imReceiveAccounts;
}
public Long determineSenderOuId() {
if (sender != null)
return sender.ouIdOrDefault();
return 0L;
}
}

View File

@ -1,44 +1,17 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SendTemplateMessageParam {
/**
* 发送人
*/
@NotNull(message = "发送人不能为空")
private PersonAccountAttribute sender;
/**
* 消息接收用户信息
*/
@Valid
private Set<PersonAccountAttribute> receivePersons;
/**
* 消息接收IM账号或群id
*/
private Set<String> imReceiveAccounts;
@Setter
@Getter
public class SendTemplateMessageParam extends SendMessageRequest {
/**
* 消息标题
@ -67,11 +40,6 @@ public class SendTemplateMessageParam {
*/
private JSONObject ext;
/**
* 业务的唯一ID用于查询发送消息的记录和结果不验证唯一
*/
private String bizId;
private Integer sendPriority;
private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE;
@ -81,40 +49,14 @@ public class SendTemplateMessageParam {
*/
private boolean isUpdatable;
public boolean isSendByRobot() {
return sender.getAppType() == null;
}
private PushContent pushContent;
private List<ExcludePushPayload> excludePushPayloads;
public Long determineSenderPersonId() {
if (sender != null)
return Long.parseLong(sender.getPersonId());
return 0L;
}
public Long determineSenderOuId() {
if (sender != null)
return sender.ouIdOrDefault();
return 0L;
}
public boolean isUpdatable() {
return isUpdatable;
}
public Set<PersonAccountAttribute> uniqueReceivePersons() {
if (receivePersons == null)
return Collections.emptySet();
return new HashSet<>(receivePersons);
}
public Set<String> imReceiveAccountsOrEmpty() {
return imReceiveAccounts == null ? Collections.emptySet() : imReceiveAccounts;
}
@Override
public String toString() {
return JSON.toJSONString(this);

View File

@ -0,0 +1,72 @@
package cn.axzo.im.center.api.vo.resp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.NimFromClientType;
import cn.axzo.im.center.common.enums.NimMessageType;
import cn.axzo.im.center.common.enums.YesOrNo;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
/**
* @author yanglin
*/
@Setter @Getter
public class GroupMessagePageQueryResponse {
/**
* 服务器消息id
*/
private Long id;
/**
* 群id
*/
private Long tid;
/**
* 发送者IM账号
*/
private String fromAccount;
/**
* 发送者自然人ID
*/
private Long fromPersonId;
/**
* 发送者OU ID
*/
private Long fromPersonOuId;
/**
* 发送者应用类型
*/
private AppTypeEnum fromPersonAppType;
/**
* 是否来自机器人
*/
private YesOrNo isFromRobot;
/**
* 云信消息ID
*/
private Long messageId;
/**
* 消息类型. TEXT: 文本, IMAGE: 图片, SPEECH: 语音, VIDEO: 视频, POSITION: 地理位置, FILE: 文件, CUSTOM: 自定义消息
*/
private NimMessageType messageType;
/**
* 发送者的客户端类型. ANDROID, IOS, PC, WEB, REST, MAC
*/
private NimFromClientType fromClientType;
/**
* 发送时间
*/
private Date sendTime;
/**
* 消息体. 具体格式根据messageType来定, 字段参考云信文档的body部分: https://doc.yunxin.163.com/messaging/server-apis/DE0MTk0OTY?platform=server#%E5%8E%86%E5%8F%B2%E6%B6%88%E6%81%AF%E6%9F%A5%E8%AF%A2%E8%BF%94%E5%9B%9E%E7%9A%84%E6%B6%88%E6%81%AF%E6%A0%BC%E5%BC%8F%E8%AF%B4%E6%98%8E
*/
private JSONObject body;
/**
* 单位名称
*/
private String unitName;
}

View File

@ -7,17 +7,19 @@ import lombok.RequiredArgsConstructor;
* @author yanglin
*/
@RequiredArgsConstructor
public enum NimFromClientType implements CodeDefinition<String> {
ANDROID,
IOS,
PC,
WEB,
REST,
MAC
public enum NimFromClientType implements CodeDefinition<Integer> {
ANDROID(1),
IOS(2),
PC(4),
WEB(16),
REST(32),
MAC(64)
;
private final Integer nimCode;
@Override
public String getCode() {
return name();
public Integer getCode() {
return nimCode;
}
}

View File

@ -1,28 +1,29 @@
package cn.axzo.im.center.common.enums;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public enum NimMessageType implements CodeDefinition<String> {
TEXT("文本"),
IMAGE("图片"),
SPEECH("语音"),
VIDEO("视频"),
POSITION("地理位置"),
FILE("文件"),
CUSTOM("第三方自定义消息"),
GROUP_NOTIFICATION("群通知"),
CHAT_GROUP_NOTIFICATION("聊天室通知"),
public enum NimMessageType implements CodeDefinition<Integer> {
TEXT("文本", 0),
IMAGE("图片", 1),
SPEECH("语音", 2),
VIDEO("视频", 3),
POSITION("地理位置", 4),
FILE("文件", 6),
CUSTOM("自定义", 100),
;
private final String description;
private final String name;
private final int nimCode;
@Override
public String getCode() {
return name();
public Integer getCode() {
return nimCode;
}
}

View File

@ -12,6 +12,8 @@ import cn.axzo.im.channel.netease.dto.NimGroupDismissRequest;
import cn.axzo.im.channel.netease.dto.NimGroupDismissResponse;
import cn.axzo.im.channel.netease.dto.NimGroupGetInfoRequest;
import cn.axzo.im.channel.netease.dto.NimGroupGetInfoResponse;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesRequest;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesResponse;
import cn.axzo.im.channel.netease.dto.NimGroupRemoveMembersRequest;
import cn.axzo.im.channel.netease.dto.NimGroupRemoveMembersResponse;
import cn.axzo.im.channel.netease.dto.NimQueryEventRequest;
@ -76,10 +78,13 @@ public interface NimClient {
NimGroupCreateResponse createGroup(NimGroupCreateRequest request);
@PostMapping(value = "/team/add.action")
NimGroupAddMembersResponse addMembers(NimGroupAddMembersRequest request);
NimGroupAddMembersResponse addGroupMembers(NimGroupAddMembersRequest request);
@PostMapping(value = "/team/kick.action")
NimGroupRemoveMembersResponse removeMembers(NimGroupRemoveMembersRequest request);
NimGroupRemoveMembersResponse removeGroupMembers(NimGroupRemoveMembersRequest request);
@PostMapping(value = "/history/queryTeamMsg.action")
NimGroupGetMessagesResponse getGroupMessages(NimGroupGetMessagesRequest request);
@Data
class NimCodeResponse {

View File

@ -0,0 +1,34 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.channel.netease.client.FormRequest;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
@FormRequest
public class NimGroupGetMessagesRequest {
private Long tid;
private String accid;
private Long begintime;
private Long endtime;
private Long limit;
// 1 按时间正序排列2 按时间降序排列其它返回参数 414 错误默认是按降序排列即时间戳最晚的消息排在最前面
private Integer reverse;
// 查询指定的多个消息类型类型之间用","分割不设置该参数则查询全部类型消息格式示例0,1,2,3
//类型支持0:文本1:图片2:语音3:视频4:地理位置5:通知6:文件10:提示11:Robot100:自定义
private String type;
private Boolean checkTeamValid;
private Boolean includeNoSenseMsg;
// 结束查询的最后一条消息的 msgid不包含在查询结果中用于定位锚点
private Long excludeMsgid;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,35 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.channel.netease.client.NimClient;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class NimGroupGetMessagesResponse extends NimClient.NimCodeResponse {
private List<NimGroupMessage> msgs;
@Setter
@Getter
public static class NimGroupMessage {
private String from;
private Long msgid;
private Long sendtime;
private Integer type;
private Integer fromclienttype;
private String msgidclient;
private JSONObject body;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -13,6 +13,7 @@ import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetMessageDetailRequest;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendChatMessageRequest;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
@ -39,7 +40,9 @@ import cn.axzo.im.service.RobotMsgTemplateService;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.updatable.UpdatableMessageQueryService;
import cn.axzo.im.updatable.UpdateSupport;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.pokonyan.exception.Aassert;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
@ -197,7 +200,7 @@ public class MessageController implements MessageApi {
.build();
Date now = new Date();
List<MessageTask.ReceivePerson> requestReceivePersons = JSONArray.parseArray(
JSONObject.toJSONString(request.uniqueReceivePersons()), MessageTask.ReceivePerson.class);
JSONObject.toJSONString(request.receivePersonsOrEmpty()), MessageTask.ReceivePerson.class);
List<MessageTask.ReceivePerson> receivePersons = new ArrayList<>(requestReceivePersons);
if (CollectionUtils.isNotEmpty(request.getImReceiveAccounts())) {
for (String account : request.getImReceiveAccounts()) {
@ -231,6 +234,43 @@ public class MessageController implements MessageApi {
return ApiResult.ok(messageTaskResp);
}
@Override
public ApiResult<Long> sendChatMessage(SendChatMessageRequest request) {
PersonAccountAttribute sender = request.getSender();
BizAssertions.assertNotNull(sender.getAppType(), "发送人appType不能为空");
String sendImAccount = accountService.registerAccountIfAbsent(
sender.getPersonId(), sender.getOuId(), sender.getAppType());
MessageTask.BizData bizData = MessageTask.BizData.builder()
.messageBody(JSON.toJSONString(request.getMessageBody()))
.isSenderRobot(false)
.senderPersonId(request.determineSenderPersonId())
.nimMessageType(request.getMessageType())
.build();
Date now = new Date();
List<MessageTask.ReceivePerson> requestReceivePersons = JSONArray.parseArray(
JSONObject.toJSONString(request.receivePersonsOrEmpty()), MessageTask.ReceivePerson.class);
List<MessageTask.ReceivePerson> receivePersons = new ArrayList<>(requestReceivePersons);
if (CollectionUtils.isNotEmpty(request.getImReceiveAccounts())) {
for (String account : request.getImReceiveAccounts()) {
receivePersons.add(MessageTask.ReceivePerson.builder()
.imAccount(account)
.build());
}
}
MessageTask messageTask = messageTaskService.create(MessageTask.builder()
.bizId(request.getBizId())
.sendImAccount(sendImAccount)
.receivePersons(receivePersons)
.status(MessageTaskStatus.PENDING)
.bizData(bizData)
.planStartTime(now)
.createAt(now)
.sendPriority(SendPriority.CHAT_MESSAGE.getPriority())
.apiChannel(ApiChannel.COMMON_MESSAGE)
.build());
return ApiResult.ok(messageTask.getId());
}
@Override
public ApiResult<MessageUpdateResponse> updateMessage(UpdateMessageRequest request) {
log.info("updateMessage, request={}", request);

View File

@ -11,6 +11,7 @@ import cn.axzo.im.channel.netease.dto.NimQueryEventRequest;
import cn.axzo.im.channel.netease.dto.NimQueryMessageRequest;
import cn.axzo.im.channel.netease.dto.NimRevokeMessageRequest;
import cn.axzo.im.group.GroupManager;
import cn.axzo.im.group.message.GroupMessageSyncService;
import cn.axzo.im.job.CreateMessageHistoryJob;
import cn.axzo.im.job.ExpungeImTaskJob;
import cn.axzo.im.job.RevokeAllMessagesJob;
@ -39,6 +40,7 @@ public class PrivateController {
private final MessageController messageController;
private final ExpungeImTaskJob expungeImTaskJob;
private final GroupManager groupManager;
private final GroupMessageSyncService groupMessageSyncService;
@PostMapping("/private/revoke")
public Object revoke(@Valid @RequestBody NimRevokeMessageRequest request) {
@ -109,4 +111,10 @@ public class PrivateController {
return CommonResponse.success(nimClient.getGroupInfo(request));
}
@PostMapping("/private/group/syncGroupMessages")
public Object syncGroupMessages() throws Exception {
groupMessageSyncService.syncAndWait();
return "done";
}
}

View File

@ -7,6 +7,7 @@ import cn.axzo.im.entity.Group;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
import java.util.Date;
import java.util.Optional;
/**
@ -49,6 +50,7 @@ public class GroupDao extends ServiceImpl<GroupMapper, Group> {
lambdaUpdate()
.eq(Group::getTid, tid)
.set(Group::getIsDismissed, YesOrNo.YES)
.set(Group::getDismissedAt, new Date())
.update();
}

View File

@ -3,11 +3,31 @@ package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.GroupMessageMapper;
import cn.axzo.im.entity.GroupMessage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Repository;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* @author yanglin
*/
@Repository("groupMessageDao")
public class GroupMessageDao extends ServiceImpl<GroupMessageMapper, GroupMessage> {
public List<GroupMessage> reload(Collection<GroupMessage> messages) {
if (CollectionUtils.isEmpty(messages))
return Collections.emptyList();
return lambdaQuery()
.nested(wrapper -> {
for (GroupMessage message : messages) {
wrapper.or().nested(nested -> nested
.eq(GroupMessage::getTid, message.getTid())
.eq(GroupMessage::getMessageId, message.getMessageId()));
}
})
.list();
}
}

View File

@ -2,6 +2,7 @@ package cn.axzo.im.entity;
import cn.axzo.im.center.common.enums.GroupType;
import cn.axzo.im.center.common.enums.YesOrNo;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
@ -19,6 +20,8 @@ import java.util.Date;
public class Group {
public static final String ATTACHMENT_GROUP_TYPE = "groupType";
public static final String ATTACHMENT_BIZ_CODE = "bizCode";
public static final String ATTACHMENT_BIZ_ATTACHMENT = "bizAttachment";
private Long id;
private Long tid;
@ -31,6 +34,7 @@ public class Group {
private String ownerAccount;
private Long ownerPersonId;
private Long createPersonId;
private Date dismissedAt;
private Long isDelete;
private YesOrNo isDismissed;
private Date createAt;
@ -47,6 +51,11 @@ public class Group {
return getIsDismissed() == YesOrNo.YES;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class RecordExt {

View File

@ -8,7 +8,6 @@ import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import com.dingtalk.api.request.OapiAtsResumeAddRequest;
import lombok.Getter;
import lombok.Setter;
@ -28,8 +27,10 @@ public class GroupMessage {
private Long fromPersonOuId;
private AppTypeEnum fromPersonAppType;
private YesOrNo isFromRobot;
private Long messageId;
private NimMessageType messageType;
private NimFromClientType fromClientType;
private String messageIdClient;
private Date sendTime;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject body;
@ -39,6 +40,12 @@ public class GroupMessage {
private Date createAt;
private Date updateAt;
public RecordExt getOrCreateRecordExt() {
if (recordExt == null)
recordExt = new RecordExt();
return recordExt;
}
@Setter
@Getter
public static class RecordExt {

View File

@ -35,4 +35,6 @@ public class HistoryRecordExt {
private Long updateRetryCount;
private Map<String, String> initMessageExt;
private Long workspaceId;
private Integer nimMessageType;
}

View File

@ -6,6 +6,7 @@ import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.center.common.enums.NimMessageType;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import cn.axzo.im.config.BaseListTypeHandler;
import cn.axzo.im.enums.MessageTaskStatus;
@ -130,6 +131,11 @@ public class MessageTask {
*/
private String msgTemplateContent;
/**
* 直接发送的消息内容
*/
private String messageBody;
/**
* 网易云信-自定义消息使用
*/
@ -162,6 +168,8 @@ public class MessageTask {
private Boolean isSenderRobot;
private NimMessageType nimMessageType;
public boolean determineIsSenderRobot() {
return isSenderRobot != null && isSenderRobot;
}

View File

@ -14,8 +14,8 @@ import cn.axzo.im.center.api.vo.resp.GroupAddMembersResponse;
import cn.axzo.im.center.api.vo.resp.GroupCreateResponse;
import cn.axzo.im.center.api.vo.resp.GroupGetMembersResponse;
import cn.axzo.im.center.api.vo.resp.GroupGetOwnerResponse;
import cn.axzo.im.dao.repository.GroupMemberDao;
import cn.axzo.im.entity.GroupMember;
import cn.axzo.im.group.member.GroupMemberQueryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
@ -31,8 +31,7 @@ import java.util.List;
public class GroupController implements GroupApi {
private final GroupManager groupManager;
private final GroupMemberManager groupMemberManager;
private final GroupMemberDao groupMemberDao;
private final GroupMemberQueryService groupMemberQueryService;
@Override
public ApiResult<GroupCreateResponse> createGroup(GroupCreateRequest request) {
@ -58,7 +57,7 @@ public class GroupController implements GroupApi {
@Override
public ApiResult<GroupGetMembersResponse> getMembers(GroupGetMembersRequest request) {
List<GroupMember> members = groupMemberManager.getMembers(request.getTid());
List<GroupMember> members = groupMemberQueryService.getMembers(request.getTid());
GroupGetMembersResponse response = new GroupGetMembersResponse();
response.setMember(BeanMapper.copyList(members, GroupMemberInfo.class));
return ApiResult.ok(response);
@ -66,7 +65,7 @@ public class GroupController implements GroupApi {
@Override
public ApiResult<GroupGetOwnerResponse> getOwner(GroupGetOwnerRequest request) {
GroupMember owner = groupMemberManager.getOwner(request.getTid());
GroupMember owner = groupMemberQueryService.getOwner(request.getTid());
GroupGetOwnerResponse response = new GroupGetOwnerResponse();
response.setOwner(BeanMapper.copyBean(owner, GroupMemberInfo.class));
return ApiResult.ok(response);

View File

@ -21,6 +21,7 @@ import cn.axzo.im.dao.repository.GroupDao;
import cn.axzo.im.dao.repository.GroupMemberDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMember;
import cn.axzo.im.group.member.GroupMemberSyncer;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.domain.ImAccounts;
import cn.axzo.im.utils.BizAssertions;
@ -52,7 +53,7 @@ public class GroupManager {
private final GroupSupport groupSupport;
private final AccountService accountService;
private final GroupBroadcaster groupBroadcaster;
private final GroupMemberManager groupMemberManager;
private final GroupMemberSyncer groupMemberSyncer;
@Transactional
public GroupCreateResponse createGroup(GroupCreateRequest request) {
@ -74,7 +75,7 @@ public class GroupManager {
BizAssertions.assertTrue(nimResponse.isSuccess(), "创建群失败: {}", nimResponse.getDesc());
groupDao.updateTid(group.getId(), nimResponse.getTid());
group = groupDao.getById(group.getId());
groupMemberManager.syncGroupMembers(group);
groupMemberSyncer.syncMembers(group);
groupBroadcaster.fireMembersAdded(group, groupMemberDao.getByTid(nimResponse.getTid()));
GroupCreateResponse response = new GroupCreateResponse();
response.setTid(nimResponse.getTid());
@ -101,14 +102,14 @@ public class GroupManager {
Group group = findGroupForUpdateOrThrow(request.getTid());
BizAssertions.assertFalse(group.isDismissed(), "群已经解散: {}", group.getTid());
// sync members 1
groupMemberManager.syncGroupMembers(group);
groupMemberSyncer.syncMembers(group);
// prepare add members
Set<PersonAccountAttribute> preMembers = groupMemberDao.getGroupPersons(group.getTid());
List<PersonAccountAttribute> toAddMembers = request.getMembers().stream()
.filter(member -> !preMembers.contains(member))
.collect(toList());
if (group.isMemberLimitReached(preMembers.size() + toAddMembers.size()))
throw new ServiceException("群聊人数上限" + group.getMemberLimit() +"人, 请删除部分已选人员");
throw new ServiceException("群聊人数上限" + group.getMemberLimit() + "人, 请删除部分已选人员");
ImAccounts imAccounts = accountService.getAccountsByPersons(toAddMembers);
if (imAccounts.isAccountEmpty()) {
groupSupport.log("添加群成员[TID:{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
@ -118,11 +119,11 @@ public class GroupManager {
NimGroupAddMembersRequest nimRequest = groupSupport
.buildAddMembersRequest(group, group.getOwnerAccount(), imAccounts);
// add members
NimGroupAddMembersResponse nimResponse = nimClient.addMembers(nimRequest);
NimGroupAddMembersResponse nimResponse = nimClient.addGroupMembers(nimRequest);
log.info("添加群成员, request={}, response={}", nimRequest, nimResponse);
BizAssertions.assertTrue(nimResponse.isSuccess(), "添加群成员失败: {}", nimResponse.getDesc());
// sync members 2
groupMemberManager.syncGroupMembers(group);
groupMemberSyncer.syncMembers(group);
groupBroadcaster.fireMembersAdded(group, groupMemberDao
.getByPersons(group.getTid(), toAddMembers));
GroupAddMembersResponse response = new GroupAddMembersResponse();
@ -137,24 +138,23 @@ public class GroupManager {
BizAssertions.assertFalse(group.isDismissed(), "群已经解散: {}", group.getTid());
ImAccounts imAccounts = accountService.getAccountsByPersons(request.getMembers());
if (imAccounts.isAccountEmpty()) {
groupSupport.log("添加群成员[TID:{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
groupSupport.log("移除群成员[TID:{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
group.getTid(), JSON.toJSONString(request.getMembers()));
return;
}
groupMemberManager.syncGroupMembers(group);
groupMemberSyncer.syncMembers(group);
List<GroupMember> toRemoveMembers = groupMemberDao.getByPersons(
group.getTid(), request.getMembers());
if (CollectionUtils.isEmpty(toRemoveMembers))
return;
NimGroupRemoveMembersRequest nimRequest = groupSupport
.buildRemoveMembersRequest(group, group.getOwnerAccount(), imAccounts);
NimGroupRemoveMembersResponse nimResponse = nimClient.removeMembers(nimRequest);
// 不判断NIM响应状态, 因为前端可能已经调用app sdk移除过成员了(支持重复移除)
NimGroupRemoveMembersResponse nimResponse = nimClient.removeGroupMembers(nimRequest);
log.info("移除群成员, request={}, response={}", nimRequest, nimResponse);
if (nimResponse.isSuccess()) {
groupMemberManager.syncGroupMembers(group);
// 不比较直接发消息
groupBroadcaster.fireMembersRemoved(group, toRemoveMembers);
}
groupMemberSyncer.syncMembers(group);
// 不比较直接发消息
groupBroadcaster.fireMembersRemoved(group, toRemoveMembers);
}
@NotNull

View File

@ -1,30 +1,21 @@
package cn.axzo.im.group;
import cn.axzo.im.center.api.feign.GroupApi;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.api.vo.req.GroupCreateRequest;
import cn.axzo.im.center.common.enums.GroupMemberType;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.dto.NimGroupAddMembersRequest;
import cn.axzo.im.channel.netease.dto.NimGroupCreateRequest;
import cn.axzo.im.channel.netease.dto.NimGroupDismissRequest;
import cn.axzo.im.channel.netease.dto.NimGroupInfo;
import cn.axzo.im.channel.netease.dto.NimGroupMemberInfo;
import cn.axzo.im.channel.netease.dto.NimGroupRemoveMembersRequest;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMember;
import cn.axzo.im.service.ChatGroupService;
import cn.axzo.im.service.domain.ImAccounts;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImAccountParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.helpers.MessageFormatter;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@ -66,6 +57,8 @@ public class GroupSupport implements GroupLogger {
nimRequest.setIntroduceMessage(GroupApi.INTRODUCE_MESSAGE);
nimRequest.setIcon(request.getAvatar());
nimRequest.addAttachment(Group.ATTACHMENT_GROUP_TYPE, request.getGroupType());
nimRequest.addAttachment(Group.ATTACHMENT_BIZ_CODE, request.getBizCode());
nimRequest.addAttachment(Group.ATTACHMENT_BIZ_ATTACHMENT, request.getBizAttachment());
return nimRequest;
}
@ -105,29 +98,4 @@ public class GroupSupport implements GroupLogger {
}
}
List<GroupMember> parseGroupMembers(Group group, NimGroupInfo groupInfo) {
ArrayList<GroupMember> accounts = new ArrayList<>();
for (NimGroupMemberInfo member : groupInfo.getOwnerAndMembers()) {
PersonAccountAttribute person = ImAccountParser.parsePerson(member.getAccid()).orElse(null);
GroupMember account = new GroupMember();
accounts.add(account);
account.setTid(group.getTid());
account.setImAccount(member.getAccid());
account.setMemberType(groupInfo.getOwner().equals(member)
? GroupMemberType.OWNER : GroupMemberType.MEMBER);
if (person == null) {
account.setPersonId(0L);
account.setPersonOuId(0L);
account.setAppType(null);
account.setIsRobot(YesOrNo.YES);
} else {
account.setPersonId(Long.valueOf(person.getPersonId()));
account.setPersonOuId(person.ouIdOrDefault());
account.setAppType(person.getAppType());
account.setIsRobot(YesOrNo.NO);
}
}
return accounts;
}
}

View File

@ -0,0 +1,33 @@
package cn.axzo.im.group.member;
import cn.axzo.im.center.common.enums.GroupMemberType;
import cn.axzo.im.dao.repository.GroupMemberDao;
import cn.axzo.im.entity.GroupMember;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GroupMemberQueryService {
private final GroupMemberDao groupMemberDao;
public List<GroupMember> getMembers(Long tid) {
return groupMemberDao.getByTid(tid);
}
public GroupMember getOwner(Long tid) {
return groupMemberDao.lambdaQuery()
.eq(GroupMember::getTid, tid)
.eq(GroupMember::getMemberType, GroupMemberType.OWNER)
.one();
}
}

View File

@ -1,20 +1,25 @@
package cn.axzo.im.group;
package cn.axzo.im.group.member;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.GroupMemberType;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.channel.netease.dto.NimGroupGetInfoRequest;
import cn.axzo.im.channel.netease.dto.NimGroupGetInfoResponse;
import cn.axzo.im.channel.netease.dto.NimGroupInfo;
import cn.axzo.im.channel.netease.dto.NimGroupMemberInfo;
import cn.axzo.im.dao.repository.GroupDao;
import cn.axzo.im.dao.repository.GroupMemberDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMember;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImAccountParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -24,29 +29,17 @@ import java.util.Optional;
@Slf4j
@Component
@RequiredArgsConstructor
public class GroupMemberManager {
public class GroupMemberSyncer {
private final GroupMemberDao groupMemberDao;
private final GroupSupport groupSupport;
private final GroupDao groupDao;
private final NimClient nimClient;
List<GroupMember> getMembers(Long tid) {
return groupMemberDao.getByTid(tid);
}
GroupMember getOwner(Long tid) {
return groupMemberDao.lambdaQuery()
.eq(GroupMember::getTid, tid)
.eq(GroupMember::getMemberType, GroupMemberType.OWNER)
.one();
}
void syncGroupMembers(Group group) {
public void syncMembers(Group group) {
NimGroupInfo groupInfo = fetchGroupInfo(group).orElse(null);
if (groupInfo == null) return;
groupMemberDao.deleteAccounts(group.getTid());
List<GroupMember> accounts = groupSupport.parseGroupMembers(group, groupInfo);
List<GroupMember> accounts = parseGroupMembers(group, groupInfo);
if (CollectionUtils.isNotEmpty(accounts))
groupMemberDao.saveBatch(accounts);
groupDao.updateMembersCount(group.getTid(), accounts.size());
@ -65,4 +58,30 @@ public class GroupMemberManager {
"获取群信息失败: {}", nimResponse.getDesc());
return Optional.of(nimResponse.getTinfo());
}
private List<GroupMember> parseGroupMembers(Group group, NimGroupInfo groupInfo) {
ArrayList<GroupMember> accounts = new ArrayList<>();
for (NimGroupMemberInfo member : groupInfo.getOwnerAndMembers()) {
PersonAccountAttribute person = ImAccountParser.parsePerson(member.getAccid()).orElse(null);
GroupMember account = new GroupMember();
accounts.add(account);
account.setTid(group.getTid());
account.setImAccount(member.getAccid());
account.setMemberType(groupInfo.getOwner().equals(member)
? GroupMemberType.OWNER : GroupMemberType.MEMBER);
if (person == null) {
account.setPersonId(0L);
account.setPersonOuId(0L);
account.setAppType(null);
account.setIsRobot(YesOrNo.YES);
} else {
account.setPersonId(Long.valueOf(person.getPersonId()));
account.setPersonOuId(person.ouIdOrDefault());
account.setAppType(person.getAppType());
account.setIsRobot(YesOrNo.NO);
}
}
return accounts;
}
}

View File

@ -0,0 +1,57 @@
package cn.axzo.im.group.message;
import cn.axzo.basics.common.exception.ServiceException;
import com.alibaba.fastjson.JSON;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GroupMessageSyncJob {
private final GroupMessageSyncService groupMessageSyncService;
private volatile boolean isRunning = false;
@XxlJob("groupMessageSyncJob")
public ReturnT<String> execute(String jsonStr) throws Exception {
if (isRunning)
throw new ServiceException("任务正在执行中,请稍后再试...");
synchronized (this) {
if (isRunning)
throw new ServiceException("任务正在执行中,请稍后再试...");
isRunning = true;
}
Param param = StringUtils.isBlank(jsonStr)
? new Param() : JSON.parseObject(jsonStr, Param.class);
CountDownLatch latch = new CountDownLatch(1);
groupMessageSyncService.sync(param.tps, () -> {
synchronized (GroupMessageSyncJob.this) {
isRunning = false;
}
latch.countDown();
log.info("group message sync job finished");
});
latch.await();
return ReturnT.SUCCESS;
}
@Setter
@Getter
public static class Param {
private int tps = 30;
}
}

View File

@ -0,0 +1,122 @@
package cn.axzo.im.group.message;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesRequest;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesResponse;
import cn.axzo.im.dao.repository.GroupDao;
import cn.axzo.im.dao.repository.GroupMessageDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMessage;
import cn.axzo.im.utils.RecordCursor;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author yanglin
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GroupMessageSyncService {
private final GroupDao groupDao;
private final GroupMessageDao groupMessageDao;
private final NimClient nimClient;
public void syncAndWait() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
sync(5, latch::countDown);
latch.await();
}
void sync(int tps, Runnable onComplete) {
AtomicBoolean completed = new AtomicBoolean(false);
Runnable runOnce = () -> {
if (completed.compareAndSet(false, true))
onComplete.run();
};
try {
syncImpl(tps, runOnce);
} catch (Exception e) {
log.warn("sync group message failed", e);
runOnce.run();
}
}
private void syncImpl(int tps, Runnable onComplete) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(tps + 1);
MessageSyncController controller = new MessageSyncController(tps, onComplete);
Date twoDayAgo = new DateTime(new Date()).minusDays(2).toDate();
RecordCursor<Group> cursor = new RecordCursor<>(Group::getId, () ->
groupDao.lambdaQuery()
.nested(w1 -> w1
.eq(Group::getIsDismissed, YesOrNo.NO)
.or().nested(w2 -> w2
.eq(Group::getIsDismissed, YesOrNo.YES)
.gt(Group::getDismissedAt, twoDayAgo)))
);
for (List<Group> groups : cursor) {
for (Group group : groups) {
controller.acquireSubmitGroup();
executor.execute(new GroupMessageSyncer(this, controller, group) {
@Override
public void run() {
try {
super.run();
} catch (Exception e) {
log.warn("sync group message failed: {}", group.getId(), e);
} finally {
controller.setGroupCompleted(group);
}
}
});
controller.setGroupSubmitted(group);
}
}
controller.setSubmitFinished();
}
void saveMessagesBatch(Collection<GroupMessage> messages) {
if (CollectionUtils.isEmpty(messages))
return;
groupMessageDao.saveBatch(messages);
}
void saveMessage(GroupMessage message) {
groupMessageDao.save(message);
}
List<GroupMessage> reloadMessages(Collection<GroupMessage> messages) {
return groupMessageDao.reload(messages);
}
Optional<Date> getMaxSendTime(Long tid) {
GroupMessage groupMessage = groupMessageDao.getBaseMapper()
.selectOne(new QueryWrapper<GroupMessage>()
.select("MAX(send_time) AS send_time")
.lambda()
.eq(GroupMessage::getTid, tid));
return groupMessage == null
? Optional.empty()
: Optional.of(groupMessage.getSendTime());
}
NimGroupGetMessagesResponse getGroupMessages(NimGroupGetMessagesRequest request) {
return nimClient.getGroupMessages(request);
}
}

View File

@ -0,0 +1,129 @@
package cn.axzo.im.group.message;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.NimFromClientType;
import cn.axzo.im.center.common.enums.NimMessageType;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesRequest;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesResponse;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesResponse.NimGroupMessage;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMessage;
import cn.axzo.im.group.message.timeline.TimeNode;
import cn.axzo.im.group.message.timeline.Timeline;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImAccountParser;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.joda.time.DateTime;
import org.springframework.dao.DuplicateKeyException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Slf4j
@RequiredArgsConstructor
class GroupMessageSyncer implements Runnable {
private final GroupMessageSyncService service;
private final MessageSyncController controller;
private final Group group;
@Override
public void run() {
Long beginMs = service.getMaxSendTime(group.getTid()).orElse(oneAgo()).getTime();
Long endMs = System.currentTimeMillis();
Timeline timeline = new Timeline(beginMs, endMs);
long limit = 100L;
TimeNode node = timeline.peek();
while (node != null) {
NimGroupGetMessagesResponse response = fetchMessages(node, limit, null);
BizAssertions.assertTrue(response.isSuccess(), "fetch group messages failed");
List<NimGroupMessage> msgList = response.getMsgs();
if (CollectionUtils.isEmpty(msgList)) {
timeline.consume();
node = timeline.peek();
continue;
}
if (response.getSize() > limit && msgList.size() == limit && node.isSplittable()) {
timeline.split();
node = timeline.peek();
continue;
}
saveMessages(asGroupMessages(msgList));
timeline.consume();
node = timeline.peek();
}
}
private NimGroupGetMessagesResponse fetchMessages(TimeNode node, Long limit, Long lastMessageId) {
NimGroupGetMessagesRequest request = new NimGroupGetMessagesRequest();
request.setTid(group.getTid());
request.setAccid(group.getOwnerAccount());
request.setBegintime(node.getBeginMs());
request.setEndtime(node.getEndMs());
request.setLimit(limit);
request.setReverse(1);
request.setCheckTeamValid(false);
request.setIncludeNoSenseMsg(false);
request.setExcludeMsgid(lastMessageId);
request.setType("0,1,2,3,4,6,100");
controller.acquireFetchMessages();
NimGroupGetMessagesResponse response = service.getGroupMessages(request);
log.info("fetch group messages, request={}, response={}", request, response);
return response;
}
private Collection<GroupMessage> asGroupMessages(List<NimGroupMessage> msgList) {
ArrayList<GroupMessage> messages = new ArrayList<>();
for (NimGroupMessage nimMessage : msgList) {
PersonAccountAttribute person = ImAccountParser.parsePerson(nimMessage.getFrom()).orElse(null);
GroupMessage message = new GroupMessage();
messages.add(message);
message.setTid(group.getTid());
message.setFromAccount(nimMessage.getFrom());
message.setFromPersonId(person == null ? 0L : Long.parseLong(person.getPersonId()));
message.setFromPersonOuId(person == null ? 0L : person.getOuId());
message.setFromPersonAppType(person == null ? AppTypeEnum.NONE : person.getAppType());
message.setIsFromRobot(YesOrNo.of(person == null));
message.setMessageId(nimMessage.getMsgid());
message.setMessageType(CodeDefinition.findByCode(
NimMessageType.class, nimMessage.getType()).orElse(null));
message.setFromClientType(CodeDefinition.findByCode(
NimFromClientType.class, nimMessage.getFromclienttype()).orElse(null));
message.setSendTime(new Date(nimMessage.getSendtime()));
message.setBody(nimMessage.getBody());
message.setMessageIdClient(nimMessage.getMsgidclient());
}
return messages;
}
private void saveMessages(Collection<GroupMessage> messages) {
if (CollectionUtils.isEmpty(messages))
return;
try {
service.saveMessagesBatch(messages);
} catch (DuplicateKeyException ignored) {
List<GroupMessage> savedMessages = service.reloadMessages(messages);
List<GroupMessage> notSaved = Sets.difference(
UniqueMessage.from(messages), UniqueMessage.from(savedMessages))
.stream().map(UniqueMessage::getGroupMessage).collect(toList());
saveMessages(notSaved);
}
}
private Date oneAgo() {
return DateTime.now().minusDays(1).toDate();
}
}

View File

@ -0,0 +1,80 @@
package cn.axzo.im.group.message;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.im.entity.Group;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/**
* @author yanglin
*/
@Slf4j
class MessageSyncController {
private static final Object SUBMITTED = new Object();
private final RateLimiter rateLimiter;
private final Semaphore submitGroupSemaphore;
private final Runnable onComplete;
private final Map<Long, Object> groups = new ConcurrentHashMap<>();
private int submittedGroup = 0;
private int completedGroup = 0;
private boolean submitFinished = false;
MessageSyncController(int tps, Runnable onComplete) {
this.rateLimiter = RateLimiter.create(tps);
this.submitGroupSemaphore = new Semaphore(tps);
this.onComplete = onComplete;
}
void acquireSubmitGroup() throws Exception {
submitGroupSemaphore.acquire();
}
void setGroupSubmitted(Group group) {
Object submitted = groups.putIfAbsent(group.getId(), SUBMITTED);
if (submitted != null)
throw new ServiceException("group already submitted: " + group.getId());
synchronized (this) {
if (submitFinished)
throw new ServiceException("submit finished");
submittedGroup++;
}
log.info("submitted group: {}", group.getId());
}
void setGroupCompleted(Group group) {
Object submitted = groups.remove(group.getId());
if (submitted == null) return;
submitGroupSemaphore.release();
synchronized (this) {
completedGroup++;
maybeComplete();
}
log.info("completed group: {}", group.getId());
}
void setSubmitFinished() {
synchronized (this) {
submitFinished = true;
}
log.info("submit finished");
maybeComplete();
}
private void maybeComplete() {
if (submitFinished && completedGroup == submittedGroup)
onComplete.run();
}
void acquireFetchMessages() {
rateLimiter.acquire();
}
}

View File

@ -0,0 +1,51 @@
package cn.axzo.im.group.message;
import cn.axzo.im.entity.GroupMessage;
import lombok.RequiredArgsConstructor;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import static java.util.stream.Collectors.toSet;
/**
* @author yanglin
*/
@RequiredArgsConstructor
class UniqueMessage {
private final GroupMessage message;
public static Set<UniqueMessage> from(Collection<GroupMessage> messages) {
return messages.stream()
.map(UniqueMessage::new)
.collect(toSet());
}
public Long getTid() {
return message.getTid();
}
public Long getMessageId() {
return message.getMessageId();
}
public GroupMessage getGroupMessage() {
return message;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof UniqueMessage)) return false;
UniqueMessage that = (UniqueMessage) o;
return Objects.equals(getTid(), that.getTid())
&& Objects.equals(getMessageId(), that.getMessageId());
}
@Override
public int hashCode() {
return Objects.hash(getTid(), getMessageId());
}
}

View File

@ -0,0 +1,26 @@
package cn.axzo.im.group.message.timeline;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
public class TimeNode {
private final Long beginMs;
private final Long endMs;
public boolean isSplittable() {
return endMs - beginMs > 0;
}
@Override
public String toString() {
return String.format("%s~%s", beginMs, endMs);
}
}

View File

@ -0,0 +1,83 @@
package cn.axzo.im.group.message.timeline;
import cn.axzo.im.utils.BizAssertions;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import static java.util.stream.Collectors.joining;
/**
* @author yanglin
*/
public class Timeline {
private final LinkedList<TimeNode> nodes;
private int splitCount = 1;
public Timeline(Long beginMs, Long endMs) {
BizAssertions.assertTrue(beginMs <= endMs, "beginMs must be less than or equal to endMs");
nodes = new LinkedList<>();
prepend(split(beginMs, endMs, splitCount));
}
private void prepend(List<TimeNode> nodes) {
this.nodes.addAll(0, nodes);
}
public void split() {
TimeNode node = nodes.peek();
BizAssertions.assertNotNull(node, "timeline is empty");
//noinspection DataFlowIssue
BizAssertions.assertTrue(node.isSplittable(), "node is not splittable");
nodes.removeFirst();
splitCount *= 2;
prepend(split(node.getBeginMs(), node.getEndMs(), splitCount));
}
public void consume() {
TimeNode node = nodes.peek();
BizAssertions.assertNotNull(node, "timeline is empty");
nodes.removeFirst();
splitCount = 1;
}
@Nullable
public TimeNode peek() {
return nodes.peek();
}
private List<TimeNode> split(Long beginMs, Long endMs, int splitCount) {
BizAssertions.assertTrue(beginMs <= endMs, "beginMs must be less than or equal to endMs");
List<TimeNode> nodes = new ArrayList<>();
long totalDuration = endMs - beginMs;
if (totalDuration == 0) {
nodes.add(new TimeNode(beginMs, endMs));
return nodes;
}
long duration = totalDuration / splitCount;
if (duration == 0)
duration = 1;
for (int i = 0; i < splitCount; i++) {
long start = beginMs + i * duration;
long end = duration == 1
? (start)
: ((i == splitCount - 1) ? endMs : start + duration - 1);
nodes.add(new TimeNode(start, end));
if (duration == 1 && end == endMs)
break;
if (duration != 1 && end >= endMs - 1)
break;
}
return nodes;
}
@Override
public String toString() {
String nodes = this.nodes.stream()
.map(TimeNode::toString).collect(joining(","));
return String.format("[%s]", nodes);
}
}

View File

@ -2,7 +2,6 @@ package cn.axzo.im.send.handler;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.ChannelMsgTypeEnum;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageDispatchResponse;
@ -36,7 +35,11 @@ public class CommonSendOneHandler extends SendOneHandler {
&& NumberUtils.isDigits(history.getToAccount());
sendRequest.setOpe(isSendToGroup ? 1 : 0);
sendRequest.setTo(history.getToAccount());
sendRequest.setType(ChannelMsgTypeEnum.CUSTOM.getCode());
Integer nimMessageType = history.getOrCreateRecordExt().getNimMessageType();
if (nimMessageType != null)
sendRequest.setType(nimMessageType);
else
sendRequest.setType(ChannelMsgTypeEnum.CUSTOM.getCode());
sendRequest.setBody(history.getMessageBody());
sendRequest.setPayload(history.getOrCreateRecordExt().getPayload());
sendRequest.populateOption();

View File

@ -297,6 +297,10 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
MessageTask.BizData bizData = messageTask.getBizData();
if (bizData != null) {
messageHistory.getOrCreateRecordExt().setIsSenderRobot(bizData.getIsSenderRobot());
if (bizData.getNimMessageType() != null)
messageHistory.getOrCreateRecordExt().setNimMessageType(
bizData.getNimMessageType().getNimCode());
}
List<ExcludePushPayload> excludePushPayloads =
@ -423,6 +427,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
private String resolveBody(MessageTask.ReceivePerson receivePerson,
MessageTask messageTask, MessageHistory history) {
MessageTask.BizData bizData = messageTask.getBizData();
if (StringUtils.isNotBlank(bizData.getMessageBody()))
return bizData.getMessageBody();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(bizData.determineTemplatedMsgType().getCode());

View File

@ -20,7 +20,7 @@ mybatis-plus:
configuration:
auto-mapping-behavior: full
map-underscore-to-camel-case: true
#log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
global-config:
db-config:
id-type: auto

View File

@ -0,0 +1,23 @@
package cn.axzo.im.group;
import cn.axzo.im.Application;
import cn.axzo.im.group.message.GroupMessageSyncJob;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class GroupMessageSyncJobTest {
private final GroupMessageSyncJob groupMessageSyncJob;
@Test
void exec() {
}
}

View File

@ -0,0 +1,23 @@
package cn.axzo.im.group.message;
import cn.axzo.im.Application;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class GroupMessageSyncServiceTest {
private final GroupMessageSyncService groupMessageSyncService;
@Test
void sync() throws Exception {
groupMessageSyncService.syncAndWait();
}
}