一,前言
单机版限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制。而分布式限流,以集群为维度,可以方便的控制这个集群的请求限制,从而保护下游依赖的各种服务资源。
分布式限流最关键的是要将限流服务做成原子化 ,我们可以借助 Redis 的计数器,Lua 执行的原子性,进行分布式限流。具体实现上存储了两个 key,一个用于计时,一个用于计数。请求每调用一次,计数器加1,若在计时器时间内计数器未超过阈值,则放行。
Redis + Lua 限流基于的是令牌桶算法,系统以恒定速率向桶里放入令牌,当请求来的时候,从桶里拿走一个令牌,如果桶里没有令牌则阻塞或者拒绝新的请求,它允许突发的流量。令牌桶的另外一个好处是可以方便的改变放入桶中的令牌的速率,可以方便实现动态限流。
为什么使用 Lua 脚本?
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,Redis 支持 Lua 脚本,所以通过 Lua 实现限流的算法。Lua本身就是一种编程语言(脚本语言),Redis 脚本使用 Lua 解释器来执行脚本。虽然Redis 官方没有直接提供限流相应的API,但却支持了 Lua 脚本的功能,可以使用它实现复杂的令牌桶或漏桶算法,也是分布式系统中实现限流的主要方式之一。
Lua 脚本实现算法对比操作 Redis 实现算法的优点:
-
减少网络开销:使用Lua脚本,无需向Redis 发送多次请求,执行一次即可,减少网络传输;
-
原子操作:Redis 将整个 Lua 脚本作为一个命令执行,原子,无需担心并发;
-
复用:Lua脚本一旦执行,会永久保存 Redis 中,其他客户端可复用;
二,实现步骤
2.1,项目中引入 Redis(已配置即跳过)
maven:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Redis配置类:
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfigurtion {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
2.2,创建自定义限流注解
通过切面拦截带有自定义限流注解的接口或方法,通过注解方式就可轻松给接口加上限流操作。
import java.lang.annotation.*;
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MyRedisLimiter {
//限流key,缓存到redis的key=prefix()+key()
String key();
//Key的前缀
String prefix() default "limiter:";
//给定的时间范围 单位(秒) 默认1秒 即1秒内超过count次的请求将会被限流
int period() default 1;
// period()时间内最多访问的次数
int count();
}
2.3,编写 limit.lua 脚本
通过 limit.lua 脚本实现限流逻辑,将 limit.lua 文件放到项目 resources 目录下。
local count
count = redis.call('get',KEYS[1])
-- 不超过阈值,则直接返回
if count and tonumber(count) > tonumber(ARGV[1]) then
return count;
end
-- 自加
count = redis.call('incr',KEYS[1])
if tonumber(count) == 1 then
-- 从第一次调用开始限流,设置对应key的过期时间
redis.call('expire',KEYS[1],ARGV[2])
end
return count;
2.3,切面实现限流业务逻辑
通过 redisTemplate.execute() 执行 lua 脚本返回周期时间内的请求次数,根据次数判断是否触发限流。
import com.google.common.collect.ImmutableList;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.lang.reflect.Method;
@Slf4j
@Aspect
@Component
public class RedisLimitAspect {
@Autowired
private RedisTemplate redisTemplate;
private static final String LIMIT_LUA_PATH = "limit.lua";
private DefaultRedisScript<Number> redisScript;
/**
* 初始化加载Lua脚本
*/
@PostConstruct
public void init() {
redisScript = new DefaultRedisScript<>();
redisScript.setResultType(Number.class);
ClassPathResource classPathResource = new ClassPathResource(LIMIT_LUA_PATH);
try {
//探测资源是否存在
classPathResource.getInputStream();
redisScript.setScriptSource(new ResourceScriptSource(classPathResource));
} catch (IOException e) {
log.error("未找到文件:{}", LIMIT_LUA_PATH);
}
}
/**
* 对@MyRedisLimiter注解拦截
*/
@Pointcut("@annotation(com.ym.framework.limit.MyRedisLimiter)")
public void pointcut(){}
/**
* 对切点进行继续处理
*/
@Around("pointcut()")
public Object limit(ProceedingJoinPoint pjp) {
log.info("[限流切面]进入限流逻辑");
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
MyRedisLimiter limitAnnotation = method.getAnnotation(MyRedisLimiter.class);
int limitPeriod = limitAnnotation.period();
int limitCount = limitAnnotation.count();
String limitKey = limitAnnotation.key();
String key = limitAnnotation.prefix() + limitKey;
ImmutableList<String> keys = ImmutableList.of(key);
try {
// 执行lua脚本,返回count:limitPeriod内的请求次数
Number count = (Number)redisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
log.info("[限流切面]try to access, this time count is {} for key: {}", count, key);
if (count != null && count.intValue() <= limitCount) {
//放行请求
return pjp.proceed();
} else {
//触发限流,降级或抛异常
throw new Exception("服务器繁忙,请稍后再试");
}
} catch (Throwable e) {
if (e instanceof RuntimeException) {
throw new RuntimeException(e.getLocalizedMessage());
}
throw new RuntimeException("[限流切面]服务器繁忙,请稍后再试");
}
}
}
2.4,接口测试
@Api(tags = "限流测试接口")
@Validated
@RestController
@CrossOrigin
@RequestMapping("/api/limit")
public class TestLimiterController {
// 1秒内限制10个请求,key=limiter:limitTest
@MyRedisLimiter(key = "limitTest", count = 10)
@GetMapping(value = "/test1")
public String test1() {
System.out.println("test1");
return "test1";
}
// 10秒内限制5个请求,key=limiter:REDPACKET
@MyRedisLimiter(key = "REDPACKET", period = 10, count = 5)
@GetMapping("/test2")
public String test2() {
System.out.println("test2");
return "test2";
}
// 10秒内限制5个请求,key=limiter:SECKILL
@MyRedisLimiter(key = "SECKILL", period = 10, count = 5)
@GetMapping("/test3")
public String test3() {
System.out.println("test3");
return "test3";
}
}
评论区