REQ-3345: 添加限流

This commit is contained in:
yanglin 2025-02-06 16:45:05 +08:00
parent 4bc6ec2f8a
commit 4d09ce2ee3
8 changed files with 88 additions and 3 deletions

View File

@ -27,6 +27,7 @@ import cn.axzo.im.channel.netease.dto.RegisterResponse;
import cn.axzo.im.channel.netease.dto.RegisterUpdateRequest;
import cn.axzo.im.channel.netease.dto.UserAddChatGroupRequest;
import cn.axzo.im.channel.netease.dto.UserAddChatGroupResponse;
import cn.axzo.im.group.support.GroupManipulateRateLimiter;
import cn.axzo.im.utils.ImProperties;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpRequest;
@ -133,6 +134,9 @@ public class NimChannelService implements IMChannelProvider {
@Resource
private ImProperties props;
@Resource
private GroupManipulateRateLimiter rateLimiter;
@Override
public String getProviderAppKey() {
return appKeyUtil.getAppKey();
@ -395,6 +399,7 @@ public class NimChannelService implements IMChannelProvider {
Map<String, String> authHeaderMap = buildAuthHeader(getProviderAppKey(), getProviderAppSecret());
log.warn("chatGroupCreate-请求网易云信,URL:{},Header:{},请求参数:{}", CHAT_GROUP_CREATE,
JSONUtil.toJsonStr(authHeaderMap), JSONUtil.toJsonStr(paramMap));
rateLimiter.requireCreateGroup();
HttpResponse response = HttpRequest.post(CHAT_GROUP_CREATE).addHeaders(authHeaderMap)
.form(paramMap).timeout(5000).execute();
String result = response.body();
@ -452,6 +457,7 @@ public class NimChannelService implements IMChannelProvider {
Map<String, String> authHeaderMap = buildAuthHeader(getProviderAppKey(), getProviderAppSecret());
log.warn("userAddChatGroup-请求网易云信,URL:{},Header:{},请求参数:{}", USER_ADD_CHAT_GROUP,
JSONUtil.toJsonStr(authHeaderMap), JSONUtil.toJsonStr(paramMap));
rateLimiter.requireAddMember();
HttpResponse response = HttpRequest.post(USER_ADD_CHAT_GROUP).addHeaders(authHeaderMap)
.form(paramMap).timeout(5000).execute();
String result = response.body();
@ -484,6 +490,7 @@ public class NimChannelService implements IMChannelProvider {
Map<String, String> authHeaderMap = buildAuthHeader(getProviderAppKey(), getProviderAppSecret());
log.warn("kickChatGroup-请求网易云信,URL:{},Header:{},请求参数:{}", KICK_CHAT_GROUP,
JSONUtil.toJsonStr(authHeaderMap), JSONUtil.toJsonStr(paramMap));
rateLimiter.requireRemoveMember();
HttpResponse response = HttpRequest.post(KICK_CHAT_GROUP).addHeaders(authHeaderMap)
.form(paramMap).timeout(5000).execute();
String result = response.body();
@ -540,6 +547,7 @@ public class NimChannelService implements IMChannelProvider {
Map<String, String> authHeaderMap = buildAuthHeader(getProviderAppKey(), getProviderAppSecret());
log.warn("chatGroupQuery-请求网易云信,URL:{},Header:{},请求参数:{}", CHAT_GROUP_QUERY,
JSONUtil.toJsonStr(authHeaderMap), JSONUtil.toJsonStr(paramMap));
rateLimiter.requireGetGroupInfo();
HttpResponse response = HttpRequest.post(CHAT_GROUP_QUERY).addHeaders(authHeaderMap)
.form(paramMap).timeout(5000).execute();
String result = response.body();
@ -580,6 +588,7 @@ public class NimChannelService implements IMChannelProvider {
Map<String, String> authHeaderMap = buildAuthHeader(getProviderAppKey(), getProviderAppSecret());
log.warn("changeOwner-请求网易云信,URL:{},Header:{},请求参数:{}", CHANGE_OWNER,
JSONUtil.toJsonStr(authHeaderMap), JSONUtil.toJsonStr(paramMap));
rateLimiter.requireChangeOwner();
HttpResponse response = HttpRequest.post(CHANGE_OWNER).addHeaders(authHeaderMap)
.form(paramMap).timeout(5000).execute();
String result = response.body();

View File

@ -23,6 +23,7 @@ import cn.axzo.im.dao.repository.GroupMemberDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMember;
import cn.axzo.im.group.member.GroupMemberSyncer;
import cn.axzo.im.group.support.GroupManipulateRateLimiter;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.domain.ImAccounts;
import cn.axzo.im.utils.BizAssertions;
@ -55,6 +56,7 @@ public class GroupManager {
private final AccountService accountService;
private final GroupBroadcaster groupBroadcaster;
private final GroupMemberSyncer groupMemberSyncer;
private final GroupManipulateRateLimiter rateLimiter;
@Transactional
public GroupCreateResponse createGroup(GroupCreateRequest request) {
@ -73,6 +75,7 @@ public class GroupManager {
}
NimGroupCreateRequest nimRequest = groupSupport
.buildNimCreateGroupRequest(request, imAccounts);
rateLimiter.requireCreateGroup();
NimGroupCreateResponse nimResponse = nimClient.createGroup(nimRequest);
log.info("创建群, request={}, response={}", nimRequest, nimResponse);
BizAssertions.assertTrue(nimResponse.isSuccess(), "创建群失败: {}", nimResponse.getDesc());
@ -96,6 +99,7 @@ public class GroupManager {
if (group.isDismissed()) return;
NimGroupDismissRequest nimRequest = groupSupport
.buildNimDismissGroupRequest(group.getOwnerAccount(), group);
rateLimiter.requireDismissGroup();
NimGroupDismissResponse nimResponse = nimClient.dismissGroup(nimRequest);
log.info("解散群, request={}, response={}", nimRequest, nimResponse);
if (!nimResponse.isGroupNotFoundError())
@ -129,6 +133,7 @@ public class GroupManager {
NimGroupAddMembersRequest nimRequest = groupSupport
.buildAddMembersRequest(group, group.getOwnerAccount(), imAccounts);
// add members
rateLimiter.requireAddMember();
NimGroupAddMembersResponse nimResponse = nimClient.addGroupMembers(nimRequest);
log.info("添加群成员, request={}, response={}", nimRequest, nimResponse);
BizAssertions.assertTrue(nimResponse.isSuccess(), "添加群成员失败: {}", nimResponse.getDesc());
@ -160,6 +165,7 @@ public class GroupManager {
return;
NimGroupRemoveMembersRequest nimRequest = groupSupport
.buildRemoveMembersRequest(group, group.getOwnerAccount(), imAccounts);
rateLimiter.requireRemoveMember();
// 不判断NIM响应状态, 因为前端可能已经调用app sdk移除过成员了(支持重复移除)
NimGroupRemoveMembersResponse nimResponse = nimClient.removeGroupMembers(nimRequest);
log.info("移除群成员, request={}, response={}", nimRequest, nimResponse);

View File

@ -7,6 +7,7 @@ import cn.axzo.im.channel.netease.dto.NimGroupCreateRequest;
import cn.axzo.im.channel.netease.dto.NimGroupDismissRequest;
import cn.axzo.im.channel.netease.dto.NimGroupRemoveMembersRequest;
import cn.axzo.im.entity.Group;
import cn.axzo.im.group.support.GroupProps;
import cn.axzo.im.service.ChatGroupService;
import cn.axzo.im.service.domain.ImAccounts;
import cn.axzo.im.utils.BizAssertions;

View File

@ -13,6 +13,7 @@ import cn.axzo.im.dao.repository.GroupDao;
import cn.axzo.im.dao.repository.GroupMemberDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMember;
import cn.axzo.im.group.support.GroupManipulateRateLimiter;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.ImAccountParser;
import lombok.RequiredArgsConstructor;
@ -35,6 +36,7 @@ public class GroupMemberSyncer {
private final GroupMemberDao groupMemberDao;
private final GroupDao groupDao;
private final NimClient nimClient;
private final GroupManipulateRateLimiter rateLimiter;
public void syncMembers(Group group) {
NimGroupInfo groupInfo = fetchGroupInfo(group).orElse(null);
@ -50,6 +52,7 @@ public class GroupMemberSyncer {
NimGroupGetInfoRequest nimRequest = new NimGroupGetInfoRequest();
nimRequest.setTid(group.getTid());
long start = System.currentTimeMillis();
rateLimiter.requireGetGroupInfo();
NimGroupGetInfoResponse nimResponse = nimClient.getGroupInfo(nimRequest);
log.info("获取群信息, request={}, response={}, timeUsed={}",
nimRequest, nimResponse, System.currentTimeMillis() - start);

View File

@ -9,7 +9,7 @@ import cn.axzo.im.dao.repository.GroupDao;
import cn.axzo.im.dao.repository.GroupMessageDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupMessage;
import cn.axzo.im.group.GroupProps;
import cn.axzo.im.group.support.GroupProps;
import cn.axzo.im.utils.RecordCursor;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.taobao.api.internal.util.NamedThreadFactory;

View File

@ -0,0 +1,58 @@
package cn.axzo.im.group.support;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class GroupManipulateRateLimiter implements InitializingBean {
private final GroupProps groupProps;
private RateLimiter createGroupLimiter;
private RateLimiter dismissGroupLimiter;
private RateLimiter addMemberLimiter;
private RateLimiter removeMemberLimiter;
private RateLimiter getGroupInfoLimiter;
private RateLimiter changeOwnerLimiter;
public void requireCreateGroup() {
createGroupLimiter.acquire();
}
public void requireDismissGroup() {
dismissGroupLimiter.acquire();
}
public void requireAddMember() {
addMemberLimiter.acquire();
}
public void requireRemoveMember() {
removeMemberLimiter.acquire();
}
public void requireGetGroupInfo() {
getGroupInfoLimiter.acquire();
}
public void requireChangeOwner() {
changeOwnerLimiter.acquire();
}
@Override
public void afterPropertiesSet() {
createGroupLimiter = RateLimiter.create(groupProps.getCreateGroupTps());
dismissGroupLimiter = RateLimiter.create(groupProps.getDismissGroupTps());
addMemberLimiter = RateLimiter.create(groupProps.getAddMemberTps());
removeMemberLimiter = RateLimiter.create(groupProps.getRemoveMemberTps());
getGroupInfoLimiter = RateLimiter.create(groupProps.getGetGroupInfoTps());
changeOwnerLimiter = RateLimiter.create(groupProps.getChangeOwnerTps());
}
}

View File

@ -1,4 +1,4 @@
package cn.axzo.im.group;
package cn.axzo.im.group.support;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
@ -18,10 +18,18 @@ import org.springframework.context.annotation.Configuration;
public class GroupProps {
private int defaultMemberLimit = 499;
private int syncMessageTps = 20;
private int createGroupTps = 10;
private int dismissGroupTps = 10;
private int addMemberTps = 10;
private int removeMemberTps = 10;
private int getGroupInfoTps = 10;
private int changeOwnerTps = 10;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -48,6 +48,7 @@ import cn.axzo.im.config.MqProducer;
import cn.axzo.im.dao.mapper.ChatGroupMapper;
import cn.axzo.im.entity.ChatGroup;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.dto.OrganizationalNodeUserDTO;
import cn.axzo.im.event.payload.ChatGroupCreatePayload;
import cn.axzo.im.gateway.OrganizationalNodeUserApiGateway;
import cn.axzo.im.gateway.ProfilesApiGateway;
@ -62,7 +63,6 @@ import cn.axzo.im.service.OperateLogService;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.DateFormatUtil;
import cn.axzo.im.utils.JobCodeUtils;
import cn.axzo.im.entity.dto.OrganizationalNodeUserDTO;
import cn.axzo.tyr.client.common.enums.RoleTypeEnum;
import cn.axzo.tyr.client.model.roleuser.dto.SaasRoleUserV2DTO;
import cn.hutool.core.bean.BeanUtil;