REQ-2481: 支持动态调整优先级

This commit is contained in:
yanglin 2024-06-03 19:56:28 +08:00
parent a093e1dd96
commit 00e9c12e52
14 changed files with 113 additions and 54 deletions

View File

@ -4,9 +4,9 @@ import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.AsyncSendMessageParam;
import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendCustomMessageParam;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdateTemplateSendPriorityRequest;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
@ -71,5 +71,10 @@ public interface MessageApi {
@PostMapping("api/im/custom-message/send")
ApiResult<List<MessageCustomResp>> sendCustomMessage(@RequestBody @Validated CustomMessageInfo messageInfo);
@PostMapping("api/im/custom-message/updateSendPriority")
ApiResult<Boolean> updateSendPriority(
@RequestBody @Validated UpdateTemplateSendPriorityRequest request);
@PostMapping("api/im/custom-message/interceptSend")
ApiResult<Boolean> interceptSend();
}

View File

@ -0,0 +1,17 @@
package cn.axzo.im.center.api.vo.req;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* @author yanglin
*/
@Data
public class UpdateTemplateSendPriorityRequest {
@NotBlank(message = "模板业务前缀不能为空")
private String bizIdPrefix;
@NotNull(message = "发送优先级不能为空")
private Integer sendPriority;
}

View File

