feat: 新增限流器

This commit is contained in:
zengxiaobo 2024-07-05 15:22:48 +08:00
parent a949372904
commit c5e235872b
6 changed files with 518 additions and 0 deletions

View File

@ -1,5 +1,6 @@
package cn.axzo.foundation.result;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
@ -26,6 +27,8 @@ public class ApiResult<T> {
protected T data;
protected JSONObject stacks;
public static <T> ApiResult<T> success() {
return success(null);
}
@ -71,6 +74,12 @@ public class ApiResult<T> {
return SUCCESS_CODE.equals(getCode());
}
public ApiResult<T> setStacks(JSONObject stacks) {
this.stacks = stacks;
return this;
}
/**
* 根据appId 获取标准的code
* 如果code > 100000 则认为可能已经带了appId

View File

@ -0,0 +1,116 @@
package cn.axzo.foundation.redis.support;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import lombok.*;
import java.util.*;
import java.util.stream.Collectors;
public interface RateLimiter {
/**
* 尝试获得锁, 获取失败则返回Optional.empty()
* 如果获取锁成功. 则返回Optional<Permit>. 同时计数器增加
* Permit支持取消
*
* @param value 业务标识
* @param step 指定步长
* @return
*/
Optional<Permit> tryAcquire(Object value, long step);
default Optional<Permit> tryAcquire(Object value) {
return tryAcquire(value, 1);
}
/**
* 重置value对应的锁, 便于特殊场景下重新获取锁
*/
void reset(Object value);
/**
* 获取窗口类型
*
* @return
*/
WindowType getWindowType();
class Permit {
private List<Runnable> cancelRunners;
@Builder
public Permit(List<Runnable> cancelRunners) {
Objects.requireNonNull(cancelRunners);
this.cancelRunners = cancelRunners;
}
public void cancel() {
if (!cancelRunners.isEmpty()) {
cancelRunners.stream().forEach(e -> e.run());
}
}
}
@Getter
@AllArgsConstructor
enum WindowType {
/**
* 固定窗口, 窗口范围: start = visitTim, end = start + WindowDuration
*/
FIXED("f"),
/**
* 固定窗口, 窗口范围: start = currentMillis/WindowDuration, end = currentMillis/WindowDuration
*/
FIXED_BUCKET("fb"),
/**
* 滑动窗口, 窗口范围: start = currentMillis - WindowDuration, end = currentMillis
*/
SLIDING("s");
//减少redisKey长度
private final String shortName;
}
/**
* 限流规则
* <pre>
* seconds: 窗口时长
* permits: 允许发放的令牌数量
* </pre>
*/
@Data
@Builder
@AllArgsConstructor
@ToString
class LimitRule {
long seconds;
int permits;
public boolean isValid() {
return seconds > 0 && permits > 0;
}
/**
* 根据约定规则创建Rules
* eg: express 10/1,20/1... 代表10秒1次 & 20秒1次...
* note: seconds不可重复
*/
public static List<LimitRule> fromExpression(String expression) {
if (Strings.isNullOrEmpty(expression)) {
return Collections.emptyList();
}
Map<String, String> rulesMap = Splitter.on(",")
.omitEmptyStrings()
.trimResults()
.withKeyValueSeparator("/")
.split(expression);
return rulesMap.entrySet().stream()
.map(e -> LimitRule.builder()
.seconds(Long.parseLong(e.getKey()))
.permits(Integer.parseInt(e.getValue()))
.build())
.collect(Collectors.toList());
}
}
}

View File

@ -0,0 +1,32 @@
package cn.axzo.foundation.redis.support;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
public interface RateLimiterFactory {
/**
* 构建一个基于RateLimiter
*/
RateLimiter build(RateLimiterReq rateLimiterReq);
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class RateLimiterReq {
RateLimiter.WindowType windowType;
List<RateLimiter.LimitRule> rules;
String limiterKey;
RateType rateType;
}
enum RateType {
LOCAL,
REDIS
}
}

