diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java index 324ac11..4e5c39e 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/MessageApi.java @@ -3,12 +3,19 @@ package cn.axzo.im.center.api.feign; import cn.axzo.framework.domain.web.result.ApiResult; import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam; import cn.axzo.im.center.api.vo.req.CustomMessageInfo; +import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest; +import cn.axzo.im.center.api.vo.req.GetBizMessageIdsRequest; import cn.axzo.im.center.api.vo.req.MessageInfo; import cn.axzo.im.center.api.vo.req.SendMessageParam; import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam; +import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest; +import cn.axzo.im.center.api.vo.req.UpdateMessageRequest; +import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse; +import cn.axzo.im.center.api.vo.resp.GetBizMessageIdsResponse; import cn.axzo.im.center.api.vo.resp.MessageCustomResp; import cn.axzo.im.center.api.vo.resp.MessageDispatchResp; import cn.axzo.im.center.api.vo.resp.MessageTaskResp; +import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; @@ -54,7 +61,25 @@ public interface MessageApi { * @return */ @PostMapping("/api/im/template-message/async/send") - ApiResult sendTemplateMessageAsync(@RequestBody @Validated SendTemplateMessageParam sendMessageParam); + ApiResult sendTemplateMessageAsync( + @RequestBody @Validated SendTemplateMessageParam sendMessageParam); + + /** + * 更新消息 + */ + @PostMapping("/api/im/template-message/updatable/updateMessage") + ApiResult updateMessage(@RequestBody @Validated UpdateMessageRequest request); + + @PostMapping("/api/im/template-message/updatable/ack") + ApiResult ack(@RequestBody @Validated UpdatableMessageAckRequest request); + + @PostMapping("/api/im/template-message/updatable/fetchUpdatableMessage") + ApiResult fetchUpdatableMessage( + @RequestBody @Validated FetchUpdatableMessageRequest request); + + @PostMapping("/api/im/template-message/updatable/getBizMessageIds") + ApiResult getBizMessageIds( + @RequestBody @Validated GetBizMessageIdsRequest request); /** * diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java index 2503a02..ccffcc9 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java @@ -13,6 +13,7 @@ public class SendPriority { public static final SendPriority TEMPLATE_MESSAGE = create(1000); public static final SendPriority SYSTEM_CUSTOM_MESSAGE = create(5000); + public static final SendPriority UPDATE_MESSAGE = create(5500); public static final SendPriority OP_MESSAGE = create(500000); private final Integer value; diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/PersonAccountAttribute.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/PersonAccountAttribute.java new file mode 100644 index 0000000..94aba2b --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/PersonAccountAttribute.java @@ -0,0 +1,51 @@ +package cn.axzo.im.center.api.vo; + +import cn.axzo.im.center.common.enums.AppTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +/** + * @author yanglin + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +// IMPORTANT: 不要删除这个注解, 避免@Data被@Setter, @Getter取代 +@EqualsAndHashCode +public class PersonAccountAttribute { + + /** + * 接收消息的personId + */ + @NotBlank(message = "personId不能为空") + private String personId; + + /** + * appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号, + * 所以需要根据organizationalUnitId获取账号 + */ + private Long ouId; + + /** + * 项目id + */ + private Long workspaceId; + + /** + * 发送消息到App端 + * 工人端、企业端、服务器 + * CM、CMP、SYSTEM + * + * @See cn.axzo.im.center.common.enums.AppTypeEnum + */ + @NotNull(message = "appType不能为空") + private AppTypeEnum appType; + +} diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AdvanceRetryRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AdvanceRetryRequest.java new file mode 100644 index 0000000..efc62e3 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/AdvanceRetryRequest.java @@ -0,0 +1,17 @@ +package cn.axzo.im.center.api.vo.req; + +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +/** + * @author yanglin + */ +@Setter +@Getter +public class AdvanceRetryRequest { + + private List bizMessageIds; + +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/FetchUpdatableMessageRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/FetchUpdatableMessageRequest.java new file mode 100644 index 0000000..2a9be61 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/FetchUpdatableMessageRequest.java @@ -0,0 +1,27 @@ +package cn.axzo.im.center.api.vo.req; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +import javax.validation.constraints.NotEmpty; +import java.util.Set; + +/** + * @author yanglin + */ +@Setter +@Getter +public class FetchUpdatableMessageRequest { + + /** + * 初始消息ID列表(普通消息的网易云信ID). 列表最大数量: 500 + */ + @NotEmpty(message = "初始消息ID列表不能为空") + private Set nimMessageIds; + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/GetBizMessageIdsRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/GetBizMessageIdsRequest.java new file mode 100644 index 0000000..024599a --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/GetBizMessageIdsRequest.java @@ -0,0 +1,27 @@ +package cn.axzo.im.center.api.vo.req; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +import javax.validation.constraints.NotEmpty; +import java.util.Set; + +/** + * @author yanglin + */ +@Setter +@Getter +public class GetBizMessageIdsRequest { + + /** + * 初始消息ID列表(普通消息的网易云信ID) + */ + @NotEmpty(message = "初始消息ID列表不能为空") + private Set nimMessageIds; + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/MessageUpdateInfo.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/MessageUpdateInfo.java new file mode 100644 index 0000000..e19660d --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/MessageUpdateInfo.java @@ -0,0 +1,32 @@ +package cn.axzo.im.center.api.vo.req; + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.collections4.CollectionUtils; + +import javax.validation.constraints.NotNull; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +public interface MessageUpdateInfo { + + static List collectBizMessageIds(Collection updates) { + if (CollectionUtils.isEmpty(updates)) + return Collections.emptyList(); + return updates.stream() + .map(MessageUpdateInfo::bizMessageId) + .distinct() + .collect(toList()); + } + + String bizMessageId(); + + @NotNull + JSONObject bizBody(); + +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendTemplateMessageParam.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendTemplateMessageParam.java index 2684ff8..b81ec81 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendTemplateMessageParam.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendTemplateMessageParam.java @@ -1,6 +1,8 @@ package cn.axzo.im.center.api.vo.req; -import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.im.center.api.vo.PersonAccountAttribute; +import cn.axzo.im.center.common.enums.TemplatedMsgType; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.AllArgsConstructor; import lombok.Builder; @@ -10,8 +12,9 @@ import lombok.NoArgsConstructor; import javax.validation.Valid; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; +import java.util.HashSet; import java.util.List; +import java.util.Set; @Data @Builder @@ -19,29 +22,31 @@ import java.util.List; @AllArgsConstructor public class SendTemplateMessageParam { + /** + * 发送人 + */ + private PersonAccountAttribute sender; + /** * 消息接收用户信息 */ @NotEmpty(message = "消息接收用户信息不能为空") @Valid - private List receivePersons; + private List receivePersons; /** * 消息标题 */ - @NotBlank(message = "消息标题不能为空") private String msgHeader; /** * 消息内容 */ - @NotBlank(message = "消息内容不能为空") private String msgContent; /** * 消息模板ID */ - @NotBlank(message = "消息模板ID不能为空") private String msgTemplateId; /** @@ -62,6 +67,12 @@ public class SendTemplateMessageParam { private Integer sendPriority; + private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE; + + public boolean isSendByRobot() { + return sender == null; + } + /** * 推送消息 */ @@ -69,32 +80,28 @@ public class SendTemplateMessageParam { private List excludePushPayloads; - @Data - @Builder - @AllArgsConstructor - @NoArgsConstructor - public static class ReceivePerson { + public Long determineSenderPersonId() { + if (sender != null) + return Long.parseLong(sender.getPersonId()); + return 0L; + } - /** - * 接收消息的personId - */ - @NotBlank(message = "personId不能为空") - private String personId; + public Long determineSenderOuId() { + if (sender != null) + return sender.getOuId(); + return 0L; + } - /** - * appType = AppTypeEnum.CMP时,因为网易云信无法对同一个账号做企业隔离,只能一个企业一个账号, - * 所以需要根据organizationalUnitId获取账号 - */ - private Long ouId; + public boolean isUpdatable() { + return templatedMsgType.isUpdatable(); + } - /** - * 发送消息到App端 - * 工人端、企业端、服务器 - * CM、CMP、SYSTEM - * - * @See cn.axzo.im.center.common.enums.AppTypeEnum - */ - @NotNull(message = "appType不能为空") - private AppTypeEnum appType; + public Set uniqueReceivePersons() { + return new HashSet<>(receivePersons); + } + + @Override + public String toString() { + return JSON.toJSONString(this); } } diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdatableMessageAckRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdatableMessageAckRequest.java new file mode 100644 index 0000000..42e65a9 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdatableMessageAckRequest.java @@ -0,0 +1,70 @@ +package cn.axzo.im.center.api.vo.req; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections4.CollectionUtils; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +@Setter +@Getter +public class UpdatableMessageAckRequest { + + /** + * ack的消息信息列表, 同一个nimMessageId传需要ack的最大dataVersion. 每次最多值100条 + */ + @NotEmpty(message = "消息ACK不能为空") + private List acknowledgments; + + public List determineValidAcknowledgments() { + if (CollectionUtils.isEmpty(acknowledgments)) + return Collections.emptyList(); + HashMap id2Max = new HashMap<>(); + for (Acknowledgment ack : acknowledgments) { + Long max = id2Max.getOrDefault(ack.nimMessageId, 0L); + max = Math.max(max, ack.dataVersion); + id2Max.put(ack.nimMessageId, max); + } + return id2Max.entrySet().stream() + .map(e -> { + Acknowledgment ack = new Acknowledgment(); + ack.setNimMessageId(e.getKey()); + ack.setDataVersion(e.getValue()); + return ack; + }) + .collect(toList()); + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } + + @Setter + @Getter + public static class Acknowledgment { + /** + * 初始消息ID(普通消息的网易云信ID) + */ + @NotBlank(message = "初始消息ID不能为空") + private String nimMessageId; + + /** + * 数据版本 + */ + @NotNull(message = "数据版本不能为空") + private Long dataVersion; + } + +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java new file mode 100644 index 0000000..27b672e --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java @@ -0,0 +1,58 @@ +package cn.axzo.im.center.api.vo.req; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.Valid; +import javax.validation.constraints.NotBlank; +import java.util.ArrayList; +import java.util.List; + +/** + * @author yanglin + */ +@Setter +@Getter +public class UpdateMessageRequest { + + @Valid + private List updates = new ArrayList<>(); + + public void addUpdate(Update update) { + this.updates.add(update); + } + + public List getBizMessageIds() { + return MessageUpdateInfo.collectBizMessageIds(updates); + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } + + @Setter + @Getter + public static class Update implements MessageUpdateInfo { + + @NotBlank(message = "消息ID不能为空") + private String bizMessageId; + private String msgTemplateContent; + + @Override + public String bizMessageId() { + return bizMessageId; + } + + @Override + public JSONObject bizBody() { + return StringUtils.isBlank(msgTemplateContent) + ? new JSONObject() + : JSON.parseObject(msgTemplateContent); + } + } + +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/FetchUpdatableMessageResponse.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/FetchUpdatableMessageResponse.java new file mode 100644 index 0000000..9aab45d --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/FetchUpdatableMessageResponse.java @@ -0,0 +1,52 @@ +package cn.axzo.im.center.api.vo.resp; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author yanglin + */ +@Setter +@Getter +public class FetchUpdatableMessageResponse { + + /** + * 消息列表 + */ + private List messages = new ArrayList<>(); + + public void addMessage(MessageInfo message) { + messages.add(message); + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } + + @Setter + @Getter + public static class MessageInfo { + /** + * 消息类型. 和首次发次的保持一致. template: 模板消息, card: 卡片消息 + */ + private String msgType; + /** + * 消息体 + */ + private String msgBody; + /** + * 业务消息ID + */ + private String bizMessageId; + /** + * 数据版本 + */ + private Long dataVersion; + } + +} diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/GetBizMessageIdsResponse.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/GetBizMessageIdsResponse.java new file mode 100644 index 0000000..48a36d7 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/GetBizMessageIdsResponse.java @@ -0,0 +1,33 @@ +package cn.axzo.im.center.api.vo.resp; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * @author yanglin + */ +@Setter +@Getter +public class GetBizMessageIdsResponse { + + private Map nimMessageId2BizMessageId = new HashMap<>(); + + public void addMessage(String nimMessageId, String bizMessageId) { + nimMessageId2BizMessageId.put(nimMessageId, bizMessageId); + } + + public Set bizMessageIds() { + return new HashSet<>(nimMessageId2BizMessageId.values()); + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageTaskResp.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageTaskResp.java index c73fc87..d23da86 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageTaskResp.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageTaskResp.java @@ -1,5 +1,6 @@ package cn.axzo.im.center.api.vo.resp; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.AllArgsConstructor; @@ -8,6 +9,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.util.Date; +import java.util.List; @Data @Builder @@ -53,4 +55,11 @@ public class MessageTaskResp { private Date createAt; private Date updateAt; + + private List updatableMessageSendResults; + + @Override + public String toString() { + return JSON.toJSONString(this); + } } diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageUpdateResponse.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageUpdateResponse.java new file mode 100644 index 0000000..d709801 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/MessageUpdateResponse.java @@ -0,0 +1,66 @@ +package cn.axzo.im.center.api.vo.resp; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author yanglin + */ +@Setter +@Getter +public class MessageUpdateResponse { + + /** + * 更新的业务消息ID + */ + private List updatedBizMessageIds = new ArrayList<>(); + + /** + * 未更新的业务消息 + */ + private List nonUpdatedMessages = new ArrayList<>(); + + public void addUpdatedBizMessageId(String bizMessageId) { + this.updatedBizMessageIds.add(bizMessageId); + } + + public void addNonUpdatedMessage(String bizMessageId, + NonUpdateMessageReason reason) { + addNonUpdatedMessage(bizMessageId, reason, null); + } + + public void addNonUpdatedMessage(String bizMessageId, + NonUpdateMessageReason reason, + Object description) { + NonUpdatedMessage nonUpdatedMessage = new NonUpdatedMessage(); + nonUpdatedMessage.setBizMessageId(bizMessageId); + nonUpdatedMessage.setReason(reason); + nonUpdatedMessage.setDescription(description); + this.nonUpdatedMessages.add(nonUpdatedMessage); + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } + + @Setter + @Getter + public static class NonUpdatedMessage { + private String bizMessageId; + private NonUpdateMessageReason reason; + private Object description; + } + + public enum NonUpdateMessageReason { + // 找不到首次发送的消息 + CANT_FIND_INIT_MESSAGE, + // 首次投递消息的状态不允许更新. 此时查看NonUpdatedMessage#description字段 + MESSAGE_STATE_NOT_ALLOWED + } + +} \ No newline at end of file diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/UpdatableMessageSendResult.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/UpdatableMessageSendResult.java new file mode 100644 index 0000000..5ca6951 --- /dev/null +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/resp/UpdatableMessageSendResult.java @@ -0,0 +1,15 @@ +package cn.axzo.im.center.api.vo.resp; + +import cn.axzo.im.center.api.vo.PersonAccountAttribute; +import lombok.Getter; +import lombok.Setter; + +/** + * @author yanglin + */ +@Setter +@Getter +public class UpdatableMessageSendResult { + private String bizMessageId; + private PersonAccountAttribute account; +} \ No newline at end of file diff --git a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java index 77c9c51..dd7c283 100644 --- a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java +++ b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java @@ -18,6 +18,8 @@ public enum BizTypeEnum { * 待办 */ PENDING("PENDING", "待办"), + + MESSAGE_UPDATE("MESSAGE_UPDATE", "消息更新") ; diff --git a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/ImAppType.java b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/ImAppType.java new file mode 100644 index 0000000..4d60a4e --- /dev/null +++ b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/ImAppType.java @@ -0,0 +1,35 @@ +package cn.axzo.im.center.common.enums; + +/** + * @author yanglin + */ +public enum ImAppType { + + /** + * 工人端 + */ + CM, + + /** + * 企业管理端 + */ + CMP; + + public static ImAppType fromNimAppType(AppTypeEnum appType) { + if (appType == null) + return null; + if (appType == AppTypeEnum.CM) + return CM; + if (appType == AppTypeEnum.CMP) + return CMP; + throw new UnsupportedOperationException("Should never happen!"); + } + + public AppTypeEnum toNimAppType() { + if (this == CM) + return AppTypeEnum.CM; + if (this == CMP) + return AppTypeEnum.CMP; + throw new UnsupportedOperationException("Should never happen!"); + } +} \ No newline at end of file diff --git a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/TemplatedMsgType.java b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/TemplatedMsgType.java new file mode 100644 index 0000000..685eb30 --- /dev/null +++ b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/TemplatedMsgType.java @@ -0,0 +1,22 @@ +package cn.axzo.im.center.common.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * @author yanglin + */ +@Getter +@RequiredArgsConstructor +public enum TemplatedMsgType { + + TEMPLATE("template", "模板消息", false), + CARD("card", "卡片消息", true), + + ; + + private final String code; + private final String message; + private final boolean isUpdatable; + +} \ No newline at end of file diff --git a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/YesOrNo.java b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/YesOrNo.java new file mode 100644 index 0000000..7b836b5 --- /dev/null +++ b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/YesOrNo.java @@ -0,0 +1,23 @@ +package cn.axzo.im.center.common.enums; + +import cn.axzo.basics.common.constant.enums.CodeDefinition; +import com.baomidou.mybatisplus.annotation.EnumValue; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author yanglin + */ +@Getter +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public enum YesOrNo implements CodeDefinition { + YES("YES", "是"), + NO("NO", "是") + ; + + @EnumValue + private final String code; + private final String desc; + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java index 91398d9..a0ce53e 100644 --- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java @@ -36,4 +36,19 @@ public class MessageBody { private Map messageExtension; + + /** + * 业务消息id, 用于接口拉取最新消息内容 + */ + private String bizMessageId; + + /** + * 数据版本 + */ + private Long dataVersion; + + /** + * 端信息 + */ + private Peer peer; } diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java index 89aea4a..1800523 100644 --- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java @@ -1,6 +1,7 @@ package cn.axzo.im.channel.netease.dto; import cn.axzo.im.center.common.enums.BizTypeEnum; +import cn.axzo.im.center.common.enums.TemplatedMsgType; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -37,4 +38,28 @@ public class MessageCustomBody { private String payload; + /** + * 业务消息id, 用于接口拉取最新消息内容 + */ + private String bizMessageId; + + /** + * 数据版本 + */ + private Long dataVersion; + + /** + * 最原始的网易云信消息id, 更新的哪条消息 + */ + private String initMessageId; + + /** + * 更新的消息类型 + */ + private TemplatedMsgType msgType; + + /** + * 端信息 + */ + private Peer peer; } diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/Peer.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/Peer.java new file mode 100644 index 0000000..d8fefaa --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/Peer.java @@ -0,0 +1,28 @@ +package cn.axzo.im.channel.netease.dto; + +import lombok.Getter; +import lombok.Setter; + +/** + * @author yanglin + */ +@Setter +@Getter +public class Peer { + + /** + * IM是否为机器人发送 + */ + private boolean isSenderRobot; + + /** + * IM发送者自然人id, 机器人发送时为0 + */ + private Long senderPersonId; + + /** + * IM接收者自然人id + */ + private Long receiverPersonId; + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java index 40bd4da..927bb7a 100644 --- a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java +++ b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java @@ -5,15 +5,25 @@ import cn.axzo.framework.domain.web.result.ApiResult; import cn.axzo.im.center.api.feign.MessageApi; import cn.axzo.im.center.api.feign.SendPriority; import cn.axzo.im.center.api.vo.ApiChannel; +import cn.axzo.im.center.api.vo.PersonAccountAttribute; +import cn.axzo.im.center.api.vo.req.AccountAbsentQuery; import cn.axzo.im.center.api.vo.req.AccountQuery; import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam; import cn.axzo.im.center.api.vo.req.CustomMessageInfo; +import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest; +import cn.axzo.im.center.api.vo.req.GetBizMessageIdsRequest; import cn.axzo.im.center.api.vo.req.MessageInfo; import cn.axzo.im.center.api.vo.req.SendMessageParam; import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam; +import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest; +import cn.axzo.im.center.api.vo.req.UpdateMessageRequest; +import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse; +import cn.axzo.im.center.api.vo.resp.GetBizMessageIdsResponse; import cn.axzo.im.center.api.vo.resp.MessageCustomResp; import cn.axzo.im.center.api.vo.resp.MessageDispatchResp; import cn.axzo.im.center.api.vo.resp.MessageTaskResp; +import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse; +import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult; import cn.axzo.im.center.api.vo.resp.UserAccountResp; import cn.axzo.im.center.common.enums.AppTypeEnum; import cn.axzo.im.entity.AccountRegister; @@ -26,6 +36,10 @@ import cn.axzo.im.service.CustomMessageService; import cn.axzo.im.service.MessageHistoryService; import cn.axzo.im.service.MessageTaskService; import cn.axzo.im.service.RobotMsgTemplateService; +import cn.axzo.im.updatable.UpdatableMessageManager; +import cn.axzo.im.updatable.UpdatableMessageQueryService; +import cn.axzo.im.updatable.UpdateSupport; +import cn.axzo.im.utils.BizAssertions; import cn.axzo.pokonyan.exception.Aassert; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; @@ -39,12 +53,15 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Optional; import static cn.axzo.im.config.BizResultCode.ALL_PERSSON_TYPE_NOT_EMPTY; import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_MAX; @@ -76,6 +93,12 @@ public class MessageController implements MessageApi { private MessageHistoryService messageHistoryService; @Autowired private CustomMessageService customMessageService; + @Autowired + private UpdatableMessageManager updatableMessageManager; + @Autowired + private UpdateSupport updateSupport; + @Autowired + private UpdatableMessageQueryService updatableMessageQueryService; @Override @@ -150,32 +173,87 @@ public class MessageController implements MessageApi { } @Override - public ApiResult sendTemplateMessageAsync(SendTemplateMessageParam sendMessageParam) { - String sendImAccount = check(sendMessageParam); + @Transactional + public ApiResult sendTemplateMessageAsync(SendTemplateMessageParam request) { + log.info("sendTemplateMessageAsync, request={}", request); + PersonAccountAttribute sender = request.getSender(); + BizAssertions.assertTrue(sender != null || StringUtils.isNotBlank(request.getMsgTemplateId()), + "消息模板ID和发送人必须选其一"); + String sendImAccount; + if (sender != null) { + AccountAbsentQuery accountQuery = new AccountAbsentQuery(); + accountQuery.setAppType(sender.getAppType().getCode()); + accountQuery.setPersonId(sender.getPersonId()); + accountQuery.setOuId(sender.getOuId()); + List accounts = accountService.registerAccountIfAbsent(accountQuery); + sendImAccount = accounts.get(0).getImAccount(); + } else { + //todo: 根据类型验证模版是否配置了机器人 + sendImAccount = check(request); + } MessageTask.BizData bizData = MessageTask.BizData.builder() - .msgTemplateContent(sendMessageParam.getMsgTemplateContent()) - .msgTemplateId(sendMessageParam.getMsgTemplateId()) - .payload(sendMessageParam.getPayload()) - .excludePushPayloads(sendMessageParam.getExcludePushPayloads()) + .msgTemplateContent(request.getMsgTemplateContent()) + .msgTemplateId(request.getMsgTemplateId()) + .payload(request.getPayload()) + .excludePushPayloads(request.getExcludePushPayloads()) + .templatedMsgType(request.getTemplatedMsgType()) + .isSenderRobot(request.isSendByRobot()) + .senderPersonId(request.determineSenderPersonId()) .build(); Date now = new Date(); + List receivePersons = JSONArray.parseArray( + JSONObject.toJSONString(request.uniqueReceivePersons()), MessageTask.ReceivePerson.class); MessageTask messageTask = messageTaskService.create(MessageTask.builder() - .bizId(sendMessageParam.getBizId()) + .bizId(request.getBizId()) .sendImAccount(sendImAccount) - .receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class)) + .receivePersons(receivePersons) .status(MessageTaskStatus.PENDING) - .title(sendMessageParam.getMsgHeader()) - .content(sendMessageParam.getMsgContent()) + .title(request.getMsgHeader() == null ? "" : request.getMsgHeader()) + .content(request.getMsgContent() == null ? "" : request.getMsgContent()) .bizData(bizData) - .ext(sendMessageParam.getExt()) + .ext(request.getExt()) .planStartTime(now) .createAt(now) .sendPriority(SendPriority.TEMPLATE_MESSAGE - .determinePriority(sendMessageParam.getSendPriority())) + .determinePriority(request.getSendPriority())) .apiChannel(ApiChannel.COMMON_MESSAGE) .build()); - return ApiResult.ok(toMessageTaskResp(messageTask)); + List updatableMessageSendResults = Collections.emptyList(); + if (request.isUpdatable()) { + updatableMessageSendResults = updatableMessageManager.createUpdatableMessage(messageTask, request, receivePersons); + } + MessageTaskResp messageTaskResp = toMessageTaskResp(messageTask); + messageTaskResp.setUpdatableMessageSendResults(updatableMessageSendResults); + return ApiResult.ok(messageTaskResp); + } + + @Override + public ApiResult updateMessage(UpdateMessageRequest request) { + log.info("updateMessage, request={}", request); + MessageUpdateResponse resp = updatableMessageManager.updateMessage(request); + return ApiResult.ok(resp); + } + + @Override + public ApiResult ack(UpdatableMessageAckRequest request) { + log.info("ack, request={}", request); + updatableMessageManager.ack(request); + return ApiResult.ok(); + } + + @Override + public ApiResult fetchUpdatableMessage(FetchUpdatableMessageRequest request) { + log.info("fetchUpdatableMessage, request={}", request); + FetchUpdatableMessageResponse resp = updatableMessageQueryService.fetchUpdatableMessage(request); + return ApiResult.ok(resp); + } + + @Override + public ApiResult getBizMessageIds(GetBizMessageIdsRequest request) { + log.info("getBizMessageIds, request={}", request); + GetBizMessageIdsResponse response = updatableMessageQueryService.getBizMessageIds(request); + return ApiResult.ok(response); } private void check(SendMessageParam sendMessageParam) { @@ -233,6 +311,16 @@ public class MessageController implements MessageApi { return robotImAccount; } + private Optional findRobotAccount(String robotId) { + AccountQuery accountQuery = new AccountQuery(); + accountQuery.setAccountId(robotId); + accountQuery.setAppType(AppTypeEnum.SYSTEM.getCode()); + List accounts = accountService.queryAccountInfo(accountQuery); + if (CollectionUtils.isEmpty(accounts)) + return Optional.empty(); + return Optional.of(accounts.get(0)); + } + public MessageTaskResp toMessageTaskResp(MessageTask messageTask) { MessageTaskResp messageTaskResp = MessageTaskResp.builder().build(); BeanUtils.copyProperties(messageTask, messageTaskResp); diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/mapper/AckRetryMapper.java b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/AckRetryMapper.java deleted file mode 100644 index acc6ce4..0000000 --- a/im-center-server/src/main/java/cn/axzo/im/dao/mapper/AckRetryMapper.java +++ /dev/null @@ -1,10 +0,0 @@ -package cn.axzo.im.dao.mapper; - -import cn.axzo.im.entity.AckRetry; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; - -/** - * @author yanglin - */ -public interface AckRetryMapper extends BaseMapper { -} diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/mapper/MessageUpdateRetryMapper.java b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/MessageUpdateRetryMapper.java new file mode 100644 index 0000000..562d1e6 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/MessageUpdateRetryMapper.java @@ -0,0 +1,24 @@ +package cn.axzo.im.dao.mapper; + +import cn.axzo.im.entity.MessageUpdateRetry; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Param; + +import java.util.Collection; + +/** + * @author yanglin + */ +public interface MessageUpdateRetryMapper extends BaseMapper { + + @Delete("") + void deleteByBizMessageIds( + @Param("bizMessageIds") Collection bizMessageIds); + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/mapper/UpdatableMessageLogMapper.java b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/UpdatableMessageLogMapper.java new file mode 100644 index 0000000..d700e5c --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/UpdatableMessageLogMapper.java @@ -0,0 +1,18 @@ +package cn.axzo.im.dao.mapper; + +import cn.axzo.im.entity.UpdatableMessageLog; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; + +/** + * @author yanglin + */ +public interface UpdatableMessageLogMapper extends BaseMapper { + + @Delete("DELETE FROM im_updatable_message_log WHERE create_at <= #{until}") + int expunge(@Param("until") Date until); + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/mapper/UpdatableMessageMapper.java b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/UpdatableMessageMapper.java new file mode 100644 index 0000000..bf780dd --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/dao/mapper/UpdatableMessageMapper.java @@ -0,0 +1,31 @@ +package cn.axzo.im.dao.mapper; + +import cn.axzo.im.entity.UpdatableMessage; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; + +import java.util.List; + +/** + * @author yanglin + */ +public interface UpdatableMessageMapper extends BaseMapper { + + @Update("") + void incrDataVersion(@Param("ids") List ids); + + @Update("") + void incrRetryCount(@Param("ids") List ids); + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/repository/AckRetryDao.java b/im-center-server/src/main/java/cn/axzo/im/dao/repository/AckRetryDao.java deleted file mode 100644 index aa14860..0000000 --- a/im-center-server/src/main/java/cn/axzo/im/dao/repository/AckRetryDao.java +++ /dev/null @@ -1,13 +0,0 @@ -package cn.axzo.im.dao.repository; - -import cn.axzo.im.dao.mapper.AckRetryMapper; -import cn.axzo.im.entity.AckRetry; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import org.springframework.stereotype.Repository; - -/** - * @author yanglin - */ -@Repository("ackRetryDao") -public class AckRetryDao extends ServiceImpl { -} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageHistoryDao.java b/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageHistoryDao.java index b1d0d20..f71b397 100644 --- a/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageHistoryDao.java +++ b/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageHistoryDao.java @@ -1,12 +1,14 @@ package cn.axzo.im.dao.repository; -import cn.axzo.im.dao.mapper.AccountRegisterMapper; import cn.axzo.im.dao.mapper.MessageHistoryMapper; -import cn.axzo.im.entity.AccountRegister; import cn.axzo.im.entity.MessageHistory; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Repository; +import java.util.Collections; +import java.util.List; + /** * im-center * @@ -18,4 +20,15 @@ import org.springframework.stereotype.Repository; @Repository("messageHistoryDao") public class MessageHistoryDao extends ServiceImpl { + /** + * 不用自带的listByIds + */ + public List getByIds(List historyIds) { + if (CollectionUtils.isEmpty(historyIds)) + return Collections.emptyList(); + return lambdaQuery() + .in(MessageHistory::getId, historyIds) + .list(); + } + } diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageUpdateRetryDao.java b/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageUpdateRetryDao.java new file mode 100644 index 0000000..8b13d9c --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/dao/repository/MessageUpdateRetryDao.java @@ -0,0 +1,13 @@ +package cn.axzo.im.dao.repository; + +import cn.axzo.im.dao.mapper.MessageUpdateRetryMapper; +import cn.axzo.im.entity.MessageUpdateRetry; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Repository; + +/** + * @author yanglin + */ +@Repository("messageUpdateRetryDao") +public class MessageUpdateRetryDao extends ServiceImpl { +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java b/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java new file mode 100644 index 0000000..c6de775 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java @@ -0,0 +1,70 @@ +package cn.axzo.im.dao.repository; + +import cn.axzo.im.dao.mapper.UpdatableMessageMapper; +import cn.axzo.im.entity.UpdatableMessage; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.stereotype.Repository; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +@Repository("updatableMessageDao") +public class UpdatableMessageDao extends ServiceImpl { + + public List getByNimMessageIdForUpdate(Collection nimMessageIds) { + if (CollectionUtils.isEmpty(nimMessageIds)) + return Collections.emptyList(); + List bizMessageIds = getByNimMessageIds(nimMessageIds) + .stream() + .map(UpdatableMessage::getBizMessageId) + .distinct() + .collect(toList()); + // 统一使用bizMessageIds加锁, 避免死锁 + return getByBizMessageIdsForUpdate(bizMessageIds); + } + + public List getByBizMessageIdsForUpdate(Collection bizMessageIds) { + if (CollectionUtils.isEmpty(bizMessageIds)) + return Collections.emptyList(); + // 避免死锁 + List sortedBizMessageIds = bizMessageIds.stream() + .sorted().collect(toList()); + return lambdaQuery() + .in(UpdatableMessage::getBizMessageId, sortedBizMessageIds) + // 避免ack更新出错 + .last("FOR UPDATE") + .list(); + } + + public List getByBizMessageIds(Collection bizMessageIds) { + if (CollectionUtils.isEmpty(bizMessageIds)) + return Collections.emptyList(); + return lambdaQuery() + .in(UpdatableMessage::getBizMessageId, bizMessageIds) + .list(); + } + + public List getByNimMessageIds(Collection nimMessageIds) { + if (CollectionUtils.isEmpty(nimMessageIds)) + return Collections.emptyList(); + return lambdaQuery() + .in(UpdatableMessage::getNimMessageId, nimMessageIds) + .list(); + } + + public List getByTaskIds(List taskIds) { + if (CollectionUtils.isEmpty(taskIds)) + return Collections.emptyList(); + return lambdaQuery() + .in(UpdatableMessage::getTaskId, taskIds) + .list(); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageLogDao.java b/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageLogDao.java new file mode 100644 index 0000000..8cc332f --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageLogDao.java @@ -0,0 +1,13 @@ +package cn.axzo.im.dao.repository; + +import cn.axzo.im.dao.mapper.UpdatableMessageLogMapper; +import cn.axzo.im.entity.UpdatableMessageLog; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Repository; + +/** + * @author yanglin + */ +@Repository("updatableMessageLogDao") +public class UpdatableMessageLogDao extends ServiceImpl { +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java b/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java index f19f305..bf5c6cf 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java @@ -7,6 +7,7 @@ import lombok.Data; */ @Data public class HistoryRecordExt { + private Boolean isSenderRobot; private String sendApi; private String sendRespDesc; private String batchSendId; @@ -24,4 +25,10 @@ public class HistoryRecordExt { private boolean payloadExcluded; private String sound; -} + private Boolean isUpdatableMessage; + private String bizMessageId; + private Long dataVersion; + private Boolean isUpdateMessage; + private Boolean isUpdateRetry; + private Long updateRetryCount; +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java index f3ce422..0d3dd0c 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java @@ -131,6 +131,32 @@ public class MessageHistory implements Serializable { return recordExt; } + public Long getDataVersion() { + return recordExt != null ? recordExt.getDataVersion() : null; + } + + public boolean isUpdatableMessage() { + return recordExt != null + && recordExt.getIsUpdatableMessage() != null + && recordExt.getIsUpdatableMessage(); + } + + public boolean isUpdateMessage() { + return recordExt != null + && recordExt.getIsUpdatableMessage() != null + && recordExt.getIsUpdatableMessage(); + } + + public boolean isUpdateRetry() { + return recordExt != null + && recordExt.getIsUpdateRetry() != null + && recordExt.getIsUpdateRetry(); + } + + public Long getUpdateRetryCount() { + return recordExt == null ? null : recordExt.getUpdateRetryCount(); + } + public Optional determineBatchNo() { String batchNo = this.batchNo; // 兼容在途数据 diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java index f4101e0..92adbaa 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java @@ -1,11 +1,12 @@ package cn.axzo.im.entity; +import cn.axzo.im.center.api.vo.ApiChannel; import cn.axzo.im.center.api.vo.req.ExcludePushPayload; import cn.axzo.im.center.api.vo.req.SendMessageParam; import cn.axzo.im.center.common.enums.AppTypeEnum; import cn.axzo.im.center.common.enums.BizTypeEnum; +import cn.axzo.im.center.common.enums.TemplatedMsgType; import cn.axzo.im.config.BaseListTypeHandler; -import cn.axzo.im.center.api.vo.ApiChannel; import cn.axzo.im.enums.MessageTaskStatus; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.annotation.IdType; @@ -18,11 +19,13 @@ import com.google.common.collect.Table; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.springframework.cglib.beans.BeanMap; import java.util.Date; @@ -116,6 +119,9 @@ public class MessageTask { @NoArgsConstructor @AllArgsConstructor public static class BizData { + + private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE; + private String msgTemplateId; /** @@ -154,12 +160,21 @@ public class MessageTask { private List appTypes; private List excludePushPayloads; + + private Long senderPersonId = 0L; + + private Boolean isSenderRobot; + + public TemplatedMsgType determineTemplatedMsgType() { + return templatedMsgType == null ? TemplatedMsgType.TEMPLATE : templatedMsgType; + } } @Data @Builder @AllArgsConstructor @NoArgsConstructor + @EqualsAndHashCode public static class ReceivePerson { /** @@ -189,6 +204,12 @@ public class MessageTask { private Long workspaceId; + public Long personIdAsLong() { + if (NumberUtils.isDigits(personId)) + return Long.parseLong(personId); + return 0L; + } + public String buildKey(Map ouIdMap) { if (StringUtils.isNotBlank(this.getImAccount())) { return this.getImAccount(); diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/AckRetry.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageUpdateRetry.java similarity index 74% rename from im-center-server/src/main/java/cn/axzo/im/entity/AckRetry.java rename to im-center-server/src/main/java/cn/axzo/im/entity/MessageUpdateRetry.java index 07b77dc..7b80449 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/AckRetry.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageUpdateRetry.java @@ -11,16 +11,14 @@ import java.util.Date; */ @Setter @Getter -@TableName(value = "im_ack_retry", autoResultMap = true) -public class AckRetry { +@TableName(value = "im_message_update_retry", autoResultMap = true) +public class MessageUpdateRetry { private Long id; private String bizMessageId; private Long initHistoryId; private Date nextRetryTime; private Long dataVersion; - private Long retryHistoryId; - private Integer retryCount; private Long isDelete; private Date createAt; private Date updateAt; diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java b/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java new file mode 100644 index 0000000..2a241a1 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java @@ -0,0 +1,114 @@ +package cn.axzo.im.entity; + +import cn.axzo.im.center.api.vo.PersonAccountAttribute; +import cn.axzo.im.center.api.vo.req.MessageUpdateInfo; +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.im.center.common.enums.ImAppType; +import cn.axzo.im.center.common.enums.TemplatedMsgType; +import cn.axzo.im.center.common.enums.YesOrNo; +import cn.axzo.im.enums.UpdatableMessageState; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; + +import java.util.Date; + +/** + * @author yanglin + */ +@Setter +@Getter +@TableName(value = "im_updatable_message", autoResultMap = true) +public class UpdatableMessage implements MessageUpdateInfo { + + private Long id; + private String batchNo; + private String templateId; + private String bizId; + private Long taskId; + private String fromAccount; + private String toAccount; + private String receiverPersonId; + private Long receiverOuId; + private String senderPersonId; + private Long senderOuId; + private YesOrNo isSenderRobot; + private AppTypeEnum appType; + private TemplatedMsgType msgType; + private UpdatableMessageState state; + private String bizMessageId; + private String nimMessageId; + private Long initHistoryId; + private Long updateHistoryId; + private Long retryHistoryId; + @TableField(typeHandler = FastjsonTypeHandler.class) + private JSONObject messageBody; + @TableField(typeHandler = FastjsonTypeHandler.class) + private JSONObject bizBody; + private Long dataVersion; + private Long ackDataVersion; + private Long retryCount; + @TableField(typeHandler = FastjsonTypeHandler.class) + private RecordExt recordExt; + private Long isDelete; + private Date createAt; + private Date updateAt; + + public UpdatableMessageLog toMessageLog() { + UpdatableMessageLog messageLog = new UpdatableMessageLog(); + messageLog.setBizId(bizId); + messageLog.setFromAccount(fromAccount); + messageLog.setToAccount(toAccount); + messageLog.setBizMessageId(bizMessageId); + messageLog.setMessageState(state); + messageLog.setInitHistoryId(initHistoryId); + messageLog.setMessageBody(messageBody); + messageLog.setBizBody(bizBody); + messageLog.setRetryCount(retryCount); + messageLog.setDataVersion(dataVersion); + // messageLog.setContext(null); + // messageLog.setContextHistoryId(null); + return messageLog; + } + + public PersonAccountAttribute parsePersonAccount() { + PersonAccountAttribute person = new PersonAccountAttribute(); + person.setPersonId(receiverPersonId); + person.setOuId(receiverOuId); + person.setAppType(appType); + return person; + } + + public Long senderPersonIdAsLong() { + if (NumberUtils.isDigits(senderPersonId)) + return NumberUtils.toLong(senderPersonId); + return 0L; + } + + public Long receiverPersonIdAsLong() { + if (NumberUtils.isDigits(receiverPersonId)) + return NumberUtils.toLong(receiverPersonId); + return 0L; + } + + @Override + public String bizMessageId() { + return bizMessageId; + } + + @Override + public JSONObject bizBody() { + return bizBody; + } + + @Getter + @Setter + public static class RecordExt { + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessageLog.java b/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessageLog.java new file mode 100644 index 0000000..dddfa48 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessageLog.java @@ -0,0 +1,58 @@ +package cn.axzo.im.entity; + +import cn.axzo.im.enums.UpdatableMessageState; +import cn.axzo.im.utils.IgnorePropsJsonTypeHandler; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler; +import lombok.Getter; +import lombok.Setter; + +import java.util.Date; + +/** + * @author yanglin + */ +@Setter +@Getter +@TableName(value = "im_updatable_message_log", autoResultMap = true) +public class UpdatableMessageLog { + + private Long id; + private String bizId; + private String fromAccount; + private String toAccount; + private String bizMessageId; + private UpdatableMessageState messageState; + private Long initHistoryId; + private String context; + private Long contextHistoryId; + @TableField(typeHandler = FastjsonTypeHandler.class) + private JSONObject messageBody; + @TableField(typeHandler = FastjsonTypeHandler.class) + private JSONObject bizBody; + private Long retryCount; + private Long dataVersion; + @TableField(typeHandler = IgnorePropsJsonTypeHandler.class) + private RecordExt recordExt; + private Long isDelete; + private Date createAt; + private Date updateAt; + + @Setter + @Getter + public static class RecordExt { + private AckInfo ack; + } + + @Setter + @Getter + public static class AckInfo { + private Boolean ackSuccess; + private String ackDescription; + private Long requestDataVersion; + private Long messageDataVersion; + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/enums/UpdatableMessageState.java b/im-center-server/src/main/java/cn/axzo/im/enums/UpdatableMessageState.java new file mode 100644 index 0000000..35d8f55 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/enums/UpdatableMessageState.java @@ -0,0 +1,36 @@ +package cn.axzo.im.enums; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * @author yanglin + */ +@Getter +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public enum UpdatableMessageState { + + // 已创建 + TASK_CREATED(false, false), + // 消息已经放进队列 + INIT_MESSAGE_QUEUED(false, false), + // 消息已经发送成功 + INIT_MESSAGE_SEND_SUCCESS(true, false), + // 消息已经发送失败 + INIT_MESSAGE_SEND_FAIL(false, false), + // 更新已经放进队列 + UPDATE_MESSAGE_QUEUED(true, false), + // 更新已经发送成功 + UPDATE_MESSAGE_SEND_SUCCESS(true, true), + // 更新已经发送失败 + UPDATE_MESSAGE_SEND_FAIL(true, false), + // 更新ACK + UPDATE_ACK(true, false), + // 未找到账号 + ACCOUNT_NOT_FOUND(false, false); + + private final boolean isUpdateMessageAllowed; + private final boolean isUpdateAckAllowed; + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/send/SendExecutor.java b/im-center-server/src/main/java/cn/axzo/im/send/SendExecutor.java index cf88a01..dbf7a78 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/SendExecutor.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/SendExecutor.java @@ -97,13 +97,13 @@ public class SendExecutor implements Supplier { queue().log(message, args); } - public void scheduleRetrySend(MessageHistory history, HistoryRecordExt ext) { - scheduleRetrySend(Collections.singletonList(history), ext); + public void scheduleRetrySend(MessageHistory history, HistoryRecordExt updateExt) { + scheduleRetrySend(Collections.singletonList(history), updateExt); } - public void scheduleRetrySend(List histories, HistoryRecordExt ext) { + public void scheduleRetrySend(List histories, HistoryRecordExt updateExt) { if (CollectionUtils.isEmpty(histories)) return; - queue().scheduleRetrySend(histories, ext); + queue().scheduleRetrySend(histories, updateExt); scheduleRetryCount.addAndGet(histories.size()); } @@ -121,22 +121,25 @@ public class SendExecutor implements Supplier { sendManager.submitSetSendFail(history, failReason); } - public void setBatchSendSuccess( - List histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) { + public void setBatchSendSuccess(List histories, + MessageBatchDispatchResponse response, + HistoryRecordExt updateExt) { sendCount.addAndGet(histories.size()); - sendManager.setBatchSendSuccess(histories, response, ext); + sendManager.setBatchSendSuccess(histories, response, updateExt); } - public void setBatchSendSuccess( - List histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) { + public void setBatchSendSuccess(List histories, + BatchSendCustomMessageResponse response, + HistoryRecordExt updateExt) { sendCount.addAndGet(histories.size()); - sendManager.setBatchSendSuccess(histories, response, ext); + sendManager.setBatchSendSuccess(histories, response, updateExt); } - public void setSendFail( - List histories, String failReason, HistoryRecordExt ext) { + public void setSendFail(List histories, + String failReason, + HistoryRecordExt updateExt) { sendCount.addAndGet(histories.size()); - sendManager.setSendFail(histories, failReason, ext); + sendManager.setSendFail(histories, failReason, updateExt); } private static class Stage { diff --git a/im-center-server/src/main/java/cn/axzo/im/send/SendManager.java b/im-center-server/src/main/java/cn/axzo/im/send/SendManager.java index 174b119..14846f6 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/SendManager.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/SendManager.java @@ -7,6 +7,7 @@ import cn.axzo.im.entity.HistoryRecordExt; import cn.axzo.im.entity.MessageHistory; import cn.axzo.im.enums.MessageHistoryStatus; import cn.axzo.im.service.impl.MessageHistoryServiceImpl; +import cn.axzo.im.updatable.UpdatableMessageManager; import cn.axzo.im.utils.DateFormatUtil; import cn.axzo.im.utils.ImProperties; import cn.axzo.im.utils.ImProperties.SendMessageConfig; @@ -15,6 +16,7 @@ import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; +import org.springframework.transaction.support.TransactionTemplate; import java.util.ArrayList; import java.util.Date; @@ -40,6 +42,8 @@ public class SendManager { private final SendExec sendExec; private final MessageHistoryMapper messageHistoryMapper; private final MessageHistoryServiceImpl messageHistoryService; + private final UpdatableMessageManager updatableMessageManager; + private final TransactionTemplate transactionTemplate; private final SendMessageConfig cfg; private final Date maxCreateAt; private final AsyncTasks asyncTasks; @@ -54,6 +58,8 @@ public class SendManager { this.cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy(); this.messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class); this.messageHistoryService = applicationContext.getBean(MessageHistoryServiceImpl.class); + this.updatableMessageManager = applicationContext.getBean(UpdatableMessageManager.class); + this.transactionTemplate = applicationContext.getBean(TransactionTemplate.class); this.queue = new SendQueue(applicationContext, sendExec.getApiChannel()); this.sendExec = sendExec; this.maxCreateAt = getMaxCreateAt(); @@ -183,7 +189,10 @@ public class SendManager { return update; }) .collect(toList()); - messageHistoryService.updateBatch(updates); + transactionTemplate.executeWithoutResult(unused -> { + messageHistoryService.updateBatch(updates); + updatableMessageManager.onHistorySend(successHistories); + }); queue.setSendComplete(successHistories); } for (Map.Entry> e : failHistories.entrySet()) { @@ -196,28 +205,43 @@ public class SendManager { .status(MessageHistoryStatus.FAILED) .build()) .collect(toList()); - messageHistoryService.updateBatch(updates); + transactionTemplate.executeWithoutResult(unused -> { + messageHistoryService.updateBatch(updates); + updatableMessageManager.onHistorySend(histories); + }); queue.setSendComplete(histories); } successHistories = new ArrayList<>(); failHistories = new HashMap<>(); } - void setBatchSendSuccess( - List histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) { - messageHistoryService.setBatchSendSuccess(histories, response, ext); + void setBatchSendSuccess(List histories, + MessageBatchDispatchResponse response, + HistoryRecordExt updateExt) { + transactionTemplate.executeWithoutResult(unused -> { + messageHistoryService.setBatchSendSuccess(histories, response, updateExt); + updatableMessageManager.onHistorySend(histories); + }); queue.setSendComplete(histories); } - void setBatchSendSuccess( - List histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) { - messageHistoryService.setBatchSendSuccess(histories, response, ext); + void setBatchSendSuccess(List histories, + BatchSendCustomMessageResponse response, + HistoryRecordExt updateExt) { + transactionTemplate.executeWithoutResult(unused -> { + messageHistoryService.setBatchSendSuccess(histories, response, updateExt); + updatableMessageManager.onHistorySend(histories); + }); queue.setSendComplete(histories); } - void setSendFail( - List histories, String failReason, HistoryRecordExt ext) { - messageHistoryService.setSendFail(histories, failReason, ext); + void setSendFail(List histories, + String failReason, + HistoryRecordExt updateExt) { + transactionTemplate.executeWithoutResult(unused -> { + messageHistoryService.setSendFail(histories, failReason, updateExt); + updatableMessageManager.onHistorySend(histories); + }); queue.setSendComplete(histories); } diff --git a/im-center-server/src/main/java/cn/axzo/im/send/SendQueue.java b/im-center-server/src/main/java/cn/axzo/im/send/SendQueue.java index 7dc7b7e..7e055b7 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/SendQueue.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/SendQueue.java @@ -2,12 +2,14 @@ package cn.axzo.im.send; import cn.axzo.im.center.api.vo.ApiChannel; import cn.axzo.im.dao.mapper.MessageHistoryMapper; +import cn.axzo.im.dao.repository.MessageHistoryDao; import cn.axzo.im.entity.HistoryRecordExt; import cn.axzo.im.entity.MessageHistory; import cn.axzo.im.enums.MessageHistoryStatus; import cn.axzo.im.utils.BizAssertions; import cn.axzo.im.utils.ImProperties; import cn.axzo.im.utils.ImProperties.SendMessageConfig; +import cn.axzo.im.utils.PropsUtils; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.Getter; @@ -46,6 +48,7 @@ public class SendQueue { private final ApiChannel apiChannel; private final SendMessageConfig cfg; private final MessageHistoryMapper messageHistoryMapper; + private final MessageHistoryDao messageHistoryDao; @Getter private final BlockingQueue logQueue = new ArrayBlockingQueue<>(2048); private final LinkedList records = new LinkedList<>(); private final Date queueCreateTime = new Date(); @@ -60,6 +63,7 @@ public class SendQueue { this.apiChannel = apiChannel; cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy(); messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class); + messageHistoryDao = applicationContext.getBean(MessageHistoryDao.class); totalCount = messageHistoryMapper.selectCount(recordsQuery()); } @@ -208,7 +212,7 @@ public class SendQueue { lastLoadEmpty = true; } - public void scheduleRetrySend(List histories, HistoryRecordExt ext) { + public void scheduleRetrySend(List histories, HistoryRecordExt updateExt) { if (CollectionUtils.isEmpty(histories)) return; List ids = histories.stream() .map(MessageHistory::getId) @@ -221,11 +225,15 @@ public class SendQueue { Date newTimestamp = DateTime.now() .plusSeconds(delaySeconds) .toDate(); - MessageHistory update = new MessageHistory(); - update.setRecordExt(ext); - update.setTimestampForSend(newTimestamp); - messageHistoryMapper.update(update, query(MessageHistory.class) - .in(MessageHistory::getId, ids)); + ArrayList updates = new ArrayList<>(); + for (MessageHistory history : histories) { + MessageHistory update = new MessageHistory(); + updates.add(update); + update.setId(history.getId()); + update.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt)); + update.setTimestampForSend(newTimestamp); + } + messageHistoryDao.updateBatchById(updates); setSendComplete(histories); } diff --git a/im-center-server/src/main/java/cn/axzo/im/send/job/SendCustomMessageJob.java b/im-center-server/src/main/java/cn/axzo/im/send/job/SendCustomMessageJob.java index 79d58b4..1c85ba4 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/job/SendCustomMessageJob.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/job/SendCustomMessageJob.java @@ -23,7 +23,7 @@ public class SendCustomMessageJob extends SendMessageExecInstance { } @XxlJob("sendCustomMessageJob") - ReturnT execute(String param) { + public ReturnT execute(String param) { try { scanAndSend(); return ReturnT.SUCCESS; diff --git a/im-center-server/src/main/java/cn/axzo/im/send/job/SendMessageJob.java b/im-center-server/src/main/java/cn/axzo/im/send/job/SendMessageJob.java index 2066211..435d1ab 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/job/SendMessageJob.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/job/SendMessageJob.java @@ -23,7 +23,7 @@ public class SendMessageJob extends SendMessageExecInstance { } @XxlJob("sendMessageJob") - ReturnT execute(String param) { + public ReturnT execute(String param) { try { scanAndSend(); return ReturnT.SUCCESS; diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java index 8c4c735..a03f76e 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageHistoryServiceImpl.java @@ -17,6 +17,7 @@ import cn.axzo.im.event.payload.MessageHistoryCreatedPayload; import cn.axzo.im.event.payload.MessageHistoryUpdatedPayload; import cn.axzo.im.service.AccountService; import cn.axzo.im.service.MessageHistoryService; +import cn.axzo.im.utils.PropsUtils; import cn.axzo.maokai.api.client.OrganizationalUnitApi; import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery; import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO; @@ -218,13 +219,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) { + public void setBatchSendSuccess(List histories, + BatchSendCustomMessageResponse response, + HistoryRecordExt updateExt) { ArrayList updates = new ArrayList<>(histories.size()); for (MessageHistory history : histories) { MessageHistory update = new MessageHistory(); updates.add(update); update.setId(history.getId()); - update.setRecordExt(ext); + update.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt)); MessageHistoryStatus status = response.getUnregister().contains(history.getToAccount()) ? MessageHistoryStatus.FAILED : MessageHistoryStatus.SUCCEED; @@ -237,7 +240,9 @@ public class MessageHistoryServiceImpl extends ServiceImpl histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) { + public void setBatchSendSuccess(List histories, + MessageBatchDispatchResponse response, + HistoryRecordExt updateExt) { // 发送成功的IMAccountId -> msgId Map msgids = response.getMsgids(); // unregister的账号 @@ -252,7 +257,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl { MessageHistory messageHistory = MessageHistory.builder() .id(history.getId()) - .recordExt(ext) + .recordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt)) .build(); if (finalUnregister.contains(history.getToAccount())) { messageHistory.setStatus(MessageHistoryStatus.FAILED); @@ -284,13 +289,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl histories, String failReason, HistoryRecordExt ext) { + public void setSendFail(List histories, + String failReason, + HistoryRecordExt updateExt) { List updates = histories.stream() .map(e -> MessageHistory.builder() .id(e.getId()) .result(failReason) .status(MessageHistoryStatus.FAILED) - .recordExt(ext) + .recordExt(PropsUtils.updateProperties(e.getRecordExt(), updateExt)) .build()) .collect(toList()); this.updateBatch(updates); diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java index a1aeb7e..827b5dc 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java @@ -5,8 +5,8 @@ import cn.axzo.im.center.api.vo.req.SendMessageParam; import cn.axzo.im.center.common.enums.AccountTypeEnum; import cn.axzo.im.center.common.enums.AppTypeEnum; import cn.axzo.im.channel.IMChannelProvider; -import cn.axzo.im.channel.netease.NimMsgTypeEnum; import cn.axzo.im.channel.netease.dto.MessageBody; +import cn.axzo.im.channel.netease.dto.Peer; import cn.axzo.im.dao.mapper.MessageTaskMapper; import cn.axzo.im.entity.AccountRegister; import cn.axzo.im.entity.MessageHistory; @@ -17,6 +17,7 @@ import cn.axzo.im.service.AccountRegisterService.AccountRegisterDTO; import cn.axzo.im.service.AccountService; import cn.axzo.im.service.MessageHistoryService; import cn.axzo.im.service.MessageTaskService; +import cn.axzo.im.updatable.UpdatableMessageManager; import cn.axzo.im.utils.UUIDUtil; import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi; import cn.axzo.maokai.api.vo.request.OrganizationalTeamOuRelationReq; @@ -64,6 +65,8 @@ public class MessageTaskServiceImpl extends ServiceImpl resolveMessageHistory(batchNo, messageTask, receivePerson, imAccounts, accountRegisters, ouIdMap)) .collect(Collectors.toList()); messageHistoryService.createBatch(messageHistories); + List historyIds = messageHistories.stream() + .map(MessageHistory::getId) + .collect(Collectors.toList()); + updatableMessageManager.onHistoryCreated(historyIds); } private MessageHistory resolveMessageHistory(String batchNo, @@ -238,6 +246,9 @@ public class MessageTaskServiceImpl extends ServiceImpl excludePushPayloads = bizData.getExcludePushPayloads() == null ? Collections.emptyList() @@ -339,14 +350,19 @@ public class MessageTaskServiceImpl extends ServiceImpl defaultExtMap = Maps.newHashMap(); - MessageTask.BizData bizData = messageTask.getBizData(); if (StringUtils.isNotBlank(bizData.getMsgTemplateContent())) { messageBody.setMsgBody(bizData.getMsgTemplateContent()); defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId()); diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/AddUpdateHistoryResult.java b/im-center-server/src/main/java/cn/axzo/im/updatable/AddUpdateHistoryResult.java new file mode 100644 index 0000000..5cfafed --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/AddUpdateHistoryResult.java @@ -0,0 +1,27 @@ +package cn.axzo.im.updatable; + +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Optional; + +/** + * @author yanglin + */ +public class AddUpdateHistoryResult { + + private final Map messageId2HistoryId = new IdentityHashMap<>(); + + public Optional findHistoryId(Long messageId) { + return Optional.ofNullable(messageId2HistoryId.get(messageId)); + } + + public void addHistoryId(Long messageId, Long historyId) { + messageId2HistoryId.put(messageId, historyId); + } + + public Collection getMessageIds() { + return messageId2HistoryId.keySet(); + } + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java b/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java new file mode 100644 index 0000000..47c801d --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java @@ -0,0 +1,25 @@ +package cn.axzo.im.updatable; + +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.List; + +/** + * @author yanglin + */ +@Getter +@RequiredArgsConstructor +public class HistoryAndMessage { + + private final List histories; + private final UpdatableMessage message; + + public boolean hasDataVersionMatchHistories() { + return histories.stream() + .anyMatch(history -> history.getDataVersion().equals(message.getDataVersion())); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/InitHistories.java b/im-center-server/src/main/java/cn/axzo/im/updatable/InitHistories.java new file mode 100644 index 0000000..8fbdd27 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/InitHistories.java @@ -0,0 +1,61 @@ +package cn.axzo.im.updatable; + +import cn.axzo.basics.common.constant.enums.CodeDefinition; +import cn.axzo.im.center.api.vo.PersonAccountAttribute; +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** + * @author yanglin + */ +class InitHistories { + + private final Map account2histories; + + InitHistories(List histories) { + this.account2histories = histories.stream() + .filter(history -> { + AppTypeEnum appType = CodeDefinition.findByCode( + AppTypeEnum.class, history.getAppType()).orElse(null); + return appType != null; + }) + .collect(toMap(history -> { + PersonAccountAttribute person = new PersonAccountAttribute(); + person.setPersonId(history.getReceivePersonId()); + person.setOuId(history.getReceiveOuId()); + AppTypeEnum appType = CodeDefinition.findByCode( + AppTypeEnum.class, history.getAppType()).orElse(null); + person.setAppType(appType); + return new HistoryTaskAccount(history.getImMessageTaskId(), person); + }, identity(), (oldValue, newValue) -> oldValue)); + } + + public Optional findHistory(UpdatableMessage message) { + HistoryTaskAccount account = new HistoryTaskAccount( + message.getTaskId(), message.parsePersonAccount()); + return Optional.ofNullable(account2histories.get(account)); + } + + @Setter + @Getter + @RequiredArgsConstructor + // IMPORTANT: 不要删除这个注解 + @EqualsAndHashCode + private static class HistoryTaskAccount { + final Long taskId; + final PersonAccountAttribute person; + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/MessageBodyJsonObject.java b/im-center-server/src/main/java/cn/axzo/im/updatable/MessageBodyJsonObject.java new file mode 100644 index 0000000..41389df --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/MessageBodyJsonObject.java @@ -0,0 +1,25 @@ +package cn.axzo.im.updatable; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; + +/** + * @author yanglin + */ +@Getter +public class MessageBodyJsonObject { + + private final JSONObject messageBody; + private final JSONObject bizBody; + + public MessageBodyJsonObject(String json) { + messageBody = JSON.parseObject(json); + String bizBodyJson = messageBody.getString("msgBody"); + if (StringUtils.isBlank(bizBodyJson)) + bizBodyJson = messageBody.getString("payload"); + bizBody = JSON.parseObject(bizBodyJson); + } + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java new file mode 100644 index 0000000..0f413f3 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java @@ -0,0 +1,341 @@ +package cn.axzo.im.updatable; + +import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam; +import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest; +import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest.Acknowledgment; +import cn.axzo.im.center.api.vo.req.UpdateMessageRequest; +import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse; +import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse.NonUpdateMessageReason; +import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult; +import cn.axzo.im.center.common.enums.YesOrNo; +import cn.axzo.im.channel.netease.dto.MessageBody; +import cn.axzo.im.dao.repository.MessageHistoryDao; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.entity.HistoryRecordExt; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.MessageTask; +import cn.axzo.im.entity.MessageTask.ReceivePerson; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import cn.axzo.im.enums.MessageHistoryStatus; +import cn.axzo.im.enums.UpdatableMessageState; +import cn.axzo.im.updatable.collector.CardManipulateCollector; +import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory; +import cn.axzo.im.updatable.handler.InitMessageHandler; +import cn.axzo.im.updatable.handler.UpdateMessageHandler; +import cn.axzo.im.updatable.retry.MessageUpdateRetryService; +import cn.axzo.im.utils.BizAssertions; +import cn.axzo.im.utils.ImProperties; +import cn.axzo.im.utils.PropsUtils; +import cn.axzo.im.utils.UUIDUtil; +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +/** + * @author yanglin + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class UpdatableMessageManager { + + private final UpdatableMessageDao updatableMessageDao; + private final MessageHistoryDao messageHistoryDao; + private final InitMessageHandler initMessageHandler; + private final UpdateMessageHandler updateMessageHandler; + private final MessageUpdateRetryService messageUpdateRetryService; + private final UpdateSupport updateSupport; + private final CardManipulateCollectorFactory cardManipulateCollectorFactory; + private final TransactionTemplate transactionTemplate; + private final ImProperties props; + + // !! schedule in task + + public List createUpdatableMessage( + MessageTask task, SendTemplateMessageParam request, List receivePersons) { + if (CollectionUtils.isEmpty(receivePersons)) return Collections.emptyList(); + String batchNo = UUIDUtil.uuidString(); + ArrayList sendResults = new ArrayList<>(); + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (ReceivePerson person : receivePersons) { + UpdatableMessage message = new UpdatableMessage(); + collector.addMessage(message); + message.setBatchNo(batchNo); + message.setTemplateId(request.getMsgTemplateId()); + message.setBizId(request.getBizId()); + message.setTaskId(task.getId()); + message.setReceiverPersonId(person.getPersonId()); + message.setReceiverOuId(person.getOuId()); + message.setAppType(person.getAppType()); + message.setMsgType(request.getTemplatedMsgType()); + message.setState(UpdatableMessageState.TASK_CREATED); + message.setBizMessageId(UUIDUtil.uuidString()); + message.setDataVersion(1L); + message.setSenderPersonId(request.determineSenderPersonId() + "0"); + message.setSenderOuId(request.determineSenderOuId()); + message.setIsSenderRobot(request.isSendByRobot() ? YesOrNo.YES : YesOrNo.NO); + + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog.setContext("scheduleInTask"); + + UpdatableMessageSendResult sendResult = new UpdatableMessageSendResult(); + sendResults.add(sendResult); + sendResult.setBizMessageId(message.bizMessageId()); + sendResult.setAccount(message.parsePersonAccount()); + } + collector.finish(); + return sendResults; + } + + // !! init history created + + public void onHistoryCreated(List historyIds) { + if (CollectionUtils.isEmpty(historyIds)) return; + List histories = messageHistoryDao.getByIds(historyIds); + List taskIds = histories.stream() + .map(MessageHistory::getImMessageTaskId) + .distinct() + .collect(toList()); + List messages = updatableMessageDao.getByTaskIds(taskIds); + log.info("onHistoryCreated, taskIdSize={}, messageSize={}", taskIds.size(), messages.size()); + InitHistories initHistories = new InitHistories(histories); + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (UpdatableMessage message : messages) { + MessageHistory history = initHistories.findHistory(message).orElse(null); + if (history == null) continue; + MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody()); + + UpdatableMessage messageUpdate = new UpdatableMessage(); + collector.updateMessage(messageUpdate); + messageUpdate.setId(message.getId()); + messageUpdate.setInitHistoryId(history.getId()); + messageUpdate.setMessageBody(object.getMessageBody()); + messageUpdate.setBizBody(object.getBizBody()); + messageUpdate.setFromAccount(history.getFromAccount()); + messageUpdate.setToAccount(history.getToAccount()); + messageUpdate.setState(history.getStatus() == MessageHistoryStatus.PENDING + ? UpdatableMessageState.INIT_MESSAGE_QUEUED + : UpdatableMessageState.ACCOUNT_NOT_FOUND); + + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog.setBizId(history.getBizId()); + messageLog.setFromAccount(history.getFromAccount()); + messageLog.setToAccount(history.getToAccount()); + messageLog.setMessageState(messageUpdate.getState()); + messageLog.setInitHistoryId(history.getId()); + messageLog.setMessageBody(object.getMessageBody()); + messageLog.setBizBody(object.getBizBody()); + messageLog.setContext("initHistoryCreated"); + messageLog.setContextHistoryId(history.getId()); + + MessageHistory historyUpdate = new MessageHistory(); + collector.updateHistory(historyUpdate); + + historyUpdate.setId(history.getId()); + MessageBody messageBody = JSON.parseObject(history.getMessageBody(), MessageBody.class); + messageBody.setBizMessageId(message.bizMessageId()); + messageBody.setDataVersion(message.getDataVersion()); + historyUpdate.setMessageBody(JSON.toJSONString(messageBody)); + + HistoryRecordExt updateExt = new HistoryRecordExt(); + updateExt.setIsUpdatableMessage(true); + updateExt.setBizMessageId(message.bizMessageId()); + updateExt.setDataVersion(message.getDataVersion()); + updateExt.setIsUpdateMessage(false); + updateExt.setIsUpdateRetry(false); + updateExt.setUpdateRetryCount(message.getRetryCount()); + historyUpdate.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt)); + } + collector.finish(); + } + + // !! update message + + public MessageUpdateResponse updateMessage(UpdateMessageRequest request) { + BizAssertions.assertNotEmpty(request.getUpdates(), "更新消息不能为空"); + List messages = updatableMessageDao + .getByBizMessageIds(collectBizMessageIds(request.getUpdates())); + BizAssertions.assertNotEmpty(messages, "未找到任何需要更新的消息"); + MessageUpdateResponse response = new MessageUpdateResponse(); + List> batches = Lists.partition( + request.getUpdates(), props.getUpdatableMessageMaxLockRecords()); + for (List batch : batches) + transactionTemplate.executeWithoutResult(unused -> updateMessageImpl(batch, response)); + return response; + } + + private void updateMessageImpl(List requestUpdates, + MessageUpdateResponse response) { + List requestMessages = updatableMessageDao + .getByBizMessageIdsForUpdate(collectBizMessageIds(requestUpdates)); + Map bizMessageId2Message = requestMessages.stream() + .collect(toMap(UpdatableMessage::bizMessageId, identity())); + List validUpdates = new ArrayList<>(); + List updateIds = new ArrayList<>(); + for (UpdateMessageRequest.Update update : requestUpdates) { + UpdatableMessage message = bizMessageId2Message.get(update.getBizMessageId()); + if (message == null) { + response.addNonUpdatedMessage( + update.getBizMessageId(), + NonUpdateMessageReason.CANT_FIND_INIT_MESSAGE); + } else if (!message.getState().isUpdateMessageAllowed()) { + response.addNonUpdatedMessage( + update.getBizMessageId(), + NonUpdateMessageReason.MESSAGE_STATE_NOT_ALLOWED, + message.getState()); + } else { + validUpdates.add(update); + updateIds.add(message.getId()); + response.addUpdatedBizMessageId(message.getBizMessageId()); + } + } + if (!validUpdates.isEmpty()) { + // incr data version + reset retry count + updatableMessageDao.getBaseMapper().incrDataVersion(updateIds); + messageUpdateRetryService.scheduleNextRetry(updateIds); + AddUpdateHistoryResult result = updateSupport + .addUpdateHistories("updateHistoryCreated", validUpdates); + updateSupport.updateHistoryId(result, UpdatableMessage::setUpdateHistoryId); + } + } + + // !! init & update message sent + + @Transactional + public void onHistorySend(List maybeUpdatedHistory) { + List historyIds = maybeUpdatedHistory.stream() + .filter(MessageHistory::isUpdatableMessage) + .map(MessageHistory::getId) + .collect(toList()); + if (CollectionUtils.isEmpty(historyIds)) return; + List histories = messageHistoryDao.getByIds(historyIds); + // 避免ack更新出错 + List messages = updatableMessageDao + .getByBizMessageIdsForUpdate(histories.stream() + .map(h -> h.getRecordExt().getBizMessageId()) + .distinct() + .sorted() + .collect(toList())); + HashMap id2Messages = new HashMap<>(); + messages.forEach(message -> id2Messages.put(message.getId(), message)); + BiFunction> historyAndMessageBuilder = + (isUpdateMessage, isSuccess) -> { + List historyAndMessages = new ArrayList<>(); + for (UpdatableMessage message : new ArrayList<>(id2Messages.values())) { + List stateHistories = histories.stream() + .filter(history -> isSuccess + ? history.getStatus() == MessageHistoryStatus.SUCCEED + : history.getStatus() == MessageHistoryStatus.FAILED) + .filter(history -> { + boolean isHistoryUpdateMessage = history.getRecordExt().getIsUpdateMessage(); + return isUpdateMessage && isHistoryUpdateMessage + || !isUpdateMessage && !isHistoryUpdateMessage; + }) + .collect(toList()); + if (stateHistories.isEmpty()) + continue; + // 一次只会处理一种情况, 避免无用的遍历 + id2Messages.remove(message.getId()); + historyAndMessages.add(new HistoryAndMessage(stateHistories, message)); + } + return historyAndMessages; + }; + // send success + List sendSuccess = historyAndMessageBuilder.apply(false, true); + if (!sendSuccess.isEmpty()) + initMessageHandler.onSuccess(sendSuccess); + // send fail + List sendFail = historyAndMessageBuilder.apply(false, false); + if (!sendFail.isEmpty()) + initMessageHandler.onFail(sendFail); + // update success + List updateSuccess = historyAndMessageBuilder.apply(true, true); + if (!updateSuccess.isEmpty()) + updateMessageHandler.onSuccess(updateSuccess); + // update fail + List updateFail = historyAndMessageBuilder.apply(true, false); + if (!updateFail.isEmpty()) + updateMessageHandler.onFail(updateFail); + } + + @Transactional + public void ack(UpdatableMessageAckRequest request) { + List acknowledgments = request.determineValidAcknowledgments(); + if (CollectionUtils.isEmpty(acknowledgments)) return; + List> batches = Lists.partition( + acknowledgments, props.getUpdatableMessageMaxLockRecords()); + for (List batch : batches) + transactionTemplate.executeWithoutResult(unused -> ackImpl(batch)); + } + + private void ackImpl(List acknowledgments) { + List nimMessageIds = acknowledgments.stream() + .map(Acknowledgment::getNimMessageId) + .collect(toList()); + Map nimMessageId2Ack = acknowledgments.stream() + .collect(toMap(Acknowledgment::getNimMessageId, identity())); + List messages = updatableMessageDao.getByNimMessageIdForUpdate(nimMessageIds); + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (UpdatableMessage message : messages) { + Long dataVersion = nimMessageId2Ack.get(message.getNimMessageId()).getDataVersion(); + if (dataVersion == null) dataVersion = -1L; + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog.setContext("ack"); + messageLog.setContextHistoryId(0L); + messageLog.setRetryCount(0L); + messageLog.setMessageBody(null); + messageLog.setBizBody(null); + messageLog.setDataVersion(dataVersion); + UpdatableMessageLog.RecordExt logExt = new UpdatableMessageLog.RecordExt(); + messageLog.setRecordExt(logExt); + + UpdatableMessage messageUpdate = new UpdatableMessage(); + collector.updateMessage(messageUpdate); + messageUpdate.setId(message.getId()); + Long ackDataVersion = dataVersion >= message.getAckDataVersion() + ? dataVersion + : message.getAckDataVersion(); + messageUpdate.setAckDataVersion(ackDataVersion); + UpdatableMessageLog.AckInfo ackInfo = new UpdatableMessageLog.AckInfo(); + logExt.setAck(ackInfo); + if (!dataVersion.equals(message.getDataVersion())) { + ackInfo.setAckSuccess(false); + ackInfo.setAckDescription("数据版本不匹配"); + ackInfo.setRequestDataVersion(dataVersion); + ackInfo.setMessageDataVersion(message.getDataVersion()); + continue; + } + // 避免前端有bug + if (!message.getState().isUpdateAckAllowed()) { + ackInfo.setAckSuccess(false); + ackInfo.setAckDescription("消息状态不允许ack"); + continue; + } + ackInfo.setAckSuccess(true); + messageUpdate.setState(UpdatableMessageState.UPDATE_ACK); + } + collector.finish(); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageQueryService.java b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageQueryService.java new file mode 100644 index 0000000..6da20b7 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageQueryService.java @@ -0,0 +1,52 @@ +package cn.axzo.im.updatable; + +import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest; +import cn.axzo.im.center.api.vo.req.GetBizMessageIdsRequest; +import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse; +import cn.axzo.im.center.api.vo.resp.GetBizMessageIdsResponse; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.utils.BizAssertions; +import cn.axzo.im.utils.ImProperties; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author yanglin + */ +@Component +@RequiredArgsConstructor +public class UpdatableMessageQueryService { + + private final UpdatableMessageDao updatableMessageDao; + private final ImProperties props; + + public FetchUpdatableMessageResponse fetchUpdatableMessage(FetchUpdatableMessageRequest request) { + int maxSize = props.getUpdatable().getFetchMaxSize(); + BizAssertions.assertTrue(request.getNimMessageIds().size() <= maxSize, + "超过了最大的消息数量, 最大: {}", maxSize); + List messages = updatableMessageDao + .getByNimMessageIds(request.getNimMessageIds()); + FetchUpdatableMessageResponse response = new FetchUpdatableMessageResponse(); + for (UpdatableMessage message : messages) { + FetchUpdatableMessageResponse.MessageInfo imMessage = new FetchUpdatableMessageResponse.MessageInfo(); + imMessage.setMsgType(message.getMsgType().getCode()); + imMessage.setMsgBody(message.getBizBody().toJSONString()); + imMessage.setBizMessageId(message.getBizMessageId()); + imMessage.setDataVersion(message.getDataVersion()); + response.addMessage(imMessage); + } + return response; + } + + public GetBizMessageIdsResponse getBizMessageIds(GetBizMessageIdsRequest request) { + List messages = updatableMessageDao.getByNimMessageIds(request.getNimMessageIds()); + GetBizMessageIdsResponse response = new GetBizMessageIdsResponse(); + for (UpdatableMessage message : messages) + response.addMessage(message.getNimMessageId(), message.getBizMessageId()); + return response; + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateSupport.java b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateSupport.java new file mode 100644 index 0000000..be0bec0 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateSupport.java @@ -0,0 +1,146 @@ +package cn.axzo.im.updatable; + +import cn.axzo.im.center.api.feign.SendPriority; +import cn.axzo.im.center.api.vo.ApiChannel; +import cn.axzo.im.center.api.vo.req.MessageUpdateInfo; +import cn.axzo.im.center.common.enums.BizTypeEnum; +import cn.axzo.im.center.common.enums.YesOrNo; +import cn.axzo.im.channel.IMChannelProvider; +import cn.axzo.im.channel.netease.dto.MessageCustomBody; +import cn.axzo.im.channel.netease.dto.Peer; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.entity.HistoryRecordExt; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import cn.axzo.im.enums.MessageHistoryStatus; +import cn.axzo.im.enums.UpdatableMessageState; +import cn.axzo.im.updatable.collector.CardManipulateCollector; +import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory; +import cn.axzo.im.utils.UUIDUtil; +import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** + * @author yanglin + */ +@Component +@RequiredArgsConstructor +public class UpdateSupport { + + private final UpdatableMessageDao updatableMessageDao; + private final IMChannelProvider imChannel; + private final CardManipulateCollectorFactory cardManipulateCollectorFactory; + + public AddUpdateHistoryResult addUpdateHistories( + String context, List updates) { + List messages = updatableMessageDao + .getByBizMessageIds(collectBizMessageIds(updates)); + Map bizMessageId2Message = messages + .stream().collect(toMap(UpdatableMessage::getBizMessageId, identity())); + String batchNo = UUIDUtil.uuidString(); + Map messageLog2History = new IdentityHashMap<>(); + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (MessageUpdateInfo update : updates) { + UpdatableMessage message = bizMessageId2Message.get(update.bizMessageId()); + if (message == null) continue; + UpdatableMessage messageUpdate = new UpdatableMessage(); + collector.updateMessage(messageUpdate); + messageUpdate.setId(message.getId()); + messageUpdate.setState(UpdatableMessageState.UPDATE_MESSAGE_QUEUED); + + MessageHistory history = new MessageHistory(); + collector.addHistory(history); + history.setBizId(message.getBizId()); + history.setFromAccount(message.getFromAccount()); + history.setToAccount(message.getToAccount()); + history.setAppType(message.getAppType().getCode()); + history.setChannel(imChannel.getProviderType()); + MessageCustomBody messageBody = new MessageCustomBody(); + messageBody.setToImAccount(message.getToAccount()); + messageBody.setPersonId(message.getReceiverPersonId()); + messageBody.setBizType(BizTypeEnum.MESSAGE_UPDATE); + messageBody.setPayload("{}"); + messageBody.setBizMessageId(message.bizMessageId()); + messageBody.setDataVersion(messageUpdate.getDataVersion()); + messageBody.setInitMessageId(message.getNimMessageId()); + messageBody.setMsgType(message.getMsgType()); + + messageBody.setPeer(new Peer()); + messageBody.getPeer().setSenderRobot(message.getIsSenderRobot() == YesOrNo.YES); + messageBody.getPeer().setSenderPersonId(message.senderPersonIdAsLong()); + messageBody.getPeer().setReceiverPersonId(message.receiverPersonIdAsLong()); + + history.setMessageBody(JSON.toJSONString(messageBody)); + messageUpdate.setMessageBody(JSON.parseObject(history.getMessageBody())); + messageUpdate.setBizBody(update.bizBody()); + + history.setImMessageTaskId(0L); + history.setReceivePersonId(message.getReceiverPersonId()); + history.setReceiveOuId(message.getReceiverOuId()); + history.setStatus(MessageHistoryStatus.PENDING); + history.setBatchNo(batchNo); + history.setSendPriority(SendPriority.UPDATE_MESSAGE.getPriority()); + history.setApiChannel(ApiChannel.CUSTOM_MESSAGE); + HistoryRecordExt recordExt = new HistoryRecordExt(); + recordExt.setIsUpdatableMessage(true); + recordExt.setBizMessageId(message.bizMessageId()); + recordExt.setDataVersion(message.getDataVersion()); + recordExt.setIsUpdateMessage(true); + Long retryCount = message.getRetryCount(); + recordExt.setIsUpdateRetry(retryCount > 0); + recordExt.setUpdateRetryCount(retryCount); + history.setRecordExt(recordExt); + history.setTimestampForSend(new Date()); + + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog2History.put(messageLog, history); + messageLog.setDataVersion(message.getDataVersion()); + messageLog.setMessageState(messageUpdate.getState()); + messageLog.setMessageBody(messageUpdate.getMessageBody()); + messageLog.setBizBody(messageUpdate.getBizBody()); + messageLog.setRetryCount(message.getRetryCount()); + } + collector.finishAddHistory(); + for (UpdatableMessageLog messageLog : collector.getUpdatableMessageLogsToAdd()) { + MessageHistory history = messageLog2History.get(messageLog); + messageLog.setContext(context); + messageLog.setContextHistoryId(history.getId()); + } + collector.finish(); + AddUpdateHistoryResult result = new AddUpdateHistoryResult(); + for (int i = 0; i < messages.size(); i++) { + Long messageId = messages.get(i).getId(); + MessageHistory history = collector.getMessageHistoriesToAdd().get(i); + result.addHistoryId(messageId, history.getId()); + } + return result; + } + + public void updateHistoryId(AddUpdateHistoryResult result, + BiConsumer historyIdSetter) { + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (Long messageId : result.getMessageIds()) { + result.findHistoryId(messageId).ifPresent(historyId -> { + UpdatableMessage messageUpdate = new UpdatableMessage(); + messageUpdate.setId(messageId); + historyIdSetter.accept(messageUpdate, historyId); + collector.updateMessage(messageUpdate); + }); + } + collector.finish(); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/collector/CardManipulateCollector.java b/im-center-server/src/main/java/cn/axzo/im/updatable/collector/CardManipulateCollector.java new file mode 100644 index 0000000..c6f9ee7 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/collector/CardManipulateCollector.java @@ -0,0 +1,102 @@ +package cn.axzo.im.updatable.collector; + +import cn.axzo.im.dao.repository.MessageHistoryDao; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.dao.repository.UpdatableMessageLogDao; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import com.google.common.collect.Lists; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +/** + * @author yanglin + */ +@RequiredArgsConstructor(access = AccessLevel.PACKAGE) +public class CardManipulateCollector { + + private static final int BATCH_SIZE = 1000; + + private final UpdatableMessageDao updatableMessageDao; + private final UpdatableMessageLogDao updatableMessageLogDao; + private final MessageHistoryDao messageHistoryDao; + + private final State updatableMessagesToAdd = new State<>(); + private final State updatableMessagesToUpdate = new State<>(); + private final State updatableMessageLogsToAdd = new State<>(); + private final State messageHistoriesToAdd = new State<>(); + private final State messageHistoriesToUpdate = new State<>(); + + public List getUpdatableMessageLogsToAdd() { + return updatableMessageLogsToAdd.elementsOrEmpty(); + } + + public List getMessageHistoriesToAdd() { + return messageHistoriesToAdd.elementsOrEmpty(); + } + + public void addMessage(UpdatableMessage message) { + updatableMessagesToAdd.add(message); + } + + public void updateMessage(UpdatableMessage message) { + updatableMessagesToUpdate.add(message); + } + + public void addLog(UpdatableMessageLog log) { + updatableMessageLogsToAdd.add(log); + } + + public void addHistory(MessageHistory history) { + messageHistoriesToAdd.add(history); + } + + public void updateHistory(MessageHistory history) { + messageHistoriesToUpdate.add(history); + } + + public void finishAddHistory() { + messageHistoriesToAdd.batched(messageHistoryDao::saveBatch); + } + + public void finish() { + updatableMessagesToAdd.batched(updatableMessageDao::saveBatch); + updatableMessagesToUpdate.batched(updatableMessageDao::updateBatchById); + updatableMessageLogsToAdd.batched(updatableMessageLogDao::saveBatch); + messageHistoriesToAdd.batched(messageHistoryDao::saveBatch); + messageHistoriesToUpdate.batched(messageHistoryDao::updateBatchById); + } + + private static class State { + private boolean finished; + private List elements; + + void add(T element) { + if (elements == null) + elements = new ArrayList<>(); + elements.add(element); + } + + List elementsOrEmpty() { + return elements == null ? Collections.emptyList() : elements; + } + + void batched(Consumer> action) { + if (CollectionUtils.isEmpty(elements)) + return; + if (finished) + return; + Lists.partition(elements, BATCH_SIZE).forEach(action); + finished = true; + } + + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/collector/CardManipulateCollectorFactory.java b/im-center-server/src/main/java/cn/axzo/im/updatable/collector/CardManipulateCollectorFactory.java new file mode 100644 index 0000000..af38ebc --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/collector/CardManipulateCollectorFactory.java @@ -0,0 +1,27 @@ +package cn.axzo.im.updatable.collector; + +import cn.axzo.im.dao.repository.MessageHistoryDao; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.dao.repository.UpdatableMessageLogDao; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +/** + * @author yanglin + */ +@Component +@RequiredArgsConstructor +public class CardManipulateCollectorFactory { + + private final UpdatableMessageDao updatableMessageDao; + private final UpdatableMessageLogDao updatableMessageLogDao; + private final MessageHistoryDao messageHistoryDao; + + public CardManipulateCollector create() { + return new CardManipulateCollector( + updatableMessageDao, + updatableMessageLogDao, + messageHistoryDao); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/handler/InitMessageHandler.java b/im-center-server/src/main/java/cn/axzo/im/updatable/handler/InitMessageHandler.java new file mode 100644 index 0000000..3805f2f --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/handler/InitMessageHandler.java @@ -0,0 +1,58 @@ +package cn.axzo.im.updatable.handler; + +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import cn.axzo.im.enums.UpdatableMessageState; +import cn.axzo.im.updatable.HistoryAndMessage; +import cn.axzo.im.updatable.collector.CardManipulateCollector; +import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author yanglin + */ +@Component +@RequiredArgsConstructor +public class InitMessageHandler implements StateHandler { + + private final CardManipulateCollectorFactory cardManipulateCollectorFactory; + + @Override + public void onSuccess(List historyAndMessages) { + updateState(historyAndMessages, true); + } + + @Override + public void onFail(List historyAndMessages) { + updateState(historyAndMessages, false); + } + + private void updateState(List historyAndMessages, boolean isSuccess) { + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (HistoryAndMessage historyAndMessage : historyAndMessages) { + UpdatableMessage message = historyAndMessage.getMessage(); + MessageHistory history = historyAndMessage.getHistories().get(0); + + UpdatableMessage messageUpdate = new UpdatableMessage(); + collector.updateMessage(messageUpdate); + messageUpdate.setId(message.getId()); + if (isSuccess) + messageUpdate.setNimMessageId(history.getMessageId()); + messageUpdate.setState(isSuccess + ? UpdatableMessageState.INIT_MESSAGE_SEND_SUCCESS + : UpdatableMessageState.INIT_MESSAGE_SEND_FAIL); + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog.setMessageState(messageUpdate.getState()); + messageLog.setInitHistoryId(history.getId()); + messageLog.setContext("sendInitMessage"); + messageLog.setContextHistoryId(history.getId()); + } + collector.finish(); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/handler/StateHandler.java b/im-center-server/src/main/java/cn/axzo/im/updatable/handler/StateHandler.java new file mode 100644 index 0000000..14410ec --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/handler/StateHandler.java @@ -0,0 +1,16 @@ +package cn.axzo.im.updatable.handler; + +import cn.axzo.im.updatable.HistoryAndMessage; + +import java.util.List; + +/** + * @author yanglin + */ +public interface StateHandler { + + void onSuccess(List historyAndMessages); + + void onFail(List historyAndMessages); + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/handler/UpdateMessageHandler.java b/im-center-server/src/main/java/cn/axzo/im/updatable/handler/UpdateMessageHandler.java new file mode 100644 index 0000000..b1e4173 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/handler/UpdateMessageHandler.java @@ -0,0 +1,69 @@ +package cn.axzo.im.updatable.handler; + +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import cn.axzo.im.enums.UpdatableMessageState; +import cn.axzo.im.updatable.HistoryAndMessage; +import cn.axzo.im.updatable.MessageBodyJsonObject; +import cn.axzo.im.updatable.collector.CardManipulateCollector; +import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author yanglin + */ +@Component +@RequiredArgsConstructor +public class UpdateMessageHandler implements StateHandler { + + private final CardManipulateCollectorFactory cardManipulateCollectorFactory; + + @Override + public void onSuccess(List historyAndMessages) { + updateState(historyAndMessages, true); + } + + @Override + public void onFail(List historyAndMessages) { + updateState(historyAndMessages, false); + } + + private void updateState(List historyAndMessages, boolean isSuccess) { + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + for (HistoryAndMessage historyAndMessage : historyAndMessages) { + UpdatableMessageState state = historyAndMessage.getMessage().getState(); + UpdatableMessage message = historyAndMessage.getMessage(); + boolean queuedOrFail = state == UpdatableMessageState.UPDATE_MESSAGE_QUEUED + || state == UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL; + if (queuedOrFail && historyAndMessage.hasDataVersionMatchHistories()) { + UpdatableMessage messageUpdate = new UpdatableMessage(); + collector.updateMessage(messageUpdate); + messageUpdate.setId(historyAndMessage.getMessage().getId()); + messageUpdate.setState(isSuccess + ? UpdatableMessageState.UPDATE_MESSAGE_SEND_SUCCESS + : UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL); + } + + for (MessageHistory history : historyAndMessage.getHistories()) { + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog.setMessageState(isSuccess + ? UpdatableMessageState.UPDATE_MESSAGE_SEND_SUCCESS + : UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL); + messageLog.setContextHistoryId(history.getId()); + MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody()); + messageLog.setMessageBody(object.getMessageBody()); + messageLog.setBizBody(object.getBizBody()); + messageLog.setDataVersion(history.getDataVersion()); + messageLog.setContextHistoryId(history.getId()); + messageLog.setContext(history.isUpdateRetry() ? "retrySendUpdateMessage" : "sendUpdateMessage"); + messageLog.setRetryCount(history.getUpdateRetryCount()); + } + } + collector.finish(); + } +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/retry/ExpungeUpdatableMessageLogJob.java b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/ExpungeUpdatableMessageLogJob.java new file mode 100644 index 0000000..e10cab4 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/ExpungeUpdatableMessageLogJob.java @@ -0,0 +1,58 @@ +package cn.axzo.im.updatable.retry; + +import cn.axzo.im.dao.mapper.UpdatableMessageLogMapper; +import cn.axzo.im.entity.SendJobInfo; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.utils.DateFormatUtil; +import cn.axzo.im.utils.JSONObjectUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.joda.time.DateTime; +import org.springframework.stereotype.Component; + +import java.util.Date; + +import static cn.axzo.im.utils.Queries.query; + +/** + * @author yanglin + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ExpungeUpdatableMessageLogJob { + + private final UpdatableMessageLogMapper updatableMessageLogMapper; + + @XxlJob("expungeUpdatableMessageLogJob") + public ReturnT execute(String paramStr) throws Exception { + log.info("start - run job with param={}", paramStr); + try { + Param param = StringUtils.isBlank(paramStr) + ? new Param() : + JSONObjectUtil.parseObject(paramStr, Param.class); + expunge(param); + log.info("end - run job with param={}", param); + return ReturnT.SUCCESS; + } catch (Exception e) { + log.warn("job failed. param={}", paramStr, e); + return ReturnT.FAIL; + } + } + + private void expunge(Param param) { + Date until = DateTime.now().minusDays(param.daysAgo).toDate(); + log.info("going to delete until={}", DateFormatUtil.toReadableString(until)); + int count = updatableMessageLogMapper.expunge(until); + log.info("deleted count={}", count); + } + + @Data + public static class Param { + private int daysAgo = 7; + } +} diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/retry/MessageUpdateRetryJob.java b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/MessageUpdateRetryJob.java new file mode 100644 index 0000000..68108bc --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/MessageUpdateRetryJob.java @@ -0,0 +1,68 @@ +package cn.axzo.im.updatable.retry; + +import cn.axzo.im.dao.repository.MessageUpdateRetryDao; +import cn.axzo.im.entity.MessageUpdateRetry; +import cn.axzo.maokai.api.util.Ref; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.function.Supplier; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ + +@Slf4j +@Component +@RequiredArgsConstructor +public class MessageUpdateRetryJob { + + private final MessageUpdateRetryDao messageUpdateRetryDao; + private final MessageUpdateRetryService messageUpdateRetryService; + + @XxlJob("messageUpdateRetryJob") + public ReturnT execute(String paramStr) throws Exception { + try { + log.info("start - run job with param={}", paramStr); + executeImpl(); + log.info("end - run job with param={}", paramStr); + return ReturnT.SUCCESS; + } catch (Exception e) { + log.warn("job failed. param={}", paramStr, e); + return ReturnT.FAIL; + } + } + + private void executeImpl() { + Supplier> cursor = retryCursor(); + for (List retries = cursor.get(); !retries.isEmpty(); retries = cursor.get()) { + List bizMessageIds = retries.stream() + .map(MessageUpdateRetry::getBizMessageId) + .collect(toList()); + messageUpdateRetryService.advanceRetry(bizMessageIds); + } + } + + private Supplier> retryCursor() { + Ref maxId = Ref.create(0L); + return () -> { + List retries = messageUpdateRetryDao.lambdaQuery() + .le(MessageUpdateRetry::getNextRetryTime, new Date()) + .gt(MessageUpdateRetry::getId, maxId.get()) + .orderByAsc(MessageUpdateRetry::getId) + .last("LIMIT 1000") + .list(); + if (!retries.isEmpty()) + maxId.set(retries.get(retries.size() - 1).getId()); + return retries; + }; + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/retry/MessageUpdateRetryService.java b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/MessageUpdateRetryService.java new file mode 100644 index 0000000..18ba4b7 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/MessageUpdateRetryService.java @@ -0,0 +1,100 @@ +package cn.axzo.im.updatable.retry; + +import cn.axzo.im.dao.repository.MessageUpdateRetryDao; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.entity.MessageUpdateRetry; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import cn.axzo.im.updatable.AddUpdateHistoryResult; +import cn.axzo.im.updatable.UpdateSupport; +import cn.axzo.im.updatable.collector.CardManipulateCollector; +import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory; +import cn.axzo.im.utils.ImProperties; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import org.apache.commons.collections4.CollectionUtils; +import org.joda.time.DateTime; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +@Service +@RequiredArgsConstructor +public class MessageUpdateRetryService { + + private final MessageUpdateRetryDao messageUpdateRetryDao; + private final UpdatableMessageDao updatableMessageDao; + private final UpdateSupport updateSupport; + private final TransactionTemplate transactionTemplate; + private final CardManipulateCollectorFactory cardManipulateCollectorFactory; + private final ImProperties props; + + public void removePrevRetryByBizMessageIds(Collection bizMessageIds) { + if (CollectionUtils.isNotEmpty(bizMessageIds)) + messageUpdateRetryDao.getBaseMapper().deleteByBizMessageIds(bizMessageIds); + } + + public void advanceRetry(List bizMessageIds) { + List> batches = Lists.partition( + bizMessageIds, props.getUpdatableMessageMaxLockRecords()); + for (List batch : batches) + transactionTemplate.executeWithoutResult(unused -> advanceRetryImpl(batch)); + } + + private void advanceRetryImpl(List bizMessageIds) { + removePrevRetryByBizMessageIds(bizMessageIds); + List messages = updatableMessageDao.getByBizMessageIdsForUpdate(bizMessageIds); + UpdateAckState state = new UpdateAckState(messages, props.getMessageUpdateAckMaxRetryCount()); + // 先发本次生试, 再调度下一次重试 + retryUpdateMessage(state.getNonAckMessageIds()); + scheduleNextRetry(state.getScheduleNextRetryMessages()); + } + + public void scheduleNextRetry(List messageIds) { + if (CollectionUtils.isEmpty(messageIds)) return; + List messages = updatableMessageDao.listByIds(messageIds); + List bizMessageIds = messages.stream() + .map(UpdatableMessage::getBizMessageId) + .collect(toList()); + removePrevRetryByBizMessageIds(bizMessageIds); + ArrayList retries = new ArrayList<>(); + CardManipulateCollector collector = cardManipulateCollectorFactory.create(); + Date nextRetryTime = DateTime.now() + .plusSeconds(props.getMessageUpdateAckRetryIntervalSeconds()) + .toDate(); + for (UpdatableMessage message : messages) { + MessageUpdateRetry retry = new MessageUpdateRetry(); + retry.setBizMessageId(message.bizMessageId()); + retry.setInitHistoryId(message.getInitHistoryId()); + retry.setNextRetryTime(nextRetryTime); + retry.setDataVersion(message.getDataVersion()); + retries.add(retry); + + UpdatableMessageLog messageLog = message.toMessageLog(); + collector.addLog(messageLog); + messageLog.setRetryCount(message.getRetryCount() + 1); + messageLog.setContext("scheduleNextRetrySendUpdateMessage"); + } + messageUpdateRetryDao.saveBatch(retries); + collector.finish(); + } + + public void retryUpdateMessage(List messageIds) { + if (CollectionUtils.isEmpty(messageIds)) return; + updatableMessageDao.getBaseMapper().incrRetryCount(messageIds); + List messages = updatableMessageDao.listByIds(messageIds); + AddUpdateHistoryResult result = updateSupport + .addUpdateHistories("retryUpdateHistoryCreated", messages); + updateSupport.updateHistoryId(result, UpdatableMessage::setRetryHistoryId); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/retry/UpdateAckState.java b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/UpdateAckState.java new file mode 100644 index 0000000..4f8b31b --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/retry/UpdateAckState.java @@ -0,0 +1,35 @@ +package cn.axzo.im.updatable.retry; + +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.enums.UpdatableMessageState; +import lombok.RequiredArgsConstructor; + +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +@RequiredArgsConstructor +public class UpdateAckState { + + private final List messages; + private final int maxRetryCount; + + List getScheduleNextRetryMessages() { + return messages.stream() + .filter(message -> message.getRetryCount() < maxRetryCount) + .filter(message -> message.getState() != UpdatableMessageState.UPDATE_ACK) + .map(UpdatableMessage::getId) + .collect(toList()); + } + + List getNonAckMessageIds() { + return messages.stream() + .filter(message -> message.getState() != UpdatableMessageState.UPDATE_ACK) + .map(UpdatableMessage::getId) + .collect(toList()); + } + +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/utils/ImProperties.java b/im-center-server/src/main/java/cn/axzo/im/utils/ImProperties.java index 8062cf7..b74abc0 100644 --- a/im-center-server/src/main/java/cn/axzo/im/utils/ImProperties.java +++ b/im-center-server/src/main/java/cn/axzo/im/utils/ImProperties.java @@ -3,6 +3,8 @@ package cn.axzo.im.utils; import cn.axzo.basics.common.BeanMapper; import com.google.common.collect.Sets; import lombok.Data; +import lombok.Getter; +import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Configuration; @@ -28,8 +30,16 @@ public class ImProperties { private String controllerToken = "123442"; + private int messageUpdateAckMaxRetryCount = 3; + + private int messageUpdateAckRetryIntervalSeconds = 10; + + private int updatableMessageMaxLockRecords = 5; + private SendMessageConfig sendMessage = new SendMessageConfig(); + private UpdatableMessageConfig updatable = new UpdatableMessageConfig(); + private Set genImAccountJobCodes = Sets.newHashSet( // 企业班组长, 项目班组长 "entTeamLeader", "projTeamLeader", @@ -40,6 +50,14 @@ public class ImProperties { // 带班长 "projectTeamManager"); + @Setter + @Getter + public static class UpdatableMessageConfig { + + private int fetchMaxSize = 500; + + } + @Data public static class SendMessageConfig { private int memoryQueueQueryLimitSize = 1000; diff --git a/im-center-server/src/main/java/cn/axzo/im/utils/PropsUtils.java b/im-center-server/src/main/java/cn/axzo/im/utils/PropsUtils.java new file mode 100644 index 0000000..2eb13db --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/utils/PropsUtils.java @@ -0,0 +1,36 @@ +package cn.axzo.im.utils; + +import java.lang.reflect.Field; + +/** + * @author yanglin + */ +public class PropsUtils { + + /** + * 只合并第一层 + */ + public static T updateProperties(T oldObj, T newObj) { + if (oldObj == null) return newObj; + if (newObj == null) return oldObj; + try { + @SuppressWarnings("unchecked") + T merged = (T) oldObj.getClass().newInstance(); + for (Field field : merged.getClass().getDeclaredFields()) { + boolean accessible = field.isAccessible(); + field.setAccessible(true); + try { + Object oldFieldValue = field.get(oldObj); + Object newFieldValue = field.get(newObj); + field.set(merged, newFieldValue == null ? oldFieldValue : newFieldValue); + } finally { + field.setAccessible(accessible); + } + } + return merged; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/im-center-server/src/test/java/cn/axzo/im/send/job/SendMessageJobTest.java b/im-center-server/src/test/java/cn/axzo/im/send/job/SendMessageJobTest.java index 405608b..0da37ef 100644 --- a/im-center-server/src/test/java/cn/axzo/im/send/job/SendMessageJobTest.java +++ b/im-center-server/src/test/java/cn/axzo/im/send/job/SendMessageJobTest.java @@ -18,7 +18,8 @@ class SendMessageJobTest { private final SendMessageJob sendMessageJob; @Test - void foo() { + void exec() { + sendMessageJob.execute(null);; } } \ No newline at end of file diff --git a/im-center-server/src/test/java/cn/axzo/im/updatable/retry/ExpungeUpdatableMessageLogJobTest.java b/im-center-server/src/test/java/cn/axzo/im/updatable/retry/ExpungeUpdatableMessageLogJobTest.java new file mode 100644 index 0000000..851d622 --- /dev/null +++ b/im-center-server/src/test/java/cn/axzo/im/updatable/retry/ExpungeUpdatableMessageLogJobTest.java @@ -0,0 +1,24 @@ +package cn.axzo.im.updatable.retry; + +import cn.axzo.im.Application; +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author yanglin + */ +@SpringBootTest(classes = Application.class) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +class ExpungeUpdatableMessageLogJobTest { + + private final ExpungeUpdatableMessageLogJob expungeUpdatableMessageLogJob; + + @Test + void exec() { + } + +} \ No newline at end of file diff --git a/im-center-server/src/test/java/cn/axzo/im/updatable/retry/MessageUpdateRetryJobTest.java b/im-center-server/src/test/java/cn/axzo/im/updatable/retry/MessageUpdateRetryJobTest.java new file mode 100644 index 0000000..9848869 --- /dev/null +++ b/im-center-server/src/test/java/cn/axzo/im/updatable/retry/MessageUpdateRetryJobTest.java @@ -0,0 +1,22 @@ +package cn.axzo.im.updatable.retry; + +import cn.axzo.im.Application; +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +/** + * @author yanglin + */ +@SpringBootTest(classes = Application.class) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +class MessageUpdateRetryJobTest { + + private final MessageUpdateRetryJob messageUpdateRetryJob; + + @Test + void exec() { + } + +} \ No newline at end of file