Merge branch 'feature/REQ-3201'

This commit is contained in:
yanglin 2025-01-08 09:43:29 +08:00
commit 076f7cf7fe
107 changed files with 3940 additions and 203 deletions

View File

@ -3,12 +3,19 @@ package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetMessageDetailRequest;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse;
import cn.axzo.im.center.api.vo.resp.GetMessageDetailResponse;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
@ -54,7 +61,25 @@ public interface MessageApi {
* @return
*/
@PostMapping("/api/im/template-message/async/send")
ApiResult<MessageTaskResp> sendTemplateMessageAsync(@RequestBody @Validated SendTemplateMessageParam sendMessageParam);
ApiResult<MessageTaskResp> sendTemplateMessageAsync(
@RequestBody @Validated SendTemplateMessageParam sendMessageParam);
/**
* 更新消息
*/
@PostMapping("/api/im/template-message/updatable/updateMessage")
ApiResult<MessageUpdateResponse> updateMessage(@RequestBody @Validated UpdateMessageRequest request);
@PostMapping("/api/im/template-message/updatable/ack")
ApiResult<Void> ack(@RequestBody @Validated UpdatableMessageAckRequest request);
@PostMapping("/api/im/template-message/updatable/fetchUpdatableMessage")
ApiResult<FetchUpdatableMessageResponse> fetchUpdatableMessage(
@RequestBody @Validated FetchUpdatableMessageRequest request);
@PostMapping("/api/im/template-message/updatable/getMessageDetails")
ApiResult<GetMessageDetailResponse> getMessageDetails(
@RequestBody @Validated GetMessageDetailRequest request);
/**
*

View File

@ -0,0 +1,20 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.RevokeMessageRequest;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import javax.validation.Valid;
/**
* @author yanglin
*/
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface MessageTool {
@PostMapping("/api/im/message/tool/revoke")
ApiResult<String> revoke(@RequestBody @Valid RevokeMessageRequest request);
}

View File

@ -13,6 +13,8 @@ public class SendPriority {
public static final SendPriority TEMPLATE_MESSAGE = create(1000);
public static final SendPriority SYSTEM_CUSTOM_MESSAGE = create(5000);
public static final SendPriority UPDATE_MESSAGE = create(5500);
public static final SendPriority UPDATE_MESSAGE_RETRY = create(5510); // lower priority than UPDATE_MESSAGE
public static final SendPriority OP_MESSAGE = create(500000);
private final Integer value;

View File

@ -0,0 +1,20 @@
package cn.axzo.im.center.api.vo;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum ImErrorCodes implements CodeDefinition<Integer> {
TEMPLATE_NO_ROBOT(4000),
;
private final Integer code;
}

View File

@ -0,0 +1,51 @@
package cn.axzo.im.center.api.vo;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* @author yanglin
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
// IMPORTANT: 不要删除这个注解, 避免@Data被@Setter, @Getter取代
@EqualsAndHashCode
public class PersonAccountAttribute {
/**
* 接收消息的personId
*/
@NotBlank(message = "personId不能为空")
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 项目id
*/
private Long workspaceId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
@NotNull(message = "appType不能为空")
private AppTypeEnum appType;
}

View File

@ -0,0 +1,17 @@
package cn.axzo.im.center.api.vo.req;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class AdvanceRetryRequest {
private List<String> bizMessageIds;
}

View File

@ -74,6 +74,8 @@ public class AsyncSendMessageParam {
private Integer sendPriority;
private PushContent pushContent;
@Data
@Builder
@AllArgsConstructor

View File

@ -59,7 +59,7 @@ public class CustomMessageInfo {
*/
private Map<String, Object> personId2BizInfo = new HashMap<>();
private boolean isPush = false;
private PushContent pushContent;
public void addBizInfo(String personId, Object bizInfo) {
if (personId2BizInfo == null)

View File

@ -0,0 +1,55 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotBlank;
import java.util.Date;
/**
* @author yanglin
*/
@Setter
@Getter
public class FetchUpdatableMessageRequest {
public static final Long DEFAULT_MAX_UPDATE_TIME = 0L;
public static final Long DEFAULT_MIN_CURSOR_ID = Long.MAX_VALUE;
/**
* 接收者IM账号
*/
@NotBlank(message = "接收者账号不能为空")
private String toAccount;
/**
* 用于增量更新. 上次增量更新时最大的时间戳, 客户端每次增量更新结束需要按账号缓存该值
* 默认为1970年xxxx, 比较逻辑为IM消息更新大于该时间戳
*/
private Long maxUpdateTime = DEFAULT_MAX_UPDATE_TIME;
/**
* 用于批量(分页)查询. 游标ID, 上一页查询时返回的IM消息的最小ID
* 默认为最大整数值, 比较逻辑为IM消息ID小于该ID, 也就是先返回最新的消息
*/
private Long minCursorId = DEFAULT_MIN_CURSOR_ID;
public Date determineMaxUpdateTime() {
if (maxUpdateTime == null || maxUpdateTime <= 0)
return new Date(DEFAULT_MAX_UPDATE_TIME);
return new Date(maxUpdateTime);
}
public Long determineMinCursorId() {
if (minCursorId == null || minCursorId <= 0)
return DEFAULT_MIN_CURSOR_ID;
return minCursorId;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,22 @@
package cn.axzo.im.center.api.vo.req;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotEmpty;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class GetMessageDetailRequest {
/**
* 业务消息ID列表每次最大数量100
*/
@NotEmpty(message = "消息ID不能为空")
private Set<String> bizMessageIds;
}

View File

@ -0,0 +1,32 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections4.CollectionUtils;
import javax.validation.constraints.NotNull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
public interface MessageUpdateInfo {
static List<String> collectBizMessageIds(Collection<? extends MessageUpdateInfo> updates) {
if (CollectionUtils.isEmpty(updates))
return Collections.emptyList();
return updates.stream()
.map(MessageUpdateInfo::bizMessageId)
.distinct()
.collect(toList());
}
String bizMessageId();
@NotNull
JSONObject bizBody();
}

View File

@ -0,0 +1,68 @@
package cn.axzo.im.center.api.vo.req;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.Map;
/**
* @author yanglin
*/
@Setter
@Getter
public class PushContent {
/**
* 主标题
*/
private String title;
/**
* 推送内容
*/
private String content;
/**
* android点击push的跳转链接
*/
private String workerAndroidPushUrl;
/**
* ios点击push的跳转链接
*/
private String workerIosPushUrl;
/**
* android点击push的跳转链接
*/
private String mangerAndroidPushUrl;
/**
* ios点击push的跳转链接
*/
private String managerIosPushUrl;
/**
* push消息类型. SYSTEM: 系统消息, OP: 运营消息
*/
private PushMessageTye messageTye;
/**
* 自定义提示音
*/
private String customSoundFile;
private Map<String, String> intent = new HashMap<>();
public void addIntent(String key, String value) {
intent.put(key, value);
}
public String determineAndroidCategory() {
return messageTye == PushMessageTye.OP
? "MARKETING"
: "IM";
}
}

View File

@ -0,0 +1,24 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@RequiredArgsConstructor
public enum PushMessageTye implements CodeDefinition<String> {
SYSTEM("system", "系统消息"),
OP("op", "运营消息"),
;
private final String code;
private final String description;
@Override
public String getCode() {
return code;
}
}

View File

@ -0,0 +1,33 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class RevokeMessageRequest {
private Set<String> bizIds;
private String pattern;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@NotNull(message = "since不能为空")
private Date since;
private int parallelism = 25;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -1,6 +1,8 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -11,7 +13,9 @@ import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Data
@Builder
@ -19,23 +23,27 @@ import java.util.List;
@AllArgsConstructor
public class SendTemplateMessageParam {
/**
* 发送人
*/
@NotNull(message = "发送人不能为空")
private PersonAccountAttribute sender;
/**
* 消息接收用户信息
*/
@NotEmpty(message = "消息接收用户信息不能为空")
@Valid
private List<ReceivePerson> receivePersons;
private List<PersonAccountAttribute> receivePersons;
/**
* 消息标题
*/
@NotBlank(message = "消息标题不能为空")
private String msgHeader;
/**
* 消息内容
*/
@NotBlank(message = "消息内容不能为空")
private String msgContent;
/**
@ -62,39 +70,43 @@ public class SendTemplateMessageParam {
private Integer sendPriority;
private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE;
/**
* 推送消息
* 消息是否可更新
*/
private String payload;
private boolean isUpdatable;
public boolean isSendByRobot() {
return sender.getAppType() == null;
}
private PushContent pushContent;
private List<ExcludePushPayload> excludePushPayloads;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class ReceivePerson {
public Long determineSenderPersonId() {
if (sender != null)
return Long.parseLong(sender.getPersonId());
return 0L;
}
/**
* 接收消息的personId
*/
@NotBlank(message = "personId不能为空")
private String personId;
public Long determineSenderOuId() {
if (sender != null)
return sender.getOuId();
return 0L;
}
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
public boolean isUpdatable() {
return isUpdatable;
}
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
@NotNull(message = "appType不能为空")
private AppTypeEnum appType;
public Set<PersonAccountAttribute> uniqueReceivePersons() {
return new HashSet<>(receivePersons);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,70 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdatableMessageAckRequest {
/**
* ack的消息信息列表, 同一个bizMessageId传需要ack的最大dataVersion. 每次最多值100条
*/
@NotEmpty(message = "消息ACK不能为空")
private List<Acknowledgment> acknowledgments;
public List<Acknowledgment> determineValidAcknowledgments() {
if (CollectionUtils.isEmpty(acknowledgments))
return Collections.emptyList();
HashMap<String, Long> id2Max = new HashMap<>();
for (Acknowledgment ack : acknowledgments) {
Long max = id2Max.getOrDefault(ack.bizMessageId, 0L);
max = Math.max(max, ack.dataVersion);
id2Max.put(ack.bizMessageId, max);
}
return id2Max.entrySet().stream()
.map(e -> {
Acknowledgment ack = new Acknowledgment();
ack.setBizMessageId(e.getKey());
ack.setDataVersion(e.getValue());
return ack;
})
.collect(toList());
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class Acknowledgment {
/**
* 业务消息ID
*/
@NotBlank(message = "业务消息ID不能为空")
private String bizMessageId;
/**
* 数据版本
*/
@NotNull(message = "数据版本不能为空")
private Long dataVersion;
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdateMessageRequest {
@Valid
private List<Update> updates = new ArrayList<>();
public void addUpdate(Update update) {
this.updates.add(update);
}
public List<String> getBizMessageIds() {
return MessageUpdateInfo.collectBizMessageIds(updates);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class Update implements MessageUpdateInfo {
@NotBlank(message = "消息ID不能为空")
private String bizMessageId;
private String msgTemplateContent;
@Override
public String bizMessageId() {
return bizMessageId;
}
@Override
public JSONObject bizBody() {
return StringUtils.isBlank(msgTemplateContent)
? new JSONObject()
: JSON.parseObject(msgTemplateContent);
}
}
}

View File

@ -0,0 +1,45 @@
package cn.axzo.im.center.api.vo.resp;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class FetchUpdatableMessageResponse {
/**
* 消息列表, 返回的列表为空时表示没有更多满足条件的消息.
*/
private List<UpdatableMessageInfo> messages = new ArrayList<>();
/**
* 当前批次IM消息的最小ID
*/
private Long minCursorId;
public void addMessage(UpdatableMessageInfo message) {
messages.add(message);
}
public void setQueryProgress() {
if (messages == null) return;
minCursorId = messages.stream()
.mapToLong(UpdatableMessageInfo::getId)
.min()
.orElse(FetchUpdatableMessageRequest.DEFAULT_MIN_CURSOR_ID);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.im.center.api.vo.resp;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class GetMessageDetailResponse {
/**
* 消息列表
*/
private List<UpdatableMessageInfo> messages = new ArrayList<>();
public void addMessage(UpdatableMessageInfo message) {
messages.add(message);
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
@ -8,6 +9,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.List;
@Data
@Builder
@ -53,4 +55,11 @@ public class MessageTaskResp {
private Date createAt;
private Date updateAt;
private List<UpdatableMessageSendResult> updatableMessageSendResults;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,66 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class MessageUpdateResponse {
/**
* 更新的业务消息ID
*/
private List<String> updatedBizMessageIds = new ArrayList<>();
/**
* 未更新的业务消息
*/
private List<NonUpdatedMessage> nonUpdatedMessages = new ArrayList<>();
public void addUpdatedBizMessageId(String bizMessageId) {
this.updatedBizMessageIds.add(bizMessageId);
}
public void addNonUpdatedMessage(String bizMessageId,
NonUpdateMessageReason reason) {
addNonUpdatedMessage(bizMessageId, reason, null);
}
public void addNonUpdatedMessage(String bizMessageId,
NonUpdateMessageReason reason,
Object description) {
NonUpdatedMessage nonUpdatedMessage = new NonUpdatedMessage();
nonUpdatedMessage.setBizMessageId(bizMessageId);
nonUpdatedMessage.setReason(reason);
nonUpdatedMessage.setDescription(description);
this.nonUpdatedMessages.add(nonUpdatedMessage);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class NonUpdatedMessage {
private String bizMessageId;
private NonUpdateMessageReason reason;
private Object description;
}
public enum NonUpdateMessageReason {
// 找不到首次发送的消息
CANT_FIND_INIT_MESSAGE,
// 首次投递消息的状态不允许更新. 此时查看NonUpdatedMessage#description字段
MESSAGE_STATE_NOT_ALLOWED
}
}

View File

@ -0,0 +1,26 @@
package cn.axzo.im.center.api.vo.resp;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdatableMessageInfo {
/**
* 消息体
*/
private String msgBody;
/**
* 消息ID
*/
private Long id;
/**
* 更新时间
*/
private Long updateTime;
}

View File

@ -0,0 +1,15 @@
package cn.axzo.im.center.api.vo.resp;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdatableMessageSendResult {
private String bizMessageId;
private PersonAccountAttribute account;
}

View File

@ -18,6 +18,8 @@ public enum BizTypeEnum {
* 待办
*/
PENDING("PENDING", "待办"),
MESSAGE_UPDATE("MESSAGE_UPDATE", "消息更新")
;

View File

@ -0,0 +1,35 @@
package cn.axzo.im.center.common.enums;
/**
* @author yanglin
*/
public enum ImAppType {
/**
* 工人端
*/
CM,
/**
* 企业管理端
*/
CMP;
public static ImAppType fromNimAppType(AppTypeEnum appType) {
if (appType == null)
return null;
if (appType == AppTypeEnum.CM)
return CM;
if (appType == AppTypeEnum.CMP)
return CMP;
throw new UnsupportedOperationException("Should never happen!");
}
public AppTypeEnum toNimAppType() {
if (this == CM)
return AppTypeEnum.CM;
if (this == CMP)
return AppTypeEnum.CMP;
throw new UnsupportedOperationException("Should never happen!");
}
}

View File

@ -0,0 +1,21 @@
package cn.axzo.im.center.common.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public enum TemplatedMsgType {
TEMPLATE("template", "模板消息", false)
;
private final String code;
private final String message;
private final boolean isUpdatable;
}

View File

@ -0,0 +1,23 @@
package cn.axzo.im.center.common.enums;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author yanglin
*/
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum YesOrNo implements CodeDefinition<String> {
YES("YES", ""),
NO("NO", "")
;
@EnumValue
private final String code;
private final String desc;
}

View File

@ -2,8 +2,12 @@ package cn.axzo.im.channel.netease.client;
import cn.axzo.im.channel.netease.dto.BatchSendCustomMessageRequest;
import cn.axzo.im.channel.netease.dto.BatchSendCustomMessageResponse;
import cn.axzo.im.channel.netease.dto.DismissGroupRequest;
import cn.axzo.im.channel.netease.dto.DismissGroupResponse;
import cn.axzo.im.channel.netease.dto.GetAccountInfoRequest;
import cn.axzo.im.channel.netease.dto.GetAccountInfoResponse;
import cn.axzo.im.channel.netease.dto.GetGroupInfoRequest;
import cn.axzo.im.channel.netease.dto.GetGroupInfoResponse;
import cn.axzo.im.channel.netease.dto.QueryEventRequest;
import cn.axzo.im.channel.netease.dto.QueryEventResponse;
import cn.axzo.im.channel.netease.dto.QueryMessageRequest;
@ -56,6 +60,12 @@ public interface NimClient {
@PostMapping(value = "/user/updateUinfo.action")
UpdateAccountInfoResponse updateAccountInfo(UpdateAccountInfoRequest request);
@PostMapping(value = "/team/remove.action")
DismissGroupResponse dismissGroup(DismissGroupRequest request);
@PostMapping(value = "/team/queryDetail.action")
GetGroupInfoResponse getGroupInfo(GetGroupInfoRequest request);
@Data
class CodeResponse {
private Integer code;

View File

@ -0,0 +1,18 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.channel.netease.client.FormRequest;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/**
* @author yanglin
*/
@Data
@FormRequest
public class DismissGroupRequest {
@NotBlank(message = "群id不能为空")
private String tid;
@NotBlank(message = "群主id不能为空")
private String owner;
}

View File

@ -0,0 +1,13 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.channel.netease.client.NimClient;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class DismissGroupResponse extends NimClient.CodeResponse {
}

View File

@ -0,0 +1,18 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.channel.netease.client.FormRequest;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/**
* @author yanglin
*/
@Data
@FormRequest
public class GetGroupInfoRequest {
@NotBlank(message = "群id不能为空")
private String tid;
}

View File

@ -0,0 +1,16 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.channel.netease.client.NimClient;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class GetGroupInfoResponse extends NimClient.CodeResponse {
private Object tinfo;
}

View File

@ -1,6 +1,9 @@
package cn.axzo.im.channel.netease.dto;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@ -36,4 +39,15 @@ public class MessageBody {
private Map<String,String> messageExtension;
/**
* 端信息
*/
private Peer peer;
public JSONObject parseMsgBody() {
if (StringUtils.isBlank(msgBody))
return new JSONObject();
return JSON.parseObject(msgBody);
}
}

View File

@ -36,5 +36,4 @@ public class MessageCustomBody {
*/
private String payload;
}

View File

@ -0,0 +1,28 @@
package cn.axzo.im.channel.netease.dto;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class Peer {
/**
* IM是否为机器人发送
*/
private boolean isSenderRobot;
/**
* IM发送者自然人id, 机器人发送时为0
*/
private Long senderPersonId;
/**
* IM接收者自然人id
*/
private Long receiverPersonId;
}

View File

@ -5,15 +5,25 @@ import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.feign.SendPriority;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.ImErrorCodes;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetMessageDetailRequest;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse;
import cn.axzo.im.center.api.vo.resp.GetMessageDetailResponse;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.entity.AccountRegister;
@ -26,6 +36,9 @@ import cn.axzo.im.service.CustomMessageService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.service.RobotMsgTemplateService;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.updatable.UpdatableMessageQueryService;
import cn.axzo.im.updatable.UpdateSupport;
import cn.axzo.pokonyan.exception.Aassert;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
@ -39,12 +52,15 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import static cn.axzo.im.config.BizResultCode.ALL_PERSSON_TYPE_NOT_EMPTY;
import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_MAX;
@ -76,6 +92,12 @@ public class MessageController implements MessageApi {
private MessageHistoryService messageHistoryService;
@Autowired
private CustomMessageService customMessageService;
@Autowired
private UpdatableMessageManager updatableMessageManager;
@Autowired
private UpdateSupport updateSupport;
@Autowired
private UpdatableMessageQueryService updatableMessageQueryService;
@Override
@ -112,6 +134,7 @@ public class MessageController implements MessageApi {
// 全员发送是不常用的场景不应该由业务处理所以把配置放在bizData里面
.allPerson(sendMessageParam.isAllPerson())
.appTypes(sendMessageParam.getAppTypes())
.pushContent(sendMessageParam.getPushContent())
.build();
Date now = new Date();
MessageTask messageTask = messageTaskService.create(MessageTask.builder()
@ -150,32 +173,80 @@ public class MessageController implements MessageApi {
}
@Override
public ApiResult<MessageTaskResp> sendTemplateMessageAsync(SendTemplateMessageParam sendMessageParam) {
String sendImAccount = check(sendMessageParam);
@Transactional
public ApiResult<MessageTaskResp> sendTemplateMessageAsync(SendTemplateMessageParam request) {
log.info("sendTemplateMessageAsync, request={}", request);
PersonAccountAttribute sender = request.getSender();
String sendImAccount;
if (sender.getAppType() != null) {
sendImAccount = accountService.registerAccountIfAbsent(
sender.getPersonId(), sender.getOuId(), sender.getAppType());
} else {
sendImAccount = check(request);
}
MessageTask.BizData bizData = MessageTask.BizData.builder()
.msgTemplateContent(sendMessageParam.getMsgTemplateContent())
.msgTemplateId(sendMessageParam.getMsgTemplateId())
.payload(sendMessageParam.getPayload())
.excludePushPayloads(sendMessageParam.getExcludePushPayloads())
.msgTemplateContent(request.getMsgTemplateContent())
.msgTemplateId(request.getMsgTemplateId())
.pushContent(request.getPushContent())
.excludePushPayloads(request.getExcludePushPayloads())
.templatedMsgType(request.getTemplatedMsgType())
.isSenderRobot(request.isSendByRobot())
.senderPersonId(request.determineSenderPersonId())
.build();
Date now = new Date();
List<MessageTask.ReceivePerson> receivePersons = JSONArray.parseArray(
JSONObject.toJSONString(request.uniqueReceivePersons()), MessageTask.ReceivePerson.class);
MessageTask messageTask = messageTaskService.create(MessageTask.builder()
.bizId(sendMessageParam.getBizId())
.bizId(request.getBizId())
.sendImAccount(sendImAccount)
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class))
.receivePersons(receivePersons)
.status(MessageTaskStatus.PENDING)
.title(sendMessageParam.getMsgHeader())
.content(sendMessageParam.getMsgContent())
.title(request.getMsgHeader() == null ? "" : request.getMsgHeader())
.content(request.getMsgContent() == null ? "" : request.getMsgContent())
.bizData(bizData)
.ext(sendMessageParam.getExt())
.ext(request.getExt())
.planStartTime(now)
.createAt(now)
.sendPriority(SendPriority.TEMPLATE_MESSAGE
.determinePriority(sendMessageParam.getSendPriority()))
.determinePriority(request.getSendPriority()))
.apiChannel(ApiChannel.COMMON_MESSAGE)
.build());
return ApiResult.ok(toMessageTaskResp(messageTask));
List<UpdatableMessageSendResult> updatableMessageSendResults = Collections.emptyList();
if (request.isUpdatable()) {
updatableMessageSendResults = updatableMessageManager.createUpdatableMessage(messageTask, request, receivePersons);
}
MessageTaskResp messageTaskResp = toMessageTaskResp(messageTask);
messageTaskResp.setUpdatableMessageSendResults(updatableMessageSendResults);
return ApiResult.ok(messageTaskResp);
}
@Override
public ApiResult<MessageUpdateResponse> updateMessage(UpdateMessageRequest request) {
log.info("updateMessage, request={}", request);
MessageUpdateResponse resp = updatableMessageManager.updateMessage(request);
return ApiResult.ok(resp);
}
@Override
public ApiResult<Void> ack(UpdatableMessageAckRequest request) {
log.info("ack, request={}", request);
updatableMessageManager.ack(request);
return ApiResult.ok();
}
@Override
public ApiResult<FetchUpdatableMessageResponse> fetchUpdatableMessage(FetchUpdatableMessageRequest request) {
log.info("fetchUpdatableMessage, request={}", request);
FetchUpdatableMessageResponse resp = updatableMessageQueryService.fetchUpdatableMessage(request);
return ApiResult.ok(resp);
}
@Override
public ApiResult<GetMessageDetailResponse> getMessageDetails(GetMessageDetailRequest request) {
log.info("getMessageDetail, request={}", request);
GetMessageDetailResponse resp = updatableMessageQueryService.getMessageDetails(request);
return ApiResult.ok(resp);
}
private void check(SendMessageParam sendMessageParam) {
@ -208,7 +279,8 @@ public class MessageController implements MessageApi {
private String check(SendTemplateMessageParam sendMessageParam) {
List<String> robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(sendMessageParam.getMsgTemplateId());
if (CollectionUtils.isEmpty(robotIdList)) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],还未维护机器人账户!");
throw new ServiceException(ImErrorCodes.TEMPLATE_NO_ROBOT.getCode(),
"消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],还未维护机器人账户!");
}
if (CollectionUtils.size(robotIdList) > 1) {
throw new ServiceException("消息模板ID[" + sendMessageParam.getMsgTemplateId() + "],关联了多个机器人!");
@ -233,6 +305,16 @@ public class MessageController implements MessageApi {
return robotImAccount;
}
private Optional<UserAccountResp> findRobotAccount(String robotId) {
AccountQuery accountQuery = new AccountQuery();
accountQuery.setAccountId(robotId);
accountQuery.setAppType(AppTypeEnum.SYSTEM.getCode());
List<UserAccountResp> accounts = accountService.queryAccountInfo(accountQuery);
if (CollectionUtils.isEmpty(accounts))
return Optional.empty();
return Optional.of(accounts.get(0));
}
public MessageTaskResp toMessageTaskResp(MessageTask messageTask) {
MessageTaskResp messageTaskResp = MessageTaskResp.builder().build();
BeanUtils.copyProperties(messageTask, messageTaskResp);

View File

@ -0,0 +1,34 @@
package cn.axzo.im.controller;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageTool;
import cn.axzo.im.center.api.vo.req.RevokeMessageRequest;
import cn.axzo.im.updatable.RevokeService;
import cn.axzo.im.utils.BizAssertions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yanglin
*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class MessageToolController implements MessageTool {
private final RevokeService revokeService;
@Override
public ApiResult<String> revoke(RevokeMessageRequest request) {
log.info("Revoke message request:{}", request);
BizAssertions.assertTrue(
CollectionUtils.isNotEmpty(request.getBizIds())
|| StringUtils.isNotBlank(request.getPattern()),
"bizIds and pattern can't be blank at the same time");
return ApiResult.ok(revokeService.revoke(request));
}
}

View File

@ -2,6 +2,8 @@ package cn.axzo.im.controller;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.channel.netease.dto.DismissGroupRequest;
import cn.axzo.im.channel.netease.dto.GetGroupInfoRequest;
import cn.axzo.im.channel.netease.dto.QueryEventRequest;
import cn.axzo.im.channel.netease.dto.QueryMessageRequest;
import cn.axzo.im.channel.netease.dto.RevokeMessageRequest;
@ -80,4 +82,15 @@ public class PrivateController {
int count = expungeImTaskJob.expunge(param);
return CommonResponse.success(count);
}
@PostMapping("/private/group/dismissGroup")
public Object dismissGroup(@Valid @RequestBody DismissGroupRequest request) {
return CommonResponse.success(nimClient.dismissGroup(request));
}
@PostMapping("/private/group/getGroupInfo")
public Object getGroupInfo(@Valid @RequestBody GetGroupInfoRequest request) {
return CommonResponse.success(nimClient.getGroupInfo(request));
}
}

View File

@ -1,10 +0,0 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.AckRetry;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author yanglin
*/
public interface AckRetryMapper extends BaseMapper<AckRetry> {
}

View File

@ -0,0 +1,24 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.MessageUpdateRetry;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
/**
* @author yanglin
*/
public interface MessageUpdateRetryMapper extends BaseMapper<MessageUpdateRetry> {
@Delete("<script>" +
"DELETE FROM im_message_update_retry WHERE biz_message_id IN\n" +
" <foreach collection='bizMessageIds' item='bizMessageId' open='(' close=')' separator=','>\n" +
" #{bizMessageId}\n" +
" </foreach>" +
"</script>")
void deleteByBizMessageIds(
@Param("bizMessageIds") Collection<String> bizMessageIds);
}

View File

@ -0,0 +1,18 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.UpdatableMessageLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
/**
* @author yanglin
*/
public interface UpdatableMessageLogMapper extends BaseMapper<UpdatableMessageLog> {
@Delete("DELETE FROM im_updatable_message_log WHERE create_at <= #{until}")
int expunge(@Param("until") Date until);
}

View File

@ -0,0 +1,31 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.UpdatableMessage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
import java.util.List;
/**
* @author yanglin
*/
public interface UpdatableMessageMapper extends BaseMapper<UpdatableMessage> {
@Update("<script>" +
"UPDATE im_updatable_message SET data_version = data_version + 1, retry_count = 0 WHERE id IN\n" +
" <foreach collection='ids' item='id' open='(' close=')' separator=','>\n" +
" #{id}\n" +
" </foreach>\n" +
"</script>")
void incrDataVersion(@Param("ids") List<Long> ids);
@Update("<script>" +
"UPDATE im_updatable_message SET retry_count = retry_count + 1 WHERE id IN\n" +
" <foreach collection='ids' item='id' open='(' close=')' separator=','>\n" +
" #{id}\n" +
" </foreach>\n" +
"</script>")
void incrRetryCount(@Param("ids") List<Long> ids);
}

View File

@ -1,13 +0,0 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.AckRetryMapper;
import cn.axzo.im.entity.AckRetry;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
/**
* @author yanglin
*/
@Repository("ackRetryDao")
public class AckRetryDao extends ServiceImpl<AckRetryMapper, AckRetry> {
}

View File

@ -1,12 +1,14 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Repository;
import java.util.Collections;
import java.util.List;
/**
* im-center
*
@ -18,4 +20,15 @@ import org.springframework.stereotype.Repository;
@Repository("messageHistoryDao")
public class MessageHistoryDao extends ServiceImpl<MessageHistoryMapper, MessageHistory> {
/**
* 不用自带的listByIds
*/
public List<MessageHistory> getByIds(List<Long> historyIds) {
if (CollectionUtils.isEmpty(historyIds))
return Collections.emptyList();
return lambdaQuery()
.in(MessageHistory::getId, historyIds)
.list();
}
}

View File

@ -0,0 +1,13 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.MessageUpdateRetryMapper;
import cn.axzo.im.entity.MessageUpdateRetry;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
/**
* @author yanglin
*/
@Repository("messageUpdateRetryDao")
public class MessageUpdateRetryDao extends ServiceImpl<MessageUpdateRetryMapper, MessageUpdateRetry> {
}

View File

@ -0,0 +1,50 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.UpdatableMessageMapper;
import cn.axzo.im.entity.UpdatableMessage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Repository;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Repository("updatableMessageDao")
public class UpdatableMessageDao extends ServiceImpl<UpdatableMessageMapper, UpdatableMessage> {
public List<UpdatableMessage> getByBizMessageIdsForUpdate(Collection<String> bizMessageIds) {
if (CollectionUtils.isEmpty(bizMessageIds))
return Collections.emptyList();
// 避免死锁
List<String> sortedBizMessageIds = bizMessageIds.stream()
.sorted().collect(toList());
return lambdaQuery()
.in(UpdatableMessage::getBizMessageId, sortedBizMessageIds)
// 避免ack更新出现竞争
.last("FOR UPDATE")
.list();
}
public List<UpdatableMessage> getByBizMessageIds(Collection<String> bizMessageIds) {
if (CollectionUtils.isEmpty(bizMessageIds))
return Collections.emptyList();
return lambdaQuery()
.in(UpdatableMessage::getBizMessageId, bizMessageIds)
.list();
}
public List<UpdatableMessage> getByTaskIds(List<Long> taskIds) {
if (CollectionUtils.isEmpty(taskIds))
return Collections.emptyList();
return lambdaQuery()
.in(UpdatableMessage::getTaskId, taskIds)
.list();
}
}

View File

@ -0,0 +1,13 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.UpdatableMessageLogMapper;
import cn.axzo.im.entity.UpdatableMessageLog;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
/**
* @author yanglin
*/
@Repository("updatableMessageLogDao")
public class UpdatableMessageLogDao extends ServiceImpl<UpdatableMessageLogMapper, UpdatableMessageLog> {
}

View File

@ -2,11 +2,14 @@ package cn.axzo.im.entity;
import lombok.Data;
import java.util.Map;
/**
* @author yanglin
*/
@Data
public class HistoryRecordExt {
private Boolean isSenderRobot;
private String sendApi;
private String sendRespDesc;
private String batchSendId;
@ -24,4 +27,12 @@ public class HistoryRecordExt {
private boolean payloadExcluded;
private String sound;
}
private Boolean isUpdatableMessage;
private String bizMessageId;
private Long dataVersion;
private Boolean isUpdateMessage;
private Boolean isUpdateRetry;
private Long updateRetryCount;
private Map<String, String> initMessageExt;
private Long workspaceId;
}

View File

@ -32,7 +32,7 @@ import java.util.Optional;
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistory implements Serializable {
public class MessageHistory implements Serializable, NimMessageHistory {
private static final long serialVersionUID = 1L;
@ -131,7 +131,35 @@ public class MessageHistory implements Serializable {
return recordExt;
}
public Long getDataVersion() {
return recordExt != null ? recordExt.getDataVersion() : null;
}
public boolean isUpdatableMessage() {
return recordExt != null
&& recordExt.getIsUpdatableMessage() != null
&& recordExt.getIsUpdatableMessage();
}
public boolean isUpdateMessage() {
return recordExt != null
&& recordExt.getIsUpdatableMessage() != null
&& recordExt.getIsUpdatableMessage();
}
public boolean isUpdateRetry() {
return recordExt != null
&& recordExt.getIsUpdateRetry() != null
&& recordExt.getIsUpdateRetry();
}
public Long getUpdateRetryCount() {
return recordExt == null ? null : recordExt.getUpdateRetryCount();
}
public Optional<String> determineBatchNo() {
if (isUpdatableMessage())
return Optional.empty();
String batchNo = this.batchNo;
// 兼容在途数据
if (StringUtils.isBlank(batchNo) && imMessageTaskId != null)

View File

@ -30,7 +30,7 @@ import java.util.Date;
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistoryCold implements Serializable {
public class MessageHistoryCold implements Serializable, NimMessageHistory {
private static final long serialVersionUID = 1L;

View File

@ -1,11 +1,13 @@
package cn.axzo.im.entity;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.ExcludePushPayload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import cn.axzo.im.config.BaseListTypeHandler;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.enums.MessageTaskStatus;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.IdType;
@ -18,11 +20,13 @@ import com.google.common.collect.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.cglib.beans.BeanMap;
import java.util.Date;
@ -116,6 +120,9 @@ public class MessageTask {
@NoArgsConstructor
@AllArgsConstructor
public static class BizData {
private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE;
private String msgTemplateId;
/**
@ -128,11 +135,7 @@ public class MessageTask {
*/
private BizTypeEnum bizType;
/**
* 网易云信-自定义消息使用
* 推送内容 - 业务数据json格式
*/
private String payload;
private PushContent pushContent;
/**
* 跳转信息
@ -154,12 +157,25 @@ public class MessageTask {
private List<AppTypeEnum> appTypes;
private List<ExcludePushPayload> excludePushPayloads;
private Long senderPersonId = 0L;
private Boolean isSenderRobot;
public boolean determineIsSenderRobot() {
return isSenderRobot != null && isSenderRobot;
}
public TemplatedMsgType determineTemplatedMsgType() {
return templatedMsgType == null ? TemplatedMsgType.TEMPLATE : templatedMsgType;
}
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public static class ReceivePerson {
/**
@ -189,6 +205,12 @@ public class MessageTask {
private Long workspaceId;
public Long personIdAsLong() {
if (NumberUtils.isDigits(personId))
return Long.parseLong(personId);
return 0L;
}
public String buildKey(Map<Long, Long> ouIdMap) {
if (StringUtils.isNotBlank(this.getImAccount())) {
return this.getImAccount();

View File

@ -11,16 +11,14 @@ import java.util.Date;
*/
@Setter
@Getter
@TableName(value = "im_ack_retry", autoResultMap = true)
public class AckRetry {
@TableName(value = "im_message_update_retry", autoResultMap = true)
public class MessageUpdateRetry {
private Long id;
private String bizMessageId;
private Long initHistoryId;
private Date nextRetryTime;
private Long dataVersion;
private Long retryHistoryId;
private Integer retryCount;
private Long isDelete;
private Date createAt;
private Date updateAt;

View File

@ -0,0 +1,14 @@
package cn.axzo.im.entity;
/**
* @author yanglin
*/
public interface NimMessageHistory {
String getMessageId();
String getFromAccount();
String getToAccount();
}

View File

@ -0,0 +1,118 @@
package cn.axzo.im.entity;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.api.vo.req.MessageUpdateInfo;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.enums.UpdatableMessageState;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.Date;
/**
* @author yanglin
*/
@Setter
@Getter
@TableName(value = "im_updatable_message", autoResultMap = true)
public class UpdatableMessage implements MessageUpdateInfo {
private Long id;
private String batchNo;
private String templateId;
private String bizId;
private Long taskId;
private String fromAccount;
private String toAccount;
private String receiverPersonId;
private Long receiverOuId;
private String senderPersonId;
private Long senderOuId;
private YesOrNo isSenderRobot;
private AppTypeEnum appType;
private TemplatedMsgType msgType;
private UpdatableMessageState state;
private String bizMessageId;
private String nimMessageId;
private Long initHistoryId;
private Long updateHistoryId;
private Long retryHistoryId;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizBody;
private Long dataVersion;
private Long ackDataVersion;
private Long retryCount;
@TableField(typeHandler = FastjsonTypeHandler.class)
private RecordExt recordExt;
private Long isDelete;
private Date createAt;
private Date updateAt;
public RecordExt getOrCreateRecordExt() {
if (recordExt == null)
recordExt = new RecordExt();
return recordExt;
}
public UpdatableMessageLog toMessageLog(Object request) {
UpdatableMessageLog messageLog = new UpdatableMessageLog();
messageLog.setBizId(bizId);
messageLog.setFromAccount(fromAccount);
messageLog.setToAccount(toAccount);
messageLog.setBizMessageId(bizMessageId);
messageLog.setMessageState(state);
messageLog.setInitHistoryId(initHistoryId);
messageLog.setBizBody(bizBody);
messageLog.setRetryCount(retryCount);
messageLog.setDataVersion(dataVersion);
messageLog.addLogContent("request", request);
// messageLog.setContext(null);
// messageLog.setContextHistoryId(null);
return messageLog;
}
public PersonAccountAttribute parsePersonAccount() {
PersonAccountAttribute person = new PersonAccountAttribute();
person.setPersonId(receiverPersonId);
person.setOuId(receiverOuId);
person.setAppType(appType);
person.setWorkspaceId(getOrCreateRecordExt().getReceiverWorkspaceId());
return person;
}
public Long senderPersonIdAsLong() {
if (NumberUtils.isDigits(senderPersonId))
return NumberUtils.toLong(senderPersonId);
return 0L;
}
public Long receiverPersonIdAsLong() {
if (NumberUtils.isDigits(receiverPersonId))
return NumberUtils.toLong(receiverPersonId);
return 0L;
}
@Override
public String bizMessageId() {
return bizMessageId;
}
@Override
public JSONObject bizBody() {
return bizBody;
}
@Getter
@Setter
public static class RecordExt {
private Long receiverWorkspaceId;
}
}

View File

@ -0,0 +1,64 @@
package cn.axzo.im.entity;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.utils.IgnorePropsJsonTypeHandler;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
/**
* @author yanglin
*/
@Setter
@Getter
@TableName(value = "im_updatable_message_log", autoResultMap = true)
public class UpdatableMessageLog {
private Long id;
private String bizId;
private String fromAccount;
private String toAccount;
private String bizMessageId;
private UpdatableMessageState messageState;
private Long initHistoryId;
private String context;
private Long contextHistoryId;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizBody;
private Long retryCount;
private Long dataVersion;
@TableField(typeHandler = IgnorePropsJsonTypeHandler.class)
private JSONObject logContent;
@TableField(typeHandler = IgnorePropsJsonTypeHandler.class)
private RecordExt recordExt;
private Long isDelete;
private Date createAt;
private Date updateAt;
public void addLogContent(String name, Object value) {
if (logContent == null)
logContent = new JSONObject();
logContent.put(name, value);
}
@Setter
@Getter
public static class RecordExt {
private AckInfo ack;
}
@Setter
@Getter
public static class AckInfo {
private Boolean ackSuccess;
private String ackDescription;
private Long requestDataVersion;
private Long messageDataVersion;
}
}

View File

@ -0,0 +1,35 @@
package cn.axzo.im.enums;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum UpdatableMessageState {
// 已创建
TASK_CREATED(true),
// 消息已经放进队列
INIT_MESSAGE_QUEUED(true),
// 消息已经发送成功
INIT_MESSAGE_SEND_SUCCESS(true),
// 消息已经发送失败
INIT_MESSAGE_SEND_FAIL(true),
// 更新已经放进队列
UPDATE_MESSAGE_QUEUED(true),
// 更新已经发送成功
UPDATE_MESSAGE_SEND_SUCCESS(true),
// 更新已经发送失败
UPDATE_MESSAGE_SEND_FAIL(true),
// 更新ACK
UPDATE_ACK(true),
// 未找到账号
ACCOUNT_NOT_FOUND(false);
private final boolean isUpdateMessageAllowed;
}

View File

@ -1,29 +0,0 @@
package cn.axzo.im.gateway;
import cn.axzo.maokai.api.client.OrganizationalNodeApi;
import cn.axzo.maokai.api.vo.request.OrganizationalNodeBatchQueryVO;
import cn.axzo.maokai.api.vo.response.OrganizationalNodeVO;
import cn.axzo.pokonyan.util.RpcUtil;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Service("organizationalNodeApiGateway")
@Slf4j
@RequiredArgsConstructor
public class OrganizationalNodeApiGateway {
private final OrganizationalNodeApi organizationalNodeApi;
public List<OrganizationalNodeVO> fetchNodesByNodeIds(List<Long> nodeIdList) {
OrganizationalNodeBatchQueryVO organizationalNodeBatchQueryVO = new OrganizationalNodeBatchQueryVO();
organizationalNodeBatchQueryVO.setNodeIds(nodeIdList);
organizationalNodeBatchQueryVO.setContainsDeleted(false);
List<OrganizationalNodeVO> nodeVOList = organizationalNodeApi.listNode(organizationalNodeBatchQueryVO).getData();
return RpcUtil.rpcApiResultProcessor(() -> organizationalNodeApi.listNode(organizationalNodeBatchQueryVO), "fetchNodesByNodeIds", JSON.toJSONString(nodeVOList));
}
}

View File

@ -0,0 +1,61 @@
package cn.axzo.im.push;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.payload.PushPayloadBuilder;
import cn.axzo.im.push.payload.intent.Intent;
import cn.axzo.im.push.payload.intent.IntentValue;
import cn.axzo.im.utils.BizAssertions;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.function.Consumer;
/**
* @author yanglin
*/
@Service
public class NimPushService {
private final PushPayloadBuilder<?>[] payloadBuilders;
@SuppressWarnings("rawtypes")
public NimPushService(ObjectProvider<PushPayloadBuilder[]> payloadBuilders) {
this.payloadBuilders = payloadBuilders.getIfAvailable();
}
@SuppressWarnings({"rawtypes", "unchecked"})
public String buildPayload(PushContent content, PushPeer peer) {
Consumer<Intent<?>> intentPopulator = intent -> {
intent.setValue(Intent.INTENT_TYPE,
peer.getApiChannel() == ApiChannel.CUSTOM_MESSAGE
? IntentValue.TYPE_SERVER
: IntentValue.TYPE_IM);
intent.setValue(Intent.INTENT_SESSION_TYPE, IntentValue.CONSTANT_SESSION_TYPE);
intent.setValue(Intent.INTENT_SESSION_ID, IntentValue.create(peer.getSenderImAccount()));
if (peer.getOuId() != null && peer.getOuId() != 0L)
intent.setValue(Intent.INTENT_OU_ID, IntentValue.create(peer.getOuId()));
if (peer.getWorkspaceId() != null && peer.getWorkspaceId() != 0L)
intent.setValue(Intent.INTENT_WORKSPACE_ID, IntentValue.create(peer.getWorkspaceId()));
if (StringUtils.isNotBlank(content.getCustomSoundFile()))
intent.setValue(Intent.INTENT_SOUND, IntentValue.create(content.getCustomSoundFile()));
for (Map.Entry<String, String> e : content.getIntent().entrySet()) {
if (e.getValue() == null) continue;
intent.setValue(e.getKey(), IntentValue.create(e.getValue()));
}
};
JSONObject payload = new JSONObject();
payload.put("pushTitle", content.getTitle());
for (PushPayloadBuilder builder : payloadBuilders) {
Intent intent = builder.createIntent(peer, content);
BizAssertions.assertNotNull(intent, "intent can't be null");
intentPopulator.accept(intent);
builder.build(peer, content, intent, payload);
}
return payload.toJSONString();
}
}

View File

@ -0,0 +1,49 @@
package cn.axzo.im.push;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.center.api.vo.req.PushMessageTye;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.push.PushProps.ChannelConfig;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class PushPeer {
private Long ouId;
private Long workspaceId;
private String senderImAccount;
private AppTypeEnum appType;
private ApiChannel apiChannel;
public String determineAndroidUrl(PushContent content) {
if (appType == AppTypeEnum.CM)
return content.getWorkerAndroidPushUrl();
else if (appType == AppTypeEnum.CMP)
return content.getMangerAndroidPushUrl();
return null;
}
public String determineIosUrl(PushContent content) {
if (appType == AppTypeEnum.CM)
return content.getWorkerIosPushUrl();
else if (appType == AppTypeEnum.CMP)
return content.getManagerIosPushUrl();
return null;
}
public String determineChannelId(ChannelConfig config,
PushContent content) {
PushProps.ChannelIds channelIds = appType == AppTypeEnum.CM
? config.getWorkerIds()
: config.getManagerIds();
return content.getMessageTye() == PushMessageTye.OP
? channelIds.getOpMessageChannelId()
: channelIds.getWorkMessageChannelId();
}
}

View File

@ -0,0 +1,96 @@
package cn.axzo.im.push;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
/**
* @author yanglin
*/
@Data
@RefreshScope
@Configuration
@ConfigurationProperties(prefix = "upush.channels")
public class PushProps {
/**
* oppo渠道配置
*/
private ChannelConfig oppoChannelConfig = new ChannelConfig(
"oppo渠道配置",
// 管理端
//new ChannelIds("cmp", "67a686e9", "38a2db69"),
// 工人端
//new ChannelIds("cm", "203be53b", "3163b193"),
new ChannelIds("cm", "12a12b80", "12a12b80"),
new ChannelIds("cm", "12a12b80", "12a12b80"));
/**
* 小米渠道配置
*/
private ChannelConfig xiaomiChannelConfig = new ChannelConfig(
"小米渠道配置",
// 管理端
new ChannelIds("cmp", "122345", "122346"),
// 工人端
new ChannelIds("cm", "122431", "122432"));
/**
* 不同app的渠道配置
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class ChannelConfig {
/**
* 渠道名称
*/
private String channelName;
/**
* 管理端配置
*/
private ChannelIds managerIds;
/**
* 工人端配置
*/
private ChannelIds workerIds;
public String toString() {
return JSON.toJSONString(this);
}
}
/**
* 不同渠道的channelId配置
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class ChannelIds {
private String appType;
/**
* 个人工作信息渠道
*/
private String workMessageChannelId;
/**
* 推广信息运营信息渠道
*/
private String opMessageChannelId;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
}

View File

@ -0,0 +1,28 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.payload.intent.Intent;
import cn.axzo.im.push.payload.intent.IntentValue;
import cn.axzo.im.utils.BizAssertions;
import org.apache.commons.lang3.StringUtils;
/**
* @author yanglin
*/
abstract class AndroidPushPayloadBuilder<T extends Intent<?>>
implements PushPayloadBuilder<T> {
@Override
public final T createIntent(PushPeer peer, PushContent content) {
T intent = createIntent();
BizAssertions.assertNotNull(intent, "intent can't be null");
String url = peer.determineAndroidUrl(content);
if (StringUtils.isNotBlank(url))
Intent.setRouter(intent, IntentValue.create(url).urlEncode());
return intent;
}
abstract T createIntent();
}

View File

@ -0,0 +1,45 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.payload.intent.UriIntent;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
/**
* 华为
*
* @author yanglin
*/
@Component
class HWPushPayloadBuilder extends AndroidPushPayloadBuilder<UriIntent> {
@Override
UriIntent createIntent() {
return new UriIntent();
}
@Override
public void build(PushPeer peer, PushContent content,
UriIntent intent, JSONObject payload) {
// 点击事件的内容
JSONObject clickAction = new JSONObject();
clickAction.put("type", 1);
clickAction.put("intent", intent.build());
// 通知的内容
JSONObject androidConfig = new JSONObject();
androidConfig.put("category", content.determineAndroidCategory());
JSONObject hwField = new JSONObject();
hwField.put("android", androidConfig);
hwField.put("style", 1);
hwField.put("big_title", content.getTitle());
hwField.put("big_body", content.getContent());
hwField.put("click_action", clickAction);
payload.put("hwField", hwField);
}
}

View File

@ -0,0 +1,40 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.PushProps;
import cn.axzo.im.push.payload.intent.JsonIntent;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
class OppoPushPayloadBuilder extends AndroidPushPayloadBuilder<JsonIntent> {
private final PushProps pushProps;
@Override
public JsonIntent createIntent() {
return new JsonIntent();
}
@Override
public void build(PushPeer peer, PushContent content,
JsonIntent intent, JSONObject payload) {
PushProps.ChannelConfig xmCfg = pushProps.getOppoChannelConfig();
JSONObject oppoField = new JSONObject();
oppoField.put("channel_id", peer.determineChannelId(xmCfg, content));
oppoField.put("category", content.determineAndroidCategory());
oppoField.put("notify_level", 2);
oppoField.put("click_action_type", 1);
oppoField.put("click_action_activity", "com.oppo.codelabpush.intent.action.test");
oppoField.put("action_parameters", intent.build().toJSONString());
payload.put("oppoField", oppoField);
}
}

View File

@ -0,0 +1,47 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.payload.intent.Intent;
import cn.axzo.im.push.payload.intent.IntentValue;
import cn.axzo.im.push.payload.intent.JsonIntent;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
/**
* 苹果
*
* @author yanglin
*/
@Component
class PGPushPayloadBuilder implements PushPayloadBuilder<JsonIntent> {
@Override
public JsonIntent createIntent(PushPeer peer, PushContent content) {
JsonIntent intent = new JsonIntent();
String url = peer.determineIosUrl(content);
if (StringUtils.isNotBlank(url))
Intent.setRouter(intent, IntentValue
.create(url));
return intent;
}
@Override
public void build(PushPeer peer, PushContent content,
JsonIntent intent, JSONObject payload) {
JSONObject userInfo = new JSONObject();
userInfo.putAll(intent.build());
JSONObject alert = new JSONObject();
alert.put("title", content.getTitle());
alert.put("body", content.getContent());
alert.put("userInfo", userInfo);
JSONObject apsField = new JSONObject();
apsField.put("alert", alert);
payload.put("apsField", apsField);
}
}

View File

@ -0,0 +1,23 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.payload.intent.Intent;
import com.alibaba.fastjson.JSONObject;
/**
* <a href="https://doc.yunxin.163.com/messaging/server-apis/DQyNjc5NjE?platform=server#apns%E6%8E%A8%E9%80%81%E6%B6%88%E6%81%AF">云信推送</a>
* <a href="https://doc.yunxin.163.com/messaging/guide/TY4MzU5MDc?platform=android#%E8%AE%BE%E7%BD%AE%E6%8E%A8%E9%80%81%E9%80%9A%E7%9F%A5%E6%A0%8F%E8%B7%B3%E8%BD%AC%E6%96%B9%E5%BC%8F">各端推送配置</a>
* <p/>
* 文字, 链接声音
*
* @author yanglin
*/
public interface PushPayloadBuilder<T extends Intent<?>> {
T createIntent(PushPeer peer, PushContent content);
void build(PushPeer peer, PushContent content,
T intent, JSONObject payload);
}

View File

@ -0,0 +1,45 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.center.api.vo.req.PushMessageTye;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.payload.intent.UriIntent;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
/**
* 荣耀
*
* @author yanglin
*/
@Component
class RYPushPayloadBuilder extends AndroidPushPayloadBuilder<UriIntent> {
@Override
public UriIntent createIntent() {
return new UriIntent();
}
@Override
public void build(PushPeer peer, PushContent content,
UriIntent intent, JSONObject payload) {
// 点击事件的内容
JSONObject clickAction = new JSONObject();
clickAction.put("type", 1);
clickAction.put("intent", intent.build());
// 通知的内容
JSONObject notification = new JSONObject();
notification.put("title", content.getTitle());
notification.put("body", content.getContent());
notification.put("clickAction", clickAction);
notification.put("importance",
content.getMessageTye() == PushMessageTye.OP ? "LOW" : "NORMAL");
JSONObject honorField = new JSONObject();
honorField.put("notification", notification);
payload.put("honorField", honorField);
}
}

View File

@ -0,0 +1,41 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.center.api.vo.req.PushMessageTye;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.payload.intent.UriIntent;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
/**
* VIVO
*
* @author yanglin
*/
@Component
class VivoPushPayloadBuilder extends AndroidPushPayloadBuilder<UriIntent> {
@Override
public UriIntent createIntent() {
return new UriIntent();
}
/**
* 没找到自定义声音相关的字段
*/
@Override
public void build(PushPeer peer, PushContent content,
UriIntent intent, JSONObject payload) {
JSONObject vivoField = new JSONObject();
vivoField.put("content", content.getContent());
vivoField.put("classification",
content.getMessageTye() == PushMessageTye.OP ? 0 : 1);
vivoField.put("skipType", 4);
vivoField.put("networkType", -1);
vivoField.put("category", content.determineAndroidCategory());
vivoField.put("skipContent", intent.build());
payload.put("vivoField", vivoField);
}
}

View File

@ -0,0 +1,37 @@
package cn.axzo.im.push.payload;
import cn.axzo.im.center.api.vo.req.PushContent;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.push.PushProps;
import cn.axzo.im.push.payload.intent.UriIntent;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* 小米
*
* @author yanglin
*/
@Component
@RequiredArgsConstructor
class XMPushPayloadBuilder extends AndroidPushPayloadBuilder<UriIntent> {
private final PushProps pushProps;
@Override
public UriIntent createIntent() {
return new UriIntent();
}
@Override
public void build(PushPeer peer, PushContent content,
UriIntent intent, JSONObject payload) {
PushProps.ChannelConfig xmCfg = pushProps.getXiaomiChannelConfig();
payload.put("channel_id", peer.determineChannelId(xmCfg, content));
payload.put("notify_foreground", "1");
payload.put("notify_effect", "2");
payload.put("intent_uri", intent.build());
}
}

View File

@ -0,0 +1,26 @@
package cn.axzo.im.push.payload.intent;
/**
* @author yanglin
*/
public interface Intent<T> {
String INTENT_ROUTER = "router";
String INTENT_SOUND = "sound";
String INTENT_TYPE = "type";
String INTENT_SESSION_TYPE = "sessionType";
String INTENT_SESSION_ID = "sessionId";
String INTENT_WORKSPACE_ID = "workspaceId";
String INTENT_OU_ID = "ouId";
void setValue(String key, IntentValue value);
T build();
// !! helper
static void setRouter(Intent<?> intent, IntentValue router) {
intent.setValue(INTENT_ROUTER, router);
}
}

View File

@ -0,0 +1,52 @@
package cn.axzo.im.push.payload.intent;
import cn.axzo.basics.common.exception.ServiceException;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.function.Consumer;
/**
* @author yanglin
*/
@Slf4j
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class IntentValue {
private final Object value;
private static final IntentValue NULL = new IntentValue(null);
public static final IntentValue CONSTANT_SESSION_TYPE = new IntentValue("0");
public static final IntentValue TYPE_SERVER = new IntentValue("server");
public static final IntentValue TYPE_IM = new IntentValue("im");
public static IntentValue create(Object value) {
if (value == null) return NULL;
return new IntentValue(value);
}
/**
* 和app约定好了, intent里面都使用string类型的值
*/
public void consume(Consumer<String> consumer) {
if (value == null) return;
consumer.accept(String.valueOf(value));
}
public IntentValue urlEncode() {
if (value == null) return NULL;
try {
if (value instanceof String)
return create(URLEncoder.encode((String) value, "UTF-8"));
else
throw new ServiceException("only string can be url encoded");
} catch (UnsupportedEncodingException e) {
log.warn("url encode failed, intent value: {}", value, e);
throw new ServiceException(String.format("url encode failed, intent value: %s", value), e);
}
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.im.push.payload.intent;
import cn.axzo.im.utils.BizAssertions;
import com.alibaba.fastjson.JSONObject;
/**
* @author yanglin
*/
public class JsonIntent implements Intent<JSONObject> {
private final JSONObject values = new JSONObject();
@Override
public void setValue(String key, IntentValue value) {
BizAssertions.assertNotBlank(key, "key is required");
BizAssertions.assertNotNull(value, "value is required");
value.consume(stringValue -> values.put(key, stringValue));
}
@Override
public JSONObject build() {
return values;
}
}

View File

@ -0,0 +1,41 @@
package cn.axzo.im.push.payload.intent;
import cn.axzo.im.utils.BizAssertions;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author yanglin
*/
public class UriIntent implements Intent<String> {
private static final String INTENT_PREFIX =
"intent://cn.axzo.codelabpush/deeplink?#Intent;scheme=pushschema;launchFlags=0x4000000;";
private final LinkedHashMap<String, IntentValue> values = new LinkedHashMap<>();
@Override
public void setValue(String key, IntentValue value) {
BizAssertions.assertNotBlank(key, "key is required");
BizAssertions.assertNotNull(value, "value is required");
values.put(key, value);
}
@Override
public String build() {
StringBuilder buf = new StringBuilder(INTENT_PREFIX);
for (Map.Entry<String, IntentValue> entry : values.entrySet()) {
entry.getValue().consume(stringValue ->
buf.append("S.")
.append(entry.getKey())
.append("=")
.append(stringValue)
.append(";"));
}
buf.append("end");
return buf.toString();
}
}

View File

@ -97,13 +97,13 @@ public class SendExecutor<T> implements Supplier<ExecResult> {
queue().log(message, args);
}
public void scheduleRetrySend(MessageHistory history, HistoryRecordExt ext) {
scheduleRetrySend(Collections.singletonList(history), ext);
public void scheduleRetrySend(MessageHistory history, HistoryRecordExt updateExt) {
scheduleRetrySend(Collections.singletonList(history), updateExt);
}
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt ext) {
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt updateExt) {
if (CollectionUtils.isEmpty(histories)) return;
queue().scheduleRetrySend(histories, ext);
queue().scheduleRetrySend(histories, updateExt);
scheduleRetryCount.addAndGet(histories.size());
}
@ -121,22 +121,25 @@ public class SendExecutor<T> implements Supplier<ExecResult> {
sendManager.submitSetSendFail(history, failReason);
}
public void setBatchSendSuccess(
List<MessageHistory> histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
sendCount.addAndGet(histories.size());
sendManager.setBatchSendSuccess(histories, response, ext);
sendManager.setBatchSendSuccess(histories, response, updateExt);
}
public void setBatchSendSuccess(
List<MessageHistory> histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
sendCount.addAndGet(histories.size());
sendManager.setBatchSendSuccess(histories, response, ext);
sendManager.setBatchSendSuccess(histories, response, updateExt);
}
public void setSendFail(
List<MessageHistory> histories, String failReason, HistoryRecordExt ext) {
public void setSendFail(List<MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
sendCount.addAndGet(histories.size());
sendManager.setSendFail(histories, failReason, ext);
sendManager.setSendFail(histories, failReason, updateExt);
}
private static class Stage {

View File

@ -7,6 +7,7 @@ import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.service.impl.MessageHistoryServiceImpl;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.utils.DateFormatUtil;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.ImProperties.SendMessageConfig;
@ -15,6 +16,7 @@ import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Date;
@ -40,6 +42,8 @@ public class SendManager {
private final SendExec sendExec;
private final MessageHistoryMapper messageHistoryMapper;
private final MessageHistoryServiceImpl messageHistoryService;
private final UpdatableMessageManager updatableMessageManager;
private final TransactionTemplate transactionTemplate;
private final SendMessageConfig cfg;
private final Date maxCreateAt;
private final AsyncTasks<Void> asyncTasks;
@ -54,6 +58,8 @@ public class SendManager {
this.cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy();
this.messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
this.messageHistoryService = applicationContext.getBean(MessageHistoryServiceImpl.class);
this.updatableMessageManager = applicationContext.getBean(UpdatableMessageManager.class);
this.transactionTemplate = applicationContext.getBean(TransactionTemplate.class);
this.queue = new SendQueue(applicationContext, sendExec.getApiChannel());
this.sendExec = sendExec;
this.maxCreateAt = getMaxCreateAt();
@ -183,7 +189,10 @@ public class SendManager {
return update;
})
.collect(toList());
messageHistoryService.updateBatch(updates);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.updateBatch(updates);
updatableMessageManager.onHistorySend(successHistories);
});
queue.setSendComplete(successHistories);
}
for (Map.Entry<String, List<MessageHistory>> e : failHistories.entrySet()) {
@ -196,28 +205,43 @@ public class SendManager {
.status(MessageHistoryStatus.FAILED)
.build())
.collect(toList());
messageHistoryService.updateBatch(updates);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.updateBatch(updates);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
successHistories = new ArrayList<>();
failHistories = new HashMap<>();
}
void setBatchSendSuccess(
List<MessageHistory> histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) {
messageHistoryService.setBatchSendSuccess(histories, response, ext);
void setBatchSendSuccess(List<MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
void setBatchSendSuccess(
List<MessageHistory> histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) {
messageHistoryService.setBatchSendSuccess(histories, response, ext);
void setBatchSendSuccess(List<MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
void setSendFail(
List<MessageHistory> histories, String failReason, HistoryRecordExt ext) {
messageHistoryService.setSendFail(histories, failReason, ext);
void setSendFail(List<MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setSendFail(histories, failReason, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}

View File

@ -2,12 +2,14 @@ package cn.axzo.im.send;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.ImProperties.SendMessageConfig;
import cn.axzo.im.utils.PropsUtils;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.Getter;
@ -46,6 +48,7 @@ public class SendQueue {
private final ApiChannel apiChannel;
private final SendMessageConfig cfg;
private final MessageHistoryMapper messageHistoryMapper;
private final MessageHistoryDao messageHistoryDao;
@Getter private final BlockingQueue<String> logQueue = new ArrayBlockingQueue<>(2048);
private final LinkedList<Record> records = new LinkedList<>();
private final Date queueCreateTime = new Date();
@ -60,6 +63,7 @@ public class SendQueue {
this.apiChannel = apiChannel;
cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy();
messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
messageHistoryDao = applicationContext.getBean(MessageHistoryDao.class);
totalCount = messageHistoryMapper.selectCount(recordsQuery());
}
@ -208,7 +212,7 @@ public class SendQueue {
lastLoadEmpty = true;
}
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt ext) {
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt updateExt) {
if (CollectionUtils.isEmpty(histories)) return;
List<Long> ids = histories.stream()
.map(MessageHistory::getId)
@ -221,11 +225,15 @@ public class SendQueue {
Date newTimestamp = DateTime.now()
.plusSeconds(delaySeconds)
.toDate();
MessageHistory update = new MessageHistory();
update.setRecordExt(ext);
update.setTimestampForSend(newTimestamp);
messageHistoryMapper.update(update, query(MessageHistory.class)
.in(MessageHistory::getId, ids));
ArrayList<MessageHistory> updates = new ArrayList<>();
for (MessageHistory history : histories) {
MessageHistory update = new MessageHistory();
updates.add(update);
update.setId(history.getId());
update.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
update.setTimestampForSend(newTimestamp);
}
messageHistoryDao.updateBatchById(updates);
setSendComplete(histories);
}

View File

@ -23,7 +23,7 @@ public class SendCustomMessageJob extends SendMessageExecInstance {
}
@XxlJob("sendCustomMessageJob")
ReturnT<String> execute(String param) {
public ReturnT<String> execute(String param) {
try {
scanAndSend();
return ReturnT.SUCCESS;

View File

@ -23,7 +23,7 @@ public class SendMessageJob extends SendMessageExecInstance {
}
@XxlJob("sendMessageJob")
ReturnT<String> execute(String param) {
public ReturnT<String> execute(String param) {
try {
scanAndSend();
return ReturnT.SUCCESS;

View File

@ -377,6 +377,15 @@ public class AccountService {
return Lists.newArrayList();
}
public String registerAccountIfAbsent(String personId, Long ouId, AppTypeEnum appType) {
AccountAbsentQuery accountQuery = new AccountAbsentQuery();
accountQuery.setPersonId(personId);
accountQuery.setOuId(ouId);
accountQuery.setAppType(appType.getCode());
List<UserAccountResp> accounts = registerAccountIfAbsent(accountQuery);
return accounts.get(0).getImAccount();
}
/**
* 查询用户注册IM账户信息如果没有则进行注册
*

View File

@ -13,6 +13,8 @@ import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.push.NimPushService;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
@ -44,6 +46,7 @@ public class CustomMessageService {
private final AccountService accountService;
private final AccountRegisterDao accountRegisterDao;
private final MessageHistoryDao messageHistoryDao;
private final NimPushService nimPushService;
@Transactional(rollbackFor = Exception.class)
public List<MessageCustomResp> saveAsHistoryRecords(CustomMessageInfo request) {
@ -126,10 +129,17 @@ public class CustomMessageService {
history.setToAccount(account.getImAccount());
history.setStatus(MessageHistoryStatus.PENDING);
}
if (request.isPush()) {
history.setMessageBody(request.getPayload());
history.getOrCreateRecordExt().setPayload(request.getPayload());
history.getOrCreateRecordExt().setSound(request.getSound());
if (request.getPushContent() != null) {
PushPeer peer = new PushPeer();
peer.setOuId(request.getOuId());
peer.setWorkspaceId(null);
peer.setSenderImAccount(customSendAccount.getImAccount());
peer.setAppType(appType);
peer.setApiChannel(ApiChannel.CUSTOM_MESSAGE);
String payload = nimPushService.buildPayload(request.getPushContent(), peer);
history.setMessageBody(payload);
history.getOrCreateRecordExt().setPayload(payload);
history.getOrCreateRecordExt().setSound(payload);
} else {
BizAssertions.assertNotNull(request.getBizType(), "业务类型不能为空");
history.setMessageBody(JSON.toJSONString(messageBody));

View File

@ -17,6 +17,7 @@ import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
import cn.axzo.im.event.payload.MessageHistoryUpdatedPayload;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.utils.PropsUtils;
import cn.axzo.maokai.api.client.OrganizationalUnitApi;
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
@ -218,13 +219,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
}
@Transactional(rollbackFor = Exception.class)
public void setBatchSendSuccess(List<? extends MessageHistory> histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<? extends MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
ArrayList<MessageHistory> updates = new ArrayList<>(histories.size());
for (MessageHistory history : histories) {
MessageHistory update = new MessageHistory();
updates.add(update);
update.setId(history.getId());
update.setRecordExt(ext);
update.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
MessageHistoryStatus status = response.getUnregister().contains(history.getToAccount())
? MessageHistoryStatus.FAILED
: MessageHistoryStatus.SUCCEED;
@ -237,7 +240,9 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
}
@Transactional(rollbackFor = Exception.class)
public void setBatchSendSuccess(List<? extends MessageHistory> histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<? extends MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
// 发送成功的IMAccountId -> msgId
Map<String, Long> msgids = response.getMsgids();
// unregister的账号
@ -252,7 +257,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
.map(history -> {
MessageHistory messageHistory = MessageHistory.builder()
.id(history.getId())
.recordExt(ext)
.recordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt))
.build();
if (finalUnregister.contains(history.getToAccount())) {
messageHistory.setStatus(MessageHistoryStatus.FAILED);
@ -284,13 +289,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
}
@Transactional(rollbackFor = Exception.class)
public void setSendFail(List<? extends MessageHistory> histories, String failReason, HistoryRecordExt ext) {
public void setSendFail(List<? extends MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
List<MessageHistory> updates = histories.stream()
.map(e -> MessageHistory.builder()
.id(e.getId())
.result(failReason)
.status(MessageHistoryStatus.FAILED)
.recordExt(ext)
.recordExt(PropsUtils.updateProperties(e.getRecordExt(), updateExt))
.build())
.collect(toList());
this.updateBatch(updates);

View File

@ -1,22 +1,26 @@
package cn.axzo.im.service.impl;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.ExcludePushPayload;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.NimMsgTypeEnum;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.channel.netease.dto.Peer;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.push.NimPushService;
import cn.axzo.im.push.PushPeer;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.AccountRegisterService.AccountRegisterDTO;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.utils.UUIDUtil;
import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi;
import cn.axzo.maokai.api.vo.request.OrganizationalTeamOuRelationReq;
@ -32,6 +36,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -64,6 +69,14 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
private MessageHistoryService messageHistoryService;
@Autowired
private OrganizationalTeamOuRelationApi organizationalTeamOuRelationApi;
@Autowired
private UpdatableMessageManager updatableMessageManager;
@Autowired
private NimPushService nimPushService;
@Autowired
private AccountService accountService;
private final RateLimiter createRateLimiter = RateLimiter.create(2);
private static final Integer DEFAULT_PAGE_SIZE = 500;
@ -88,6 +101,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
@Override
@Transactional
public void createMessageHistory(MessageTask messageTask) {
this.update(UpdateMessageTaskParam.builder()
@ -142,7 +156,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
List<MessageTask.ReceivePerson> receivePersons = page.getRecords().stream()
.map(e -> MessageTask.ReceivePerson.builder().imAccount(e.getImAccount()).build())
.collect(Collectors.toList());
saveMessageHistory(batchNo, receivePersons, messageTask);
saveMessageHistory(batchNo, receivePersons, messageTask, false);
}
if (!page.hasNext()) {
@ -152,14 +166,15 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
private void doSendNotAll(MessageTask messageTask) {
int totalPersonSize = messageTask.getReceivePersons().size();
String batchNo = UUIDUtil.uuidString();
// 防止sql过长
List<List<MessageTask.ReceivePerson>> receivePersons = Lists.partition(messageTask.getReceivePersons(), DEFAULT_PAGE_SIZE);
receivePersons.forEach(e -> saveMessageHistory(batchNo, e, messageTask));
receivePersons.forEach(e -> saveMessageHistory(batchNo, e, messageTask, totalPersonSize <= 2));
}
private void saveMessageHistory(String batchNo, List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask) {
MessageTask messageTask, boolean tryCreateAccount) {
// 排除已经发送成功的记录防止重复发送
Set<String> existPersons = listExistPerson(receivePersons, messageTask);
Set<String> existImAccounts = listExistImAccount(receivePersons, messageTask);
@ -187,9 +202,13 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
Map<String, AccountRegisterDTO> imAccounts = listImAccount(absentReceivePersons);
List<MessageHistory> messageHistories = absentReceivePersons.stream()
.map(receivePerson -> resolveMessageHistory(batchNo, messageTask, receivePerson, imAccounts, accountRegisters, ouIdMap))
.map(receivePerson -> resolveMessageHistory(batchNo, messageTask, receivePerson, imAccounts, accountRegisters, ouIdMap, tryCreateAccount))
.collect(Collectors.toList());
messageHistoryService.createBatch(messageHistories);
List<Long> historyIds = messageHistories.stream()
.map(MessageHistory::getId)
.collect(Collectors.toList());
updatableMessageManager.onHistoryCreated(historyIds);
}
private MessageHistory resolveMessageHistory(String batchNo,
@ -197,7 +216,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
MessageTask.ReceivePerson receivePerson,
Map<String, AccountRegisterDTO> imAccounts,
Map<String, String> accountRegisters,
Map<Long, Long> ouIdMap) {
Map<Long, Long> ouIdMap,
boolean tryCreateAccount) {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setBizId(Optional.ofNullable(messageTask.getBizId()).orElseGet(() -> messageTask.getId().toString()));
messageHistory.setImMessageTaskId(messageTask.getId());
@ -230,16 +250,43 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
messageHistory.setReceivePersonId(receivePerson.getPersonId());
messageHistory.setAppType(receivePerson.getAppType().getCode());
messageHistory.setToAccount(imAccount);
String externalMessage = "";
boolean ouValid = messageHistory.getReceiveOuId() != null && messageHistory.getReceiveOuId() > 0;
if (StringUtils.isBlank(imAccount)
&& receivePerson.getAppType() == AppTypeEnum.CMP
&& tryCreateAccount
&& ouValid) {
if (createRateLimiter.tryAcquire()) {
try {
imAccount = accountService.registerAccountIfAbsent(
messageHistory.getReceivePersonId(),
messageHistory.getReceiveOuId(),
receivePerson.getAppType());
messageHistory.setToAccount(imAccount);
} catch (Exception e) {
log.warn("创建账号失败", e);
externalMessage = "(尝试创建账号失败: " + e.getMessage() + ")";
}
} else {
externalMessage = "(未尝试创建账号: 限流)";
}
}
if (StringUtils.isBlank(imAccount)) {
messageHistory.setToAccount("");
messageHistory.setResult(AccountService.NO_IM_ACCOUNT_MESSAGE);
messageHistory.setResult(AccountService.NO_IM_ACCOUNT_MESSAGE + externalMessage);
messageHistory.setStatus(MessageHistoryStatus.FAILED);
messageHistory.getOrCreateRecordExt().setImAccountInfo(key);
}
}
MessageTask.BizData bizData = messageTask.getBizData();
if (bizData != null) {
messageHistory.getOrCreateRecordExt().setIsSenderRobot(bizData.getIsSenderRobot());
}
List<ExcludePushPayload> excludePushPayloads = bizData.getExcludePushPayloads() == null
List<ExcludePushPayload> excludePushPayloads =
bizData == null || bizData.getExcludePushPayloads() == null
? Collections.emptyList()
: bizData.getExcludePushPayloads();
boolean excludePayload = excludePushPayloads.stream()
@ -249,9 +296,19 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
return personIdMatches && appTypeMatches;
});
messageHistory.getOrCreateRecordExt().setPayloadExcluded(excludePayload);
if (!excludePayload)
messageHistory.getOrCreateRecordExt().setPayload(bizData.getPayload());
messageHistory.setMessageBody(resolveBody(receivePerson, messageTask, messageHistory.getAppType()));
messageHistory.getOrCreateRecordExt().setWorkspaceId(receivePerson.getWorkspaceId());
if (bizData != null && bizData.getPushContent() != null && !excludePayload) {
PushPeer peer = new PushPeer();
peer.setOuId(receivePerson.getOuId());
peer.setWorkspaceId(receivePerson.getWorkspaceId());
peer.setSenderImAccount(messageHistory.getFromAccount());
peer.setAppType(AppTypeEnum.isValidAppType(messageHistory.getAppType()));
peer.setApiChannel(ApiChannel.COMMON_MESSAGE);
String payload = nimPushService.buildPayload(bizData.getPushContent(), peer);
messageHistory.getOrCreateRecordExt().setPayload(payload);
}
messageHistory.setMessageBody(resolveBody(receivePerson, messageTask,
messageHistory));
return messageHistory;
}
@ -338,15 +395,21 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
return nimAccount2Account;
}
private String resolveBody(MessageTask.ReceivePerson receivePerson, MessageTask messageTask, String appType) {
private String resolveBody(MessageTask.ReceivePerson receivePerson,
MessageTask messageTask, MessageHistory history) {
MessageTask.BizData bizData = messageTask.getBizData();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgType(bizData.determineTemplatedMsgType().getCode());
messageBody.setMsgContent(messageTask.getContent());
messageBody.setMsgHeader(messageTask.getTitle());
messageBody.setPeer(new Peer());
messageBody.getPeer().setSenderRobot(bizData.determineIsSenderRobot());
messageBody.getPeer().setSenderPersonId(bizData.getSenderPersonId());
messageBody.getPeer().setReceiverPersonId(receivePerson.personIdAsLong());
Map<String, String> defaultExtMap = Maps.newHashMap();
MessageTask.BizData bizData = messageTask.getBizData();
if (StringUtils.isNotBlank(bizData.getMsgTemplateContent())) {
messageBody.setMsgBody(bizData.getMsgTemplateContent());
defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId());
@ -362,7 +425,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
String platform;
if (e.getPlatform() == SendMessageParam.JumpPlatform.PC) {
platform = SendMessageParam.JumpPlatform.PC.getOldPlatform();
} else if (Objects.equals(e.getPlatform().getAppType().getCode(), appType)) {
} else if (Objects.equals(e.getPlatform().getAppType().getCode(), history.getAppType())) {
platform = e.getPlatform().getOldPlatform();
} else {
return null;
@ -395,6 +458,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
messageBody.setMessageExtension(defaultExtMap);
history.getOrCreateRecordExt().setInitMessageExt(defaultExtMap);
return JSONUtil.toJsonStr(messageBody);
}

View File

@ -0,0 +1,27 @@
package cn.axzo.im.updatable;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
/**
* @author yanglin
*/
public class AddUpdateHistoryResult {
private final Map<Long, Long> messageId2HistoryId = new IdentityHashMap<>();
public Optional<Long> findHistoryId(Long messageId) {
return Optional.ofNullable(messageId2HistoryId.get(messageId));
}
public void addHistoryId(Long messageId, Long historyId) {
messageId2HistoryId.put(messageId, historyId);
}
public Collection<Long> getMessageIds() {
return messageId2HistoryId.keySet();
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.im.updatable;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.List;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public class HistoryAndMessage {
private final List<MessageHistory> histories;
private final UpdatableMessage message;
public boolean hasDataVersionMatchHistories() {
return histories.stream()
.anyMatch(history -> history.getDataVersion().equals(message.getDataVersion()));
}
}

View File

@ -0,0 +1,62 @@
package cn.axzo.im.updatable;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
class InitHistories {
private final Map<HistoryTaskAccount, MessageHistory> account2histories;
InitHistories(List<MessageHistory> histories) {
this.account2histories = histories.stream()
.filter(history -> {
AppTypeEnum appType = CodeDefinition.findByCode(
AppTypeEnum.class, history.getAppType()).orElse(null);
return appType != null;
})
.collect(toMap(history -> {
PersonAccountAttribute person = new PersonAccountAttribute();
person.setPersonId(history.getReceivePersonId());
person.setOuId(history.getReceiveOuId());
AppTypeEnum appType = CodeDefinition.findByCode(
AppTypeEnum.class, history.getAppType()).orElse(null);
person.setAppType(appType);
person.setWorkspaceId(history.getOrCreateRecordExt().getWorkspaceId());
return new HistoryTaskAccount(history.getImMessageTaskId(), person);
}, identity(), (oldValue, newValue) -> oldValue));
}
public Optional<MessageHistory> findHistory(UpdatableMessage message) {
HistoryTaskAccount account = new HistoryTaskAccount(
message.getTaskId(), message.parsePersonAccount());
return Optional.ofNullable(account2histories.get(account));
}
@Setter
@Getter
@RequiredArgsConstructor
// IMPORTANT: 不要删除这个注解
@EqualsAndHashCode
private static class HistoryTaskAccount {
final Long taskId;
final PersonAccountAttribute person;
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.im.updatable;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
/**
* @author yanglin
*/
@Getter
public class MessageBodyJsonObject {
private final JSONObject messageBody;
private final JSONObject bizBody;
public MessageBodyJsonObject(String json) {
messageBody = JSON.parseObject(json);
String bizBodyJson = messageBody.getString("msgBody");
if (StringUtils.isBlank(bizBodyJson))
bizBodyJson = messageBody.getString("payload");
bizBody = JSON.parseObject(bizBodyJson);
}
}

View File

@ -0,0 +1,123 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.vo.req.RevokeMessageRequest;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.dao.repository.MessageHistoryColdDao;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageHistoryCold;
import cn.axzo.im.entity.NimMessageHistory;
import cn.axzo.im.utils.RecordCursor;
import cn.axzo.im.utils.ThrowableConsumer;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author yanglin
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RevokeService {
private final NimClient nimClient;
private final MessageHistoryDao messageHistoryDao;
private final MessageHistoryColdDao messageHistoryColdDao;
private volatile boolean isRunning = false;
public String revoke(RevokeMessageRequest request) {
if (isRunning)
return "revoke is running...";
synchronized (this) {
if (isRunning)
return "revoke is running...";
isRunning = true;
}
try {
log.info("revoke start...");
revokeImpl(request);
log.info("revoke done normally...");
} catch (Exception e) {
log.error("revoke failed", e);
return "revoke failed: " + e.getMessage();
} finally {
log.info("revoke done...");
synchronized (this) {
isRunning = false;
}
}
return "done...";
}
private void revokeImpl(RevokeMessageRequest request) throws Exception {
int threadSize = request.getParallelism() + 3;
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadSize, threadSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
RateLimiter rateLimiter = RateLimiter.create(request.getParallelism());
ThrowableConsumer<RecordCursor<? extends NimMessageHistory>> runner = cursor -> {
for (List<? extends NimMessageHistory> histories : cursor) {
for (NimMessageHistory history : histories) {
if (StringUtils.isBlank(history.getMessageId()))
continue;
executor.submit(() -> {
try {
rateLimiter.acquire();
revoke(history);
} catch (Exception e) {
log.error("revoke failed", e);
}
});
}
}
};
runner.accept(hotCursor(request));
log.info("hot cursor submitted...");
runner.accept(coldCursor(request));
log.info("cold cursor submitted...");
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
}
private void revoke(NimMessageHistory history) {
cn.axzo.im.channel.netease.dto.RevokeMessageRequest nimRequest =
new cn.axzo.im.channel.netease.dto.RevokeMessageRequest();
nimRequest.setMessageId(history.getMessageId());
nimRequest.setFrom(history.getFromAccount());
nimRequest.setTo(history.getToAccount());
NimClient.CodeResponse resp = nimClient.revoke(nimRequest);
log.info("revoke messageId={}, code={}, desc={}", history.getMessageId(), resp.getCode(), resp.getDesc());
}
private RecordCursor<? extends NimMessageHistory> hotCursor(RevokeMessageRequest request) {
return new RecordCursor<>(MessageHistory::getId, () -> messageHistoryDao.lambdaQuery()
.select(MessageHistory::getMessageId,
MessageHistory::getFromAccount,
MessageHistory::getToAccount)
.in(CollectionUtils.isNotEmpty(request.getBizIds()), MessageHistory::getBizId, request.getBizIds())
.likeRight(StringUtils.isNoneBlank(request.getPattern()), MessageHistory::getBizId, request.getPattern()));
}
private RecordCursor<? extends NimMessageHistory> coldCursor(RevokeMessageRequest request) {
return new RecordCursor<>(MessageHistoryCold::getId, () -> messageHistoryColdDao.lambdaQuery()
.select(MessageHistoryCold::getMessageId,
MessageHistoryCold::getFromAccount,
MessageHistoryCold::getToAccount)
.in(CollectionUtils.isNotEmpty(request.getBizIds()), MessageHistoryCold::getBizId, request.getBizIds())
.likeRight(StringUtils.isNoneBlank(request.getPattern()), MessageHistoryCold::getBizId, request.getPattern()));
}
}

View File

@ -0,0 +1,346 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest.Acknowledgment;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse.NonUpdateMessageReason;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.entity.MessageTask.ReceivePerson;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import cn.axzo.im.updatable.handler.InitMessageHandler;
import cn.axzo.im.updatable.handler.UpdateMessageHandler;
import cn.axzo.im.updatable.retry.MessageUpdateRetryService;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.PropsUtils;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class UpdatableMessageManager {
private final UpdatableMessageDao updatableMessageDao;
private final MessageHistoryDao messageHistoryDao;
private final InitMessageHandler initMessageHandler;
private final UpdateMessageHandler updateMessageHandler;
private final MessageUpdateRetryService messageUpdateRetryService;
private final UpdateSupport updateSupport;
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
private final TransactionTemplate transactionTemplate;
private final ImProperties props;
// !! schedule in task
public List<UpdatableMessageSendResult> createUpdatableMessage(
MessageTask task, SendTemplateMessageParam request, List<ReceivePerson> receivePersons) {
if (CollectionUtils.isEmpty(receivePersons)) return Collections.emptyList();
String batchNo = UUIDUtil.uuidString();
ArrayList<UpdatableMessageSendResult> sendResults = new ArrayList<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (ReceivePerson person : receivePersons) {
UpdatableMessage message = new UpdatableMessage();
collector.addMessage(message);
message.setBatchNo(batchNo);
message.setTemplateId(request.getMsgTemplateId());
message.setBizId(request.getBizId());
message.setTaskId(task.getId());
message.setReceiverPersonId(person.getPersonId());
message.setReceiverOuId(person.getOuId());
message.setAppType(person.getAppType());
message.setMsgType(request.getTemplatedMsgType());
message.setState(UpdatableMessageState.TASK_CREATED);
message.setBizMessageId(UUIDUtil.uuidString());
message.setDataVersion(1L);
message.setSenderPersonId(request.determineSenderPersonId() + "");
message.setSenderOuId(request.determineSenderOuId());
message.setIsSenderRobot(request.isSendByRobot() ? YesOrNo.YES : YesOrNo.NO);
message.getOrCreateRecordExt().setReceiverWorkspaceId(person.getWorkspaceId());
UpdatableMessageLog messageLog = message.toMessageLog(request);
collector.addLog(messageLog);
messageLog.setContext("scheduleInTask");
UpdatableMessageSendResult sendResult = new UpdatableMessageSendResult();
sendResults.add(sendResult);
sendResult.setBizMessageId(message.bizMessageId());
sendResult.setAccount(message.parsePersonAccount());
}
collector.finish();
return sendResults;
}
// !! init history created
public void onHistoryCreated(List<Long> historyIds) {
if (CollectionUtils.isEmpty(historyIds)) return;
List<MessageHistory> histories = messageHistoryDao.getByIds(historyIds);
List<Long> taskIds = histories.stream()
.map(MessageHistory::getImMessageTaskId)
.distinct()
.collect(toList());
List<UpdatableMessage> messages = updatableMessageDao.getByTaskIds(taskIds);
log.info("onHistoryCreated, taskIdSize={}, messageSize={}", taskIds.size(), messages.size());
InitHistories initHistories = new InitHistories(histories);
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (UpdatableMessage message : messages) {
MessageHistory history = initHistories.findHistory(message).orElse(null);
if (history == null) continue;
MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody());
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setInitHistoryId(history.getId());
messageUpdate.setBizBody(object.getBizBody());
messageUpdate.setFromAccount(history.getFromAccount());
messageUpdate.setToAccount(history.getToAccount());
messageUpdate.setState(history.getStatus() == MessageHistoryStatus.PENDING
? UpdatableMessageState.INIT_MESSAGE_QUEUED
: UpdatableMessageState.ACCOUNT_NOT_FOUND);
UpdatableMessageLog messageLog = message.toMessageLog(null);
collector.addLog(messageLog);
messageLog.setBizId(history.getBizId());
messageLog.setFromAccount(history.getFromAccount());
messageLog.setToAccount(history.getToAccount());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(history.getId());
messageLog.setBizBody(object.getBizBody());
messageLog.setContext("initHistoryCreated");
messageLog.setContextHistoryId(history.getId());
MessageHistory historyUpdate = new MessageHistory();
collector.updateHistory(historyUpdate);
historyUpdate.setId(history.getId());
MessageBody messageBody = JSON.parseObject(history.getMessageBody(), MessageBody.class);
messageBody.setMsgBody(updateSupport
.injectUpdatableFields(messageBody.parseMsgBody(), message)
.toJSONString());
historyUpdate.setMessageBody(JSON.toJSONString(messageBody));
HistoryRecordExt updateExt = new HistoryRecordExt();
updateExt.setIsUpdatableMessage(true);
updateExt.setBizMessageId(message.bizMessageId());
updateExt.setDataVersion(message.getDataVersion());
updateExt.setIsUpdateMessage(false);
updateExt.setIsUpdateRetry(false);
updateExt.setUpdateRetryCount(message.getRetryCount());
historyUpdate.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
}
collector.finish();
}
// !! update message
public MessageUpdateResponse updateMessage(UpdateMessageRequest request) {
BizAssertions.assertNotEmpty(request.getUpdates(), "更新消息不能为空");
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIds(collectBizMessageIds(request.getUpdates()));
BizAssertions.assertNotEmpty(messages, "未找到任何需要更新的消息");
MessageUpdateResponse response = new MessageUpdateResponse();
List<List<UpdateMessageRequest.Update>> batches = Lists.partition(
request.getUpdates(), props.getUpdatableMessageMaxLockRecords());
for (List<UpdateMessageRequest.Update> batch : batches)
transactionTemplate.executeWithoutResult(unused -> updateMessageImpl(request, batch, response));
return response;
}
private void updateMessageImpl(UpdateMessageRequest request,
List<UpdateMessageRequest.Update> requestUpdates,
MessageUpdateResponse response) {
List<UpdatableMessage> requestMessages = updatableMessageDao
.getByBizMessageIdsForUpdate(collectBizMessageIds(requestUpdates));
Map<String, UpdatableMessage> bizMessageId2Message = requestMessages.stream()
.collect(toMap(UpdatableMessage::bizMessageId, identity()));
List<UpdateMessageRequest.Update> validUpdates = new ArrayList<>();
List<Long> updateIds = new ArrayList<>();
for (UpdateMessageRequest.Update update : requestUpdates) {
UpdatableMessage message = bizMessageId2Message.get(update.getBizMessageId());
if (message == null) {
response.addNonUpdatedMessage(
update.getBizMessageId(),
NonUpdateMessageReason.CANT_FIND_INIT_MESSAGE);
} else if (!message.getState().isUpdateMessageAllowed()) {
response.addNonUpdatedMessage(
update.getBizMessageId(),
NonUpdateMessageReason.MESSAGE_STATE_NOT_ALLOWED,
message.getState());
} else {
validUpdates.add(update);
updateIds.add(message.getId());
response.addUpdatedBizMessageId(message.getBizMessageId());
}
}
if (!validUpdates.isEmpty()) {
// incr data version + reset retry count
updatableMessageDao.getBaseMapper().incrDataVersion(updateIds);
messageUpdateRetryService.scheduleNextRetry(updateIds);
AddUpdateHistoryResult result = updateSupport
.addUpdateHistories(request, "createUpdateHistory", validUpdates, false);
updateSupport.updateHistoryId(result, UpdatableMessage::setUpdateHistoryId);
}
}
// !! init & update message sent
@Transactional
public void onHistorySend(List<MessageHistory> maybeUpdatedHistory) {
List<Long> historyIds = maybeUpdatedHistory.stream()
.filter(MessageHistory::isUpdatableMessage)
.map(MessageHistory::getId)
.collect(toList());
if (CollectionUtils.isEmpty(historyIds)) return;
List<MessageHistory> histories = messageHistoryDao.getByIds(historyIds);
// 避免ack更新出错
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIdsForUpdate(histories.stream()
.map(h -> h.getRecordExt().getBizMessageId())
.distinct()
.sorted()
.collect(toList()));
HashMap<Long, UpdatableMessage> id2Messages = new HashMap<>();
messages.forEach(message -> id2Messages.put(message.getId(), message));
BiFunction<Boolean, Boolean, List<HistoryAndMessage>> historyAndMessageBuilder =
(isUpdateMessage, isSuccess) -> {
List<HistoryAndMessage> historyAndMessages = new ArrayList<>();
for (UpdatableMessage message : new ArrayList<>(id2Messages.values())) {
List<MessageHistory> stateHistories = histories.stream()
.filter(history -> message.getBizMessageId().equals(history.getRecordExt().getBizMessageId()))
.filter(history -> isSuccess
? history.getStatus() == MessageHistoryStatus.SUCCEED
: history.getStatus() == MessageHistoryStatus.FAILED)
.filter(history -> {
boolean isHistoryUpdateMessage = history.getRecordExt().getIsUpdateMessage();
return isUpdateMessage && isHistoryUpdateMessage
|| !isUpdateMessage && !isHistoryUpdateMessage;
})
.collect(toList());
if (stateHistories.isEmpty())
continue;
// 一次只会处理一种情况, 避免无用的遍历
id2Messages.remove(message.getId());
historyAndMessages.add(new HistoryAndMessage(stateHistories, message));
}
return historyAndMessages;
};
// send success
List<HistoryAndMessage> sendSuccess = historyAndMessageBuilder.apply(false, true);
if (!sendSuccess.isEmpty())
initMessageHandler.onSuccess(sendSuccess);
// send fail
List<HistoryAndMessage> sendFail = historyAndMessageBuilder.apply(false, false);
if (!sendFail.isEmpty())
initMessageHandler.onFail(sendFail);
// update success
List<HistoryAndMessage> updateSuccess = historyAndMessageBuilder.apply(true, true);
if (!updateSuccess.isEmpty())
updateMessageHandler.onSuccess(updateSuccess);
// update fail
List<HistoryAndMessage> updateFail = historyAndMessageBuilder.apply(true, false);
if (!updateFail.isEmpty())
updateMessageHandler.onFail(updateFail);
}
public void ack(UpdatableMessageAckRequest request) {
List<Acknowledgment> acknowledgments = request.determineValidAcknowledgments();
if (CollectionUtils.isEmpty(acknowledgments)) return;
List<List<Acknowledgment>> batches = Lists.partition(
acknowledgments, props.getUpdatableMessageMaxLockRecords());
for (List<Acknowledgment> batch : batches)
transactionTemplate.executeWithoutResult(unused -> ackImpl(request, batch));
}
private void ackImpl(UpdatableMessageAckRequest request, List<Acknowledgment> acknowledgments) {
List<String> bizMessageIds = acknowledgments.stream()
.map(Acknowledgment::getBizMessageId)
.collect(toList());
Map<String, Acknowledgment> bizMessageId2Ack = acknowledgments.stream()
.collect(toMap(Acknowledgment::getBizMessageId, identity()));
List<UpdatableMessage> messages = updatableMessageDao.getByBizMessageIdsForUpdate(bizMessageIds);
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
ArrayList<String> ackedBizMessageIds = new ArrayList<>();
for (UpdatableMessage message : messages) {
Long ackDataVersion = bizMessageId2Ack.get(message.getBizMessageId()).getDataVersion();
UpdatableMessageLog messageLog = message.toMessageLog(request);
collector.addLog(messageLog);
messageLog.setContext("ack");
messageLog.setContextHistoryId(0L);
messageLog.setRetryCount(0L);
messageLog.setBizBody(null);
messageLog.setDataVersion(message.getDataVersion());
BiConsumer<Boolean, String> ackLogger = (ackSuccess, ackDescription) -> {
HashMap<String, Object> ackInfo = new HashMap<>();
ackInfo.put("ackDataVersion", ackDataVersion);
ackInfo.put("dataVersion", message.getDataVersion());
ackInfo.put("messageState", message.getState());
ackInfo.put("ackSuccess", ackSuccess);
ackInfo.put("ackDescription", ackDescription);
messageLog.addLogContent("ackInfo", ackInfo);
};
if (ackDataVersion == null) {
ackLogger.accept(false, "ackDataVersion为空");
continue;
}
if (ackDataVersion > message.getDataVersion()) {
ackLogger.accept(false, "ackDataVersion大于当前dataVersion");
continue;
}
if (ackDataVersion <= message.getAckDataVersion()) {
ackLogger.accept(false, "ackDataVersion小于等于上次ackDataVersion");
continue;
}
ackLogger.accept(true, "ack成功");
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setAckDataVersion(ackDataVersion);
if (ackDataVersion.equals(message.getDataVersion()))
messageUpdate.setState(UpdatableMessageState.UPDATE_ACK);
ackedBizMessageIds.add(message.bizMessageId());
}
collector.finish();
// 只需要通过bizMessageId去删除就保证安全了
messageUpdateRetryService.removeRetryByBizMessageIds(ackedBizMessageIds);
}
}

View File

@ -0,0 +1,66 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetMessageDetailRequest;
import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse;
import cn.axzo.im.center.api.vo.resp.GetMessageDetailResponse;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageInfo;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImProperties;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class UpdatableMessageQueryService {
private final UpdatableMessageDao updatableMessageDao;
private final ImProperties props;
private final UpdateSupport updateSupport;
public GetMessageDetailResponse getMessageDetails(GetMessageDetailRequest request) {
BizAssertions.assertTrue(request.getBizMessageIds().size() <= 100, "消息ID数量不能超过100");
List<UpdatableMessage> messages = updatableMessageDao.lambdaQuery()
.in(UpdatableMessage::getBizMessageId, request.getBizMessageIds())
.list();
GetMessageDetailResponse response = new GetMessageDetailResponse();
for (UpdatableMessage message : messages)
response.addMessage(toMessageInfo(message));
return response;
}
public FetchUpdatableMessageResponse fetchUpdatableMessage(FetchUpdatableMessageRequest request) {
int maxSize = props.getUpdatable().getFetchMaxSize();
List<UpdatableMessage> messages = updatableMessageDao.lambdaQuery()
.eq(UpdatableMessage::getToAccount, request.getToAccount())
.gt(UpdatableMessage::getUpdateAt, request.determineMaxUpdateTime())
.lt(UpdatableMessage::getId, request.determineMinCursorId())
.orderByDesc(UpdatableMessage::getId)
.last("LIMIT " + maxSize)
.list();
FetchUpdatableMessageResponse response = new FetchUpdatableMessageResponse();
for (UpdatableMessage message : messages)
response.addMessage(toMessageInfo(message));
response.setQueryProgress();
return response;
}
private UpdatableMessageInfo toMessageInfo(UpdatableMessage message) {
UpdatableMessageInfo imMessage = new UpdatableMessageInfo();
JSONObject bizBody = message.getBizBody();
updateSupport.injectUpdatableFields(bizBody, message);
imMessage.setMsgBody(bizBody.toJSONString());
imMessage.setId(message.getId());
imMessage.setUpdateTime(message.getUpdateAt().getTime());
return imMessage;
}
}

View File

@ -0,0 +1,153 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.feign.SendPriority;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.MessageUpdateInfo;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageCustomBody;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import cn.axzo.im.updatable.domain.MessageUpdateBody;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class UpdateSupport {
private final UpdatableMessageDao updatableMessageDao;
private final IMChannelProvider imChannel;
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
public AddUpdateHistoryResult addUpdateHistories(
Object request,
String context,
List<? extends MessageUpdateInfo> updates,
boolean isRetry) {
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIds(collectBizMessageIds(updates));
Map<String, UpdatableMessage> bizMessageId2Message = messages
.stream().collect(toMap(UpdatableMessage::getBizMessageId, identity()));
String batchNo = UUIDUtil.uuidString();
Map<UpdatableMessageLog, MessageHistory> messageLog2History = new IdentityHashMap<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (MessageUpdateInfo update : updates) {
UpdatableMessage message = bizMessageId2Message.get(update.bizMessageId());
if (message == null) continue;
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setState(UpdatableMessageState.UPDATE_MESSAGE_QUEUED);
MessageHistory history = new MessageHistory();
collector.addHistory(history);
history.setBizId(message.getBizId());
history.setFromAccount(message.getFromAccount());
history.setToAccount(message.getToAccount());
history.setAppType(message.getAppType().getCode());
history.setChannel(imChannel.getProviderType());
MessageCustomBody messageBody = new MessageCustomBody();
messageBody.setToImAccount(message.getToAccount());
messageBody.setPersonId(message.getReceiverPersonId());
messageBody.setBizType(BizTypeEnum.MESSAGE_UPDATE);
MessageUpdateBody imBody = new MessageUpdateBody();
imBody.setBizMessageId(message.bizMessageId());
imBody.setDataVersion(message.getDataVersion());
imBody.setNimMessageId(message.getNimMessageId());
messageBody.setPayload(JSON.toJSONString(imBody));
history.setMessageBody(JSON.toJSONString(messageBody));
messageUpdate.setBizBody(injectUpdatableFields(update.bizBody(), message));
history.setImMessageTaskId(0L);
history.setReceivePersonId(message.getReceiverPersonId());
history.setReceiveOuId(message.getReceiverOuId());
history.setStatus(MessageHistoryStatus.PENDING);
history.setBatchNo(batchNo);
SendPriority priority = isRetry ? SendPriority.UPDATE_MESSAGE_RETRY : SendPriority.UPDATE_MESSAGE;
history.setSendPriority(priority.getPriority());
history.setApiChannel(ApiChannel.CUSTOM_MESSAGE);
HistoryRecordExt recordExt = new HistoryRecordExt();
recordExt.setIsUpdatableMessage(true);
recordExt.setBizMessageId(message.bizMessageId());
recordExt.setDataVersion(message.getDataVersion());
recordExt.setIsUpdateMessage(true);
Long retryCount = message.getRetryCount();
recordExt.setIsUpdateRetry(retryCount > 0);
recordExt.setUpdateRetryCount(retryCount);
history.setRecordExt(recordExt);
history.setTimestampForSend(new Date());
UpdatableMessageLog messageLog = message.toMessageLog(request);
collector.addLog(messageLog);
messageLog2History.put(messageLog, history);
messageLog.setDataVersion(message.getDataVersion());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setBizBody(messageUpdate.getBizBody());
messageLog.setRetryCount(message.getRetryCount());
}
collector.finishAddHistory();
for (UpdatableMessageLog messageLog : collector.getUpdatableMessageLogsToAdd()) {
MessageHistory history = messageLog2History.get(messageLog);
messageLog.setContext(context);
messageLog.setContextHistoryId(history.getId());
}
collector.finish();
AddUpdateHistoryResult result = new AddUpdateHistoryResult();
for (int i = 0; i < messages.size(); i++) {
Long messageId = messages.get(i).getId();
MessageHistory history = collector.getMessageHistoriesToAdd().get(i);
result.addHistoryId(messageId, history.getId());
}
return result;
}
public void updateHistoryId(AddUpdateHistoryResult result,
BiConsumer<UpdatableMessage, Long> historyIdSetter) {
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (Long messageId : result.getMessageIds()) {
result.findHistoryId(messageId).ifPresent(historyId -> {
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdate.setId(messageId);
historyIdSetter.accept(messageUpdate, historyId);
collector.updateMessage(messageUpdate);
});
}
collector.finish();
}
public JSONObject injectUpdatableFields(JSONObject bizBody, UpdatableMessage message) {
MessageUpdateBody body = new MessageUpdateBody();
body.setNimMessageId(message.getNimMessageId());
body.setBizMessageId(message.getBizMessageId());
body.setDataVersion(message.getDataVersion());
bizBody.putAll(JSON.parseObject(JSON.toJSONString(body)));
return bizBody;
}
}

View File

@ -0,0 +1,100 @@
package cn.axzo.im.updatable.collector;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import com.google.common.collect.Lists;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
/**
* @author yanglin
*/
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
public class CardManipulateCollector {
private static final int BATCH_SIZE = 1000;
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
private final MessageHistoryDao messageHistoryDao;
private final State<UpdatableMessage> updatableMessagesToAdd = new State<>();
private final State<UpdatableMessage> updatableMessagesToUpdate = new State<>();
private final State<UpdatableMessageLog> updatableMessageLogsToAdd = new State<>();
private final State<MessageHistory> messageHistoriesToAdd = new State<>();
private final State<MessageHistory> messageHistoriesToUpdate = new State<>();
public List<UpdatableMessageLog> getUpdatableMessageLogsToAdd() {
return updatableMessageLogsToAdd.elementsOrEmpty();
}
public List<MessageHistory> getMessageHistoriesToAdd() {
return messageHistoriesToAdd.elementsOrEmpty();
}
public void addMessage(UpdatableMessage message) {
updatableMessagesToAdd.add(message);
}
public void updateMessage(UpdatableMessage message) {
updatableMessagesToUpdate.add(message);
}
public void addLog(UpdatableMessageLog log) {
updatableMessageLogsToAdd.add(log);
}
public void addHistory(MessageHistory history) {
messageHistoriesToAdd.add(history);
}
public void updateHistory(MessageHistory history) {
messageHistoriesToUpdate.add(history);
}
public void finishAddHistory() {
messageHistoriesToAdd.batched(messageHistoryDao::saveBatch);
}
public void finish() {
updatableMessagesToAdd.batched(updatableMessageDao::saveBatch);
updatableMessagesToUpdate.batched(updatableMessageDao::updateBatchById);
updatableMessageLogsToAdd.batched(updatableMessageLogDao::saveBatch);
messageHistoriesToAdd.batched(messageHistoryDao::saveBatch);
messageHistoriesToUpdate.batched(messageHistoryDao::updateBatchById);
}
private static class State<T> {
private boolean finished;
private List<T> elements;
void add(T element) {
if (elements == null)
elements = new ArrayList<>();
elements.add(element);
}
List<T> elementsOrEmpty() {
return elements == null ? Collections.emptyList() : elements;
}
void batched(Consumer<List<T>> action) {
if (finished) return;
if (CollectionUtils.isEmpty(elements)) return;
Lists.partition(elements, BATCH_SIZE).forEach(action);
finished = true;
}
}
}

View File

@ -0,0 +1,27 @@
package cn.axzo.im.updatable.collector;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class CardManipulateCollectorFactory {
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
private final MessageHistoryDao messageHistoryDao;
public CardManipulateCollector create() {
return new CardManipulateCollector(
updatableMessageDao,
updatableMessageLogDao,
messageHistoryDao);
}
}

View File

@ -0,0 +1,21 @@
package cn.axzo.im.updatable.domain;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class MessageUpdateBody {
private String bizMessageId;
private Long dataVersion;
private String nimMessageId;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.updatable.handler;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.HistoryAndMessage;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class InitMessageHandler implements StateHandler {
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, true);
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, false);
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
UpdatableMessage message = historyAndMessage.getMessage();
MessageHistory history = historyAndMessage.getHistories().get(0);
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
if (isSuccess)
messageUpdate.setNimMessageId(history.getMessageId());
messageUpdate.setState(isSuccess
? UpdatableMessageState.INIT_MESSAGE_SEND_SUCCESS
: UpdatableMessageState.INIT_MESSAGE_SEND_FAIL);
UpdatableMessageLog messageLog = message.toMessageLog(null);
collector.addLog(messageLog);
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(history.getId());
messageLog.setContext("sendInitMessage");
messageLog.setContextHistoryId(history.getId());
}
collector.finish();
}
}

View File

@ -0,0 +1,16 @@
package cn.axzo.im.updatable.handler;
import cn.axzo.im.updatable.HistoryAndMessage;
import java.util.List;
/**
* @author yanglin
*/
public interface StateHandler {
void onSuccess(List<HistoryAndMessage> historyAndMessages);
void onFail(List<HistoryAndMessage> historyAndMessages);
}

View File

@ -0,0 +1,68 @@
package cn.axzo.im.updatable.handler;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.HistoryAndMessage;
import cn.axzo.im.updatable.MessageBodyJsonObject;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class UpdateMessageHandler implements StateHandler {
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, true);
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, false);
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
UpdatableMessageState state = historyAndMessage.getMessage().getState();
UpdatableMessage message = historyAndMessage.getMessage();
boolean queuedOrFail = state == UpdatableMessageState.UPDATE_MESSAGE_QUEUED
|| state == UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL;
if (queuedOrFail && historyAndMessage.hasDataVersionMatchHistories()) {
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(historyAndMessage.getMessage().getId());
messageUpdate.setState(isSuccess
? UpdatableMessageState.UPDATE_MESSAGE_SEND_SUCCESS
: UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL);
}
for (MessageHistory history : historyAndMessage.getHistories()) {
UpdatableMessageLog messageLog = message.toMessageLog(null);
collector.addLog(messageLog);
messageLog.setMessageState(isSuccess
? UpdatableMessageState.UPDATE_MESSAGE_SEND_SUCCESS
: UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL);
messageLog.setContextHistoryId(history.getId());
MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody());
messageLog.setBizBody(object.getBizBody());
messageLog.setDataVersion(history.getDataVersion());
messageLog.setContextHistoryId(history.getId());
messageLog.setContext(history.isUpdateRetry() ? "retrySendUpdateMessage" : "sendUpdateMessage");
messageLog.setRetryCount(history.getUpdateRetryCount());
}
}
collector.finish();
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.dao.mapper.UpdatableMessageLogMapper;
import cn.axzo.im.entity.SendJobInfo;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.utils.DateFormatUtil;
import cn.axzo.im.utils.JSONObjectUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
import java.util.Date;
import static cn.axzo.im.utils.Queries.query;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ExpungeUpdatableMessageLogJob {
private final UpdatableMessageLogMapper updatableMessageLogMapper;
@XxlJob("expungeUpdatableMessageLogJob")
public ReturnT<String> execute(String paramStr) throws Exception {
log.info("start - run job with param={}", paramStr);
try {
Param param = StringUtils.isBlank(paramStr)
? new Param() :
JSONObjectUtil.parseObject(paramStr, Param.class);
expunge(param);
log.info("end - run job with param={}", param);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.warn("job failed. param={}", paramStr, e);
return ReturnT.FAIL;
}
}
private void expunge(Param param) {
Date until = DateTime.now().minusDays(param.daysAgo).toDate();
log.info("going to delete until={}", DateFormatUtil.toReadableString(until));
int count = updatableMessageLogMapper.expunge(until);
log.info("deleted count={}", count);
}
@Data
public static class Param {
private int daysAgo = 7;
}
}

View File

@ -0,0 +1,68 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.dao.repository.MessageUpdateRetryDao;
import cn.axzo.im.entity.MessageUpdateRetry;
import cn.axzo.maokai.api.util.Ref;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageUpdateRetryJob {
private final MessageUpdateRetryDao messageUpdateRetryDao;
private final MessageUpdateRetryService messageUpdateRetryService;
@XxlJob("messageUpdateRetryJob")
public ReturnT<String> execute(String paramStr) throws Exception {
try {
log.info("start - run job with param={}", paramStr);
executeImpl();
log.info("end - run job with param={}", paramStr);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.warn("job failed. param={}", paramStr, e);
return ReturnT.FAIL;
}
}
private void executeImpl() {
Supplier<List<MessageUpdateRetry>> cursor = retryCursor();
for (List<MessageUpdateRetry> retries = cursor.get(); !retries.isEmpty(); retries = cursor.get()) {
List<String> bizMessageIds = retries.stream()
.map(MessageUpdateRetry::getBizMessageId)
.collect(toList());
messageUpdateRetryService.advanceRetry(bizMessageIds);
}
}
private Supplier<List<MessageUpdateRetry>> retryCursor() {
Ref<Long> maxId = Ref.create(0L);
return () -> {
List<MessageUpdateRetry> retries = messageUpdateRetryDao.lambdaQuery()
.le(MessageUpdateRetry::getNextRetryTime, new Date())
.gt(MessageUpdateRetry::getId, maxId.get())
.orderByAsc(MessageUpdateRetry::getId)
.last("LIMIT 1000")
.list();
if (!retries.isEmpty())
maxId.set(retries.get(retries.size() - 1).getId());
return retries;
};
}
}

View File

@ -0,0 +1,99 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.dao.repository.MessageUpdateRetryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.MessageUpdateRetry;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.updatable.AddUpdateHistoryResult;
import cn.axzo.im.updatable.UpdateSupport;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import cn.axzo.im.utils.ImProperties;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Service
@RequiredArgsConstructor
public class MessageUpdateRetryService {
private final MessageUpdateRetryDao messageUpdateRetryDao;
private final UpdatableMessageDao updatableMessageDao;
private final UpdateSupport updateSupport;
private final TransactionTemplate transactionTemplate;
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
private final ImProperties props;
public void removeRetryByBizMessageIds(Collection<String> bizMessageIds) {
if (CollectionUtils.isNotEmpty(bizMessageIds))
messageUpdateRetryDao.getBaseMapper().deleteByBizMessageIds(bizMessageIds);
}
public void advanceRetry(List<String> bizMessageIds) {
List<List<String>> batches = Lists.partition(
bizMessageIds, props.getUpdatableMessageMaxLockRecords());
for (List<String> batch : batches)
transactionTemplate.executeWithoutResult(unused -> advanceRetryImpl(batch));
}
private void advanceRetryImpl(List<String> bizMessageIds) {
List<UpdatableMessage> messages = updatableMessageDao.getByBizMessageIdsForUpdate(bizMessageIds);
removeRetryByBizMessageIds(bizMessageIds);
UpdateAckState state = new UpdateAckState(messages, props.getMessageUpdateAckMaxRetryCount());
// 先发本次生试, 再调度下一次重试
retryUpdateMessage(state.getNonAckMessageIds());
scheduleNextRetry(state.getScheduleNextRetryMessages());
}
public void scheduleNextRetry(List<Long> messageIds) {
if (CollectionUtils.isEmpty(messageIds)) return;
List<UpdatableMessage> messages = updatableMessageDao.listByIds(messageIds);
List<String> bizMessageIds = messages.stream()
.map(UpdatableMessage::getBizMessageId)
.collect(toList());
removeRetryByBizMessageIds(bizMessageIds);
ArrayList<MessageUpdateRetry> retries = new ArrayList<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
Date nextRetryTime = DateTime.now()
.plusSeconds(props.getMessageUpdateAckRetryIntervalSeconds())
.toDate();
for (UpdatableMessage message : messages) {
MessageUpdateRetry retry = new MessageUpdateRetry();
retry.setBizMessageId(message.bizMessageId());
retry.setInitHistoryId(message.getInitHistoryId());
retry.setNextRetryTime(nextRetryTime);
retry.setDataVersion(message.getDataVersion());
retries.add(retry);
UpdatableMessageLog messageLog = message.toMessageLog(null);
collector.addLog(messageLog);
messageLog.setRetryCount(message.getRetryCount() + 1);
messageLog.setContext("scheduleNextRetrySendUpdateMessage");
}
messageUpdateRetryDao.saveBatch(retries);
collector.finish();
}
public void retryUpdateMessage(List<Long> messageIds) {
if (CollectionUtils.isEmpty(messageIds)) return;
updatableMessageDao.getBaseMapper().incrRetryCount(messageIds);
List<UpdatableMessage> messages = updatableMessageDao.listByIds(messageIds);
AddUpdateHistoryResult result = updateSupport
.addUpdateHistories(null, "createRetryUpdateHistory", messages, true);
updateSupport.updateHistoryId(result, UpdatableMessage::setRetryHistoryId);
}
}

View File

@ -0,0 +1,35 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.enums.UpdatableMessageState;
import lombok.RequiredArgsConstructor;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@RequiredArgsConstructor
public class UpdateAckState {
private final List<UpdatableMessage> messages;
private final int maxRetryCount;
List<Long> getScheduleNextRetryMessages() {
return messages.stream()
.filter(message -> message.getRetryCount() < maxRetryCount)
.filter(message -> message.getState() != UpdatableMessageState.UPDATE_ACK)
.map(UpdatableMessage::getId)
.collect(toList());
}
List<Long> getNonAckMessageIds() {
return messages.stream()
.filter(message -> message.getState() != UpdatableMessageState.UPDATE_ACK)
.map(UpdatableMessage::getId)
.collect(toList());
}
}

View File

@ -49,13 +49,6 @@ public class BizAssertions {
AssertUtil.notEmpty(actual, MessageFormatter.arrayFormat(message, args).getMessage());
}
/**
* 断言集合为空
*/
public static void assertEmpty(Collection<?> actual, String message, Object... args) {
AssertUtil.isEmpty(actual, MessageFormatter.arrayFormat(message, args).getMessage());
}
/**
* 断言值为真
*/
@ -88,18 +81,26 @@ public class BizAssertions {
}
}
public static void assertNotBlank(String content, String message, Object... args) {
if (StringUtils.isBlank(content)) {
throw new ServiceException(MessageFormatter.arrayFormat(message, args).getMessage());
}
}
public static void assertBlank(String content, String message, Object... args) {
if (StringUtils.isNotBlank(content)) {
throw new ServiceException(MessageFormatter.arrayFormat(message, args).getMessage());
}
}
public static <T> T assertResponse(CommonResponse<T> response) {
return assertResponse(response, "error resp={}", JSON.toJSONString(response));
}
public static <T> T assertResponse(CommonResponse<T> response, String message, Object... args) {
if (response.getCode() != HttpStatus.HTTP_OK) {
String finalMsg = MessageFormatter.arrayFormat(message, args).getMessage();
if (StringUtils.isNotBlank(response.getMsg())) {
finalMsg += ": " + response.getMsg();
}
ServiceException e = new ServiceException(finalMsg);
log.warn("remote call response with error. response={}", JSON.toJSONString(response), e);
if (response == null || response.getCode() != HttpStatus.HTTP_OK) {
ServiceException e = new ServiceException(MessageFormatter.arrayFormat(message, args).getMessage());
log.warn("remote call response with error", e);
throw e;
}
return response.getData();
@ -111,12 +112,8 @@ public class BizAssertions {
public static <T> T assertResponse(ApiResult<T> response, String message, Object... args) {
if (!response.isSuccess()) {
String finalMsg = MessageFormatter.arrayFormat(message, args).getMessage();
if (StringUtils.isNotBlank(response.getMsg())) {
finalMsg += ": " + response.getMsg();
}
ServiceException e = new ServiceException(finalMsg);
log.warn("remote call response with error. response={}", JSON.toJSONString(response), e);
ServiceException e = new ServiceException(MessageFormatter.arrayFormat(message, args).getMessage());
log.warn("remote call response with error", e);
throw e;
}
return response.getData();
@ -128,12 +125,8 @@ public class BizAssertions {
public static <T> List<T> assertResponse(ApiListResult<T> response, String message, Object... args) {
if (!response.isSuccess()) {
String finalMsg = MessageFormatter.arrayFormat(message, args).getMessage();
if (StringUtils.isNotBlank(response.getMsg())) {
finalMsg += ": " + response.getMsg();
}
ServiceException e = new ServiceException(finalMsg);
log.warn("remote call response with error. response={}", JSON.toJSONString(response), e);
ServiceException e = new ServiceException(MessageFormatter.arrayFormat(message, args).getMessage());
log.warn("remote call response with error", e);
throw e;
}
return response.getData();

Some files were not shown because too many files have changed in this diff Show More