View File

@ -0,0 +1,66 @@
package cn.axzo.foundation.redis.support.impl;
import cn.axzo.foundation.redis.support.RateLimiter;
import cn.axzo.foundation.redis.support.RateLimiterFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.hash.Hashing;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class LocalRateLimiter implements RateLimiter {
private RateLimiterFactory.RateLimiterReq rateLimiterReq;
/**
* 使用 static 保持访问的状态
*/
static private Multimap<String, Long> accessLogs = ArrayListMultimap.create();
public LocalRateLimiter(RateLimiterFactory.RateLimiterReq rateLimiterReq) {
this.rateLimiterReq = rateLimiterReq;
}
public void cleanAccessLogs() {
accessLogs = ArrayListMultimap.create();
}
@Override
public Optional<Permit> tryAcquire(Object value, long step) {
Preconditions.checkArgument(value != null);
String hash = Hashing.murmur3_32().hashString(rateLimiterReq.getLimiterKey() + String.valueOf(value), Charset.defaultCharset()).toString();
Long nowSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
accessLogs.put(hash, nowSec);
List<Long> accessTimeSecs = ImmutableList.copyOf(accessLogs.get(hash));
List<LimitRule> rules = rateLimiterReq.getRules();
boolean failed = rules.stream().anyMatch(rule -> {
if (rateLimiterReq.getWindowType() == WindowType.SLIDING) {
return accessTimeSecs.stream().filter(t -> t > (nowSec - rule.getSeconds())).count() > rule.getPermits();
} else {
return accessTimeSecs.stream().filter(t -> ((nowSec - t) / rule.getSeconds()) == 0).count() > rule.getPermits();
}
});
if (failed) {
return Optional.empty();
}
return Optional.of(Permit.builder().cancelRunners(ImmutableList.of()).build());
}
@Override
public void reset(Object value) {
String hash = Hashing.murmur3_32().hashString(rateLimiterReq.getLimiterKey() + String.valueOf(value), Charset.defaultCharset()).toString();
accessLogs.removeAll(hash);
}
@Override
public WindowType getWindowType() {
return rateLimiterReq.getWindowType();
}
}

View File

@ -0,0 +1,36 @@
package cn.axzo.foundation.redis.support.impl;
import cn.axzo.foundation.redis.support.RateLimiter;
import cn.axzo.foundation.redis.support.RateLimiterFactory;
import cn.axzo.foundation.web.support.AppRuntime;
import lombok.Builder;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Objects;
public class RateLimiterFactoryImpl implements RateLimiterFactory {
private RedisTemplate redisTemplate;
private AppRuntime appRuntime;
@Builder
public RateLimiterFactoryImpl(RedisTemplate redisTemplate, AppRuntime appRuntime) {
this.redisTemplate = redisTemplate;
this.appRuntime = appRuntime;
}
@Override
public RateLimiter build(RateLimiterFactory.RateLimiterReq rateLimiterReq) {
if (rateLimiterReq.getRateType() == RateType.LOCAL) {
return new LocalRateLimiter(rateLimiterReq);
}
Objects.requireNonNull(redisTemplate);
Objects.requireNonNull(appRuntime);
return RedisRateLimiter.builder()
.windowType(rateLimiterReq.getWindowType())
.limitRules(rateLimiterReq.getRules())
.limiterKey(rateLimiterReq.getLimiterKey())
.redisTemplate(redisTemplate)
.appRuntime(appRuntime)
.build();
}
}

View File

