diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java index 2503a02..ccffcc9 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/feign/SendPriority.java @@ -13,6 +13,7 @@ public class SendPriority { public static final SendPriority TEMPLATE_MESSAGE = create(1000); public static final SendPriority SYSTEM_CUSTOM_MESSAGE = create(5000); + public static final SendPriority UPDATE_MESSAGE = create(5500); public static final SendPriority OP_MESSAGE = create(500000); private final Integer value; diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java index fcbe90d..780f3e4 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/UpdateMessageRequest.java @@ -3,10 +3,31 @@ package cn.axzo.im.center.api.vo.req; import lombok.Getter; import lombok.Setter; +import javax.validation.Valid; +import javax.validation.constraints.NotBlank; +import java.util.ArrayList; +import java.util.List; + /** * @author yanglin */ @Setter @Getter public class UpdateMessageRequest { + + @Valid + private List updates = new ArrayList<>(); + + public void addUpdate(Update update) { + this.updates.add(update); + } + + @Setter + @Getter + public static class Update { + @NotBlank(message = "消息ID不能为空") + private String bizMessageId; + private String msgTemplateContent; + } + } \ No newline at end of file diff --git a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java index 77c9c51..dd7c283 100644 --- a/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java +++ b/im-center-common/src/main/java/cn/axzo/im/center/common/enums/BizTypeEnum.java @@ -18,6 +18,8 @@ public enum BizTypeEnum { * 待办 */ PENDING("PENDING", "待办"), + + MESSAGE_UPDATE("MESSAGE_UPDATE", "消息更新") ; diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java index 91398d9..22122b2 100644 --- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageBody.java @@ -36,4 +36,15 @@ public class MessageBody { private Map messageExtension; + + /** + * 业务消息id, 用于接口拉取最新消息内容 + */ + private String bizMessageId; + + /** + * 数据版本 + */ + private Long dataVersion; + } diff --git a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java index 89aea4a..3f3333e 100644 --- a/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java +++ b/im-center-server/src/main/java/cn/axzo/im/channel/netease/dto/MessageCustomBody.java @@ -1,6 +1,7 @@ package cn.axzo.im.channel.netease.dto; import cn.axzo.im.center.common.enums.BizTypeEnum; +import cn.axzo.im.center.common.enums.TemplatedMsgType; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -37,4 +38,24 @@ public class MessageCustomBody { private String payload; + /** + * 业务消息id, 用于接口拉取最新消息内容 + */ + private String bizMessageId; + + /** + * 数据版本 + */ + private Long dataVersion; + + /** + * 最原始的网易云信消息id, 更新的哪条消息 + */ + private Long initMessageId; + + /** + * 更新的消息类型 + */ + private TemplatedMsgType msgType; + } diff --git a/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java b/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java index 918292f..2231126 100644 --- a/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java +++ b/im-center-server/src/main/java/cn/axzo/im/dao/repository/UpdatableMessageDao.java @@ -15,6 +15,14 @@ import java.util.List; @Repository("updatableMessageDao") public class UpdatableMessageDao extends ServiceImpl { + public List getByBizMessageIdsForUpdate(List bizMessageIds) { + return lambdaQuery() + .in(UpdatableMessage::getBizMessageId, bizMessageIds) + // 避免ack更新出错 + .last("FOR UPDATE") + .list(); + } + public List getByTaskIds(List taskIds) { if (CollectionUtils.isEmpty(taskIds)) return Collections.emptyList(); diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java index 9c7b548..4513b39 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageHistory.java @@ -131,6 +131,18 @@ public class MessageHistory implements Serializable { return recordExt; } + public Long getDataVersion() { + return recordExt != null ? recordExt.getDataVersion() : null; + } + + public boolean isUpdatableMessage() { + return recordExt != null && recordExt.isUpdatableMessage(); + } + + public boolean isUpdateMessage() { + return recordExt != null && recordExt.isUpdateMessage(); + } + public Optional determineBatchNo() { String batchNo = this.batchNo; // 兼容在途数据 diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java b/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java index 485841a..fef42a5 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/UpdatableMessage.java @@ -2,6 +2,7 @@ package cn.axzo.im.entity; import cn.axzo.im.center.api.vo.PersonAccountAttribute; import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.im.center.common.enums.TemplatedMsgType; import cn.axzo.im.enums.UpdatableMessageState; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.annotation.TableField; @@ -31,6 +32,7 @@ public class UpdatableMessage { private String receiverPersonId; private Long receiverOuId; private AppTypeEnum appType; + private TemplatedMsgType msgType; private UpdatableMessageState state; private String bizMessageId; private Long initHistoryId; diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java b/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java index bcf4f6b..1f2e91f 100644 --- a/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/HistoryAndMessage.java @@ -7,12 +7,22 @@ import lombok.RequiredArgsConstructor; import java.util.List; +import static java.util.stream.Collectors.toList; + /** * @author yanglin */ @Getter @RequiredArgsConstructor class HistoryAndMessage { + private final List histories; private final UpdatableMessage message; + + public List getDataVersionMatchHistories() { + return histories.stream() + .filter(history -> history.getDataVersion().equals(message.getDataVersion())) + .collect(toList()); + } + } \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/MessageBodyJsonObject.java b/im-center-server/src/main/java/cn/axzo/im/updatable/MessageBodyJsonObject.java new file mode 100644 index 0000000..c4e14c2 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/MessageBodyJsonObject.java @@ -0,0 +1,21 @@ +package cn.axzo.im.updatable; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.Getter; + +/** + * @author yanglin + */ +@Getter +public class MessageBodyJsonObject { + + private final JSONObject messageBody; + private final JSONObject bizBody; + + public MessageBodyJsonObject(String json) { + messageBody = JSON.parseObject(json); + bizBody = JSON.parseObject(messageBody.getString("msgBody")); + } + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/SendStateHandler.java b/im-center-server/src/main/java/cn/axzo/im/updatable/SendStateHandler.java index 12d3eb4..96a2a4f 100644 --- a/im-center-server/src/main/java/cn/axzo/im/updatable/SendStateHandler.java +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/SendStateHandler.java @@ -33,8 +33,8 @@ class SendStateHandler implements StateHandler { } private void updateState(List historyAndMessages, boolean isSuccess) { - ArrayList messageUpdates = new ArrayList<>(); - ArrayList messageLogs = new ArrayList<>(); + List messageUpdates = new ArrayList<>(); + List messageLogs = new ArrayList<>(); for (HistoryAndMessage historyAndMessage : historyAndMessages) { UpdatableMessage messageUpdate = new UpdatableMessage(); messageUpdates.add(messageUpdate); diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java index 239709e..fa58bf0 100644 --- a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdatableMessageManager.java @@ -1,9 +1,16 @@ package cn.axzo.im.updatable; +import cn.axzo.framework.domain.ServiceException; +import cn.axzo.im.center.api.feign.SendPriority; +import cn.axzo.im.center.api.vo.ApiChannel; import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam; import cn.axzo.im.center.api.vo.req.UpdateMessageRequest; import cn.axzo.im.center.api.vo.resp.MessageUpdateResp; import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult; +import cn.axzo.im.center.common.enums.BizTypeEnum; +import cn.axzo.im.channel.IMChannelProvider; +import cn.axzo.im.channel.netease.dto.MessageBody; +import cn.axzo.im.channel.netease.dto.MessageCustomBody; import cn.axzo.im.dao.repository.MessageHistoryDao; import cn.axzo.im.dao.repository.UpdatableMessageDao; import cn.axzo.im.dao.repository.UpdatableMessageLogDao; @@ -15,21 +22,27 @@ import cn.axzo.im.entity.UpdatableMessage; import cn.axzo.im.entity.UpdatableMessageLog; import cn.axzo.im.enums.MessageHistoryStatus; import cn.axzo.im.enums.UpdatableMessageState; +import cn.axzo.im.utils.BizAssertions; import cn.axzo.im.utils.PropsUtils; import cn.axzo.im.utils.UUIDUtil; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -48,8 +61,9 @@ public class UpdatableMessageManager { private final MessageHistoryDao messageHistoryDao; private final SendStateHandler sendStateHandler; private final UpdateStateHandler updateStateHandler; + private final IMChannelProvider imChannel; - // !! 创建消息 + // !! schedule public List createUpdatableMessage( MessageTask task, SendTemplateMessageParam request, List receivePersons) { @@ -68,8 +82,9 @@ public class UpdatableMessageManager { message.setReceiverPersonId(person.getPersonId()); message.setReceiverOuId(person.getOuId()); message.setAppType(person.getAppType()); + message.setMsgType(request.getTemplatedMsgType()); message.setState(UpdatableMessageState.CREATED); - message.setBizMessageId(message.getBizMessageId()); + message.setBizMessageId(UUIDUtil.uuidString()); message.setDataVersion(1L); UpdatableMessageSendResult sendResult = new UpdatableMessageSendResult(); @@ -94,6 +109,8 @@ public class UpdatableMessageManager { return sendResults; } + // !! enqueue + public void onHistoryCreated(List histories) { if (CollectionUtils.isEmpty(histories)) return; List taskIds = histories.stream() @@ -109,15 +126,14 @@ public class UpdatableMessageManager { for (UpdatableMessage message : messages) { MessageHistory history = initHistories.findHistory(message).orElse(null); if (history == null) continue; - String msgBodyJson = message.getMessageBody().getString("msgBody"); - JSONObject bizBody = JSON.parseObject(msgBodyJson); + MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody()); UpdatableMessage messageUpdate = new UpdatableMessage(); messageUpdates.add(messageUpdate); messageUpdate.setId(message.getId()); messageUpdate.setInitHistoryId(history.getImMessageTaskId()); - messageUpdate.setMessageBody(message.getMessageBody()); - messageUpdate.setBizBody(bizBody); + messageUpdate.setMessageBody(object.getMessageBody()); + messageUpdate.setBizBody(object.getBizBody()); messageUpdate.setFromAccount(history.getFromAccount()); messageUpdate.setToAccount(history.getToAccount()); messageUpdate.setState(history.getStatus() == MessageHistoryStatus.FAILED @@ -132,13 +148,19 @@ public class UpdatableMessageManager { messageLog.setBizMessageId(message.getBizMessageId()); messageLog.setMessageState(messageUpdate.getState()); messageLog.setInitHistoryId(history.getImMessageTaskId()); - messageLog.setMessageBody(message.getMessageBody()); - messageLog.setBizBody(bizBody); + messageLog.setMessageBody(object.getMessageBody()); + messageLog.setBizBody(object.getBizBody()); messageLog.setDataVersion(message.getDataVersion()); MessageHistory historyUpdate = new MessageHistory(); updatableHistories.add(historyUpdate); + historyUpdate.setId(history.getId()); + MessageBody messageBody = JSON.parseObject(history.getMessageBody(), MessageBody.class); + messageBody.setBizMessageId(message.getBizMessageId()); + messageBody.setDataVersion(message.getDataVersion()); + historyUpdate.setMessageBody(JSON.toJSONString(messageBody)); + HistoryRecordExt updateExt = new HistoryRecordExt(); updateExt.setUpdatableMessage(true); updateExt.setBizMessageId(message.getBizMessageId()); @@ -152,40 +174,122 @@ public class UpdatableMessageManager { } if (CollectionUtils.isNotEmpty(messageLogUpdates)) { for (List batch : Lists.partition(messageLogUpdates, BATCH_UPDATE_SIZE)) - updatableMessageLogDao.updateBatchById(batch); + updatableMessageLogDao.saveBatch(batch); } if (CollectionUtils.isNotEmpty(updatableHistories)) { for (List batch : Lists.partition(updatableHistories, BATCH_UPDATE_SIZE)) - messageHistoryDao.saveBatch(batch); + messageHistoryDao.updateBatchById(batch); } } - // !! 更新消息 + // !! update message + @Transactional public MessageUpdateResp updateMessage(UpdateMessageRequest request) { - return null; + BizAssertions.assertNotEmpty(request.getUpdates(), "更新消息不能为空"); + Map bizMessageId2Message = updatableMessageDao + .getByBizMessageIdsForUpdate( + request.getUpdates().stream() + .map(UpdateMessageRequest.Update::getBizMessageId) + .distinct() + .collect(toList())).stream() + .collect(Collectors.toMap(UpdatableMessage::getBizMessageId, Function.identity())); + if (bizMessageId2Message.isEmpty()) + throw new ServiceException("未找到需要更新的消息"); + String batchNo = UUIDUtil.uuidString(); + List histories = new ArrayList<>(); + List messageUpdates = new ArrayList<>(); + List messageLogs = new ArrayList<>(); + Map message2History = new IdentityHashMap<>(); + Map messageLog2History = new IdentityHashMap<>(); + for (UpdateMessageRequest.Update update : request.getUpdates()) { + UpdatableMessage message = bizMessageId2Message.get(update.getBizMessageId()); + if (message == null) continue; + + UpdatableMessage messageUpdate = new UpdatableMessage(); + messageUpdates.add(messageUpdate); + messageUpdate.setId(message.getId()); + messageUpdate.setDataVersion(messageUpdate.getDataVersion() + 1); + messageUpdate.setState(UpdatableMessageState.UPDATE_QUEUED); + + MessageHistory history = new MessageHistory(); + histories.add(history); + message2History.put(message, history); + history.setBizId(message.getBizId()); + history.setFromAccount(message.getFromAccount()); + history.setToAccount(message.getToAccount()); + history.setAppType(message.getAppType().getCode()); + history.setChannel(imChannel.getProviderType()); + MessageCustomBody messageBody = new MessageCustomBody(); + messageBody.setToImAccount(message.getToAccount()); + messageBody.setPersonId(message.getReceiverPersonId()); + messageBody.setBizType(BizTypeEnum.MESSAGE_UPDATE); + messageBody.setPayload(update.getMsgTemplateContent()); + messageBody.setBizMessageId(message.getBizMessageId()); + messageBody.setDataVersion(messageUpdate.getDataVersion()); + messageBody.setInitMessageId(message.getInitHistoryId()); + messageBody.setMsgType(message.getMsgType()); + history.setMessageBody(JSON.toJSONString(messageBody)); + messageUpdate.setMessageBody(JSON.parseObject(history.getMessageBody())); + messageUpdate.setBizBody(JSON.parseObject(update.getMsgTemplateContent())); + + history.setImMessageTaskId(0L); + history.setReceivePersonId(message.getReceiverPersonId()); + history.setReceiveOuId(message.getReceiverOuId()); + history.setStatus(MessageHistoryStatus.PENDING); + history.setBatchNo(batchNo); + history.setSendPriority(SendPriority.UPDATE_MESSAGE.getPriority()); + history.setApiChannel(ApiChannel.CUSTOM_MESSAGE); + HistoryRecordExt recordExt = new HistoryRecordExt(); + recordExt.setUpdatableMessage(true); + recordExt.setBizMessageId(message.getBizMessageId()); + recordExt.setDataVersion(message.getDataVersion()); + recordExt.setUpdateMessage(true); + history.setRecordExt(recordExt); + history.setTimestampForSend(new Date()); + + UpdatableMessageLog messageLog = new UpdatableMessageLog(); + messageLogs.add(messageLog); + messageLog2History.put(messageLog, history); + messageLog.setBizId(message.getBizId()); + messageLog.setFromAccount(message.getFromAccount()); + messageLog.setToAccount(message.getToAccount()); + messageLog.setBizMessageId(message.getBizMessageId()); + messageLog.setMessageState(messageUpdate.getState()); + messageLog.setInitHistoryId(message.getInitHistoryId()); + messageLog.setMessageBody(messageUpdate.getMessageBody()); + messageLog.setBizBody(messageUpdate.getBizBody()); + messageLog.setDataVersion(messageUpdate.getDataVersion()); + } + messageHistoryDao.saveBatch(histories); + for (UpdatableMessage messageUpdate : messageUpdates) { + MessageHistory history = message2History.get(messageUpdate); + messageUpdate.setUpdateHistoryId(history.getId()); + } + for (UpdatableMessageLog messageLog : messageLogs) { + MessageHistory history = messageLog2History.get(messageLog); + messageLog.setUpdateHistoryId(history.getId()); + } + updatableMessageLogDao.saveBatch(messageLogs); + updatableMessageDao.updateBatchById(messageUpdates); + return new MessageUpdateResp(); } + @Transactional public void onHistorySend(List maybeUpdatedHistory) { List historyIds = maybeUpdatedHistory.stream() - .filter(history -> { - HistoryRecordExt recordExt = history.getRecordExt(); - return recordExt != null && recordExt.isUpdateMessage(); - }) + .filter(MessageHistory::isUpdatableMessage) .map(MessageHistory::getId) .collect(toList()); if (CollectionUtils.isEmpty(historyIds)) return; List histories = messageHistoryDao.listByIds(historyIds); - List bizMessageIds = histories.stream() - .map(history -> history.getRecordExt().getBizMessageId()) - .distinct() - .sorted() - .collect(toList()); - List messages = updatableMessageDao.lambdaQuery() - .in(UpdatableMessage::getBizMessageId, bizMessageIds) - // 避免ack更新出错 - .last("FOR UPDATE") - .list(); + // 避免ack更新出错 + List messages = updatableMessageDao + .getByBizMessageIdsForUpdate(histories.stream() + .map(h -> h.getRecordExt().getBizMessageId()) + .distinct() + .sorted() + .collect(toList())); HashMap id2Messages = new HashMap<>(); messages.forEach(message -> id2Messages.put(message.getId(), message)); BiFunction> historyAndMessageBuilder = diff --git a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateStateHandler.java b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateStateHandler.java index 2e6456c..25f0b3f 100644 --- a/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateStateHandler.java +++ b/im-center-server/src/main/java/cn/axzo/im/updatable/UpdateStateHandler.java @@ -1,21 +1,72 @@ package cn.axzo.im.updatable; +import cn.axzo.im.dao.repository.UpdatableMessageDao; +import cn.axzo.im.dao.repository.UpdatableMessageLogDao; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.entity.UpdatableMessage; +import cn.axzo.im.entity.UpdatableMessageLog; +import cn.axzo.im.enums.UpdatableMessageState; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.List; /** * @author yanglin */ @Component +@RequiredArgsConstructor class UpdateStateHandler implements StateHandler { + private final UpdatableMessageDao updatableMessageDao; + private final UpdatableMessageLogDao updatableMessageLogDao; + @Override public void onSuccess(List historyAndMessages) { + updateState(historyAndMessages, true); } @Override public void onFail(List historyAndMessages) { + updateState(historyAndMessages, false); } + private void updateState(List historyAndMessages, boolean isSuccess) { + List messageUpdates = new ArrayList<>(); + List messageLogs = new ArrayList<>(); + for (HistoryAndMessage historyAndMessage : historyAndMessages) { + // 1. 第一次更新(或成功, 或失败); 2. 有一次失败, 后面ack重试成功 (针对同一次更新); + if (historyAndMessage.getMessage().getState() != UpdatableMessageState.UPDATE_SEND_SUCCESS + && !historyAndMessage.getDataVersionMatchHistories().isEmpty()) { + UpdatableMessage messageUpdate = new UpdatableMessage(); + messageUpdates.add(messageUpdate); + messageUpdate.setState(isSuccess + ? UpdatableMessageState.UPDATE_SEND_SUCCESS + : UpdatableMessageState.UPDATE_SEND_FAIL); + } + + UpdatableMessage message = historyAndMessage.getMessage(); + for (MessageHistory history : historyAndMessage.getHistories()) { + UpdatableMessageLog messageLog = new UpdatableMessageLog(); + messageLogs.add(messageLog); + messageLog.setBizId(message.getBizId()); + messageLog.setFromAccount(message.getFromAccount()); + messageLog.setToAccount(message.getToAccount()); + messageLog.setBizMessageId(message.getBizMessageId()); + messageLog.setMessageState(isSuccess + ? UpdatableMessageState.UPDATE_SEND_SUCCESS + : UpdatableMessageState.UPDATE_SEND_FAIL); + messageLog.setInitHistoryId(message.getInitHistoryId()); + messageLog.setUpdateHistoryId(history.getId()); + MessageBodyJsonObject object = new MessageBodyJsonObject(history.getMessageBody()); + messageLog.setMessageBody(object.getMessageBody()); + messageLog.setBizBody(object.getBizBody()); + messageLog.setDataVersion(history.getDataVersion()); + } + } + if (!messageUpdates.isEmpty()) + updatableMessageDao.updateBatchById(messageUpdates); + updatableMessageLogDao.saveBatch(messageLogs); + } } \ No newline at end of file