REQ-2481: 记录发送执行id

This commit is contained in:
yanglin 2024-06-04 10:10:13 +08:00
parent e1bb342d2f
commit 609f327234
7 changed files with 31 additions and 2 deletions

View File

@ -10,6 +10,7 @@ public class HistoryRecordExt {
private String sendApi;
private String sendRespDesc;
private String batchSendId;
private String sendExecId;
/**
* 因为还没有生成对应的IM账号时, 预期发送的IM账号
*/

View File

@ -45,6 +45,7 @@ public class ScanAndSendService {
}
private void scanAndSendImpl(SendExec sendExec) throws Exception {
log.info("sendExecId: {}", sendExec.getExecId());
if (isAppShuttingDown) {
log.warn("application is shutting down... skip execution");
return;
@ -55,7 +56,7 @@ public class ScanAndSendService {
log.info("扫描并发送消息总耗时: {}", MiscUtils.formatDuration(deltaMs));
return deltaMs;
};
SendManager sendManager = new SendManager(applicationContext);
SendManager sendManager = new SendManager(applicationContext, sendExec);
sendExec.setSendManager(sendManager);
int estimatedCount = sendManager.queue().totalCount();
if (estimatedCount == 0) {

View File

@ -3,9 +3,14 @@ package cn.axzo.im.send;
import cn.axzo.im.send.handler.CommonSendBatchHandler;
import cn.axzo.im.send.handler.CommonSendOneHandler;
import cn.axzo.im.send.handler.SendHandler;
import cn.axzo.im.utils.UUIDUtil;
import com.xxl.job.core.log.XxlJobFileAppender;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
@ -21,6 +26,16 @@ public class SendExec {
CommonSendBatchHandler.class, CommonSendOneHandler.class));
}
@Getter private final String execId = determineExecId();
private final List<Class<? extends SendHandler<?>>> channels;
private volatile SendManager sendManager;
private static String determineExecId() {
String logFile = XxlJobFileAppender.contextHolder.get();
if (StringUtils.isBlank(logFile))
return UUIDUtil.uuidString();
String name = Paths.get(logFile).toFile().getName();
return name.split("\\.")[0];
}
}

View File

@ -115,6 +115,10 @@ public class SendExecutor<T> implements Supplier<ExecResult> {
return sendManager.queue();
}
public SendExec sendExec() {
return sendManager.sendExec();
}
public void log(String message, Object... args) {
queue().log(message, args);
}

View File

@ -19,6 +19,7 @@ import java.util.concurrent.CompletableFuture;
public class SendManager {
private final SendQueue queue;
private final SendExec sendExec;
private final MessageHistoryMapper messageHistoryMapper;
private final SendMessageConfig cfg;
private final Date maxCreateAt;
@ -27,10 +28,11 @@ public class SendManager {
private volatile boolean execInterrupted;
@Getter private volatile boolean someoneJumpedTheQueue;
SendManager(ApplicationContext applicationContext) {
SendManager(ApplicationContext applicationContext, SendExec sendExec) {
this.cfg = applicationContext.getBean(ImProperties.class).getSendMessage();
this.messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
this.queue = new SendQueue(applicationContext);
this.sendExec = sendExec;
this.maxCreateAt = getMaxCreateAt();
this.asyncTasks = new AsyncTasks<>();
this.lastCheckJumpTheQueueTime = new Date();
@ -87,6 +89,10 @@ public class SendManager {
return queue;
}
SendExec sendExec() {
return sendExec;
}
private Date getMaxCreateAt() {
QueryWrapper<MessageHistory> query = new QueryWrapper<MessageHistory>()
.select("MAX(create_at) AS create_at");

View File

@ -42,6 +42,7 @@ public class CommonSendBatchHandler extends SendBatchHandler {
executor.submitUpdateTask(histories, () -> {
HistoryRecordExt ext = new HistoryRecordExt();
ext.setSendApi("batchSendMessage");
ext.setSendExecId(executor.sendExec().getExecId());
ext.setBatchSendId(UUIDUtil.uuidString());
ext.setSendRespDesc(StringUtils.isBlank(response.getDesc()) ? "" : response.getDesc());
if (response.isRateLimited())

View File

@ -38,6 +38,7 @@ public class CommonSendOneHandler extends SendOneHandler {
executor.submitUpdateTask(history, () -> {
HistoryRecordExt ext = new HistoryRecordExt();
ext.setSendApi("sendMessage");
ext.setSendExecId(executor.sendExec().getExecId());
ext.setSendRespDesc(StringUtils.isBlank(response.getDesc()) ? "" : response.getDesc());
if (response.isRateLimited())
executor.scheduleRetrySend(history, ext);