REQ-3345: 同步群信息的job使用分片执行

This commit is contained in:
yanglin 2025-02-08 09:50:39 +08:00
parent 97e99dcb3f
commit bbe15e2019
2 changed files with 7 additions and 3 deletions

View File

@ -15,6 +15,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.taobao.api.internal.util.NamedThreadFactory;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.util.ShardingUtil;
import com.xxl.job.core.util.ShardingUtil.ShardingVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@ -59,7 +61,7 @@ public class GroupMessageSyncJob implements InitializingBean {
}
try {
log.info("start sync group messages...");
syncImpl();
syncImpl(ShardingUtil.getShardingVo());
log.info("sync group messages finished");
} finally {
isRunning = false;
@ -67,7 +69,7 @@ public class GroupMessageSyncJob implements InitializingBean {
return ReturnT.SUCCESS;
}
private void syncImpl() throws Exception {
private void syncImpl(ShardingVO sharding) throws Exception {
CountDownLatch completed = new CountDownLatch(1);
MessageSyncController controller = new MessageSyncController(tps, completed::countDown);
Date twoDayAgo = new DateTime(new Date()).minusDays(2).toDate();
@ -80,6 +82,8 @@ public class GroupMessageSyncJob implements InitializingBean {
.gt(Group::getDismissedAt, twoDayAgo))));
for (List<Group> groups : cursor) {
for (Group group : groups) {
if (group.getTid() % sharding.getTotal() != sharding.getIndex())
continue;
controller.submitGroup(group);
executor.execute(new GroupMessageSyncHandler(this, controller, group) {
@Override

View File

@ -19,7 +19,7 @@ public class GroupProps {
private int defaultMemberLimit = 499;
private int syncMessageTps = 20;
private int syncMessageTps = 10;
private int createGroupTps = 10;
private int dismissGroupTps = 10;
private int addMemberTps = 10;