@ -0,0 +1,259 @@
package cn.axzo.foundation.redis.support.impl;
import cn.axzo.foundation.redis.support.RateLimiter;
import cn.axzo.foundation.web.support.AppRuntime;
import cn.axzo.foundation.web.support.utils.KeyBuilder;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class RedisRateLimiter implements RateLimiter {
private AppRuntime appRuntime;
private RedisTemplate redisTemplate;
private RateLimiterWorker rateLimiterWorker;
/**
* 自定义的key, 避免redisKey冲突. 必填
*/
private String limiterKey;
private List<LimitRule> limitRules;
/**
* 窗口保存最大时长. 主要针对Sliding方式窗口的zSet过期
*/
private Integer maxWindowDurationHour;
private WindowType windowType;
@Builder
RedisRateLimiter(AppRuntime appRuntime,
RedisTemplate redisTemplate,
WindowType windowType,
String limiterKey,
List<LimitRule> limitRules,
Integer maxWindowDurationHour) {
Objects.requireNonNull(appRuntime);
Objects.requireNonNull(redisTemplate);
Objects.requireNonNull(windowType);
Objects.requireNonNull(limitRules);
Objects.requireNonNull(limiterKey);
Preconditions.checkArgument(!limitRules.isEmpty());
if (limitRules.stream().anyMatch(p -> !p.isValid())) {
throw new RuntimeException(String.format("invalid rate expression, expression = %s", JSONObject.toJSONString(limitRules)));
}
this.windowType = windowType;
this.appRuntime = appRuntime;
this.redisTemplate = redisTemplate;
this.limitRules = limitRules;
this.limiterKey = limiterKey;
this.maxWindowDurationHour = Optional.ofNullable(maxWindowDurationHour).orElse(24);
this.rateLimiterWorker = buildWorker(windowType);
}
@Override
public Optional<Permit> tryAcquire(Object value, long step) {
if (!rateLimiterWorker.tryAcquire(value)) {
return Optional.empty();
}
List<Runnable> cancelRunners = rateLimiterWorker.visit(value, step);
return Optional.of(Permit.builder()
.cancelRunners(cancelRunners)
.build());
}
@Override
public void reset(Object value) {
rateLimiterWorker.reset(value);
}
@Override
public WindowType getWindowType() {
return windowType;
}
private RateLimiterWorker buildWorker(WindowType windowType) {
if (windowType == WindowType.FIXED) {
return new FixedWindowRateLimiter();
}
if (windowType == WindowType.FIXED_BUCKET) {
return new FixedBucketWindowRateLimiter();
}
if (windowType == WindowType.SLIDING) {
return new SlidingWindowRateLimiter();
}
throw new RuntimeException(String.format("unsupported window type, window type = %s", windowType));
}
private String buildRedisKey(Object value) {
String hash = Hashing.murmur3_128().newHasher()
.putString(limiterKey, Charsets.UTF_8)
.putString(String.valueOf(value), Charsets.UTF_8)
.hash()
.toString();
return KeyBuilder.build(appRuntime, "rl", getWindowType().getShortName(), limiterKey, hash);
}
/**
* 固定窗口限流, 窗口起始时间第一次tryAcquire时时间. 窗口大小为WindowDuration
* <pre>
* key = value + WindowDuration.
* 在该窗口被访问时, 计数器+1. 窗口持续时长为WindowDuration. 并依赖redis ttl销毁
* 窗口被销毁后, 重置计数器
* </pre>
*/
class FixedWindowRateLimiter implements RateLimiterWorker {
public List<Runnable> visit(Object value, long step) {
List<Runnable> cancels = new ArrayList<>(limitRules.size());
//根据时间构建key, 避免hash无法删除
limitRules.stream()
.forEach(e -> {
String key = buildLimiterKey(value, e);
BoundValueOperations op = redisTemplate.boundValueOps(key);
Long result = op.increment(step);
//第一次访问时设置过期时间, 以确定窗口
if (result == step) {
redisTemplate.expire(key, e.getSeconds(), TimeUnit.SECONDS);
}
cancels.add(() -> op.decrement(step));
});
return cancels;
}
@Override
public void reset(Object value) {
limitRules.stream().forEach(e -> {
String key = buildLimiterKey(value, e);
redisTemplate.delete(key);
});
}
public boolean tryAcquire(Object value) {
boolean anyMatch = limitRules.stream()
.anyMatch(limitRule -> {
String key = buildLimiterKey(value, limitRule);
return limitRule.getPermits() <= Optional.ofNullable(redisTemplate.opsForValue().get(key))
.map(e -> Long.parseLong(e.toString()))
.orElse(0L);
});
return !anyMatch;
}
protected String buildLimiterKey(Object value, LimitRule limitRule) {
return buildRedisKey(value + ":" + limitRule.getSeconds());
}
}
/**
* 滑动窗口限流, 每次获取令牌成功时间加入到zset. 后续获取令牌时每次检查zset中WindowDuration中已获取令牌数. 并判断是否可以继续获取令牌
* <pre>
* key = value
* zset value = currentMillis. score = currentMillis
* 获取令牌时, 在计算zset中 score = [currentMillis-WindowDuration, currentMillis} 的element数量
* </pre>
*/
class SlidingWindowRateLimiter implements RateLimiterWorker {
//当zset的element达到一定数量时, 清理该zet. 避免redis内存泄露
private static final int CLEAN_KEY_THRESHOLD = 1000;
private AtomicLong visitCounter = new AtomicLong();
public List<Runnable> visit(Object value, long step) {
Preconditions.checkArgument(step == 1, "滑动窗口只支持 step=1");
String key = buildRedisKey(value);
long now = System.currentTimeMillis();
String member = String.valueOf(now);
final BoundZSetOperations op = redisTemplate.boundZSetOps(key);
op.add(member, now);
redisTemplate.expire(key, maxWindowDurationHour, TimeUnit.HOURS);
if (visitCounter.incrementAndGet() > CLEAN_KEY_THRESHOLD) {
//删除过期的访问记录
op.removeRangeByScore(0, now - TimeUnit.HOURS.toMillis(maxWindowDurationHour));
visitCounter.set(0);
}
return ImmutableList.of(() -> op.remove(member));
}
@Override
public void reset(Object value) {
String key = buildRedisKey(value);
redisTemplate.delete(key);
}
public boolean tryAcquire(Object value) {
String key = buildRedisKey(value);
long now = System.currentTimeMillis();
//检查所有的rule, 如果有其中一个失败则跳出
return !limitRules.stream()
.anyMatch(p -> p.getPermits() <= redisTemplate.opsForZSet().count(key, now - TimeUnit.SECONDS.toMillis(p.getSeconds()), now));
}
}
/**
* 固定窗口限流, 窗口根据自然时间向前滚动
* 继承自FixedWindowRateLimiter, 区别在于构建key的方式不一样
* <pre>
* key = value + currentMillis/WindowDuration.
* currentMillis/WindowDuration会把自然时间分割为长为WindowDuration的片段. 片段有效期为WindowDuration
* 获取令牌时在该片段上检查是否有剩余令牌
* </pre>
*/
class FixedBucketWindowRateLimiter extends FixedWindowRateLimiter implements RateLimiterWorker {
@Override
protected String buildLimiterKey(Object value, LimitRule limitRule) {
// System.currentTimeMillis()是当前距离1970-01-01 08:00:00的毫秒数假设系统为+8时区
// 这里希望是距离1970-01-01 00:00:00GMT标准时间的毫秒数所以需要+8个小时的毫秒
long currentGMTMillis = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(8);
return buildRedisKey(value + ":" + currentGMTMillis / TimeUnit.SECONDS.toMillis(limitRule.getSeconds()));
}
}
interface RateLimiterWorker {
/**
* 尝试获取令牌
*
* @param value
* @return 如果获取成功则返回true, 失败则为false
*/
boolean tryAcquire(Object value);
/**
* 获取令牌完成后增加步长为 1的计数器
*
* @param value
* @return limit key
*/
default List<Runnable> visit(Object value) {
return visit(value, 1);
}
/**
* 获取令牌完成后增加指定步长的计数器
*
* @param value
* @param step
* @return limit key
*/
List<Runnable> visit(Object value, long step);
/**
* 重置value对应的锁
*/
void reset(Object value);
}
}