REQ-2481: 支持动态调整优先级

This commit is contained in:
yanglin 2024-06-03 20:24:51 +08:00
parent 00e9c12e52
commit c8a913405e
4 changed files with 29 additions and 32 deletions

View File

@ -2,6 +2,7 @@ package cn.axzo.im.job;
import cn.axzo.im.send.ScanAndSendService;
import cn.axzo.im.send.SendExec;
import cn.axzo.im.send.SendManager;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
@ -18,38 +19,36 @@ import org.springframework.stereotype.Component;
public class SendMessageJob extends IJobHandler {
private final ScanAndSendService scanAndSendService;
private volatile SendExec exec;
private volatile SendManager sendManager;
@Override
@XxlJob("sendMessageJob")
public ReturnT<String> execute(String param) {
SendExec exec = this.exec;
if (exec != null)
SendManager sendManager = this.sendManager;
if (sendManager != null)
throw new RuntimeException("无法同时执行多个操作...");
synchronized (this) {
exec = this.exec;
if (exec != null)
sendManager = this.sendManager;
if (sendManager != null)
throw new RuntimeException("无法同时执行多个操作...");
this.exec = exec = SendExec.commonChannels();
}
try {
scanAndSendService.scanAndSend(exec);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error("执行失败. para={}", param, e);
return ReturnT.FAIL;
} finally {
synchronized (this) {
this.exec = null;
try {
this.sendManager = scanAndSendService.scanAndSend(SendExec.commonChannels());
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error("执行失败. para={}", param, e);
return ReturnT.FAIL;
} finally {
this.sendManager = null;
}
}
}
public synchronized boolean interceptExec() {
SendExec exec = this.exec;
if (exec == null) return false;
exec.setExecIntercepted(true);
this.exec = null;
SendManager sendManager = this.sendManager;
if (sendManager == null) return false;
sendManager.interrupt();
sendManager.awaitTasksTermination();
this.sendManager = null;
log.info("中断job执行");
return true;
}

View File

@ -35,19 +35,19 @@ public class ScanAndSendService {
private final SendJobInfoMapper sendJobInfoMapper;
private final ForkJoinPool threadPool = new ForkJoinPool(15);
public final void scanAndSend(SendExec exec) throws Exception {
public final SendManager scanAndSend(SendExec exec) throws Exception {
try {
scanAndSendImpl(exec);
return scanAndSendImpl(exec);
} catch (Exception e) {
log.error("执行异常", e);
throw e;
}
}
private void scanAndSendImpl(SendExec exec) throws Exception {
private SendManager scanAndSendImpl(SendExec exec) throws Exception {
if (isAppShuttingDown) {
log.warn("application is shutting down... skip execution");
return;
return null;
}
long execStart = System.currentTimeMillis();
Supplier<Long> execTimeFun = () -> {
@ -60,7 +60,7 @@ public class ScanAndSendService {
if (estimatedCount == 0) {
log.info("没有IM消息需要发送, 跳过执行");
execTimeFun.get();
return;
return sendManager;
}
SendJobInfo sendJobInfo = trySaveSendJobInfo(estimatedCount);
AsyncTasks<ExecResult> tasks = new AsyncTasks<>(false);
@ -91,7 +91,7 @@ public class ScanAndSendService {
}
sendManager.awaitTasksTermination();
Long execTimeMs = execTimeFun.get();
if (sendJobInfo == null) return;
if (sendJobInfo == null) return sendManager;
// 记录发送任务信息
sendJobInfo.setSendCount(sendManager.queue().getSendCount());
sendJobInfo.setInterrupted(
@ -109,6 +109,7 @@ public class ScanAndSendService {
}
}
sendJobInfoMapper.updateById(sendJobInfo);
return sendManager;
}
private void drainLogs(BlockingQueue<String> logQueue) {

View File

@ -22,5 +22,4 @@ public class SendExec {
}
private final List<Class<? extends SendHandler<?>>> channels;
private volatile boolean execIntercepted;
}

View File

@ -16,7 +16,7 @@ import java.util.concurrent.CompletableFuture;
/**
* @author yanglin
*/
class SendManager {
public class SendManager {
private final SendQueue queue;
private final SendExec exec;
@ -39,8 +39,6 @@ class SendManager {
}
public boolean isInterrupted() {
if (exec.isExecIntercepted())
return true;
if (execInterrupted || someoneJumpedTheQueue)
return true;
if (ScanAndSendService.isAppShuttingDown)
@ -83,7 +81,7 @@ class SendManager {
asyncTasks.add(future);
}
void awaitTasksTermination() {
public void awaitTasksTermination() {
asyncTasks.awaitTermination();
}
@ -98,7 +96,7 @@ class SendManager {
return record == null ? null : record.getCreateAt();
}
void interrupt() {
public void interrupt() {
execInterrupted = true;
}
}