REQ-2481: 添加删除日志表的job

This commit is contained in:
yanglin 2024-05-31 15:11:43 +08:00
parent 9c0a99c18e
commit 5d8a0ffcfc

View File

@ -65,9 +65,11 @@ public class SendQueue {
return Collections.emptyList();
// 绝对的优先级优先
if (!cfg.isMemoryQueuedMaxBatchFirst()) {
List<MessageHistory> histories = getBatchFIFO(batchSize);
consumedCount += histories.size();
return histories;
List<Record> records = getBatchFIFO(batchSize);
consumedCount += records.size();
return records.stream()
.map(r -> r.history)
.collect(toList());
}
// 批量最大的优先, 不要浪费批量接口的流量
List<Record> perfectRecords = getQueuedMaxBatch();
@ -89,17 +91,17 @@ public class SendQueue {
.collect(toList());
}
private List<MessageHistory> getBatchFIFO(int batchSize) {
ArrayList<MessageHistory> histories = new ArrayList<>();
histories.add(records.removeFirst().history);
String currentBatchNo = histories.get(0).determineBatchNo().orElse(null);
private List<Record> getBatchFIFO(int batchSize) {
ArrayList<Record> histories = new ArrayList<>();
histories.add(records.removeFirst());
String currentBatchNo = histories.get(0).history.determineBatchNo().orElse(null);
while (currentBatchNo != null && histories.size() < batchSize && !records.isEmpty()) {
Record next = records.peekFirst();
if (next == null) break;
String nextBatchNo = next.history.determineBatchNo().orElse(null);
if (!Objects.equals(currentBatchNo, nextBatchNo))
break;
histories.add(records.removeFirst().history);
histories.add(records.removeFirst());
}
return histories;
}