REQ-3057-card: a lot of staff

This commit is contained in:
yanglin 2024-11-06 16:34:55 +08:00
parent ff57393c59
commit c926e7868e
13 changed files with 293 additions and 29 deletions

View File

@ -13,6 +13,7 @@ public class SendPriority {
public static final SendPriority TEMPLATE_MESSAGE = create(1000);
public static final SendPriority SYSTEM_CUSTOM_MESSAGE = create(5000);
public static final SendPriority UPDATE_MESSAGE = create(5500);
public static final SendPriority OP_MESSAGE = create(500000);
private final Integer value;

View File

@ -3,10 +3,31 @@ package cn.axzo.im.center.api.vo.req;
import lombok.Getter;
import lombok.Setter;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
public class UpdateMessageRequest {
@Valid
private List<Update> updates = new ArrayList<>();
public void addUpdate(Update update) {
this.updates.add(update);
}
@Setter
@Getter
public static class Update {
@NotBlank(message = "消息ID不能为空")
private String bizMessageId;
private String msgTemplateContent;
}
}

View File

@ -18,6 +18,8 @@ public enum BizTypeEnum {
* 待办
*/
PENDING("PENDING", "待办"),
MESSAGE_UPDATE("MESSAGE_UPDATE", "消息更新")
;

View File

@ -36,4 +36,15 @@ public class MessageBody {
private Map<String,String> messageExtension;
/**
* 业务消息id, 用于接口拉取最新消息内容
*/
private String bizMessageId;
/**
* 数据版本
*/
private Long dataVersion;
}

View File

@ -1,6 +1,7 @@
package cn.axzo.im.channel.netease.dto;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -37,4 +38,24 @@ public class MessageCustomBody {
private String payload;
/**
* 业务消息id, 用于接口拉取最新消息内容
*/
private String bizMessageId;
/**
* 数据版本
*/
private Long dataVersion;
/**
* 最原始的网易云信消息id, 更新的哪条消息
*/
private Long initMessageId;
/**
* 更新的消息类型
*/
private TemplatedMsgType msgType;
}

View File

@ -15,6 +15,14 @@ import java.util.List;
@Repository("updatableMessageDao")
public class UpdatableMessageDao extends ServiceImpl<UpdatableMessageMapper, UpdatableMessage> {
public List<UpdatableMessage> getByBizMessageIdsForUpdate(List<String> bizMessageIds) {
return lambdaQuery()
.in(UpdatableMessage::getBizMessageId, bizMessageIds)
// 避免ack更新出错
.last("FOR UPDATE")
.list();
}
public List<UpdatableMessage> getByTaskIds(List<Long> taskIds) {
if (CollectionUtils.isEmpty(taskIds))
return Collections.emptyList();

View File

@ -131,6 +131,18 @@ public class MessageHistory implements Serializable {
return recordExt;
}
public Long getDataVersion() {
return recordExt != null ? recordExt.getDataVersion() : null;
}
public boolean isUpdatableMessage() {
return recordExt != null && recordExt.isUpdatableMessage();
}
public boolean isUpdateMessage() {
return recordExt != null && recordExt.isUpdateMessage();
}
public Optional<String> determineBatchNo() {
String batchNo = this.batchNo;
// 兼容在途数据

View File

@ -2,6 +2,7 @@ package cn.axzo.im.entity;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.TemplatedMsgType;
import cn.axzo.im.enums.UpdatableMessageState;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
@ -31,6 +32,7 @@ public class UpdatableMessage {
private String receiverPersonId;
private Long receiverOuId;
private AppTypeEnum appType;
private TemplatedMsgType msgType;
private UpdatableMessageState state;
private String bizMessageId;
private Long initHistoryId;

View File

@ -7,12 +7,22 @@ import lombok.RequiredArgsConstructor;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
class HistoryAndMessage {
private final List<MessageHistory> histories;
private final UpdatableMessage message;
public List<MessageHistory> getDataVersionMatchHistories() {
return histories.stream()
.filter(history -> history.getDataVersion().equals(message.getDataVersion()))
.collect(toList());
}
}

View File

@ -0,0 +1,21 @@
package cn.axzo.im.updatable;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
/**
* @author yanglin
*/
@Getter
public class MessageBodyJsonObject {
private final JSONObject messageBody;
private final JSONObject bizBody;
public MessageBodyJsonObject(String json) {
messageBody = JSON.parseObject(json);
bizBody = JSON.parseObject(messageBody.getString("msgBody"));
}
}

View File

@ -33,8 +33,8 @@ class SendStateHandler implements StateHandler {
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
ArrayList<UpdatableMessage> messageUpdates = new ArrayList<>();
ArrayList<UpdatableMessageLog> messageLogs = new ArrayList<>();
List<UpdatableMessage> messageUpdates = new ArrayList<>();
List<UpdatableMessageLog> messageLogs = new ArrayList<>();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdates.add(messageUpdate);

View File

@ -1,9 +1,16 @@
package cn.axzo.im.updatable;
import cn.axzo.framework.domain.ServiceException;
import cn.axzo.im.center.api.feign.SendPriority;
import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResp;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.center.common.enums.BizTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.channel.netease.dto.MessageCustomBody;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
@ -15,21 +22,27 @@ import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.PropsUtils;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
@ -48,8 +61,9 @@ public class UpdatableMessageManager {
private final MessageHistoryDao messageHistoryDao;
private final SendStateHandler sendStateHandler;
private final UpdateStateHandler updateStateHandler;
private final IMChannelProvider imChannel;
// !! 创建消息
// !! schedule
public List<UpdatableMessageSendResult> createUpdatableMessage(
MessageTask task, SendTemplateMessageParam request, List<ReceivePerson> receivePersons) {
@ -68,8 +82,9 @@ public class UpdatableMessageManager {
message.setReceiverPersonId(person.getPersonId());
message.setReceiverOuId(person.getOuId());
message.setAppType(person.getAppType());
message.setMsgType(request.getTemplatedMsgType());
message.setState(UpdatableMessageState.CREATED);
message.setBizMessageId(message.getBizMessageId());
message.setBizMessageId(UUIDUtil.uuidString());
message.setDataVersion(1L);
UpdatableMessageSendResult sendResult = new UpdatableMessageSendResult();
@ -94,6 +109,8 @@ public class UpdatableMessageManager {
return sendResults;
}
// !! enqueue
public void onHistoryCreated(List<MessageHistory> histories) {
if (CollectionUtils.isEmpty(histories)) return;
List<Long> taskIds = histories.stream()
@ -109,15 +126,14 @@ public class UpdatableMessageManager {
for (UpdatableMessage message : messages) {
MessageHistory history = initHistories.findHistory(message).orElse(null);
if (history == null) continue;
String msgBodyJson = message.getMessageBody().getString("msgBody");
JSONObject bizBody = JSON.parseObject(msgBodyJson);
MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody());
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdates.add(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setInitHistoryId(history.getImMessageTaskId());
messageUpdate.setMessageBody(message.getMessageBody());
messageUpdate.setBizBody(bizBody);
messageUpdate.setMessageBody(object.getMessageBody());
messageUpdate.setBizBody(object.getBizBody());
messageUpdate.setFromAccount(history.getFromAccount());
messageUpdate.setToAccount(history.getToAccount());
messageUpdate.setState(history.getStatus() == MessageHistoryStatus.FAILED
@ -132,13 +148,19 @@ public class UpdatableMessageManager {
messageLog.setBizMessageId(message.getBizMessageId());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(history.getImMessageTaskId());
messageLog.setMessageBody(message.getMessageBody());
messageLog.setBizBody(bizBody);
messageLog.setMessageBody(object.getMessageBody());
messageLog.setBizBody(object.getBizBody());
messageLog.setDataVersion(message.getDataVersion());
MessageHistory historyUpdate = new MessageHistory();
updatableHistories.add(historyUpdate);
historyUpdate.setId(history.getId());
MessageBody messageBody = JSON.parseObject(history.getMessageBody(), MessageBody.class);
messageBody.setBizMessageId(message.getBizMessageId());
messageBody.setDataVersion(message.getDataVersion());
historyUpdate.setMessageBody(JSON.toJSONString(messageBody));
HistoryRecordExt updateExt = new HistoryRecordExt();
updateExt.setUpdatableMessage(true);
updateExt.setBizMessageId(message.getBizMessageId());
@ -152,40 +174,122 @@ public class UpdatableMessageManager {
}
if (CollectionUtils.isNotEmpty(messageLogUpdates)) {
for (List<UpdatableMessageLog> batch : Lists.partition(messageLogUpdates, BATCH_UPDATE_SIZE))
updatableMessageLogDao.updateBatchById(batch);
updatableMessageLogDao.saveBatch(batch);
}
if (CollectionUtils.isNotEmpty(updatableHistories)) {
for (List<MessageHistory> batch : Lists.partition(updatableHistories, BATCH_UPDATE_SIZE))
messageHistoryDao.saveBatch(batch);
messageHistoryDao.updateBatchById(batch);
}
}
// !! 更新消息
// !! update message
@Transactional
public MessageUpdateResp updateMessage(UpdateMessageRequest request) {
return null;
BizAssertions.assertNotEmpty(request.getUpdates(), "更新消息不能为空");
Map<String, UpdatableMessage> bizMessageId2Message = updatableMessageDao
.getByBizMessageIdsForUpdate(
request.getUpdates().stream()
.map(UpdateMessageRequest.Update::getBizMessageId)
.distinct()
.collect(toList())).stream()
.collect(Collectors.toMap(UpdatableMessage::getBizMessageId, Function.identity()));
if (bizMessageId2Message.isEmpty())
throw new ServiceException("未找到需要更新的消息");
String batchNo = UUIDUtil.uuidString();
List<MessageHistory> histories = new ArrayList<>();
List<UpdatableMessage> messageUpdates = new ArrayList<>();
List<UpdatableMessageLog> messageLogs = new ArrayList<>();
Map<UpdatableMessage, MessageHistory> message2History = new IdentityHashMap<>();
Map<UpdatableMessageLog, MessageHistory> messageLog2History = new IdentityHashMap<>();
for (UpdateMessageRequest.Update update : request.getUpdates()) {
UpdatableMessage message = bizMessageId2Message.get(update.getBizMessageId());
if (message == null) continue;
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdates.add(messageUpdate);
messageUpdate.setId(message.getId());
messageUpdate.setDataVersion(messageUpdate.getDataVersion() + 1);
messageUpdate.setState(UpdatableMessageState.UPDATE_QUEUED);
MessageHistory history = new MessageHistory();
histories.add(history);
message2History.put(message, history);
history.setBizId(message.getBizId());
history.setFromAccount(message.getFromAccount());
history.setToAccount(message.getToAccount());
history.setAppType(message.getAppType().getCode());
history.setChannel(imChannel.getProviderType());
MessageCustomBody messageBody = new MessageCustomBody();
messageBody.setToImAccount(message.getToAccount());
messageBody.setPersonId(message.getReceiverPersonId());
messageBody.setBizType(BizTypeEnum.MESSAGE_UPDATE);
messageBody.setPayload(update.getMsgTemplateContent());
messageBody.setBizMessageId(message.getBizMessageId());
messageBody.setDataVersion(messageUpdate.getDataVersion());
messageBody.setInitMessageId(message.getInitHistoryId());
messageBody.setMsgType(message.getMsgType());
history.setMessageBody(JSON.toJSONString(messageBody));
messageUpdate.setMessageBody(JSON.parseObject(history.getMessageBody()));
messageUpdate.setBizBody(JSON.parseObject(update.getMsgTemplateContent()));
history.setImMessageTaskId(0L);
history.setReceivePersonId(message.getReceiverPersonId());
history.setReceiveOuId(message.getReceiverOuId());
history.setStatus(MessageHistoryStatus.PENDING);
history.setBatchNo(batchNo);
history.setSendPriority(SendPriority.UPDATE_MESSAGE.getPriority());
history.setApiChannel(ApiChannel.CUSTOM_MESSAGE);
HistoryRecordExt recordExt = new HistoryRecordExt();
recordExt.setUpdatableMessage(true);
recordExt.setBizMessageId(message.getBizMessageId());
recordExt.setDataVersion(message.getDataVersion());
recordExt.setUpdateMessage(true);
history.setRecordExt(recordExt);
history.setTimestampForSend(new Date());
UpdatableMessageLog messageLog = new UpdatableMessageLog();
messageLogs.add(messageLog);
messageLog2History.put(messageLog, history);
messageLog.setBizId(message.getBizId());
messageLog.setFromAccount(message.getFromAccount());
messageLog.setToAccount(message.getToAccount());
messageLog.setBizMessageId(message.getBizMessageId());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(message.getInitHistoryId());
messageLog.setMessageBody(messageUpdate.getMessageBody());
messageLog.setBizBody(messageUpdate.getBizBody());
messageLog.setDataVersion(messageUpdate.getDataVersion());
}
messageHistoryDao.saveBatch(histories);
for (UpdatableMessage messageUpdate : messageUpdates) {
MessageHistory history = message2History.get(messageUpdate);
messageUpdate.setUpdateHistoryId(history.getId());
}
for (UpdatableMessageLog messageLog : messageLogs) {
MessageHistory history = messageLog2History.get(messageLog);
messageLog.setUpdateHistoryId(history.getId());
}
updatableMessageLogDao.saveBatch(messageLogs);
updatableMessageDao.updateBatchById(messageUpdates);
return new MessageUpdateResp();
}
@Transactional
public void onHistorySend(List<MessageHistory> maybeUpdatedHistory) {
List<Long> historyIds = maybeUpdatedHistory.stream()
.filter(history -> {
HistoryRecordExt recordExt = history.getRecordExt();
return recordExt != null && recordExt.isUpdateMessage();
})
.filter(MessageHistory::isUpdatableMessage)
.map(MessageHistory::getId)
.collect(toList());
if (CollectionUtils.isEmpty(historyIds)) return;
List<MessageHistory> histories = messageHistoryDao.listByIds(historyIds);
List<String> bizMessageIds = histories.stream()
.map(history -> history.getRecordExt().getBizMessageId())
.distinct()
.sorted()
.collect(toList());
List<UpdatableMessage> messages = updatableMessageDao.lambdaQuery()
.in(UpdatableMessage::getBizMessageId, bizMessageIds)
// 避免ack更新出错
.last("FOR UPDATE")
.list();
// 避免ack更新出错
List<UpdatableMessage> messages = updatableMessageDao
.getByBizMessageIdsForUpdate(histories.stream()
.map(h -> h.getRecordExt().getBizMessageId())
.distinct()
.sorted()
.collect(toList()));
HashMap<Long, UpdatableMessage> id2Messages = new HashMap<>();
messages.forEach(message -> id2Messages.put(message.getId(), message));
BiFunction<Boolean, Boolean, List<HistoryAndMessage>> historyAndMessageBuilder =

View File

@ -1,21 +1,72 @@
package cn.axzo.im.updatable;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.UpdatableMessageState;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
class UpdateStateHandler implements StateHandler {
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, true);
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, false);
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
List<UpdatableMessage> messageUpdates = new ArrayList<>();
List<UpdatableMessageLog> messageLogs = new ArrayList<>();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
// 1. 第一次更新(或成功, 或失败); 2. 有一次失败, 后面ack重试成功 (针对同一次更新);
if (historyAndMessage.getMessage().getState() != UpdatableMessageState.UPDATE_SEND_SUCCESS
&& !historyAndMessage.getDataVersionMatchHistories().isEmpty()) {
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdates.add(messageUpdate);
messageUpdate.setState(isSuccess
? UpdatableMessageState.UPDATE_SEND_SUCCESS
: UpdatableMessageState.UPDATE_SEND_FAIL);
}
UpdatableMessage message = historyAndMessage.getMessage();
for (MessageHistory history : historyAndMessage.getHistories()) {
UpdatableMessageLog messageLog = new UpdatableMessageLog();
messageLogs.add(messageLog);
messageLog.setBizId(message.getBizId());
messageLog.setFromAccount(message.getFromAccount());
messageLog.setToAccount(message.getToAccount());
messageLog.setBizMessageId(message.getBizMessageId());
messageLog.setMessageState(isSuccess
? UpdatableMessageState.UPDATE_SEND_SUCCESS
: UpdatableMessageState.UPDATE_SEND_FAIL);
messageLog.setInitHistoryId(message.getInitHistoryId());
messageLog.setUpdateHistoryId(history.getId());
MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody());
messageLog.setMessageBody(object.getMessageBody());
messageLog.setBizBody(object.getBizBody());
messageLog.setDataVersion(history.getDataVersion());
}
}
if (!messageUpdates.isEmpty())
updatableMessageDao.updateBatchById(messageUpdates);
updatableMessageLogDao.saveBatch(messageLogs);
}
}