Merge branch 'feature/REQ-3057-card' into feature/REQ-3201

# Conflicts:
#	im-center-api/pom.xml
#	im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendTemplateMessageParam.java
#	im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java
#	im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java
#	im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java
#	im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java
#	im-center-server/src/main/java/cn/axzo/im/entity/MessageUpdateRetry.java
#	im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendBatchHandler.java
#	im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendOneHandler.java
#	im-center-server/src/main/java/cn/axzo/im/send/handler/CustomSendBatchHandler.java
#	im-center-server/src/main/java/cn/axzo/im/send/handler/CustomSendOneHandler.java
#	im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java
This commit is contained in:
yanglin 2024-12-05 17:23:50 +08:00
commit 9f0d482b2f
67 changed files with 2633 additions and 117 deletions

View File

@ -3,12 +3,19 @@ package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetBizMessageIdsRequest;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse;
import cn.axzo.im.center.api.vo.resp.GetBizMessageIdsResponse;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
@ -54,7 +61,25 @@ public interface MessageApi {
* @return
*/
@PostMapping("/api/im/template-message/async/send")
ApiResult<MessageTaskResp> sendTemplateMessageAsync(@RequestBody @Validated SendTemplateMessageParam sendMessageParam);
ApiResult<MessageTaskResp> sendTemplateMessageAsync(
@RequestBody @Validated SendTemplateMessageParam sendMessageParam);
/**
* 更新消息
*/
@PostMapping("/api/im/template-message/updatable/updateMessage")
ApiResult<MessageUpdateResponse> updateMessage(@RequestBody @Validated UpdateMessageRequest request);
@PostMapping("/api/im/template-message/updatable/ack")
ApiResult<Void> ack(@RequestBody @Validated UpdatableMessageAckRequest request);
@PostMapping("/api/im/template-message/updatable/fetchUpdatableMessage")
ApiResult<FetchUpdatableMessageResponse> fetchUpdatableMessage(
@RequestBody @Validated FetchUpdatableMessageRequest request);
@PostMapping("/api/im/template-message/updatable/getBizMessageIds")
ApiResult<GetBizMessageIdsResponse> getBizMessageIds(
@RequestBody @Validated GetBizMessageIdsRequest request);
/**
*

View File

@ -13,6 +13,7 @@ public class SendPriority {
public static final SendPriority TEMPLATE_MESSAGE = create(1000);
public static final SendPriority SYSTEM_CUSTOM_MESSAGE = create(5000);
public static final SendPriority UPDATE_MESSAGE = create(5500);
public static final SendPriority OP_MESSAGE = create(500000);
private final Integer value;

View File

@ -0,0 +1,51 @@
package cn.axzo.im.center.api.vo;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* @author yanglin
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
// IMPORTANT: 不要删除这个注解, 避免@Data被@Setter, @Getter取代
@EqualsAndHashCode
public class PersonAccountAttribute {
/**
* 接收消息的personId
*/
@NotBlank(message = "personId不能为空")
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
/**
* 项目id
*/
private Long workspaceId;
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
@NotNull(message = "appType不能为空")
private AppTypeEnum appType;
}

View File

@ -0,0 +1,17 @@
package cn.axzo.im.center.api.vo.req;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class AdvanceRetryRequest {
private List<String> bizMessageIds;
}

View File

