REQ-3201: ack

This commit is contained in:
yanglin 2024-12-13 10:12:15 +08:00
parent 2ebb78afe0
commit b15d95881e
2 changed files with 20 additions and 23 deletions

View File

@ -12,25 +12,24 @@ import lombok.RequiredArgsConstructor;
public enum UpdatableMessageState { public enum UpdatableMessageState {
// 已创建 // 已创建
TASK_CREATED(true, false), TASK_CREATED(true),
// 消息已经放进队列 // 消息已经放进队列
INIT_MESSAGE_QUEUED(true, false), INIT_MESSAGE_QUEUED(true),
// 消息已经发送成功 // 消息已经发送成功
INIT_MESSAGE_SEND_SUCCESS(true, false), INIT_MESSAGE_SEND_SUCCESS(true),
// 消息已经发送失败 // 消息已经发送失败
INIT_MESSAGE_SEND_FAIL(true, false), INIT_MESSAGE_SEND_FAIL(true),
// 更新已经放进队列 // 更新已经放进队列
UPDATE_MESSAGE_QUEUED(true, false), UPDATE_MESSAGE_QUEUED(true),
// 更新已经发送成功 // 更新已经发送成功
UPDATE_MESSAGE_SEND_SUCCESS(true, true), UPDATE_MESSAGE_SEND_SUCCESS(true),
// 更新已经发送失败 // 更新已经发送失败
UPDATE_MESSAGE_SEND_FAIL(true, true), UPDATE_MESSAGE_SEND_FAIL(true),
// 更新ACK // 更新ACK
UPDATE_ACK(true, false), UPDATE_ACK(true),
// 未找到账号 // 未找到账号
ACCOUNT_NOT_FOUND(false, false); ACCOUNT_NOT_FOUND(false);
private final boolean isUpdateMessageAllowed; private final boolean isUpdateMessageAllowed;
private final boolean isUpdateAckAllowed;
} }

View File

@ -305,12 +305,14 @@ public class UpdatableMessageManager {
messageLog.setRetryCount(0L); messageLog.setRetryCount(0L);
messageLog.setBizBody(null); messageLog.setBizBody(null);
messageLog.setDataVersion(message.getDataVersion()); messageLog.setDataVersion(message.getDataVersion());
messageLog.addLogContent("ackDataVersion", ackDataVersion);
messageLog.addLogContent("dataVersion", message.getDataVersion());
messageLog.addLogContent("messageState", message.getState());
BiConsumer<Boolean, String> ackLogger = (ackSuccess, ackDescription) -> { BiConsumer<Boolean, String> ackLogger = (ackSuccess, ackDescription) -> {
messageLog.addLogContent("ackSuccess", ackSuccess); HashMap<String, Object> ackInfo = new HashMap<>();
messageLog.addLogContent("ackDescription", ackDescription); ackInfo.put("ackDataVersion", ackDataVersion);
ackInfo.put("dataVersion", message.getDataVersion());
ackInfo.put("messageState", message.getState());
ackInfo.put("ackSuccess", ackSuccess);
ackInfo.put("ackDescription", ackDescription);
messageLog.addLogContent("ackInfo", ackInfo);
}; };
if (ackDataVersion == null) { if (ackDataVersion == null) {
ackLogger.accept(false, "ackDataVersion为空"); ackLogger.accept(false, "ackDataVersion为空");
@ -320,13 +322,8 @@ public class UpdatableMessageManager {
ackLogger.accept(false, "ackDataVersion大于当前dataVersion"); ackLogger.accept(false, "ackDataVersion大于当前dataVersion");
continue; continue;
} }
if (!ackDataVersion.equals(message.getDataVersion())) { if (ackDataVersion < message.getAckDataVersion()) {
ackLogger.accept(false, "数据版本不匹配"); ackLogger.accept(false, "ackDataVersion小于上次ackDataVersion");
continue;
}
// 避免前端有bug或者有新的消息更新
if (!message.getState().isUpdateAckAllowed()) {
ackLogger.accept(false, "消息状态不允许ack");
continue; continue;
} }
ackLogger.accept(true, "ack成功"); ackLogger.accept(true, "ack成功");
@ -334,7 +331,8 @@ public class UpdatableMessageManager {
collector.updateMessage(messageUpdate); collector.updateMessage(messageUpdate);
messageUpdate.setId(message.getId()); messageUpdate.setId(message.getId());
messageUpdate.setAckDataVersion(ackDataVersion); messageUpdate.setAckDataVersion(ackDataVersion);
messageUpdate.setState(UpdatableMessageState.UPDATE_ACK); if (ackDataVersion.equals(message.getDataVersion()))
messageUpdate.setState(UpdatableMessageState.UPDATE_ACK);
ackedBizMessageIds.add(message.bizMessageId()); ackedBizMessageIds.add(message.bizMessageId());
} }
collector.finish(); collector.finish();