REQ-3345: 同步消息

This commit is contained in:
yanglin 2025-01-24 11:26:26 +08:00
parent 7f1caa9274
commit 51e87a8db7
5 changed files with 17 additions and 7 deletions

View File

@ -14,6 +14,7 @@ import lombok.NoArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
@ -160,6 +161,8 @@ public class MessageHistory implements Serializable, NimMessageHistory {
public Optional<String> determineBatchNo() { public Optional<String> determineBatchNo() {
if (isUpdatableMessage()) if (isUpdatableMessage())
return Optional.empty(); return Optional.empty();
if (isSendToGroup())
return Optional.empty();
String batchNo = this.batchNo; String batchNo = this.batchNo;
// 兼容在途数据 // 兼容在途数据
if (StringUtils.isBlank(batchNo) && imMessageTaskId != null) if (StringUtils.isBlank(batchNo) && imMessageTaskId != null)
@ -168,4 +171,9 @@ public class MessageHistory implements Serializable, NimMessageHistory {
batchNo = null; batchNo = null;
return Optional.ofNullable(batchNo); return Optional.ofNullable(batchNo);
} }
public boolean isSendToGroup() {
return AppTypeEnum.NONE.is(getAppType())
&& NumberUtils.isDigits(getToAccount());
}
} }

View File

@ -58,6 +58,7 @@ public class SendQueue {
private int sendCount = 0; private int sendCount = 0;
private boolean lastLoadEmpty = false; private boolean lastLoadEmpty = false;
private long lastPrintCompleted = 0; private long lastPrintCompleted = 0;
private boolean seenSendToGroupMessage = false;
SendQueue(ApplicationContext applicationContext, ApiChannel apiChannel) { SendQueue(ApplicationContext applicationContext, ApiChannel apiChannel) {
this.apiChannel = apiChannel; this.apiChannel = apiChannel;
@ -70,6 +71,8 @@ public class SendQueue {
@NotNull @NotNull
public synchronized List<MessageHistory> pollBatch(int batchSize) { public synchronized List<MessageHistory> pollBatch(int batchSize) {
BizAssertions.assertTrue(batchSize > 0, "batchSize必须大于0"); BizAssertions.assertTrue(batchSize > 0, "batchSize必须大于0");
if (seenSendToGroupMessage)
return Collections.emptyList();
if (determineNoMoreRecords()) if (determineNoMoreRecords())
return Collections.emptyList(); return Collections.emptyList();
// 绝对的优先级优先 // 绝对的优先级优先
@ -210,6 +213,8 @@ public class SendQueue {
} }
if (this.records.isEmpty()) if (this.records.isEmpty())
lastLoadEmpty = true; lastLoadEmpty = true;
this.seenSendToGroupMessage = this.seenSendToGroupMessage
|| records.stream().anyMatch(MessageHistory::isSendToGroup);
} }
public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt updateExt) { public void scheduleRetrySend(List<MessageHistory> histories, HistoryRecordExt updateExt) {

View File

@ -18,7 +18,6 @@ import java.util.List;
* @author yanglin * @author yanglin
*/ */
@Component @Component
@Deprecated // 发送给群时无法使用批量接口, 也不想改动SendQueue了, 后续有时间再优化
@RequiredArgsConstructor @RequiredArgsConstructor
public class CommonSendBatchHandler extends SendBatchHandler { public class CommonSendBatchHandler extends SendBatchHandler {

View File

@ -1,6 +1,5 @@
package cn.axzo.im.send.handler; package cn.axzo.im.send.handler;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.center.common.enums.ChannelMsgTypeEnum; import cn.axzo.im.center.common.enums.ChannelMsgTypeEnum;
import cn.axzo.im.channel.IMChannelProvider; import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageDispatchRequest; import cn.axzo.im.channel.netease.dto.MessageDispatchRequest;
@ -10,7 +9,6 @@ import cn.axzo.im.send.MessageHistoryNimLogger;
import cn.axzo.im.send.SendExecutor; import cn.axzo.im.send.SendExecutor;
import cn.axzo.im.utils.ImProperties; import cn.axzo.im.utils.ImProperties;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@ -31,9 +29,7 @@ public class CommonSendOneHandler extends SendOneHandler {
history.getBizId(), history.determineBatchNo().orElse(null)); history.getBizId(), history.determineBatchNo().orElse(null));
MessageDispatchRequest sendRequest = new MessageDispatchRequest(); MessageDispatchRequest sendRequest = new MessageDispatchRequest();
sendRequest.setFrom(history.getFromAccount()); sendRequest.setFrom(history.getFromAccount());
boolean isSendToGroup = AppTypeEnum.NONE.is(history.getAppType()) sendRequest.setOpe(history.isSendToGroup() ? 1 : 0);
&& NumberUtils.isDigits(history.getToAccount());
sendRequest.setOpe(isSendToGroup ? 1 : 0);
sendRequest.setTo(history.getToAccount()); sendRequest.setTo(history.getToAccount());
Integer nimMessageType = history.getOrCreateRecordExt().getNimMessageType(); Integer nimMessageType = history.getOrCreateRecordExt().getNimMessageType();
if (nimMessageType != null) if (nimMessageType != null)

View File

@ -3,6 +3,7 @@ package cn.axzo.im.send.job;
import cn.axzo.im.center.api.vo.ApiChannel; import cn.axzo.im.center.api.vo.ApiChannel;
import cn.axzo.im.send.ScanAndSendService; import cn.axzo.im.send.ScanAndSendService;
import cn.axzo.im.send.SendExec; import cn.axzo.im.send.SendExec;
import cn.axzo.im.send.handler.CommonSendBatchHandler;
import cn.axzo.im.send.handler.CommonSendOneHandler; import cn.axzo.im.send.handler.CommonSendOneHandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
@ -34,7 +35,8 @@ public class SendMessageJob extends SendMessageExecInstance {
@Override @Override
SendExec newExecution() { SendExec newExecution() {
return new SendExec(ApiChannel.COMMON_MESSAGE, CommonSendOneHandler.class); return new SendExec(ApiChannel.COMMON_MESSAGE,
CommonSendBatchHandler.class, CommonSendOneHandler.class);
} }
} }