@ -0,0 +1,27 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotEmpty;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class FetchUpdatableMessageRequest {
/**
* 初始消息ID列表(普通消息的网易云信ID). 列表最大数量: 500
*/
@NotEmpty(message = "初始消息ID列表不能为空")
private Set<String> nimMessageIds;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,27 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotEmpty;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class GetBizMessageIdsRequest {
/**
* 初始消息ID列表(普通消息的网易云信ID)
*/
@NotEmpty(message = "初始消息ID列表不能为空")
private Set<String> nimMessageIds;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,32 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections4.CollectionUtils;
import javax.validation.constraints.NotNull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
public interface MessageUpdateInfo {
static List<String> collectBizMessageIds(Collection<? extends MessageUpdateInfo> updates) {
if (CollectionUtils.isEmpty(updates))
return Collections.emptyList();
return updates.stream()
.map(MessageUpdateInfo::bizMessageId)
.distinct()
.collect(toList());
}
String bizMessageId();
@NotNull
JSONObject bizBody();
}

View File

@ -1,6 +1,8 @@
package cn.axzo.im.center.api.vo.req;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -10,8 +12,9 @@ import lombok.NoArgsConstructor;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Data
@Builder
@ -19,29 +22,31 @@ import java.util.List;
@AllArgsConstructor
public class SendTemplateMessageParam {
/**
* 发送人
*/
private PersonAccountAttribute sender;
/**
* 消息接收用户信息
*/
@NotEmpty(message = "消息接收用户信息不能为空")
@Valid
private List<ReceivePerson> receivePersons;
private List<PersonAccountAttribute> receivePersons;
/**
* 消息标题
*/
@NotBlank(message = "消息标题不能为空")
private String msgHeader;
/**
* 消息内容
*/
@NotBlank(message = "消息内容不能为空")
private String msgContent;
/**
* 消息模板ID
*/
@NotBlank(message = "消息模板ID不能为空")
private String msgTemplateId;
/**
@ -62,6 +67,12 @@ public class SendTemplateMessageParam {
private Integer sendPriority;
private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE;
public boolean isSendByRobot() {
return sender == null;
}
/**
* 推送消息
*/
@ -69,32 +80,28 @@ public class SendTemplateMessageParam {
private List<ExcludePushPayload> excludePushPayloads;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class ReceivePerson {
public Long determineSenderPersonId() {
if (sender != null)
return Long.parseLong(sender.getPersonId());
return 0L;
}
/**
* 接收消息的personId
*/
@NotBlank(message = "personId不能为空")
private String personId;
public Long determineSenderOuId() {
if (sender != null)
return sender.getOuId();
return 0L;
}
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long ouId;
public boolean isUpdatable() {
return templatedMsgType.isUpdatable();
}
/**
* 发送消息到App端
* 工人端企业端服务器
* CMCMPSYSTEM
*
* @See cn.axzo.im.center.common.enums.AppTypeEnum
*/
@NotNull(message = "appType不能为空")
private AppTypeEnum appType;
public Set<PersonAccountAttribute> uniqueReceivePersons() {
return new HashSet<>(receivePersons);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,70 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdatableMessageAckRequest {
/**
* ack的消息信息列表, 同一个nimMessageId传需要ack的最大dataVersion. 每次最多值100条
*/
@NotEmpty(message = "消息ACK不能为空")
private List<Acknowledgment> acknowledgments;
public List<Acknowledgment> determineValidAcknowledgments() {
if (CollectionUtils.isEmpty(acknowledgments))
return Collections.emptyList();
HashMap<String, Long> id2Max = new HashMap<>();
for (Acknowledgment ack : acknowledgments) {
Long max = id2Max.getOrDefault(ack.nimMessageId, 0L);
max = Math.max(max, ack.dataVersion);
id2Max.put(ack.nimMessageId, max);
}
return id2Max.entrySet().stream()
.map(e -> {
Acknowledgment ack = new Acknowledgment();
ack.setNimMessageId(e.getKey());
ack.setDataVersion(e.getValue());
return ack;
})
.collect(toList());
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class Acknowledgment {
/**
* 初始消息ID(普通消息的网易云信ID)
*/
@NotBlank(message = "初始消息ID不能为空")
private String nimMessageId;
/**
* 数据版本
*/
@NotNull(message = "数据版本不能为空")
private Long dataVersion;
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdateMessageRequest {
@Valid
private List<Update> updates = new ArrayList<>();
public void addUpdate(Update update) {
this.updates.add(update);
}
public List<String> getBizMessageIds() {
return MessageUpdateInfo.collectBizMessageIds(updates);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class Update implements MessageUpdateInfo {
@NotBlank(message = "消息ID不能为空")
private String bizMessageId;
private String msgTemplateContent;
@Override
public String bizMessageId() {
return bizMessageId;
}
@Override
public JSONObject bizBody() {
return StringUtils.isBlank(msgTemplateContent)
? new JSONObject()
: JSON.parseObject(msgTemplateContent);
}
}
}

View File

@ -0,0 +1,52 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class FetchUpdatableMessageResponse {
/**
* 消息列表
*/
private List<MessageInfo> messages = new ArrayList<>();
public void addMessage(MessageInfo message) {
messages.add(message);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class MessageInfo {
/**
* 消息类型. 和首次发次的保持一致. template: 模板消息, card: 卡片消息
*/
private String msgType;
/**
* 消息体
*/
private String msgBody;
/**
* 业务消息ID
*/
private String bizMessageId;
/**
* 数据版本
*/
private Long dataVersion;
}
}

View File

@ -0,0 +1,33 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class GetBizMessageIdsResponse {
private Map<String, String> nimMessageId2BizMessageId = new HashMap<>();
public void addMessage(String nimMessageId, String bizMessageId) {
nimMessageId2BizMessageId.put(nimMessageId, bizMessageId);
}
public Set<String> bizMessageIds() {
return new HashSet<>(nimMessageId2BizMessageId.values());
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
@ -8,6 +9,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.List;
@Data
@Builder
@ -53,4 +55,11 @@ public class MessageTaskResp {
private Date createAt;
private Date updateAt;
private List<UpdatableMessageSendResult> updatableMessageSendResults;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,66 @@
package cn.axzo.im.center.api.vo.resp;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class MessageUpdateResponse {
/**
* 更新的业务消息ID
*/
private List<String> updatedBizMessageIds = new ArrayList<>();
/**
* 未更新的业务消息
*/
private List<NonUpdatedMessage> nonUpdatedMessages = new ArrayList<>();
public void addUpdatedBizMessageId(String bizMessageId) {
this.updatedBizMessageIds.add(bizMessageId);
}
public void addNonUpdatedMessage(String bizMessageId,
NonUpdateMessageReason reason) {
addNonUpdatedMessage(bizMessageId, reason, null);
}
public void addNonUpdatedMessage(String bizMessageId,
NonUpdateMessageReason reason,
Object description) {
NonUpdatedMessage nonUpdatedMessage = new NonUpdatedMessage();
nonUpdatedMessage.setBizMessageId(bizMessageId);
nonUpdatedMessage.setReason(reason);
nonUpdatedMessage.setDescription(description);
this.nonUpdatedMessages.add(nonUpdatedMessage);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
@Setter
@Getter
public static class NonUpdatedMessage {
private String bizMessageId;
private NonUpdateMessageReason reason;
private Object description;
}
public enum NonUpdateMessageReason {
// 找不到首次发送的消息
CANT_FIND_INIT_MESSAGE,
// 首次投递消息的状态不允许更新. 此时查看NonUpdatedMessage#description字段
MESSAGE_STATE_NOT_ALLOWED
}
}

View File

@ -0,0 +1,15 @@
package cn.axzo.im.center.api.vo.resp;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdatableMessageSendResult {
private String bizMessageId;
private PersonAccountAttribute account;
}

View File

@ -18,6 +18,8 @@ public enum BizTypeEnum {
* 待办
*/
PENDING("PENDING", "待办"),
MESSAGE_UPDATE("MESSAGE_UPDATE", "消息更新")
;

View File

@ -0,0 +1,35 @@
package cn.axzo.im.center.common.enums;
/**
* @author yanglin
*/
public enum ImAppType {
/**
* 工人端
*/
CM,
/**
* 企业管理端
*/
CMP;
public static ImAppType fromNimAppType(AppTypeEnum appType) {
if (appType == null)
return null;
if (appType == AppTypeEnum.CM)
return CM;
if (appType == AppTypeEnum.CMP)
return CMP;
throw new UnsupportedOperationException("Should never happen!");
}
public AppTypeEnum toNimAppType() {
if (this == CM)
return AppTypeEnum.CM;
if (this == CMP)
return AppTypeEnum.CMP;
throw new UnsupportedOperationException("Should never happen!");
}
}

View File

@ -0,0 +1,22 @@
package cn.axzo.im.center.common.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public enum TemplatedMsgType {
TEMPLATE("template", "模板消息", false),
CARD("card", "卡片消息", true),
;
private final String code;
private final String message;
private final boolean isUpdatable;
}

View File

@ -0,0 +1,23 @@
package cn.axzo.im.center.common.enums;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author yanglin
*/
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum YesOrNo implements CodeDefinition<String> {
YES("YES", ""),
NO("NO", "")
;
@EnumValue
private final String code;
private final String desc;
}

View File

@ -36,4 +36,19 @@ public class MessageBody {
private Map<String,String> messageExtension;
/**
* 业务消息id, 用于接口拉取最新消息内容
*/
private String bizMessageId;
/**
* 数据版本
*/
private Long dataVersion;
/**
* 端信息
*/
private Peer peer;
}

View File

@ -1,6 +1,7 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -37,4 +38,28 @@ public class MessageCustomBody {
private String payload;
/**
* 业务消息id, 用于接口拉取最新消息内容
*/
private String bizMessageId;
/**
* 数据版本
*/
private Long dataVersion;
/**
* 最原始的网易云信消息id, 更新的哪条消息
*/
private String initMessageId;
/**
* 更新的消息类型
*/
private TemplatedMsgType msgType;
/**
* 端信息
*/
private Peer peer;
}

View File

@ -0,0 +1,28 @@
package cn.axzo.im.channel.netease.dto;
import lombok.Getter;
import lombok.Setter;
/**
* @author yanglin
*/
@Setter
@Getter
public class Peer {
/**
* IM是否为机器人发送
*/
private boolean isSenderRobot;
/**
* IM发送者自然人id, 机器人发送时为0
*/
private Long senderPersonId;
/**
* IM接收者自然人id
*/
private Long receiverPersonId;
}

View File

@ -5,15 +5,25 @@ import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.feign.SendPriority;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.api.vo.req.AccountAbsentQuery;
import cn.axzo.im.center.api.vo.req.AccountQuery;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetBizMessageIdsRequest;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse;
import cn.axzo.im.center.api.vo.resp.GetBizMessageIdsResponse;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.entity.AccountRegister;
@ -26,6 +36,10 @@ import cn.axzo.im.service.CustomMessageService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.service.RobotMsgTemplateService;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.updatable.UpdatableMessageQueryService;
import cn.axzo.im.updatable.UpdateSupport;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.pokonyan.exception.Aassert;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
@ -39,12 +53,15 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import static cn.axzo.im.config.BizResultCode.ALL_PERSSON_TYPE_NOT_EMPTY;
import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_MAX;
@ -76,6 +93,12 @@ public class MessageController implements MessageApi {
private MessageHistoryService messageHistoryService;
@Autowired
private CustomMessageService customMessageService;
@Autowired
private UpdatableMessageManager updatableMessageManager;
@Autowired
private UpdateSupport updateSupport;
@Autowired
private UpdatableMessageQueryService updatableMessageQueryService;
@Override
@ -150,32 +173,87 @@ public class MessageController implements MessageApi {
}
@Override
public ApiResult<MessageTaskResp> sendTemplateMessageAsync(SendTemplateMessageParam sendMessageParam) {
String sendImAccount = check(sendMessageParam);
@Transactional
public ApiResult<MessageTaskResp> sendTemplateMessageAsync(SendTemplateMessageParam request) {
log.info("sendTemplateMessageAsync, request={}", request);
PersonAccountAttribute sender = request.getSender();
BizAssertions.assertTrue(sender != null || StringUtils.isNotBlank(request.getMsgTemplateId()),
"消息模板ID和发送人必须选其一");
String sendImAccount;
if (sender != null) {
AccountAbsentQuery accountQuery = new AccountAbsentQuery();
accountQuery.setAppType(sender.getAppType().getCode());
accountQuery.setPersonId(sender.getPersonId());
accountQuery.setOuId(sender.getOuId());
List<UserAccountResp> accounts = accountService.registerAccountIfAbsent(accountQuery);
sendImAccount = accounts.get(0).getImAccount();
} else {
//todo: 根据类型验证模版是否配置了机器人
sendImAccount = check(request);
}
MessageTask.BizData bizData = MessageTask.BizData.builder()
.msgTemplateContent(sendMessageParam.getMsgTemplateContent())
.msgTemplateId(sendMessageParam.getMsgTemplateId())
.payload(sendMessageParam.getPayload())
.excludePushPayloads(sendMessageParam.getExcludePushPayloads())
.msgTemplateContent(request.getMsgTemplateContent())
.msgTemplateId(request.getMsgTemplateId())
.payload(request.getPayload())
.excludePushPayloads(request.getExcludePushPayloads())
.templatedMsgType(request.getTemplatedMsgType())
.isSenderRobot(request.isSendByRobot())
.senderPersonId(request.determineSenderPersonId())
.build();
Date now = new Date();
List<MessageTask.ReceivePerson> receivePersons = JSONArray.parseArray(
JSONObject.toJSONString(request.uniqueReceivePersons()), MessageTask.ReceivePerson.class);
MessageTask messageTask = messageTaskService.create(MessageTask.builder()
.bizId(sendMessageParam.getBizId())
.bizId(request.getBizId())
.sendImAccount(sendImAccount)
.receivePersons(JSONArray.parseArray(JSONObject.toJSONString(sendMessageParam.getReceivePersons()), MessageTask.ReceivePerson.class))
.receivePersons(receivePersons)
.status(MessageTaskStatus.PENDING)
.title(sendMessageParam.getMsgHeader())
.content(sendMessageParam.getMsgContent())
.title(request.getMsgHeader() == null ? "" : request.getMsgHeader())
.content(request.getMsgContent() == null ? "" : request.getMsgContent())
.bizData(bizData)
.ext(sendMessageParam.getExt())
.ext(request.getExt())
.planStartTime(now)
.createAt(now)
.sendPriority(SendPriority.TEMPLATE_MESSAGE
.determinePriority(sendMessageParam.getSendPriority()))
.determinePriority(request.getSendPriority()))
.apiChannel(ApiChannel.COMMON_MESSAGE)
.build());
return ApiResult.ok(toMessageTaskResp(messageTask));
List<UpdatableMessageSendResult> updatableMessageSendResults = Collections.emptyList();
if (request.isUpdatable()) {
updatableMessageSendResults = updatableMessageManager.createUpdatableMessage(messageTask, request, receivePersons);
}
MessageTaskResp messageTaskResp = toMessageTaskResp(messageTask);
messageTaskResp.setUpdatableMessageSendResults(updatableMessageSendResults);
return ApiResult.ok(messageTaskResp);
}
@Override
public ApiResult<MessageUpdateResponse> updateMessage(UpdateMessageRequest request) {
log.info("updateMessage, request={}", request);
MessageUpdateResponse resp = updatableMessageManager.updateMessage(request);
return ApiResult.ok(resp);
}
@Override
public ApiResult<Void> ack(UpdatableMessageAckRequest request) {
log.info("ack, request={}", request);
updatableMessageManager.ack(request);
return ApiResult.ok();
}
@Override
public ApiResult<FetchUpdatableMessageResponse> fetchUpdatableMessage(FetchUpdatableMessageRequest request) {
log.info("fetchUpdatableMessage, request={}", request);
FetchUpdatableMessageResponse resp = updatableMessageQueryService.fetchUpdatableMessage(request);
return ApiResult.ok(resp);
}
@Override
public ApiResult<GetBizMessageIdsResponse> getBizMessageIds(GetBizMessageIdsRequest request) {
log.info("getBizMessageIds, request={}", request);
GetBizMessageIdsResponse response = updatableMessageQueryService.getBizMessageIds(request);
return ApiResult.ok(response);
}
private void check(SendMessageParam sendMessageParam) {
@ -233,6 +311,16 @@ public class MessageController implements MessageApi {
return robotImAccount;
}
private Optional<UserAccountResp> findRobotAccount(String robotId) {
AccountQuery accountQuery = new AccountQuery();
accountQuery.setAccountId(robotId);
accountQuery.setAppType(AppTypeEnum.SYSTEM.getCode());
List<UserAccountResp> accounts = accountService.queryAccountInfo(accountQuery);
if (CollectionUtils.isEmpty(accounts))
return Optional.empty();
return Optional.of(accounts.get(0));
}
public MessageTaskResp toMessageTaskResp(MessageTask messageTask) {
MessageTaskResp messageTaskResp = MessageTaskResp.builder().build();
BeanUtils.copyProperties(messageTask, messageTaskResp);

View File

@ -1,10 +0,0 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.AckRetry;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author yanglin
*/
public interface AckRetryMapper extends BaseMapper<AckRetry> {
}

View File

@ -0,0 +1,24 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.MessageUpdateRetry;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
/**
* @author yanglin
*/
public interface MessageUpdateRetryMapper extends BaseMapper<MessageUpdateRetry> {
@Delete("<script>" +
"DELETE FROM im_message_update_retry WHERE biz_message_id IN\n" +
" <foreach collection='bizMessageIds' item='bizMessageId' open='(' close=')' separator=','>\n" +
" #{bizMessageId}\n" +
" </foreach>" +
"</script>")
void deleteByBizMessageIds(
@Param("bizMessageIds") Collection<String> bizMessageIds);
}

View File

@ -0,0 +1,18 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.UpdatableMessageLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
/**
* @author yanglin
*/
public interface UpdatableMessageLogMapper extends BaseMapper<UpdatableMessageLog> {
@Delete("DELETE FROM im_updatable_message_log WHERE create_at <= #{until}")
int expunge(@Param("until") Date until);
}

View File

@ -0,0 +1,31 @@
package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.UpdatableMessage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
import java.util.List;
/**
* @author yanglin
*/
public interface UpdatableMessageMapper extends BaseMapper<UpdatableMessage> {
@Update("<script>" +
"UPDATE im_updatable_message SET data_version = data_version + 1, retry_count = 0 WHERE id IN\n" +
" <foreach collection='ids' item='id' open='(' close=')' separator=','>\n" +
" #{id}\n" +
" </foreach>\n" +
"</script>")
void incrDataVersion(@Param("ids") List<Long> ids);
@Update("<script>" +
"UPDATE im_updatable_message SET retry_count = retry_count + 1 WHERE id IN\n" +
" <foreach collection='ids' item='id' open='(' close=')' separator=','>\n" +
" #{id}\n" +
" </foreach>\n" +
"</script>")
void incrRetryCount(@Param("ids") List<Long> ids);
}

View File

@ -1,13 +0,0 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.AckRetryMapper;
import cn.axzo.im.entity.AckRetry;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
/**
* @author yanglin
*/
@Repository("ackRetryDao")
public class AckRetryDao extends ServiceImpl<AckRetryMapper, AckRetry> {
}

View File

@ -1,12 +1,14 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Repository;
import java.util.Collections;
import java.util.List;
/**
* im-center
*
@ -18,4 +20,15 @@ import org.springframework.stereotype.Repository;
@Repository("messageHistoryDao")
public class MessageHistoryDao extends ServiceImpl<MessageHistoryMapper, MessageHistory> {
/**
* 不用自带的listByIds
*/
public List<MessageHistory> getByIds(List<Long> historyIds) {
if (CollectionUtils.isEmpty(historyIds))
return Collections.emptyList();
return lambdaQuery()
.in(MessageHistory::getId, historyIds)
.list();
}
}

View File

@ -0,0 +1,13 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.MessageUpdateRetryMapper;
import cn.axzo.im.entity.MessageUpdateRetry;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
/**
* @author yanglin
*/
@Repository("messageUpdateRetryDao")
public class MessageUpdateRetryDao extends ServiceImpl<MessageUpdateRetryMapper, MessageUpdateRetry> {
}

View File

@ -0,0 +1,70 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.UpdatableMessageMapper;
import cn.axzo.im.entity.UpdatableMessage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Repository;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Repository("updatableMessageDao")
public class UpdatableMessageDao extends ServiceImpl<UpdatableMessageMapper, UpdatableMessage> {
public List<UpdatableMessage> getByNimMessageIdForUpdate(Collection<String> nimMessageIds) {
if (CollectionUtils.isEmpty(nimMessageIds))
return Collections.emptyList();
List<String> bizMessageIds = getByNimMessageIds(nimMessageIds)
.stream()
.map(UpdatableMessage::getBizMessageId)
.distinct()
.collect(toList());
// 统一使用bizMessageIds加锁, 避免死锁
return getByBizMessageIdsForUpdate(bizMessageIds);
}
public List<UpdatableMessage> getByBizMessageIdsForUpdate(Collection<String> bizMessageIds) {
if (CollectionUtils.isEmpty(bizMessageIds))
return Collections.emptyList();
// 避免死锁
List<String> sortedBizMessageIds = bizMessageIds.stream()
.sorted().collect(toList());
return lambdaQuery()
.in(UpdatableMessage::getBizMessageId, sortedBizMessageIds)
// 避免ack更新出错
.last("FOR UPDATE")
.list();
}
public List<UpdatableMessage> getByBizMessageIds(Collection<String> bizMessageIds) {
if (CollectionUtils.isEmpty(bizMessageIds))
return Collections.emptyList();
return lambdaQuery()
.in(UpdatableMessage::getBizMessageId, bizMessageIds)
.list();
}
public List<UpdatableMessage> getByNimMessageIds(Collection<String> nimMessageIds) {
if (CollectionUtils.isEmpty(nimMessageIds))
return Collections.emptyList();
return lambdaQuery()
.in(UpdatableMessage::getNimMessageId, nimMessageIds)
.list();
}
public List<UpdatableMessage> getByTaskIds(List<Long> taskIds) {
if (CollectionUtils.isEmpty(taskIds))
return Collections.emptyList();
return lambdaQuery()
.in(UpdatableMessage::getTaskId, taskIds)
.list();
}
}

View File

@ -0,0 +1,13 @@
package cn.axzo.im.dao.repository;
import cn.axzo.im.dao.mapper.UpdatableMessageLogMapper;
import cn.axzo.im.entity.UpdatableMessageLog;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Repository;
/**
* @author yanglin
*/
@Repository("updatableMessageLogDao")
public class UpdatableMessageLogDao extends ServiceImpl<UpdatableMessageLogMapper, UpdatableMessageLog> {
}

View File

@ -7,6 +7,7 @@ import lombok.Data;
*/
@Data
public class HistoryRecordExt {
private Boolean isSenderRobot;
private String sendApi;
private String sendRespDesc;
private String batchSendId;
@ -24,4 +25,10 @@ public class HistoryRecordExt {
private boolean payloadExcluded;
private String sound;
private Boolean isUpdatableMessage;
private String bizMessageId;
private Long dataVersion;
private Boolean isUpdateMessage;
private Boolean isUpdateRetry;
private Long updateRetryCount;
}

View File

@ -131,6 +131,32 @@ public class MessageHistory implements Serializable {
return recordExt;
}
public Long getDataVersion() {
return recordExt != null ? recordExt.getDataVersion() : null;
}
public boolean isUpdatableMessage() {
return recordExt != null
&& recordExt.getIsUpdatableMessage() != null
&& recordExt.getIsUpdatableMessage();
}
public boolean isUpdateMessage() {
return recordExt != null
&& recordExt.getIsUpdatableMessage() != null
&& recordExt.getIsUpdatableMessage();
}
public boolean isUpdateRetry() {
return recordExt != null
&& recordExt.getIsUpdateRetry() != null
&& recordExt.getIsUpdateRetry();
}
public Long getUpdateRetryCount() {
return recordExt == null ? null : recordExt.getUpdateRetryCount();
}
public Optional<String> determineBatchNo() {
String batchNo = this.batchNo;
// 兼容在途数据

View File

@ -1,11 +1,12 @@
package cn.axzo.im.entity;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.ExcludePushPayload;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import cn.axzo.im.config.BaseListTypeHandler;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.enums.MessageTaskStatus;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.IdType;
@ -18,11 +19,13 @@ import com.google.common.collect.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.cglib.beans.BeanMap;
import java.util.Date;
@ -116,6 +119,9 @@ public class MessageTask {
@NoArgsConstructor
@AllArgsConstructor
public static class BizData {
private TemplatedMsgType templatedMsgType = TemplatedMsgType.TEMPLATE;
private String msgTemplateId;
/**
@ -154,12 +160,21 @@ public class MessageTask {
private List<AppTypeEnum> appTypes;
private List<ExcludePushPayload> excludePushPayloads;
private Long senderPersonId = 0L;
private Boolean isSenderRobot;
public TemplatedMsgType determineTemplatedMsgType() {
return templatedMsgType == null ? TemplatedMsgType.TEMPLATE : templatedMsgType;
}
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public static class ReceivePerson {
/**
@ -189,6 +204,12 @@ public class MessageTask {
private Long workspaceId;
public Long personIdAsLong() {
if (NumberUtils.isDigits(personId))
return Long.parseLong(personId);
return 0L;
}
public String buildKey(Map<Long, Long> ouIdMap) {
if (StringUtils.isNotBlank(this.getImAccount())) {
return this.getImAccount();

View File

@ -11,16 +11,14 @@ import java.util.Date;
*/
@Setter
@Getter
@TableName(value = "im_ack_retry", autoResultMap = true)
public class AckRetry {
@TableName(value = "im_message_update_retry", autoResultMap = true)
public class MessageUpdateRetry {
private Long id;
private String bizMessageId;
private Long initHistoryId;
private Date nextRetryTime;
private Long dataVersion;
private Long retryHistoryId;
private Integer retryCount;
private Long isDelete;
private Date createAt;
private Date updateAt;

View File

@ -0,0 +1,114 @@
package cn.axzo.im.entity;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.api.vo.req.MessageUpdateInfo;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.ImAppType;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.enums.UpdatableMessageState;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.Date;
/**
* @author yanglin
*/
@Setter
@Getter
@TableName(value = "im_updatable_message", autoResultMap = true)
public class UpdatableMessage implements MessageUpdateInfo {
private Long id;
private String batchNo;
private String templateId;
private String bizId;
private Long taskId;
private String fromAccount;
private String toAccount;
private String receiverPersonId;
private Long receiverOuId;
private String senderPersonId;
private Long senderOuId;
private YesOrNo isSenderRobot;
private AppTypeEnum appType;
private TemplatedMsgType msgType;
private UpdatableMessageState state;
private String bizMessageId;
private String nimMessageId;
private Long initHistoryId;
private Long updateHistoryId;
private Long retryHistoryId;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject messageBody;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizBody;
private Long dataVersion;
private Long ackDataVersion;
private Long retryCount;
@TableField(typeHandler = FastjsonTypeHandler.class)
private RecordExt recordExt;
private Long isDelete;
private Date createAt;
private Date updateAt;
public UpdatableMessageLog toMessageLog() {
UpdatableMessageLog messageLog = new UpdatableMessageLog();
messageLog.setBizId(bizId);
messageLog.setFromAccount(fromAccount);
messageLog.setToAccount(toAccount);
messageLog.setBizMessageId(bizMessageId);
messageLog.setMessageState(state);
messageLog.setInitHistoryId(initHistoryId);
messageLog.setMessageBody(messageBody);
messageLog.setBizBody(bizBody);
messageLog.setRetryCount(retryCount);
messageLog.setDataVersion(dataVersion);
// messageLog.setContext(null);
// messageLog.setContextHistoryId(null);
return messageLog;
}
public PersonAccountAttribute parsePersonAccount() {
PersonAccountAttribute person = new PersonAccountAttribute();
person.setPersonId(receiverPersonId);
person.setOuId(receiverOuId);
person.setAppType(appType);
return person;
}
public Long senderPersonIdAsLong() {
if (NumberUtils.isDigits(senderPersonId))
return NumberUtils.toLong(senderPersonId);
return 0L;
}
public Long receiverPersonIdAsLong() {
if (NumberUtils.isDigits(receiverPersonId))
return NumberUtils.toLong(receiverPersonId);
return 0L;
}
@Override
public String bizMessageId() {
return bizMessageId;
}
@Override
public JSONObject bizBody() {
return bizBody;
}
@Getter
@Setter
public static class RecordExt {
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.entity;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.utils.IgnorePropsJsonTypeHandler;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
/**
* @author yanglin
*/
@Setter
@Getter
@TableName(value = "im_updatable_message_log", autoResultMap = true)
public class UpdatableMessageLog {
private Long id;
private String bizId;
private String fromAccount;
private String toAccount;
private String bizMessageId;
private UpdatableMessageState messageState;
private Long initHistoryId;
private String context;
private Long contextHistoryId;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject messageBody;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizBody;
private Long retryCount;
private Long dataVersion;
@TableField(typeHandler = IgnorePropsJsonTypeHandler.class)
private RecordExt recordExt;
private Long isDelete;
private Date createAt;
private Date updateAt;
@Setter
@Getter
public static class RecordExt {
private AckInfo ack;
}
@Setter
@Getter
public static class AckInfo {
private Boolean ackSuccess;
private String ackDescription;
private Long requestDataVersion;
private Long messageDataVersion;
}
}

View File

@ -0,0 +1,36 @@
package cn.axzo.im.enums;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum UpdatableMessageState {
// 已创建
TASK_CREATED(false, false),
// 消息已经放进队列
INIT_MESSAGE_QUEUED(false, false),
// 消息已经发送成功
INIT_MESSAGE_SEND_SUCCESS(true, false),
// 消息已经发送失败
INIT_MESSAGE_SEND_FAIL(false, false),
// 更新已经放进队列
UPDATE_MESSAGE_QUEUED(true, false),
// 更新已经发送成功
UPDATE_MESSAGE_SEND_SUCCESS(true, true),
// 更新已经发送失败
UPDATE_MESSAGE_SEND_FAIL(true, false),
// 更新ACK
UPDATE_ACK(true, false),
// 未找到账号
ACCOUNT_NOT_FOUND(false, false);
private final boolean isUpdateMessageAllowed;
private final boolean isUpdateAckAllowed;
}

View File

@ -97,13 +97,13 @@ public class SendExecutor<T> implements Supplier<ExecResult> {
queue().log(message, args);
}
public void scheduleRetrySend(MessageHistory history, HistoryRecordExt ext) {
scheduleRetrySend(Collections.singletonList(history), ext);
public void scheduleRetrySend(MessageHistory history, HistoryRecordExt updateExt) {
scheduleRetrySend(Collections.singletonList(history), updateExt);
}
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt ext) {
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt updateExt) {
if (CollectionUtils.isEmpty(histories)) return;
queue().scheduleRetrySend(histories, ext);
queue().scheduleRetrySend(histories, updateExt);
scheduleRetryCount.addAndGet(histories.size());
}
@ -121,22 +121,25 @@ public class SendExecutor<T> implements Supplier<ExecResult> {
sendManager.submitSetSendFail(history, failReason);
}
public void setBatchSendSuccess(
List<MessageHistory> histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
sendCount.addAndGet(histories.size());
sendManager.setBatchSendSuccess(histories, response, ext);
sendManager.setBatchSendSuccess(histories, response, updateExt);
}
public void setBatchSendSuccess(
List<MessageHistory> histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
sendCount.addAndGet(histories.size());
sendManager.setBatchSendSuccess(histories, response, ext);
sendManager.setBatchSendSuccess(histories, response, updateExt);
}
public void setSendFail(
List<MessageHistory> histories, String failReason, HistoryRecordExt ext) {
public void setSendFail(List<MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
sendCount.addAndGet(histories.size());
sendManager.setSendFail(histories, failReason, ext);
sendManager.setSendFail(histories, failReason, updateExt);
}
private static class Stage {

View File

@ -7,6 +7,7 @@ import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.service.impl.MessageHistoryServiceImpl;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.utils.DateFormatUtil;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.ImProperties.SendMessageConfig;
@ -15,6 +16,7 @@ import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Date;
@ -40,6 +42,8 @@ public class SendManager {
private final SendExec sendExec;
private final MessageHistoryMapper messageHistoryMapper;
private final MessageHistoryServiceImpl messageHistoryService;
private final UpdatableMessageManager updatableMessageManager;
private final TransactionTemplate transactionTemplate;
private final SendMessageConfig cfg;
private final Date maxCreateAt;
private final AsyncTasks<Void> asyncTasks;
@ -54,6 +58,8 @@ public class SendManager {
this.cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy();
this.messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
this.messageHistoryService = applicationContext.getBean(MessageHistoryServiceImpl.class);
this.updatableMessageManager = applicationContext.getBean(UpdatableMessageManager.class);
this.transactionTemplate = applicationContext.getBean(TransactionTemplate.class);
this.queue = new SendQueue(applicationContext, sendExec.getApiChannel());
this.sendExec = sendExec;
this.maxCreateAt = getMaxCreateAt();
@ -183,7 +189,10 @@ public class SendManager {
return update;
})
.collect(toList());
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.updateBatch(updates);
updatableMessageManager.onHistorySend(successHistories);
});
queue.setSendComplete(successHistories);
}
for (Map.Entry<String, List<MessageHistory>> e : failHistories.entrySet()) {
@ -196,28 +205,43 @@ public class SendManager {
.status(MessageHistoryStatus.FAILED)
.build())
.collect(toList());
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.updateBatch(updates);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
successHistories = new ArrayList<>();
failHistories = new HashMap<>();
}
void setBatchSendSuccess(
List<MessageHistory> histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) {
messageHistoryService.setBatchSendSuccess(histories, response, ext);
void setBatchSendSuccess(List<MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
void setBatchSendSuccess(
List<MessageHistory> histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) {
messageHistoryService.setBatchSendSuccess(histories, response, ext);
void setBatchSendSuccess(List<MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
void setSendFail(
List<MessageHistory> histories, String failReason, HistoryRecordExt ext) {
messageHistoryService.setSendFail(histories, failReason, ext);
void setSendFail(List<MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setSendFail(histories, failReason, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}

View File

@ -2,12 +2,14 @@ package cn.axzo.im.send;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.ImProperties.SendMessageConfig;
import cn.axzo.im.utils.PropsUtils;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.Getter;
@ -46,6 +48,7 @@ public class SendQueue {
private final ApiChannel apiChannel;
private final SendMessageConfig cfg;
private final MessageHistoryMapper messageHistoryMapper;
private final MessageHistoryDao messageHistoryDao;
@Getter private final BlockingQueue<String> logQueue = new ArrayBlockingQueue<>(2048);
private final LinkedList<Record> records = new LinkedList<>();
private final Date queueCreateTime = new Date();
@ -60,6 +63,7 @@ public class SendQueue {
this.apiChannel = apiChannel;
cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy();
messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
messageHistoryDao = applicationContext.getBean(MessageHistoryDao.class);
totalCount = messageHistoryMapper.selectCount(recordsQuery());
}
@ -208,7 +212,7 @@ public class SendQueue {
lastLoadEmpty = true;
}
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt ext) {
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt updateExt) {
if (CollectionUtils.isEmpty(histories)) return;
List<Long> ids = histories.stream()
.map(MessageHistory::getId)
@ -221,11 +225,15 @@ public class SendQueue {
Date newTimestamp = DateTime.now()
.plusSeconds(delaySeconds)
.toDate();
ArrayList<MessageHistory> updates = new ArrayList<>();
for (MessageHistory history : histories) {
MessageHistory update = new MessageHistory();
update.setRecordExt(ext);
updates.add(update);
update.setId(history.getId());
update.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
update.setTimestampForSend(newTimestamp);
messageHistoryMapper.update(update, query(MessageHistory.class)
.in(MessageHistory::getId, ids));
}
messageHistoryDao.updateBatchById(updates);
setSendComplete(histories);
}

View File

@ -23,7 +23,7 @@ public class SendCustomMessageJob extends SendMessageExecInstance {
}
@XxlJob("sendCustomMessageJob")
ReturnT<String> execute(String param) {
public ReturnT<String> execute(String param) {
try {
scanAndSend();
return ReturnT.SUCCESS;

View File

@ -23,7 +23,7 @@ public class SendMessageJob extends SendMessageExecInstance {
}
@XxlJob("sendMessageJob")
ReturnT<String> execute(String param) {
public ReturnT<String> execute(String param) {
try {
scanAndSend();
return ReturnT.SUCCESS;

View File

@ -17,6 +17,7 @@ import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
import cn.axzo.im.event.payload.MessageHistoryUpdatedPayload;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.utils.PropsUtils;
import cn.axzo.maokai.api.client.OrganizationalUnitApi;
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
@ -218,13 +219,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
}
@Transactional(rollbackFor = Exception.class)
public void setBatchSendSuccess(List<? extends MessageHistory> histories, BatchSendCustomMessageResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<? extends MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
ArrayList<MessageHistory> updates = new ArrayList<>(histories.size());
for (MessageHistory history : histories) {
MessageHistory update = new MessageHistory();
updates.add(update);
update.setId(history.getId());
update.setRecordExt(ext);
update.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
MessageHistoryStatus status = response.getUnregister().contains(history.getToAccount())
? MessageHistoryStatus.FAILED
: MessageHistoryStatus.SUCCEED;
@ -237,7 +240,9 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
}
@Transactional(rollbackFor = Exception.class)
public void setBatchSendSuccess(List<? extends MessageHistory> histories, MessageBatchDispatchResponse response, HistoryRecordExt ext) {
public void setBatchSendSuccess(List<? extends MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
// 发送成功的IMAccountId -> msgId
Map<String, Long> msgids = response.getMsgids();
// unregister的账号
@ -252,7 +257,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
.map(history -> {
MessageHistory messageHistory = MessageHistory.builder()
.id(history.getId())
.recordExt(ext)
.recordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt))
.build();
if (finalUnregister.contains(history.getToAccount())) {
messageHistory.setStatus(MessageHistoryStatus.FAILED);
@ -284,13 +289,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
}
@Transactional(rollbackFor = Exception.class)
public void setSendFail(List<? extends MessageHistory> histories, String failReason, HistoryRecordExt ext) {
public void setSendFail(List<? extends MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
List<MessageHistory> updates = histories.stream()
.map(e -> MessageHistory.builder()
.id(e.getId())
.result(failReason)
.status(MessageHistoryStatus.FAILED)
.recordExt(ext)
.recordExt(PropsUtils.updateProperties(e.getRecordExt(), updateExt))
.build())
.collect(toList());
this.updateBatch(updates);

View File

@ -5,8 +5,8 @@ import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.NimMsgTypeEnum;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.channel.netease.dto.Peer;
import cn.axzo.im.dao.mapper.MessageTaskMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
@ -17,6 +17,7 @@ import cn.axzo.im.service.AccountRegisterService.AccountRegisterDTO;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.utils.UUIDUtil;
import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi;
import cn.axzo.maokai.api.vo.request.OrganizationalTeamOuRelationReq;
@ -64,6 +65,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
private MessageHistoryService messageHistoryService;
@Autowired
private OrganizationalTeamOuRelationApi organizationalTeamOuRelationApi;
@Autowired
private UpdatableMessageManager updatableMessageManager;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@ -88,6 +91,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
@Override
@Transactional
public void createMessageHistory(MessageTask messageTask) {
this.update(UpdateMessageTaskParam.builder()
@ -190,6 +194,10 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.map(receivePerson -> resolveMessageHistory(batchNo, messageTask, receivePerson, imAccounts, accountRegisters, ouIdMap))
.collect(Collectors.toList());
messageHistoryService.createBatch(messageHistories);
List<Long> historyIds = messageHistories.stream()
.map(MessageHistory::getId)
.collect(Collectors.toList());
updatableMessageManager.onHistoryCreated(historyIds);
}
private MessageHistory resolveMessageHistory(String batchNo,
@ -238,6 +246,9 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
}
MessageTask.BizData bizData = messageTask.getBizData();
if (bizData != null) {
messageHistory.getOrCreateRecordExt().setIsSenderRobot(bizData.getIsSenderRobot());
}
List<ExcludePushPayload> excludePushPayloads = bizData.getExcludePushPayloads() == null
? Collections.emptyList()
@ -339,14 +350,19 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
}
private String resolveBody(MessageTask.ReceivePerson receivePerson, MessageTask messageTask, String appType) {
MessageTask.BizData bizData = messageTask.getBizData();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgType(bizData.determineTemplatedMsgType().getCode());
messageBody.setMsgContent(messageTask.getContent());
messageBody.setMsgHeader(messageTask.getTitle());
messageBody.setPeer(new Peer());
messageBody.getPeer().setSenderRobot(bizData.getIsSenderRobot());
messageBody.getPeer().setSenderPersonId(bizData.getSenderPersonId());
messageBody.getPeer().setReceiverPersonId(receivePerson.personIdAsLong());
Map<String, String> defaultExtMap = Maps.newHashMap();
MessageTask.BizData bizData = messageTask.getBizData();
if (StringUtils.isNotBlank(bizData.getMsgTemplateContent())) {
messageBody.setMsgBody(bizData.getMsgTemplateContent());
defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId());

View File

@ -0,0 +1,27 @@
package cn.axzo.im.updatable;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
/**
* @author yanglin
*/
public class AddUpdateHistoryResult {
private final Map<Long, Long> messageId2HistoryId = new IdentityHashMap<>();
public Optional<Long> findHistoryId(Long messageId) {
return Optional.ofNullable(messageId2HistoryId.get(messageId));
}
public void addHistoryId(Long messageId, Long historyId) {
messageId2HistoryId.put(messageId, historyId);
}
public Collection<Long> getMessageIds() {
return messageId2HistoryId.keySet();
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.im.updatable;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.List;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public class HistoryAndMessage {
private final List<MessageHistory> histories;
private final UpdatableMessage message;
public boolean hasDataVersionMatchHistories() {
return histories.stream()
.anyMatch(history -> history.getDataVersion().equals(message.getDataVersion()));
}
}

View File

@ -0,0 +1,61 @@
package cn.axzo.im.updatable;
import cn.axzo.basics.common.constant.enums.CodeDefinition;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
class InitHistories {
private final Map<HistoryTaskAccount, MessageHistory> account2histories;
InitHistories(List<MessageHistory> histories) {
this.account2histories = histories.stream()
.filter(history -> {
AppTypeEnum appType = CodeDefinition.findByCode(
AppTypeEnum.class, history.getAppType()).orElse(null);
return appType != null;
})
.collect(toMap(history -> {
PersonAccountAttribute person = new PersonAccountAttribute();
person.setPersonId(history.getReceivePersonId());
person.setOuId(history.getReceiveOuId());
AppTypeEnum appType = CodeDefinition.findByCode(
AppTypeEnum.class, history.getAppType()).orElse(null);
person.setAppType(appType);
return new HistoryTaskAccount(history.getImMessageTaskId(), person);
}, identity(), (oldValue, newValue) -> oldValue));
}
public Optional<MessageHistory> findHistory(UpdatableMessage message) {
HistoryTaskAccount account = new HistoryTaskAccount(
message.getTaskId(), message.parsePersonAccount());
return Optional.ofNullable(account2histories.get(account));
}
@Setter
@Getter
@RequiredArgsConstructor
// IMPORTANT: 不要删除这个注解
@EqualsAndHashCode
private static class HistoryTaskAccount {
final Long taskId;
final PersonAccountAttribute person;
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.im.updatable;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
/**
* @author yanglin
*/
@Getter
public class MessageBodyJsonObject {
private final JSONObject messageBody;
private final JSONObject bizBody;
public MessageBodyJsonObject(String json) {
messageBody = JSON.parseObject(json);
String bizBodyJson = messageBody.getString("msgBody");
if (StringUtils.isBlank(bizBodyJson))
bizBodyJson = messageBody.getString("payload");
bizBody = JSON.parseObject(bizBodyJson);
}
}

View File

@ -0,0 +1,341 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest;
import cn.axzo.im.center.api.vo.req.UpdatableMessageAckRequest.Acknowledgment;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse.NonUpdateMessageReason;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.entity.MessageTask.ReceivePerson;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import cn.axzo.im.updatable.handler.InitMessageHandler;
import cn.axzo.im.updatable.handler.UpdateMessageHandler;
import cn.axzo.im.updatable.retry.MessageUpdateRetryService;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.PropsUtils;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class UpdatableMessageManager {
private final UpdatableMessageDao updatableMessageDao;
private final MessageHistoryDao messageHistoryDao;
private final InitMessageHandler initMessageHandler;
private final UpdateMessageHandler updateMessageHandler;
private final MessageUpdateRetryService messageUpdateRetryService;
private final UpdateSupport updateSupport;
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
private final TransactionTemplate transactionTemplate;
private final ImProperties props;
// !! schedule in task
public List<UpdatableMessageSendResult> createUpdatableMessage(
MessageTask task, SendTemplateMessageParam request, List<ReceivePerson> receivePersons) {
if (CollectionUtils.isEmpty(receivePersons)) return Collections.emptyList();
String batchNo = UUIDUtil.uuidString();
ArrayList<UpdatableMessageSendResult> sendResults = new ArrayList<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (ReceivePerson person : receivePersons) {
UpdatableMessage message = new UpdatableMessage();
collector.addMessage(message);
message.setBatchNo(batchNo);
message.setTemplateId(request.getMsgTemplateId());
message.setBizId(request.getBizId());
message.setTaskId(task.getId());
message.setReceiverPersonId(person.getPersonId());
message.setReceiverOuId(person.getOuId());
message.setAppType(person.getAppType());
message.setMsgType(request.getTemplatedMsgType());
message.setState(UpdatableMessageState.TASK_CREATED);
message.setBizMessageId(UUIDUtil.uuidString());
message.setDataVersion(1L);
message.setSenderPersonId(request.determineSenderPersonId() + "0");
message.setSenderOuId(request.determineSenderOuId());
message.setIsSenderRobot(request.isSendByRobot() ? YesOrNo.YES : YesOrNo.NO);
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog.setContext("scheduleInTask");
UpdatableMessageSendResult sendResult = new UpdatableMessageSendResult();
sendResults.add(sendResult);
sendResult.setBizMessageId(message.bizMessageId());
sendResult.setAccount(message.parsePersonAccount());
}
collector.finish();
return sendResults;
}
// !! init history created
public void onHistoryCreated(List<Long> historyIds) {
if (CollectionUtils.isEmpty(historyIds)) return;
List<MessageHistory> histories = messageHistoryDao.getByIds(historyIds);
List<Long> taskIds = histories.stream()
.map(MessageHistory::getImMessageTaskId)
.distinct()
.collect(toList());
List<UpdatableMessage> messages = updatableMessageDao.getByTaskIds(taskIds);
log.info("onHistoryCreated, taskIdSize={}, messageSize={}", taskIds.size(), messages.size());
InitHistories initHistories = new InitHistories(histories);
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (UpdatableMessage message : messages) {
MessageHistory history = initHistories.findHistory(message).orElse(null);
if (history == null) continue;
MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody());
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setInitHistoryId(history.getId());
messageUpdate.setMessageBody(object.getMessageBody());
messageUpdate.setBizBody(object.getBizBody());
messageUpdate.setFromAccount(history.getFromAccount());
messageUpdate.setToAccount(history.getToAccount());
messageUpdate.setState(history.getStatus() == MessageHistoryStatus.PENDING
? UpdatableMessageState.INIT_MESSAGE_QUEUED
: UpdatableMessageState.ACCOUNT_NOT_FOUND);
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog.setBizId(history.getBizId());
messageLog.setFromAccount(history.getFromAccount());
messageLog.setToAccount(history.getToAccount());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(history.getId());
messageLog.setMessageBody(object.getMessageBody());
messageLog.setBizBody(object.getBizBody());
messageLog.setContext("initHistoryCreated");
messageLog.setContextHistoryId(history.getId());
MessageHistory historyUpdate = new MessageHistory();
collector.updateHistory(historyUpdate);
historyUpdate.setId(history.getId());
MessageBody messageBody = JSON.parseObject(history.getMessageBody(), MessageBody.class);
messageBody.setBizMessageId(message.bizMessageId());
messageBody.setDataVersion(message.getDataVersion());
historyUpdate.setMessageBody(JSON.toJSONString(messageBody));
HistoryRecordExt updateExt = new HistoryRecordExt();
updateExt.setIsUpdatableMessage(true);
updateExt.setBizMessageId(message.bizMessageId());
updateExt.setDataVersion(message.getDataVersion());
updateExt.setIsUpdateMessage(false);
updateExt.setIsUpdateRetry(false);
updateExt.setUpdateRetryCount(message.getRetryCount());
historyUpdate.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
}
collector.finish();
}
// !! update message
public MessageUpdateResponse updateMessage(UpdateMessageRequest request) {
BizAssertions.assertNotEmpty(request.getUpdates(), "更新消息不能为空");
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIds(collectBizMessageIds(request.getUpdates()));
BizAssertions.assertNotEmpty(messages, "未找到任何需要更新的消息");
MessageUpdateResponse response = new MessageUpdateResponse();
List<List<UpdateMessageRequest.Update>> batches = Lists.partition(
request.getUpdates(), props.getUpdatableMessageMaxLockRecords());
for (List<UpdateMessageRequest.Update> batch : batches)
transactionTemplate.executeWithoutResult(unused -> updateMessageImpl(batch, response));
return response;
}
private void updateMessageImpl(List<UpdateMessageRequest.Update> requestUpdates,
MessageUpdateResponse response) {
List<UpdatableMessage> requestMessages = updatableMessageDao
.getByBizMessageIdsForUpdate(collectBizMessageIds(requestUpdates));
Map<String, UpdatableMessage> bizMessageId2Message = requestMessages.stream()
.collect(toMap(UpdatableMessage::bizMessageId, identity()));
List<UpdateMessageRequest.Update> validUpdates = new ArrayList<>();
List<Long> updateIds = new ArrayList<>();
for (UpdateMessageRequest.Update update : requestUpdates) {
UpdatableMessage message = bizMessageId2Message.get(update.getBizMessageId());
if (message == null) {
response.addNonUpdatedMessage(
update.getBizMessageId(),
NonUpdateMessageReason.CANT_FIND_INIT_MESSAGE);
} else if (!message.getState().isUpdateMessageAllowed()) {
response.addNonUpdatedMessage(
update.getBizMessageId(),
NonUpdateMessageReason.MESSAGE_STATE_NOT_ALLOWED,
message.getState());
} else {
validUpdates.add(update);
updateIds.add(message.getId());
response.addUpdatedBizMessageId(message.getBizMessageId());
}
}
if (!validUpdates.isEmpty()) {
// incr data version + reset retry count
updatableMessageDao.getBaseMapper().incrDataVersion(updateIds);
messageUpdateRetryService.scheduleNextRetry(updateIds);
AddUpdateHistoryResult result = updateSupport
.addUpdateHistories("updateHistoryCreated", validUpdates);
updateSupport.updateHistoryId(result, UpdatableMessage::setUpdateHistoryId);
}
}
// !! init & update message sent
@Transactional
public void onHistorySend(List<MessageHistory> maybeUpdatedHistory) {
List<Long> historyIds = maybeUpdatedHistory.stream()
.filter(MessageHistory::isUpdatableMessage)
.map(MessageHistory::getId)
.collect(toList());
if (CollectionUtils.isEmpty(historyIds)) return;
List<MessageHistory> histories = messageHistoryDao.getByIds(historyIds);
// 避免ack更新出错
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIdsForUpdate(histories.stream()
.map(h -> h.getRecordExt().getBizMessageId())
.distinct()
.sorted()
.collect(toList()));
HashMap<Long, UpdatableMessage> id2Messages = new HashMap<>();
messages.forEach(message -> id2Messages.put(message.getId(), message));
BiFunction<Boolean, Boolean, List<HistoryAndMessage>> historyAndMessageBuilder =
(isUpdateMessage, isSuccess) -> {
List<HistoryAndMessage> historyAndMessages = new ArrayList<>();
for (UpdatableMessage message : new ArrayList<>(id2Messages.values())) {
List<MessageHistory> stateHistories = histories.stream()
.filter(history -> isSuccess
? history.getStatus() == MessageHistoryStatus.SUCCEED
: history.getStatus() == MessageHistoryStatus.FAILED)
.filter(history -> {
boolean isHistoryUpdateMessage = history.getRecordExt().getIsUpdateMessage();
return isUpdateMessage && isHistoryUpdateMessage
|| !isUpdateMessage && !isHistoryUpdateMessage;
})
.collect(toList());
if (stateHistories.isEmpty())
continue;
// 一次只会处理一种情况, 避免无用的遍历
id2Messages.remove(message.getId());
historyAndMessages.add(new HistoryAndMessage(stateHistories, message));
}
return historyAndMessages;
};
// send success
List<HistoryAndMessage> sendSuccess = historyAndMessageBuilder.apply(false, true);
if (!sendSuccess.isEmpty())
initMessageHandler.onSuccess(sendSuccess);
// send fail
List<HistoryAndMessage> sendFail = historyAndMessageBuilder.apply(false, false);
if (!sendFail.isEmpty())
initMessageHandler.onFail(sendFail);
// update success
List<HistoryAndMessage> updateSuccess = historyAndMessageBuilder.apply(true, true);
if (!updateSuccess.isEmpty())
updateMessageHandler.onSuccess(updateSuccess);
// update fail
List<HistoryAndMessage> updateFail = historyAndMessageBuilder.apply(true, false);
if (!updateFail.isEmpty())
updateMessageHandler.onFail(updateFail);
}
@Transactional
public void ack(UpdatableMessageAckRequest request) {
List<Acknowledgment> acknowledgments = request.determineValidAcknowledgments();
if (CollectionUtils.isEmpty(acknowledgments)) return;
List<List<Acknowledgment>> batches = Lists.partition(
acknowledgments, props.getUpdatableMessageMaxLockRecords());
for (List<Acknowledgment> batch : batches)
transactionTemplate.executeWithoutResult(unused -> ackImpl(batch));
}
private void ackImpl(List<Acknowledgment> acknowledgments) {
List<String> nimMessageIds = acknowledgments.stream()
.map(Acknowledgment::getNimMessageId)
.collect(toList());
Map<String, Acknowledgment> nimMessageId2Ack = acknowledgments.stream()
.collect(toMap(Acknowledgment::getNimMessageId, identity()));
List<UpdatableMessage> messages = updatableMessageDao.getByNimMessageIdForUpdate(nimMessageIds);
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (UpdatableMessage message : messages) {
Long dataVersion = nimMessageId2Ack.get(message.getNimMessageId()).getDataVersion();
if (dataVersion == null) dataVersion = -1L;
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog.setContext("ack");
messageLog.setContextHistoryId(0L);
messageLog.setRetryCount(0L);
messageLog.setMessageBody(null);
messageLog.setBizBody(null);
messageLog.setDataVersion(dataVersion);
UpdatableMessageLog.RecordExt logExt = new UpdatableMessageLog.RecordExt();
messageLog.setRecordExt(logExt);
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
Long ackDataVersion = dataVersion >= message.getAckDataVersion()
? dataVersion
: message.getAckDataVersion();
messageUpdate.setAckDataVersion(ackDataVersion);
UpdatableMessageLog.AckInfo ackInfo = new UpdatableMessageLog.AckInfo();
logExt.setAck(ackInfo);
if (!dataVersion.equals(message.getDataVersion())) {
ackInfo.setAckSuccess(false);
ackInfo.setAckDescription("数据版本不匹配");
ackInfo.setRequestDataVersion(dataVersion);
ackInfo.setMessageDataVersion(message.getDataVersion());
continue;
}
// 避免前端有bug
if (!message.getState().isUpdateAckAllowed()) {
ackInfo.setAckSuccess(false);
ackInfo.setAckDescription("消息状态不允许ack");
continue;
}
ackInfo.setAckSuccess(true);
messageUpdate.setState(UpdatableMessageState.UPDATE_ACK);
}
collector.finish();
}
}

View File

@ -0,0 +1,52 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.vo.req.FetchUpdatableMessageRequest;
import cn.axzo.im.center.api.vo.req.GetBizMessageIdsRequest;
import cn.axzo.im.center.api.vo.resp.FetchUpdatableMessageResponse;
import cn.axzo.im.center.api.vo.resp.GetBizMessageIdsResponse;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class UpdatableMessageQueryService {
private final UpdatableMessageDao updatableMessageDao;
private final ImProperties props;
public FetchUpdatableMessageResponse fetchUpdatableMessage(FetchUpdatableMessageRequest request) {
int maxSize = props.getUpdatable().getFetchMaxSize();
BizAssertions.assertTrue(request.getNimMessageIds().size() <= maxSize,
"超过了最大的消息数量, 最大: {}", maxSize);
List<UpdatableMessage> messages = updatableMessageDao
.getByNimMessageIds(request.getNimMessageIds());
FetchUpdatableMessageResponse response = new FetchUpdatableMessageResponse();
for (UpdatableMessage message : messages) {
FetchUpdatableMessageResponse.MessageInfo imMessage = new FetchUpdatableMessageResponse.MessageInfo();
imMessage.setMsgType(message.getMsgType().getCode());
imMessage.setMsgBody(message.getBizBody().toJSONString());
imMessage.setBizMessageId(message.getBizMessageId());
imMessage.setDataVersion(message.getDataVersion());
response.addMessage(imMessage);
}
return response;
}
public GetBizMessageIdsResponse getBizMessageIds(GetBizMessageIdsRequest request) {
List<UpdatableMessage> messages = updatableMessageDao.getByNimMessageIds(request.getNimMessageIds());
GetBizMessageIdsResponse response = new GetBizMessageIdsResponse();
for (UpdatableMessage message : messages)
response.addMessage(message.getNimMessageId(), message.getBizMessageId());
return response;
}
}

View File

@ -0,0 +1,146 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.feign.SendPriority;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.MessageUpdateInfo;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageCustomBody;
import cn.axzo.im.channel.netease.dto.Peer;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class UpdateSupport {
private final UpdatableMessageDao updatableMessageDao;
private final IMChannelProvider imChannel;
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
public AddUpdateHistoryResult addUpdateHistories(
String context, List<? extends MessageUpdateInfo> updates) {
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIds(collectBizMessageIds(updates));
Map<String, UpdatableMessage> bizMessageId2Message = messages
.stream().collect(toMap(UpdatableMessage::getBizMessageId, identity()));
String batchNo = UUIDUtil.uuidString();
Map<UpdatableMessageLog, MessageHistory> messageLog2History = new IdentityHashMap<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (MessageUpdateInfo update : updates) {
UpdatableMessage message = bizMessageId2Message.get(update.bizMessageId());
if (message == null) continue;
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setState(UpdatableMessageState.UPDATE_MESSAGE_QUEUED);
MessageHistory history = new MessageHistory();
collector.addHistory(history);
history.setBizId(message.getBizId());
history.setFromAccount(message.getFromAccount());
history.setToAccount(message.getToAccount());
history.setAppType(message.getAppType().getCode());
history.setChannel(imChannel.getProviderType());
MessageCustomBody messageBody = new MessageCustomBody();
messageBody.setToImAccount(message.getToAccount());
messageBody.setPersonId(message.getReceiverPersonId());
messageBody.setBizType(BizTypeEnum.MESSAGE_UPDATE);
messageBody.setPayload("{}");
messageBody.setBizMessageId(message.bizMessageId());
messageBody.setDataVersion(messageUpdate.getDataVersion());
messageBody.setInitMessageId(message.getNimMessageId());
messageBody.setMsgType(message.getMsgType());
messageBody.setPeer(new Peer());
messageBody.getPeer().setSenderRobot(message.getIsSenderRobot() == YesOrNo.YES);
messageBody.getPeer().setSenderPersonId(message.senderPersonIdAsLong());
messageBody.getPeer().setReceiverPersonId(message.receiverPersonIdAsLong());
history.setMessageBody(JSON.toJSONString(messageBody));
messageUpdate.setMessageBody(JSON.parseObject(history.getMessageBody()));
messageUpdate.setBizBody(update.bizBody());
history.setImMessageTaskId(0L);
history.setReceivePersonId(message.getReceiverPersonId());
history.setReceiveOuId(message.getReceiverOuId());
history.setStatus(MessageHistoryStatus.PENDING);
history.setBatchNo(batchNo);
history.setSendPriority(SendPriority.UPDATE_MESSAGE.getPriority());
history.setApiChannel(ApiChannel.CUSTOM_MESSAGE);
HistoryRecordExt recordExt = new HistoryRecordExt();
recordExt.setIsUpdatableMessage(true);
recordExt.setBizMessageId(message.bizMessageId());
recordExt.setDataVersion(message.getDataVersion());
recordExt.setIsUpdateMessage(true);
Long retryCount = message.getRetryCount();
recordExt.setIsUpdateRetry(retryCount > 0);
recordExt.setUpdateRetryCount(retryCount);
history.setRecordExt(recordExt);
history.setTimestampForSend(new Date());
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog2History.put(messageLog, history);
messageLog.setDataVersion(message.getDataVersion());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setMessageBody(messageUpdate.getMessageBody());
messageLog.setBizBody(messageUpdate.getBizBody());
messageLog.setRetryCount(message.getRetryCount());
}
collector.finishAddHistory();
for (UpdatableMessageLog messageLog : collector.getUpdatableMessageLogsToAdd()) {
MessageHistory history = messageLog2History.get(messageLog);
messageLog.setContext(context);
messageLog.setContextHistoryId(history.getId());
}
collector.finish();
AddUpdateHistoryResult result = new AddUpdateHistoryResult();
for (int i = 0; i < messages.size(); i++) {
Long messageId = messages.get(i).getId();
MessageHistory history = collector.getMessageHistoriesToAdd().get(i);
result.addHistoryId(messageId, history.getId());
}
return result;
}
public void updateHistoryId(AddUpdateHistoryResult result,
BiConsumer<UpdatableMessage, Long> historyIdSetter) {
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (Long messageId : result.getMessageIds()) {
result.findHistoryId(messageId).ifPresent(historyId -> {
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdate.setId(messageId);
historyIdSetter.accept(messageUpdate, historyId);
collector.updateMessage(messageUpdate);
});
}
collector.finish();
}
}

View File

@ -0,0 +1,102 @@
package cn.axzo.im.updatable.collector;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import com.google.common.collect.Lists;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
/**
* @author yanglin
*/
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
public class CardManipulateCollector {
private static final int BATCH_SIZE = 1000;
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
private final MessageHistoryDao messageHistoryDao;
private final State<UpdatableMessage> updatableMessagesToAdd = new State<>();
private final State<UpdatableMessage> updatableMessagesToUpdate = new State<>();
private final State<UpdatableMessageLog> updatableMessageLogsToAdd = new State<>();
private final State<MessageHistory> messageHistoriesToAdd = new State<>();
private final State<MessageHistory> messageHistoriesToUpdate = new State<>();
public List<UpdatableMessageLog> getUpdatableMessageLogsToAdd() {
return updatableMessageLogsToAdd.elementsOrEmpty();
}
public List<MessageHistory> getMessageHistoriesToAdd() {
return messageHistoriesToAdd.elementsOrEmpty();
}
public void addMessage(UpdatableMessage message) {
updatableMessagesToAdd.add(message);
}
public void updateMessage(UpdatableMessage message) {
updatableMessagesToUpdate.add(message);
}
public void addLog(UpdatableMessageLog log) {
updatableMessageLogsToAdd.add(log);
}
public void addHistory(MessageHistory history) {
messageHistoriesToAdd.add(history);
}
public void updateHistory(MessageHistory history) {
messageHistoriesToUpdate.add(history);
}
public void finishAddHistory() {
messageHistoriesToAdd.batched(messageHistoryDao::saveBatch);
}
public void finish() {
updatableMessagesToAdd.batched(updatableMessageDao::saveBatch);
updatableMessagesToUpdate.batched(updatableMessageDao::updateBatchById);
updatableMessageLogsToAdd.batched(updatableMessageLogDao::saveBatch);
messageHistoriesToAdd.batched(messageHistoryDao::saveBatch);
messageHistoriesToUpdate.batched(messageHistoryDao::updateBatchById);
}
private static class State<T> {
private boolean finished;
private List<T> elements;
void add(T element) {
if (elements == null)
elements = new ArrayList<>();
elements.add(element);
}
List<T> elementsOrEmpty() {
return elements == null ? Collections.emptyList() : elements;
}
void batched(Consumer<List<T>> action) {
if (CollectionUtils.isEmpty(elements))
return;
if (finished)
return;
Lists.partition(elements, BATCH_SIZE).forEach(action);
finished = true;
}
}
}

View File

@ -0,0 +1,27 @@
package cn.axzo.im.updatable.collector;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class CardManipulateCollectorFactory {
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
private final MessageHistoryDao messageHistoryDao;
public CardManipulateCollector create() {
return new CardManipulateCollector(
updatableMessageDao,
updatableMessageLogDao,
messageHistoryDao);
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.updatable.handler;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.HistoryAndMessage;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class InitMessageHandler implements StateHandler {
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, true);
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, false);
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
UpdatableMessage message = historyAndMessage.getMessage();
MessageHistory history = historyAndMessage.getHistories().get(0);
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
if (isSuccess)
messageUpdate.setNimMessageId(history.getMessageId());
messageUpdate.setState(isSuccess
? UpdatableMessageState.INIT_MESSAGE_SEND_SUCCESS
: UpdatableMessageState.INIT_MESSAGE_SEND_FAIL);
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(history.getId());
messageLog.setContext("sendInitMessage");
messageLog.setContextHistoryId(history.getId());
}
collector.finish();
}
}

View File

@ -0,0 +1,16 @@
package cn.axzo.im.updatable.handler;
import cn.axzo.im.updatable.HistoryAndMessage;
import java.util.List;
/**
* @author yanglin
*/
public interface StateHandler {
void onSuccess(List<HistoryAndMessage> historyAndMessages);
void onFail(List<HistoryAndMessage> historyAndMessages);
}

View File

@ -0,0 +1,69 @@
package cn.axzo.im.updatable.handler;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.updatable.HistoryAndMessage;
import cn.axzo.im.updatable.MessageBodyJsonObject;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class UpdateMessageHandler implements StateHandler {
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, true);
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, false);
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
UpdatableMessageState state = historyAndMessage.getMessage().getState();
UpdatableMessage message = historyAndMessage.getMessage();
boolean queuedOrFail = state == UpdatableMessageState.UPDATE_MESSAGE_QUEUED
|| state == UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL;
if (queuedOrFail && historyAndMessage.hasDataVersionMatchHistories()) {
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(historyAndMessage.getMessage().getId());
messageUpdate.setState(isSuccess
? UpdatableMessageState.UPDATE_MESSAGE_SEND_SUCCESS
: UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL);
}
for (MessageHistory history : historyAndMessage.getHistories()) {
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog.setMessageState(isSuccess
? UpdatableMessageState.UPDATE_MESSAGE_SEND_SUCCESS
: UpdatableMessageState.UPDATE_MESSAGE_SEND_FAIL);
messageLog.setContextHistoryId(history.getId());
MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody());
messageLog.setMessageBody(object.getMessageBody());
messageLog.setBizBody(object.getBizBody());
messageLog.setDataVersion(history.getDataVersion());
messageLog.setContextHistoryId(history.getId());
messageLog.setContext(history.isUpdateRetry() ? "retrySendUpdateMessage" : "sendUpdateMessage");
messageLog.setRetryCount(history.getUpdateRetryCount());
}
}
collector.finish();
}
}

View File

@ -0,0 +1,58 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.dao.mapper.UpdatableMessageLogMapper;
import cn.axzo.im.entity.SendJobInfo;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.utils.DateFormatUtil;
import cn.axzo.im.utils.JSONObjectUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
import java.util.Date;
import static cn.axzo.im.utils.Queries.query;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ExpungeUpdatableMessageLogJob {
private final UpdatableMessageLogMapper updatableMessageLogMapper;
@XxlJob("expungeUpdatableMessageLogJob")
public ReturnT<String> execute(String paramStr) throws Exception {
log.info("start - run job with param={}", paramStr);
try {
Param param = StringUtils.isBlank(paramStr)
? new Param() :
JSONObjectUtil.parseObject(paramStr, Param.class);
expunge(param);
log.info("end - run job with param={}", param);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.warn("job failed. param={}", paramStr, e);
return ReturnT.FAIL;
}
}
private void expunge(Param param) {
Date until = DateTime.now().minusDays(param.daysAgo).toDate();
log.info("going to delete until={}", DateFormatUtil.toReadableString(until));
int count = updatableMessageLogMapper.expunge(until);
log.info("deleted count={}", count);
}
@Data
public static class Param {
private int daysAgo = 7;
}
}

View File

@ -0,0 +1,68 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.dao.repository.MessageUpdateRetryDao;
import cn.axzo.im.entity.MessageUpdateRetry;
import cn.axzo.maokai.api.util.Ref;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageUpdateRetryJob {
private final MessageUpdateRetryDao messageUpdateRetryDao;
private final MessageUpdateRetryService messageUpdateRetryService;
@XxlJob("messageUpdateRetryJob")
public ReturnT<String> execute(String paramStr) throws Exception {
try {
log.info("start - run job with param={}", paramStr);
executeImpl();
log.info("end - run job with param={}", paramStr);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.warn("job failed. param={}", paramStr, e);
return ReturnT.FAIL;
}
}
private void executeImpl() {
Supplier<List<MessageUpdateRetry>> cursor = retryCursor();
for (List<MessageUpdateRetry> retries = cursor.get(); !retries.isEmpty(); retries = cursor.get()) {
List<String> bizMessageIds = retries.stream()
.map(MessageUpdateRetry::getBizMessageId)
.collect(toList());
messageUpdateRetryService.advanceRetry(bizMessageIds);
}
}
private Supplier<List<MessageUpdateRetry>> retryCursor() {
Ref<Long> maxId = Ref.create(0L);
return () -> {
List<MessageUpdateRetry> retries = messageUpdateRetryDao.lambdaQuery()
.le(MessageUpdateRetry::getNextRetryTime, new Date())
.gt(MessageUpdateRetry::getId, maxId.get())
.orderByAsc(MessageUpdateRetry::getId)
.last("LIMIT 1000")
.list();
if (!retries.isEmpty())
maxId.set(retries.get(retries.size() - 1).getId());
return retries;
};
}
}

View File

@ -0,0 +1,100 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.dao.repository.MessageUpdateRetryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.entity.MessageUpdateRetry;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.updatable.AddUpdateHistoryResult;
import cn.axzo.im.updatable.UpdateSupport;
import cn.axzo.im.updatable.collector.CardManipulateCollector;
import cn.axzo.im.updatable.collector.CardManipulateCollectorFactory;
import cn.axzo.im.utils.ImProperties;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Service
@RequiredArgsConstructor
public class MessageUpdateRetryService {
private final MessageUpdateRetryDao messageUpdateRetryDao;
private final UpdatableMessageDao updatableMessageDao;
private final UpdateSupport updateSupport;
private final TransactionTemplate transactionTemplate;
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
private final ImProperties props;
public void removePrevRetryByBizMessageIds(Collection<String> bizMessageIds) {
if (CollectionUtils.isNotEmpty(bizMessageIds))
messageUpdateRetryDao.getBaseMapper().deleteByBizMessageIds(bizMessageIds);
}
public void advanceRetry(List<String> bizMessageIds) {
List<List<String>> batches = Lists.partition(
bizMessageIds, props.getUpdatableMessageMaxLockRecords());
for (List<String> batch : batches)
transactionTemplate.executeWithoutResult(unused -> advanceRetryImpl(batch));
}
private void advanceRetryImpl(List<String> bizMessageIds) {
removePrevRetryByBizMessageIds(bizMessageIds);
List<UpdatableMessage> messages = updatableMessageDao.getByBizMessageIdsForUpdate(bizMessageIds);
UpdateAckState state = new UpdateAckState(messages, props.getMessageUpdateAckMaxRetryCount());
// 先发本次生试, 再调度下一次重试
retryUpdateMessage(state.getNonAckMessageIds());
scheduleNextRetry(state.getScheduleNextRetryMessages());
}
public void scheduleNextRetry(List<Long> messageIds) {
if (CollectionUtils.isEmpty(messageIds)) return;
List<UpdatableMessage> messages = updatableMessageDao.listByIds(messageIds);
List<String> bizMessageIds = messages.stream()
.map(UpdatableMessage::getBizMessageId)
.collect(toList());
removePrevRetryByBizMessageIds(bizMessageIds);
ArrayList<MessageUpdateRetry> retries = new ArrayList<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
Date nextRetryTime = DateTime.now()
.plusSeconds(props.getMessageUpdateAckRetryIntervalSeconds())
.toDate();
for (UpdatableMessage message : messages) {
MessageUpdateRetry retry = new MessageUpdateRetry();
retry.setBizMessageId(message.bizMessageId());
retry.setInitHistoryId(message.getInitHistoryId());
retry.setNextRetryTime(nextRetryTime);
retry.setDataVersion(message.getDataVersion());
retries.add(retry);
UpdatableMessageLog messageLog = message.toMessageLog();
collector.addLog(messageLog);
messageLog.setRetryCount(message.getRetryCount() + 1);
messageLog.setContext("scheduleNextRetrySendUpdateMessage");
}
messageUpdateRetryDao.saveBatch(retries);
collector.finish();
}
public void retryUpdateMessage(List<Long> messageIds) {
if (CollectionUtils.isEmpty(messageIds)) return;
updatableMessageDao.getBaseMapper().incrRetryCount(messageIds);
List<UpdatableMessage> messages = updatableMessageDao.listByIds(messageIds);
AddUpdateHistoryResult result = updateSupport
.addUpdateHistories("retryUpdateHistoryCreated", messages);
updateSupport.updateHistoryId(result, UpdatableMessage::setRetryHistoryId);
}
}

View File

@ -0,0 +1,35 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.enums.UpdatableMessageState;
import lombok.RequiredArgsConstructor;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@RequiredArgsConstructor
public class UpdateAckState {
private final List<UpdatableMessage> messages;
private final int maxRetryCount;
List<Long> getScheduleNextRetryMessages() {
return messages.stream()
.filter(message -> message.getRetryCount() < maxRetryCount)
.filter(message -> message.getState() != UpdatableMessageState.UPDATE_ACK)
.map(UpdatableMessage::getId)
.collect(toList());
}
List<Long> getNonAckMessageIds() {
return messages.stream()
.filter(message -> message.getState() != UpdatableMessageState.UPDATE_ACK)
.map(UpdatableMessage::getId)
.collect(toList());
}
}

View File

@ -3,6 +3,8 @@ package cn.axzo.im.utils;
import cn.axzo.basics.common.BeanMapper;
import com.google.common.collect.Sets;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
@ -28,8 +30,16 @@ public class ImProperties {
private String controllerToken = "123442";
private int messageUpdateAckMaxRetryCount = 3;
private int messageUpdateAckRetryIntervalSeconds = 10;
private int updatableMessageMaxLockRecords = 5;
private SendMessageConfig sendMessage = new SendMessageConfig();
private UpdatableMessageConfig updatable = new UpdatableMessageConfig();
private Set<String> genImAccountJobCodes = Sets.newHashSet(
// 企业班组长, 项目班组长
"entTeamLeader", "projTeamLeader",
@ -40,6 +50,14 @@ public class ImProperties {
// 带班长
"projectTeamManager");
@Setter
@Getter
public static class UpdatableMessageConfig {
private int fetchMaxSize = 500;
}
@Data
public static class SendMessageConfig {
private int memoryQueueQueryLimitSize = 1000;

View File

@ -0,0 +1,36 @@
package cn.axzo.im.utils;
import java.lang.reflect.Field;
/**
* @author yanglin
*/
public class PropsUtils {
/**
* 只合并第一层
*/
public static <T> T updateProperties(T oldObj, T newObj) {
if (oldObj == null) return newObj;
if (newObj == null) return oldObj;
try {
@SuppressWarnings("unchecked")
T merged = (T) oldObj.getClass().newInstance();
for (Field field : merged.getClass().getDeclaredFields()) {
boolean accessible = field.isAccessible();
field.setAccessible(true);
try {
Object oldFieldValue = field.get(oldObj);
Object newFieldValue = field.get(newObj);
field.set(merged, newFieldValue == null ? oldFieldValue : newFieldValue);
} finally {
field.setAccessible(accessible);
}
}
return merged;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -18,7 +18,8 @@ class SendMessageJobTest {
private final SendMessageJob sendMessageJob;
@Test
void foo() {
void exec() {
sendMessageJob.execute(null);;
}
}

View File

@ -0,0 +1,24 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.Application;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class ExpungeUpdatableMessageLogJobTest {
private final ExpungeUpdatableMessageLogJob expungeUpdatableMessageLogJob;
@Test
void exec() {
}
}

View File

@ -0,0 +1,22 @@
package cn.axzo.im.updatable.retry;
import cn.axzo.im.Application;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class MessageUpdateRetryJobTest {
private final MessageUpdateRetryJob messageUpdateRetryJob;
@Test
void exec() {
}
}