@ -10,6 +10,7 @@ import cn.axzo.im.center.api.vo.req.CustomMessageInfo;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.UpdateTemplateSendPriorityRequest;
import cn.axzo.im.center.api.vo.resp.MessageCustomResp;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
@ -19,6 +20,7 @@ import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.enums.ApiChannel;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.MessageTaskStatus;
import cn.axzo.im.job.SendMessageJob;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.MessageHistoryService;
@ -75,6 +77,8 @@ public class MessageController implements MessageApi {
private AccountRegisterService accountRegisterService;
@Autowired
private MessageHistoryService messageHistoryService;
@Autowired
private SendMessageJob sendMessageJob;
@Override
@ -89,6 +93,17 @@ public class MessageController implements MessageApi {
return ApiResult.ok(messageRespList);
}
@Override
public ApiResult<Boolean> updateSendPriority(UpdateTemplateSendPriorityRequest request) {
boolean updated = messageHistoryService.updateSendPriority(request);
return ApiResult.ok(updated);
}
@Override
public ApiResult<Boolean> interceptSend() {
return ApiResult.ok(sendMessageJob.interceptExec());
}
@ExceptionHandler({ RequestNotPermitted.class })
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
public ApiResult<String> handleRequestNotPermitted() {

View File

@ -1,7 +1,7 @@
package cn.axzo.im.job;
import cn.axzo.im.send.ScanAndSendService;
import cn.axzo.im.send.handler.SendHandler;
import cn.axzo.im.send.SendExec;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
@ -18,29 +18,39 @@ import org.springframework.stereotype.Component;
public class SendMessageJob extends IJobHandler {
private final ScanAndSendService scanAndSendService;
private volatile boolean isRunning = false;
private volatile SendExec exec;
@Override
@XxlJob("sendMessageJob")
public ReturnT<String> execute(String param) {
if (isRunning)
SendExec exec = this.exec;
if (exec != null)
throw new RuntimeException("无法同时执行多个操作...");
synchronized (this) {
if (isRunning)
exec = this.exec;
if (exec != null)
throw new RuntimeException("无法同时执行多个操作...");
isRunning = true;
this.exec = exec = SendExec.commonChannels();
}
try {
scanAndSendService.scanAndSend(SendHandler.COMMON_MSG_CHANNELS);
scanAndSendService.scanAndSend(exec);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error("执行失败. para={}", param, e);
return ReturnT.FAIL;
} finally {
synchronized (this) {
isRunning = false;
this.exec = null;
}
}
}
public synchronized boolean interceptExec() {
SendExec exec = this.exec;
if (exec == null) return false;
exec.setExecIntercepted(true);
this.exec = null;
log.info("中断job执行");
return true;
}
}

View File

@ -14,7 +14,6 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
@ -36,16 +35,16 @@ public class ScanAndSendService {
private final SendJobInfoMapper sendJobInfoMapper;
private final ForkJoinPool threadPool = new ForkJoinPool(15);
public final void scanAndSend(List<Class<? extends SendHandler<?>>> channels) throws Exception {
public final void scanAndSend(SendExec exec) throws Exception {
try {
scanAndSendImpl(channels);
scanAndSendImpl(exec);
} catch (Exception e) {
log.error("执行异常", e);
throw e;
}
}
private void scanAndSendImpl(List<Class<? extends SendHandler<?>>> channels) throws Exception {
private void scanAndSendImpl(SendExec exec) throws Exception {
if (isAppShuttingDown) {
log.warn("application is shutting down... skip execution");
return;
@ -56,7 +55,7 @@ public class ScanAndSendService {
log.info("扫描并发送消息总耗时: {}", MiscUtils.formatDuration(deltaMs));
return deltaMs;
};
SendManager sendManager = new SendManager(applicationContext);
SendManager sendManager = new SendManager(applicationContext, exec);
int estimatedCount = sendManager.queue().totalCount();
if (estimatedCount == 0) {
log.info("没有IM消息需要发送, 跳过执行");
@ -65,7 +64,7 @@ public class ScanAndSendService {
}
SendJobInfo sendJobInfo = trySaveSendJobInfo(estimatedCount);
AsyncTasks<ExecResult> tasks = new AsyncTasks<>(false);
for (Class<? extends SendHandler<?>> channel : channels) {
for (Class<? extends SendHandler<?>> channel : exec.getChannels()) {
SendHandler<?> sendHandler = applicationContext.getBean(channel);
tasks.add(CompletableFuture.supplyAsync(
new SendExecutor<>(sendManager, sendHandler), threadPool));
@ -84,7 +83,6 @@ public class ScanAndSendService {
} catch (InterruptedException e) {
sendManager.interrupt();
log.warn("执行被中断", e);
MiscUtils.sleepQuietly(2, TimeUnit.SECONDS);
} finally {
log.info("扫描并发送消息结束, 预计发送消息数量: {}, 实际发送条数: {}," +
" 是否中断: {}, 是否有新消息插队: {}",
@ -101,11 +99,11 @@ public class ScanAndSendService {
sendJobInfo.getRecordExt().setExecTime(MiscUtils.formatDuration(execTimeMs));
sendJobInfo.getRecordExt().setSomeoneJumpedTheQueue(
sendManager.isSomeoneJumpedTheQueue() ? YesNo.YES : YesNo.NO);
for (int i = 0; i < channels.size(); i++) {
for (int i = 0; i < exec.getChannels().size(); i++) {
try {
ExecResult execResult = tasks.getFuture(i).get(1, TimeUnit.SECONDS);
sendJobInfo.getRecordExt().addSummary(
channels.get(i).getSimpleName(), execResult.getSummaries());
exec.getChannels().get(i).getSimpleName(), execResult.getSummaries());
} catch (Exception e) {
log.warn("发送任务异常", e);
}

View File

@ -0,0 +1,26 @@
package cn.axzo.im.send;
import cn.axzo.im.send.handler.CommonSendBatchHandler;
import cn.axzo.im.send.handler.CommonSendOneHandler;
import cn.axzo.im.send.handler.SendHandler;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
import java.util.List;
/**
* @author yanglin
*/
@Data
@RequiredArgsConstructor
public class SendExec {
public static SendExec commonChannels() {
return new SendExec(Arrays.asList(
CommonSendBatchHandler.class, CommonSendOneHandler.class));
}
private final List<Class<? extends SendHandler<?>>> channels;
private volatile boolean execIntercepted;
}

View File

@ -19,6 +19,7 @@ import java.util.concurrent.CompletableFuture;
class SendManager {
private final SendQueue queue;
private final SendExec exec;
private final MessageHistoryMapper messageHistoryMapper;
private final SendMessageConfig cfg;
private final Date maxCreateAt;
@ -27,16 +28,19 @@ class SendManager {
private volatile boolean execInterrupted;
@Getter private volatile boolean someoneJumpedTheQueue;
SendManager(ApplicationContext applicationContext) {
SendManager(ApplicationContext applicationContext, SendExec exec) {
this.cfg = applicationContext.getBean(ImProperties.class).getSendMessage();
this.messageHistoryMapper = applicationContext.getBean(MessageHistoryMapper.class);
this.queue = new SendQueue(applicationContext);
this.exec = exec;
this.maxCreateAt = getMaxCreateAt();
this.asyncTasks = new AsyncTasks<>();
this.lastCheckJumpTheQueueTime = new Date();
}
public boolean isInterrupted() {
if (exec.isExecIntercepted())
return true;
if (execInterrupted || someoneJumpedTheQueue)
return true;
if (ScanAndSendService.isAppShuttingDown)

View File

@ -22,7 +22,7 @@ import java.util.List;
*/
@Component
@RequiredArgsConstructor
class CommonSendBatchHandler extends SendBatchHandler {
public class CommonSendBatchHandler extends SendBatchHandler {
private final ImProperties props;
private final IMChannelProvider imChannelProvider;

View File

@ -18,7 +18,7 @@ import org.springframework.stereotype.Component;
*/
@Component
@RequiredArgsConstructor
class CommonSendOneHandler extends SendOneHandler {
public class CommonSendOneHandler extends SendOneHandler {
private final ImProperties props;
private final IMChannelProvider imChannelProvider;

View File

@ -3,21 +3,17 @@ package cn.axzo.im.send.handler;
import cn.axzo.im.send.SendExecutor;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
/**
* @author yanglin
*/
public interface SendHandler<T> {
List<Class<? extends SendHandler<?>>> COMMON_MSG_CHANNELS = Arrays.asList(
CommonSendBatchHandler.class, CommonSendOneHandler.class);
@Nullable
T getSendRecord(SendExecutor<T> executor);
void sendAndSubmitUpdate(SendExecutor<T> executor, T record);
double getTps();
}

View File

@ -1,6 +1,8 @@
package cn.axzo.im.service;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.UpdateTemplateSendPriorityRequest;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.maokai.api.vo.response.OrganizationalUnitVO;
import cn.axzo.pokonyan.dao.page.IPageParam;
@ -13,6 +15,8 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.springframework.beans.BeanUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
@ -30,6 +34,8 @@ public interface MessageHistoryService extends IService<MessageHistory> {
void updateBatch(List<MessageHistory> messageHistories);
boolean updateSendPriority(UpdateTemplateSendPriorityRequest request);
@SuperBuilder
@Data
@NoArgsConstructor

View File

@ -3,6 +3,7 @@ package cn.axzo.im.service.impl;
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.im.center.api.vo.req.UpdateTemplateSendPriorityRequest;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
@ -88,6 +89,15 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
private static final String TARGET_TYPE = "messageHistoryId";
@Override
public boolean updateSendPriority(UpdateTemplateSendPriorityRequest request) {
return lambdaUpdate()
.likeRight(MessageHistory::getBizId, request.getBizIdPrefix())
.eq(MessageHistory::getStatus, MessageHistoryStatus.PENDING)
.set(MessageHistory::getSendPriority, request.getSendPriority())
.update();
}
@Override
public Page<MessageHistoryDTO> page(PageMessageHistoryParam param) {
QueryWrapper<MessageHistory> wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class);

View File

@ -8,7 +8,6 @@ import cn.axzo.im.channel.netease.dto.MessageDispatchResponse;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.send.handler.SendHandler;
import cn.axzo.im.utils.MiscUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -44,7 +43,7 @@ class ScanAndSendServiceTest {
.set(MessageHistory::getResult, "")
.set(MessageHistory::getTimestampForSend, new Date())
.update();
scanAndSendService.scanAndSend(SendHandler.COMMON_MSG_CHANNELS);
scanAndSendService.scanAndSend(SendExec.commonChannels());
}
@Test

View File

@ -1,27 +0,0 @@
package cn.axzo.im.send;
import cn.axzo.im.Application;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class SendManagerTest {
private final ApplicationContext applicationContext;
@Test
void foo() throws Exception {
SendManager sendManager = new SendManager(applicationContext);
System.out.println("#########################");
Thread.sleep(20 * 1000);
System.out.println("----> " + sendManager.isInterrupted());
}
}