REQ-3345: 添加限流

This commit is contained in:
yanglin 2025-02-08 09:30:52 +08:00
parent 1d140f63d6
commit 30975d114b
6 changed files with 46 additions and 29 deletions

View File

@ -27,6 +27,7 @@ import cn.axzo.im.group.support.GroupRateLimiter;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.domain.ImAccounts;
import cn.axzo.im.utils.BizAssertions;
import cn.axzo.im.utils.Notification;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -57,6 +58,7 @@ public class GroupManager {
private final GroupBroadcaster groupBroadcaster;
private final GroupMemberSyncer groupMemberSyncer;
private final GroupRateLimiter rateLimiter;
private final Notification notification;
@Transactional
public GroupCreateResponse createGroup(GroupCreateRequest request) {
@ -92,7 +94,7 @@ public class GroupManager {
GroupCreateResponse response = new GroupCreateResponse();
response.setTid(nimResponse.getTid());
response.setAccountsNotFound(imAccounts.getAccountNotFoundPersons(
groupSupport, "创建群", group, request.getMembers()));
notification, "创建群", group, request.getMembers()));
return response;
}
@ -131,7 +133,7 @@ public class GroupManager {
throw new ServiceException("群聊人数上限" + group.getMemberLimit() + "人, 请删除部分已选人员");
ImAccounts imAccounts = accountService.getOrCreateImAccounts(toAddMembers);
if (imAccounts.isAccountEmpty()) {
groupSupport.send("添加群成员[{},{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
notification.send("添加群成员[{},{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
group.getTid(), group.getName(), JSON.toJSONString(request.getMembers()));
return new GroupAddMembersResponse();
}
@ -149,7 +151,7 @@ public class GroupManager {
MqEventType.GROUP_ADD_MEMBERS);
GroupAddMembersResponse response = new GroupAddMembersResponse();
response.setAccountsNotFound(imAccounts.getAccountNotFoundPersons(
groupSupport, "添加群成员", group, request.getMembers()));
notification, "添加群成员", group, request.getMembers()));
return response;
}
@ -160,7 +162,7 @@ public class GroupManager {
groupSupport.log(group.getTid(), "remove-members", request);
ImAccounts imAccounts = accountService.getAccountsByPersons(request.getMembers());
if (imAccounts.isAccountEmpty()) {
groupSupport.send("移除群成员[{},{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
notification.send("移除群成员[{},{}], 有效群成员IM账号列表为空. 请求成员信息: {}",
group.getTid(), group.getName(), JSON.toJSONString(request.getMembers()));
return;
}

View File

@ -10,7 +10,6 @@ import cn.axzo.im.dao.repository.GroupLogDao;
import cn.axzo.im.entity.Group;
import cn.axzo.im.entity.GroupLog;
import cn.axzo.im.group.support.GroupProps;
import cn.axzo.im.service.ChatGroupService;
import cn.axzo.im.service.domain.ImAccounts;
import cn.axzo.im.utils.BizAssertions;
import lombok.RequiredArgsConstructor;
@ -23,11 +22,10 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class GroupSupport implements GroupNotification {
public class GroupSupport {
private static final String INTRODUCE_MESSAGE = "邀请您加入群聊";
private final ChatGroupService chatGroupService;
private final GroupProps groupProps;
private final GroupLogDao groupLogDao;
@ -106,8 +104,4 @@ public class GroupSupport implements GroupNotification {
groupLogDao.save(log);
}
public void send(String message, Object... args) {
chatGroupService.send(message, args);
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.im.group.support;
import cn.axzo.im.utils.Notification;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.InitializingBean;
@ -13,13 +14,14 @@ import org.springframework.stereotype.Component;
public class GroupRateLimiter implements InitializingBean {
private final GroupProps groupProps;
private final Notification notification;
private RateLimiter createGroupLimiter;
private RateLimiter dismissGroupLimiter;
private RateLimiter addMemberLimiter;
private RateLimiter removeMemberLimiter;
private RateLimiter getGroupInfoLimiter;
private RateLimiter changeOwnerLimiter;
private Limiter createGroupLimiter;
private Limiter dismissGroupLimiter;
private Limiter addMemberLimiter;
private Limiter removeMemberLimiter;
private Limiter getGroupInfoLimiter;
private Limiter changeOwnerLimiter;
public void requireCreateGroup() {
createGroupLimiter.acquire();
@ -47,12 +49,31 @@ public class GroupRateLimiter implements InitializingBean {
@Override
public void afterPropertiesSet() {
createGroupLimiter = RateLimiter.create(groupProps.getCreateGroupTps());
dismissGroupLimiter = RateLimiter.create(groupProps.getDismissGroupTps());
addMemberLimiter = RateLimiter.create(groupProps.getAddMemberTps());
removeMemberLimiter = RateLimiter.create(groupProps.getRemoveMemberTps());
getGroupInfoLimiter = RateLimiter.create(groupProps.getGetGroupInfoTps());
changeOwnerLimiter = RateLimiter.create(groupProps.getChangeOwnerTps());
createGroupLimiter = new Limiter("create-group", groupProps.getCreateGroupTps());
dismissGroupLimiter = new Limiter("dismiss-group", groupProps.getDismissGroupTps());
addMemberLimiter = new Limiter("group-add-member", groupProps.getAddMemberTps());
removeMemberLimiter = new Limiter("group-remove-member", groupProps.getRemoveMemberTps());
getGroupInfoLimiter = new Limiter("group-get-group-info", groupProps.getGetGroupInfoTps());
changeOwnerLimiter = new Limiter("group-change-owner", groupProps.getChangeOwnerTps());
}
private class Limiter {
final RateLimiter rateLimiter;
final String name;
Limiter(String name, int tps) {
this.rateLimiter = RateLimiter.create(tps);
this.name = name;
}
public void acquire() {
long beginMs = System.currentTimeMillis();
rateLimiter.acquire();
long deltaMs = System.currentTimeMillis() - beginMs;
if (deltaMs > 1000)
notification.send("{} wait too long to acquire rate limiter: {}ms", name, deltaMs);
}
}
}

View File

@ -18,7 +18,7 @@ import cn.axzo.im.center.api.vo.resp.HistoryMsgQueryResp;
import cn.axzo.im.center.common.enums.ChatGroupUserDataSourceEnum;
import cn.axzo.im.entity.ChatGroup;
import cn.axzo.im.entity.dto.OrganizationalNodeUserDTO;
import cn.axzo.im.group.GroupNotification;
import cn.axzo.im.utils.Notification;
import cn.hutool.core.lang.Pair;
import com.baomidou.mybatisplus.extension.service.IService;
@ -30,7 +30,7 @@ import java.util.Set;
* @date 2024/11/05
* @desc 群聊
*/
public interface ChatGroupService extends IService<ChatGroup>, GroupNotification {
public interface ChatGroupService extends IService<ChatGroup>, Notification {
/**
* 创建群聊

View File

@ -4,7 +4,7 @@ import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.im.center.api.vo.PersonAccountAttribute;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.Group;
import cn.axzo.im.group.GroupNotification;
import cn.axzo.im.utils.Notification;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
@ -32,7 +32,7 @@ public class ImAccounts {
}
public Set<PersonAccountAttribute> getAccountNotFoundPersons(
GroupNotification notification, String operation,
Notification notification, String operation,
Group group, Set<PersonAccountAttribute> persons) {
if (CollectionUtils.isEmpty(persons))
return Collections.emptySet();

View File

@ -1,8 +1,8 @@
package cn.axzo.im.group;
package cn.axzo.im.utils;
/**
* @author yanglin
*/
public interface GroupNotification {
public interface Notification {
void send(String message, Object... args);
}