REQ-3057-card: a lot of staff

This commit is contained in:
yanglin 2024-11-06 14:19:29 +08:00
parent 4a937cf5ca
commit 50f4ec0bde
11 changed files with 258 additions and 41 deletions

View File

@ -16,4 +16,9 @@ public class HistoryRecordExt {
*/
private String imAccountInfo;
private Object customMessageBizInfo;
}
private boolean isUpdatableMessage;
private String bizMessageId;
private Long dataVersion;
private boolean isUpdateMessage;
}

View File

@ -1,6 +1,5 @@
package cn.axzo.im.entity;
import cn.axzo.im.enums.UpdatableMessageLogLogType;
import cn.axzo.im.enums.UpdatableMessageState;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
@ -19,7 +18,6 @@ import java.util.Date;
@TableName(value = "im_updatable_message_log", autoResultMap = true)
public class UpdatableMessageLog {
private Long id;
private UpdatableMessageLogLogType type;
private String bizId;
private String fromAccount;
private String toAccount;

View File

@ -1,11 +0,0 @@
package cn.axzo.im.enums;
/**
* @author yanglin
*/
public enum UpdatableMessageLogLogType {
CREATE,
UPDATE
}

View File

@ -4,16 +4,22 @@ package cn.axzo.im.enums;
* @author yanglin
*/
public enum UpdatableMessageState {
// 已创建
CREATED,
// 消息已经放进队列
MESSAGE_QUEUED,
// 消息已经发送
MESSAGE_SENT,
// 消息已经发送成功
MESSAGE_SEND_SUCCESS,
// 消息已经发送失败
MESSAGE_SEND_FAIL,
// 更新已经放进队列
UPDATE_QUEUED,
// 更新已经发送
UPDATED,
// 更新已经发送成功
UPDATE_SEND_SUCCESS,
// 更新已经发送失败
UPDATE_SEND_FAIL,
// 未找到账号
ACCOUNT_NOT_FOUND
}

View File

@ -34,8 +34,7 @@ public class CreateMessageHistoryJob extends IJobHandler {
@Autowired
private MessageTaskService messageTaskService;
// 有数据量放大的可能, 设置小一些
private static final Integer DEFAULT_PAGE_SIZE = 100;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@XxlJob("createMessageHistoryJob")

View File

@ -7,6 +7,7 @@ import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.service.impl.MessageHistoryServiceImpl;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.utils.DateFormatUtil;
import cn.axzo.im.utils.ImProperties;
import cn.axzo.im.utils.ImProperties.SendMessageConfig;
@ -15,6 +16,7 @@ import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Date;
@ -40,6 +42,8 @@ public class SendManager {
private final SendExec sendExec;
private final MessageHistoryMapper messageHistoryMapper;
private final MessageHistoryServiceImpl messageHistoryService;
private final UpdatableMessageManager updatableMessageManager;
private final TransactionTemplate transactionTemplate;
private final SendMessageConfig cfg;
private final Date maxCreateAt;
private final AsyncTasks<Void> asyncTasks;
@ -54,6 +58,8 @@ public class SendManager {
this.cfg = applicationContext.getBean(ImProperties.class).getSendMessage().copy();
this.messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
this.messageHistoryService = applicationContext.getBean(MessageHistoryServiceImpl.class);
this.updatableMessageManager = applicationContext.getBean(UpdatableMessageManager.class);
this.transactionTemplate = applicationContext.getBean(TransactionTemplate.class);
this.queue = new SendQueue(applicationContext, sendExec.getApiChannel());
this.sendExec = sendExec;
this.maxCreateAt = getMaxCreateAt();
@ -183,7 +189,10 @@ public class SendManager {
return update;
})
.collect(toList());
messageHistoryService.updateBatch(updates);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.updateBatch(updates);
updatableMessageManager.onHistorySend(successHistories);
});
queue.setSendComplete(successHistories);
}
for (Map.Entry<String, List<MessageHistory>> e : failHistories.entrySet()) {
@ -196,7 +205,10 @@ public class SendManager {
.status(MessageHistoryStatus.FAILED)
.build())
.collect(toList());
messageHistoryService.updateBatch(updates);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.updateBatch(updates);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
successHistories = new ArrayList<>();
@ -206,21 +218,30 @@ public class SendManager {
void setBatchSendSuccess(List<MessageHistory> histories,
MessageBatchDispatchResponse response,
HistoryRecordExt updateExt) {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
void setBatchSendSuccess(List<MessageHistory> histories,
BatchSendCustomMessageResponse response,
HistoryRecordExt updateExt) {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setBatchSendSuccess(histories, response, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}
void setSendFail(List<MessageHistory> histories,
String failReason,
HistoryRecordExt updateExt) {
messageHistoryService.setSendFail(histories, failReason, updateExt);
transactionTemplate.executeWithoutResult(unused -> {
messageHistoryService.setSendFail(histories, failReason, updateExt);
updatableMessageManager.onHistorySend(histories);
});
queue.setSendComplete(histories);
}

View File

@ -0,0 +1,18 @@
package cn.axzo.im.updatable;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.List;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
class HistoryAndMessage {
private final List<MessageHistory> histories;
private final UpdatableMessage message;
}

View File

@ -0,0 +1,62 @@
package cn.axzo.im.updatable;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.UpdatableMessageState;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
class SendStateHandler implements StateHandler {
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, true);
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
updateState(historyAndMessages, false);
}
private void updateState(List<HistoryAndMessage> historyAndMessages, boolean isSuccess) {
ArrayList<UpdatableMessage> messageUpdates = new ArrayList<>();
ArrayList<UpdatableMessageLog> messageLogs = new ArrayList<>();
for (HistoryAndMessage historyAndMessage : historyAndMessages) {
UpdatableMessage messageUpdate = new UpdatableMessage();
messageUpdates.add(messageUpdate);
messageUpdate.setState(isSuccess
? UpdatableMessageState.MESSAGE_SEND_SUCCESS
: UpdatableMessageState.MESSAGE_SEND_FAIL);
UpdatableMessageLog messageLog = new UpdatableMessageLog();
messageLogs.add(messageLog);
UpdatableMessage message = historyAndMessage.getMessage();
messageLog.setBizId(message.getBizId());
messageLog.setFromAccount(message.getFromAccount());
messageLog.setToAccount(message.getToAccount());
messageLog.setBizMessageId(message.getBizMessageId());
messageLog.setMessageState(messageUpdate.getState());
MessageHistory history = historyAndMessage.getHistories().get(0);
messageLog.setInitHistoryId(history.getId());
messageLog.setMessageBody(message.getMessageBody());
messageLog.setBizBody(message.getBizBody());
messageLog.setDataVersion(message.getDataVersion());
}
updatableMessageDao.updateBatchById(messageUpdates);
updatableMessageLogDao.saveBatch(messageLogs);
}
}

View File

@ -0,0 +1,14 @@
package cn.axzo.im.updatable;
import java.util.List;
/**
* @author yanglin
*/
interface StateHandler {
void onSuccess(List<HistoryAndMessage> historyAndMessages);
void onFail(List<HistoryAndMessage> historyAndMessages);
}

View File

@ -4,16 +4,18 @@ import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdateMessageRequest;
import cn.axzo.im.center.api.vo.resp.MessageUpdateResp;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.dao.repository.UpdatableMessageDao;
import cn.axzo.im.dao.repository.UpdatableMessageLogDao;
import cn.axzo.im.entity.HistoryRecordExt;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.entity.MessageTask.ReceivePerson;
import cn.axzo.im.entity.UpdatableMessage;
import cn.axzo.im.entity.UpdatableMessageLog;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.UpdatableMessageLogLogType;
import cn.axzo.im.enums.UpdatableMessageState;
import cn.axzo.im.utils.PropsUtils;
import cn.axzo.im.utils.UUIDUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@ -22,11 +24,12 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.BiFunction;
import static java.util.stream.Collectors.toList;
@ -42,8 +45,12 @@ public class UpdatableMessageManager {
private final UpdatableMessageDao updatableMessageDao;
private final UpdatableMessageLogDao updatableMessageLogDao;
private final MessageHistoryDao messageHistoryDao;
private final SendStateHandler sendStateHandler;
private final UpdateStateHandler updateStateHandler;
// !! 创建消息
@Transactional
public List<UpdatableMessageSendResult> createUpdatableMessage(
MessageTask task, SendTemplateMessageParam request, List<ReceivePerson> receivePersons) {
if (CollectionUtils.isEmpty(receivePersons)) return Collections.emptyList();
@ -77,9 +84,8 @@ public class UpdatableMessageManager {
for (UpdatableMessage message : messages) {
UpdatableMessageLog log = new UpdatableMessageLog();
logs.add(log);
log.setType(UpdatableMessageLogLogType.CREATE);
log.setMessageState(UpdatableMessageState.CREATED);
log.setBizId(message.getBizId());
log.setMessageState(message.getState());
log.setBizMessageId(message.getBizMessageId());
log.setDataVersion(message.getDataVersion());
}
@ -88,7 +94,6 @@ public class UpdatableMessageManager {
return sendResults;
}
@Transactional
public void onHistoryCreated(List<MessageHistory> histories) {
if (CollectionUtils.isEmpty(histories)) return;
List<Long> taskIds = histories.stream()
@ -98,8 +103,9 @@ public class UpdatableMessageManager {
List<UpdatableMessage> messages = updatableMessageDao.getByTaskIds(taskIds);
log.info("onHistoryCreated, taskIdSize={}, messageSize={}", taskIds.size(), messages.size());
InitHistories initHistories = new InitHistories(histories);
ArrayList<UpdatableMessage> messageUpdates = new ArrayList<>();
ArrayList<UpdatableMessageLog> messageLogUpdates = new ArrayList<>();
List<UpdatableMessage> messageUpdates = new ArrayList<>();
List<UpdatableMessageLog> messageLogUpdates = new ArrayList<>();
List<MessageHistory> updatableHistories = new ArrayList<>();
for (UpdatableMessage message : messages) {
MessageHistory history = initHistories.findHistory(message).orElse(null);
if (history == null) continue;
@ -118,14 +124,27 @@ public class UpdatableMessageManager {
? UpdatableMessageState.MESSAGE_QUEUED
: UpdatableMessageState.ACCOUNT_NOT_FOUND);
UpdatableMessageLog messageLogUpdate = new UpdatableMessageLog();
messageLogUpdates.add(messageLogUpdate);
messageLogUpdate.setFromAccount(history.getFromAccount());
messageLogUpdate.setToAccount(history.getToAccount());
messageLogUpdate.setMessageBody(message.getMessageBody());
messageLogUpdate.setBizBody(bizBody);
messageLogUpdate.setInitHistoryId(history.getImMessageTaskId());
messageLogUpdate.setMessageState(messageUpdate.getState());
UpdatableMessageLog messageLog = new UpdatableMessageLog();
messageLogUpdates.add(messageLog);
messageLog.setBizId(history.getBizId());
messageLog.setFromAccount(history.getFromAccount());
messageLog.setToAccount(history.getToAccount());
messageLog.setBizMessageId(message.getBizMessageId());
messageLog.setMessageState(messageUpdate.getState());
messageLog.setInitHistoryId(history.getImMessageTaskId());
messageLog.setMessageBody(message.getMessageBody());
messageLog.setBizBody(bizBody);
messageLog.setDataVersion(message.getDataVersion());
MessageHistory historyUpdate = new MessageHistory();
updatableHistories.add(historyUpdate);
historyUpdate.setId(history.getId());
HistoryRecordExt updateExt = new HistoryRecordExt();
updateExt.setUpdatableMessage(true);
updateExt.setBizMessageId(message.getBizMessageId());
updateExt.setDataVersion(message.getDataVersion());
updateExt.setUpdateMessage(false);
historyUpdate.setRecordExt(PropsUtils.updateProperties(history.getRecordExt(), updateExt));
}
if (CollectionUtils.isNotEmpty(messageUpdates)) {
for (List<UpdatableMessage> batch : Lists.partition(messageUpdates, BATCH_UPDATE_SIZE))
@ -135,9 +154,74 @@ public class UpdatableMessageManager {
for (List<UpdatableMessageLog> batch : Lists.partition(messageLogUpdates, BATCH_UPDATE_SIZE))
updatableMessageLogDao.updateBatchById(batch);
}
if (CollectionUtils.isNotEmpty(updatableHistories)) {
for (List<MessageHistory> batch : Lists.partition(updatableHistories, BATCH_UPDATE_SIZE))
messageHistoryDao.saveBatch(batch);
}
}
// !! 更新消息
public MessageUpdateResp updateMessage(UpdateMessageRequest request) {
return null;
}
public void onHistorySend(List<MessageHistory> maybeUpdatedHistory) {
List<Long> historyIds = maybeUpdatedHistory.stream()
.filter(history -> {
HistoryRecordExt recordExt = history.getRecordExt();
return recordExt != null && recordExt.isUpdateMessage();
})
.map(MessageHistory::getId)
.collect(toList());
if (CollectionUtils.isEmpty(historyIds)) return;
List<MessageHistory> histories = messageHistoryDao.listByIds(historyIds);
List<String> bizMessageIds = histories.stream()
.map(history -> history.getRecordExt().getBizMessageId())
.distinct()
.sorted()
.collect(toList());
List<UpdatableMessage> messages = updatableMessageDao.lambdaQuery()
.in(UpdatableMessage::getBizMessageId, bizMessageIds)
// 避免ack更新出错
.last("FOR UPDATE")
.list();
HashMap<Long, UpdatableMessage> id2Messages = new HashMap<>();
messages.forEach(message -> id2Messages.put(message.getId(), message));
BiFunction<Boolean, Boolean, List<HistoryAndMessage>> historyAndMessageBuilder =
(isUpdateMessage, isSuccess) -> {
List<HistoryAndMessage> historyAndMessages = new ArrayList<>();
for (UpdatableMessage message : new ArrayList<>(id2Messages.values())) {
List<MessageHistory> stateHistories = histories.stream()
.filter(history -> isSuccess
? history.getStatus() == MessageHistoryStatus.SUCCEED
: history.getStatus() == MessageHistoryStatus.FAILED)
.filter(history -> isUpdateMessage && history.getRecordExt().isUpdateMessage())
.collect(toList());
if (stateHistories.isEmpty())
continue;
// 一次只会处理一种情况, 避免无用的遍历
id2Messages.remove(message.getId());
historyAndMessages.add(new HistoryAndMessage(stateHistories, message));
}
return historyAndMessages;
};
// send success
List<HistoryAndMessage> sendSuccess = historyAndMessageBuilder.apply(true, true);
if (!sendSuccess.isEmpty())
sendStateHandler.onSuccess(sendSuccess);
// send fail
List<HistoryAndMessage> sendFail = historyAndMessageBuilder.apply(true, false);
if (!sendFail.isEmpty())
sendStateHandler.onFail(sendFail);
// update success
List<HistoryAndMessage> updateSuccess = historyAndMessageBuilder.apply(false, true);
if (!updateSuccess.isEmpty())
updateStateHandler.onSuccess(updateSuccess);
// update fail
List<HistoryAndMessage> updateFail = historyAndMessageBuilder.apply(false, false);
if (!updateFail.isEmpty())
updateStateHandler.onFail(updateFail);
}
}

View File

@ -0,0 +1,21 @@
package cn.axzo.im.updatable;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanglin
*/
@Component
class UpdateStateHandler implements StateHandler {
@Override
public void onSuccess(List<HistoryAndMessage> historyAndMessages) {
}
@Override
public void onFail(List<HistoryAndMessage> historyAndMessages) {
}
}