REQ-3201: ack

This commit is contained in:
yanglin 2024-12-13 09:47:02 +08:00
parent 7e391133f7
commit 2ebb78afe0
4 changed files with 41 additions and 35 deletions

View File

@ -26,7 +26,7 @@ public class UpdatableMessageDao extends ServiceImpl<UpdatableMessageMapper, Upd
.sorted().collect(toList());
return lambdaQuery()
.in(UpdatableMessage::getBizMessageId, sortedBizMessageIds)
// 避免ack更新出
// 避免ack更新出现竞争
.last("FOR UPDATE")
.list();
}

View File

@ -12,19 +12,19 @@ import lombok.RequiredArgsConstructor;
public enum UpdatableMessageState {
// 已创建
TASK_CREATED(false, false),
TASK_CREATED(true, false),
// 消息已经放进队列
INIT_MESSAGE_QUEUED(false, false),
INIT_MESSAGE_QUEUED(true, false),
// 消息已经发送成功
INIT_MESSAGE_SEND_SUCCESS(true, false),
// 消息已经发送失败
INIT_MESSAGE_SEND_FAIL(false, false),
INIT_MESSAGE_SEND_FAIL(true, false),
// 更新已经放进队列
UPDATE_MESSAGE_QUEUED(true, false),
// 更新已经发送成功
UPDATE_MESSAGE_SEND_SUCCESS(true, true),
// 更新已经发送失败
UPDATE_MESSAGE_SEND_FAIL(true, false),
UPDATE_MESSAGE_SEND_FAIL(true, true),
// 更新ACK
UPDATE_ACK(true, false),
// 未找到账号

View File

@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import static cn.axzo.im.center.api.vo.req.MessageUpdateInfo.collectBizMessageIds;
@ -290,49 +291,55 @@ public class UpdatableMessageManager {
List<String> bizMessageIds = acknowledgments.stream()
.map(Acknowledgment::getBizMessageId)
.collect(toList());
Map<String, Acknowledgment> nimMessageId2Ack = acknowledgments.stream()
Map<String, Acknowledgment> bizMessageId2Ack = acknowledgments.stream()
.collect(toMap(Acknowledgment::getBizMessageId, identity()));
List<UpdatableMessage> messages = updatableMessageDao.getByBizMessageIdsForUpdate(bizMessageIds);
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
ArrayList<String> ackedBizMessageIds = new ArrayList<>();
for (UpdatableMessage message : messages) {
Long dataVersion = nimMessageId2Ack.get(message.getNimMessageId()).getDataVersion();
if (dataVersion == null) dataVersion = -1L;
Long ackDataVersion = bizMessageId2Ack.get(message.getBizMessageId()).getDataVersion();
UpdatableMessageLog messageLog = message.toMessageLog(request);
collector.addLog(messageLog);
messageLog.setContext("ack");
messageLog.setContextHistoryId(0L);
messageLog.setRetryCount(0L);
messageLog.setBizBody(null);
messageLog.setDataVersion(dataVersion);
UpdatableMessageLog.RecordExt logExt = new UpdatableMessageLog.RecordExt();
messageLog.setRecordExt(logExt);
messageLog.setDataVersion(message.getDataVersion());
messageLog.addLogContent("ackDataVersion", ackDataVersion);
messageLog.addLogContent("dataVersion", message.getDataVersion());
messageLog.addLogContent("messageState", message.getState());
BiConsumer<Boolean, String> ackLogger = (ackSuccess, ackDescription) -> {
messageLog.addLogContent("ackSuccess", ackSuccess);
messageLog.addLogContent("ackDescription", ackDescription);
};
if (ackDataVersion == null) {
ackLogger.accept(false, "ackDataVersion为空");
continue;
}
if (ackDataVersion > message.getDataVersion()) {
ackLogger.accept(false, "ackDataVersion大于当前dataVersion");
continue;
}
if (!ackDataVersion.equals(message.getDataVersion())) {
ackLogger.accept(false, "数据版本不匹配");
continue;
}
// 避免前端有bug或者有新的消息更新
if (!message.getState().isUpdateAckAllowed()) {
ackLogger.accept(false, "消息状态不允许ack");
continue;
}
ackLogger.accept(true, "ack成功");
UpdatableMessage messageUpdate = new UpdatableMessage();
collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId());
Long ackDataVersion = dataVersion >= message.getAckDataVersion()
? dataVersion
: message.getAckDataVersion();
messageUpdate.setAckDataVersion(ackDataVersion);
UpdatableMessageLog.AckInfo ackInfo = new UpdatableMessageLog.AckInfo();
logExt.setAck(ackInfo);
if (!dataVersion.equals(message.getDataVersion())) {
ackInfo.setAckSuccess(false);
ackInfo.setAckDescription("数据版本不匹配");
ackInfo.setRequestDataVersion(dataVersion);
ackInfo.setMessageDataVersion(message.getDataVersion());
continue;
}
// 避免前端有bug
if (!message.getState().isUpdateAckAllowed()) {
ackInfo.setAckSuccess(false);
ackInfo.setAckDescription("消息状态不允许ack");
continue;
}
ackInfo.setAckSuccess(true);
messageUpdate.setState(UpdatableMessageState.UPDATE_ACK);
ackedBizMessageIds.add(message.bizMessageId());
}
collector.finish();
// 只需要通过bizMessageId去删除就保证安全了
messageUpdateRetryService.removeRetryByBizMessageIds(ackedBizMessageIds);
}
}

View File

@ -38,7 +38,7 @@ public class MessageUpdateRetryService {
private final CardManipulateCollectorFactory cardManipulateCollectorFactory;
private final ImProperties props;
public void removePrevRetryByBizMessageIds(Collection<String> bizMessageIds) {
public void removeRetryByBizMessageIds(Collection<String> bizMessageIds) {
if (CollectionUtils.isNotEmpty(bizMessageIds))
messageUpdateRetryDao.getBaseMapper().deleteByBizMessageIds(bizMessageIds);
}
@ -51,8 +51,8 @@ public class MessageUpdateRetryService {
}
private void advanceRetryImpl(List<String> bizMessageIds) {
removePrevRetryByBizMessageIds(bizMessageIds);
List<UpdatableMessage> messages = updatableMessageDao.getByBizMessageIdsForUpdate(bizMessageIds);
removeRetryByBizMessageIds(bizMessageIds);
UpdateAckState state = new UpdateAckState(messages, props.getMessageUpdateAckMaxRetryCount());
// 先发本次生试, 再调度下一次重试
retryUpdateMessage(state.getNonAckMessageIds());
@ -65,7 +65,7 @@ public class MessageUpdateRetryService {
List<String> bizMessageIds = messages.stream()
.map(UpdatableMessage::getBizMessageId)
.collect(toList());
removePrevRetryByBizMessageIds(bizMessageIds);
removeRetryByBizMessageIds(bizMessageIds);
ArrayList<MessageUpdateRetry> retries = new ArrayList<>();
CardManipulateCollector collector = cardManipulateCollectorFactory.create();
Date nextRetryTime = DateTime.now()
@ -96,5 +96,4 @@ public class MessageUpdateRetryService {
.addUpdateHistories("retryUpdateHistoryCreated", messages);
updateSupport.updateHistoryId(result, UpdatableMessage::setRetryHistoryId);
}
}