首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Sentinel源码—9.限流算法的实现对比

  • 25-04-24 22:41
  • 4234
  • 7896
juejin.cn

大纲

1.漏桶算法的实现对比

(1)普通思路的漏桶算法实现

(2)节省线程的漏桶算法实现

(3)Sentinel中的漏桶算法实现

(4)Sentinel中的漏桶算法与普通漏桶算法的区别

(5)Sentinel中的漏桶算法存在的问题

2.令牌桶算法的实现对比

(1)普通思路的令牌桶算法实现

(2)节省线程的令牌桶算法实现

(3)Guava中的令牌桶算法实现

(4)Sentinel中的令牌桶算法实现

(5)Sentinel中的令牌桶算法总结


1.漏桶算法的实现对比

(1)普通思路的漏桶算法实现

(2)节省线程的漏桶算法实现

(3)Sentinel中的漏桶算法实现

(4)Sentinel中的漏桶算法与普通漏桶算法的区别

(5)Sentinel中的漏桶算法存在的问题

(1)普通思路的漏桶算法实现

一.漏桶算法的处理流程

二.漏桶算法的主要特点

三.漏桶算法的优缺点

一.漏桶算法的处理流程

漏桶算法的核心思想是以固定速率流出。

步骤一: 当新的请求到达时,会将新的请求放入缓冲区(请求队列)中,类似于往水桶里注水。

步骤二: 系统会以固定的速度处理缓冲区中的请求,类似于水从窟窿中以固定的速度流出,比如开启一个后台线程定时以固定的速度从缓冲区中取出请求然后进行分发处理。

步骤三: 如果缓冲区已满,则新的请求将被拒绝或丢弃,类似于水溢出。

二.漏桶算法的主要特点

特点一:固定速率

水从桶底的孔洞中以固定速率流出,类似于网络中以固定速率发送数据包。但写入速度不固定,也就是请求不是匀速产生的。相当于生产者生产消息不固定,消费者消费消息是匀速消费的。

特点二:有限容量

桶的容量有限,当桶满时,新到达的水会溢出,即拒绝超过容量的请求。

特点三:先进先出(FIFO)

水按照先进先出的顺序从桶中流出,类似于请求的处理顺序。

这种算法的一个重要特性是:无论请求的接收速率如何变化,请求的处理速率始终是稳定的,这就确保了系统的负载不会超过预设的阈值。但是由于请求的处理速率是固定的,所以无法处理突发流量。此外如果入口流量过大,漏桶可能会溢出,导致请求丢失。

三.漏桶算法的优缺点

优点一:平滑流量

由于以固定的速率处理请求,所以可以有效地平滑和整形流量,避免流量的突发和波动,类似于消息队列的削峰填谷的作用。

优点二:防止过载

当流入的请求超过桶的容量时,可以直接丢弃请求,防止系统过载。

缺点一:无法处理突发流量

由于漏桶的出口速度是固定的,无法处理突发流量。例如,即使在流量较小的时候,也无法以更快的速度处理请求。

缺点二:可能会丢失数据

如果入口流量过大,超过了桶的容量,那么就需要丢弃部分请求。在一些不能接受丢失请求的场景中,这可能是一个问题。

缺点三:不适合处理速率变化大的场景

如果处理速率变化大,或需要动态调整处理速率,则无法满足。

漏桶算法适用于需要以固定速率处理请求的场景。在多数业务场景中,其实并不需要按照严格的速率进行请求处理。而且多数业务场景都需要应对突发流量的能力,所以会使用令牌桶算法。

(2)节省线程的漏桶算法实现

漏桶算法可以通过延迟计算的方式来实现。延迟计算指的是不需要单独的线程来定时生成令牌或从漏桶中定时取请求,而是由调用限流器的线程自己计算是否有足够的令牌以及需要sleep的时间。延迟计算的方式可以节省一个线程资源。

csharp
代码解读
复制代码
public class LeakyBucketLimiter { //桶的最大容量 public static long threshold = 10; //当前桶内的水量 public static long count = 0; //漏水速率(每秒5次) public static long leakRate = 5; //上次漏水时间 public static long lastLeakTime = System.currentTimeMillis(); //限流方法,返回true表示通过 public boolean canPass() { //调用漏水方法 this.leak(); //判断是否超过最大请求数量 if (count < threshold) { count++; return true; } return false; } //漏水方法,计算并更新这段时间内漏水量 private void leak() { //获取系统当前时间 long currentTime = System.currentTimeMillis(); //计算这段时间内,需要流出的水量 long leakWater = (currentTime - lastLeakTime) * leakRate / 1000; count = Math.max(count - leakWater, 0); //更新最近一次的漏水时间 lastLeakTime = currentTime; } }

(3)Sentinel中的漏桶算法实现

在RateLimiterController的canPass()方法中,为了判断是否超出QPS阈值,通过原子类变量latestPassedTime简化成单线程让请求先后通过的处理模型。为了尽量让业务不受Sentinel影响,采用预估请求的被处理时间点的方式。也就是无需等前面的请求完全被处理完,才确定后面的请求被处理的时间。因为在普通的漏桶算法中,是处理完一个请求,才从漏桶取出水滴。而RateLimiterController的漏桶算法,则是假设请求已经被通过了。

具体的判断逻辑如下:首先获取系统的当前时间currentTime。然后计算在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小时间间隔costTime。接着计算当前请求最早的预期通过时间expectedTime,也就是此次请求预计会在几时几分几秒内通过。最后比较expectedTime和currentTime就可知当前请求是否允许通过了。

一.如果expectedTime小于等于currentTime

也就是当前请求最早的预期通过时间比系统当前时间小。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要晚,即当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值。于是返回true允许通过,同时更新最近允许请求通过的时间戳为当前时间。

二.如果expectedTime大于currentTime

也就是当前请求最早的预期通过时间比系统当前时间大。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要早,即当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小,所以此时必然会超QPS阈值。因此返回进行等待或者返回false不允许通过,等待的最小时间就是:最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间。

需要注意:Sentinel流量控制的漏桶算法,只能限制在costTime内的流量激增,限制不了costTime外的流量激增。比如系统启动完一瞬间就涌入大量并发请求,此时的流量激增限制不了。又比如系统处理完正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超QPS阈值的并发请求,此时也限制不了这种情况的流量激增。但如果系统处理完正常流量的最后一个请求,隔了costTime-的时间后,突然涌入超QPS阈值的并发请求,此时则可以限制这种情况的流量激增。

同时,为了避免等待的各个并发线程被同时唤醒,可以利用原子变量的addAndGet()方法 + 假设等待请求已被通过的方式,实现需要等待的并发请求进行睡眠等待的时间都不一样,从而实现并发请求排队等待的效果。

实现排队等待效果的核心逻辑:由于latestPassedTime的原子性,每个线程都会获得不一样的oldTime。接着根据oldTime - 当前时间,就可以得到每个线程需要睡眠等待的时间waitTime。此时的waitTime都将会不一样,从而避免并发线程同时被唤醒的情况。将latestPassedTime按costTime进行自增,其实相当于假设当前请求在不超过QPS阈值的情况下,被允许通过了。

java
代码解读
复制代码
public class RateLimiterController implements TrafficShapingController { //排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间 private final int maxQueueingTimeMs; //流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制 private final double count; //最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间 //注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //acquireCount代表每次从桶底流出多少个请求 //如果acquireCount小于等于0,则表示无需限流直接通过,不过acquireCount一般默认是1 if (acquireCount <= 0) { return true; } //如果限流规则的count(即限制QPS的阈值)小于等于0,则直接拒绝,相当于一个请求也不能放行 if (count <= 0) { return false; } //1.首先获取系统的当前时间 long currentTime = TimeUtil.currentTimeMillis(); //2.然后计算,在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小间隔时间(假设请求是单线程处理的) long costTime = Math.round(1.0 * (acquireCount) / count * 1000); //3.接着计算当前请求最早的预期通过时间 = 满足QPS阈值下的两个请求的最小时间间隔 + 上次请求的通过时间 long expectedTime = costTime + latestPassedTime.get(); //4.最后判断当前请求最早的预期通过时间是否比系统当前时间小 if (expectedTime <= currentTime) {//等价于没有超出QPS阈值 //当前请求最早的预期通过时间比系统当前时间小 //如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要晚 //也就是当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值,返回true允许通过 //由这里可知,latestPassedTime并不会影响costTime,也就是说,多个线程可以并发执行到这里而不受阈值的影响 //这意味着,Sentinel流量控制的漏桶算法,只能限制在costTime时间内的流量激增,限制不了costTime时间外的流量激增 //比如系统启动完的那一瞬间就涌入超出QPS阈值的并发请求,此时的这种流量激增是限制不了的; //又比如系统正常运行时处理完了正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超出QPS阈值的并发请求,此时也限制不了; //只能限制住这样的一种情况:系统正常运行处理完正常流量的最后一个请求,隔了costTime-的时间,突然涌入超出QPS阈值的并发请求 latestPassedTime.set(currentTime); return true; } else { //如果不是,即当前请求最早的预期通过时间比系统当前时间大 //则说明latestPassedTime.get()大了,也就是上一个可能由于QPS超出阈值的原因导致请求处理慢了,所以需要进行等待 //计算当前请求的等待时间,用于判断是否超出流控规则设置的最大等待时间 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { //如果超出最大等待时间,则直接返回false return false; } else {//等价于超出了QPS阈值 //当前请求最早的预期通过时间比系统当前时间大 //如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要早 //也就是当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小 //所以此时必然会超QPS阈值,因此返回进行等待或者返回false不允许通过 //而等待的最小时间,就是最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间 //首先通过latestPassedTime这个原子变量的addAndGet()方法 //将最近通过请求的时间latestPassedTime,加上先后两次请求需要的最小间隔时间costTime,得到当前请求本来预期的通过时间 //注意: //当多个并发线程执行到此处时,由于latestPassedTime的原子性,每个线程都会获得不一样的oldTime //接着根据oldTime - 当前时间,就可以得到每个线程需要睡眠等待的时间waitTime //此时的waitTime都将会不一样,从而避免并发线程同时被唤醒的情况 //将latestPassedTime进行自增,其实相当于假设当前请求在不超过QPS阈值的情况下,被允许通过了 long oldTime = latestPassedTime.addAndGet(costTime); try { //然后计算当前请求需要等待多久 = 当前请求最早的预期通过时间 - 当前系统时间 waitTime = oldTime - TimeUtil.currentTimeMillis(); //如果等待时间大于流控规则设置的最大等待时间,则需要回滚刚才更新的最近通过请求的时间 //也就是将latestPassedTime减去costTime,然后返回false表示请求无法通过 if (waitTime > maxQueueingTimeMs) { //如果发现新计算的等待时间 大于 最大等待时间,则需要回滚latestPassedTime latestPassedTime.addAndGet(-costTime); return false; } //in race condition waitTime may <= 0 if (waitTime > 0) { //当前请求需要进行等待 Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; } }

(4)Sentinel中的漏桶算法与普通漏桶算法的区别

区别一:普通漏桶算法使用的是真实队列

节省线程的漏桶算法会有一个单独的字段去记录当前桶内的水量,也就是请求量。每通过一个请求,则该字段值-1。反之,每新进一个请求,此字段值+1。

区别二:Sentinel漏桶算法使用的是虚拟队列

它没有单独的字段去记录当前桶内的请求量,而是根据最近通过请求的时间得出当前请求最早的预期通过时间来实现。其本质就是先假设当前请求可以通过,然后再按照先后请求在QPS阈值下可以允许通过时的最大时间间隔,来计算出当前请求最早的预期通过时间,再对比是否和当前发生冲突。

区别三:普通漏桶算法使用的策略是直接拒绝

如果流入速度大于流出速度,则直接拒绝。

区别四:Sentinel漏桶算法使用的策略是排队等待

如果超出了阈值,则不会直接拒绝请求,而是会等待一段时间,只要在这段时间内能处理到这个请求就不会拒绝掉。

(5)Sentinel中的漏桶算法存在的问题

问题一:在costTime时间内出现流量激增才能限流

如果在costTime时间外,即最后一次请求通过的时间已经好久了,突然流量激增以及并发进入系统,那么此时是无法限制住的。

问题二:Sentinel排队等待流控效果支持的QPS阈值不能超过1000

如果超过1000,且costTime计大于等于0.5,则会认为间隔时间都是1ms。如果costTime小于0.5,则认为配置失效,相当于没有配置此条流控规则。

ini
代码解读
复制代码
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); long costTime = Math.round(1.0 * (1) / 1100 * 1000)约等于0.9ms;

默认情况下,acquireCount的值是1,那么:如果QPS阈值count在1000~2000,此时costTime = 1,限流不受阈值影响。如果QPS阈值count大于2000,此时costTime = 0,限流配置失效。

2.令牌桶算法的实现对比

(1)普通思路的令牌桶算法实现

(2)节省线程的令牌桶算法实现

(3)Guava中的令牌桶算法实现

(4)Sentinel中的令牌桶算法实现

(5)Sentinel中的令牌桶算法总结

(1)普通思路的令牌桶算法实现

一.令牌桶算法的处理流程

二.令牌桶算法的特点

三.令牌桶算法的优缺点

四.漏桶算法和令牌桶算法的对比

五.令牌桶算法和漏桶算法的核心区别

一.令牌桶算法的处理流程

令牌桶算法的核心思想是以固定速率流入。

步骤一: 初始化令牌桶,设置其容量和生成速率。

步骤二: 当有新请求到来时,检查令牌桶中是否有足够的令牌。如果有足够的令牌,则允许请求通过,并从令牌桶中扣除相应数量令牌。如果没有足够的令牌,则拒绝请求。

步骤三: 系统会以固定的速度添加令牌,直到达到令牌桶容量,比如开启一个后台线程以固定的速度向令牌桶中添加令牌。

二.令牌桶算法的特点

特点一:支持突发流量

令牌桶算法允许在限流内应对突发流量,有助于提高系统的响应能力。

特点二:平滑处理速率

和漏桶算法一样,令牌桶算法也可以平滑处理流量,避免处理速率突变。

令牌桶算法的一个重要特性是,它能够处理突发流量。当桶中有足够的令牌时,可以一次性处理多个请求。这对于需要处理突发流量的应用场景非常有用,但是又不会无限制的增加处理速率导致压垮服务器,因为桶内令牌数量是有限制的。

三.令牌桶算法的优缺点

优点一:可以处理突发流量

令牌桶算法可以处理突发流量。当桶满时,能够以最大速度处理请求。这对于需要处理突发流量的应用场景非常有用。

优点二:限制请求处理的平均速率

在长期运行中,请求的处理速率会被限制在预定义的平均速率下,也就是生成令牌的速率。

优点三:灵活性

与漏桶算法相比,令牌桶算法提供了更大的灵活性。例如,可以动态地调整生成令牌的速率。

缺点一:可能导致过载

如果令牌产生速度过快,可能会导致大量突发流量,使网络或服务过载。

缺点二:需要存储空间

令牌桶需要一定的存储空间来保存令牌,可能会导致内存资源的浪费。

四.漏桶算法和令牌桶算法的对比

漏桶算法是突然往桶里注水,但是漏水的窟窿是固定大小的,因此流出水的速度是固定的,也就是"生产不限速,消费限速"。

令牌桶算法是突然从桶中抽水,也就是固定大小的窟窿变成了入水口,而没桶盖的桶口变成了出水口。相当于入水速度变得固定了,而出水速度不做限制了,也就是"生产限速,消费不限速"。

五.令牌桶算法和漏桶算法的核心区别

令牌桶算法允许用户在短时间内发起更多请求,从而支持突发流量。漏桶算法只能支持每秒固定处理一定数量的请求,从而不支持突发流量。这就是令牌桶和漏桶的核心区别。

(2)节省线程的令牌桶算法实现

令牌桶算法可以通过延迟计算的方式来实现。延迟计算指的是不需要单独的线程来定时生成令牌或从漏桶中定时取请求,而是由调用限流器的线程自己计算是否有足够的令牌以及需要sleep的时间。延迟计算的方式可以节省一个线程资源。

csharp
代码解读
复制代码
public class TokenBucketLimiter { //桶的最大容量 public static long threshold = 10; //当前桶内的令牌数 public static long count = 0; //令牌生成速率(每秒5次) public static long tokenRate = 5; //上次生成令牌的时间 public static long lastRefillTime = System.currentTimeMillis(); //限流方法,返回true表示通过 public boolean canPass() { //调用生成令牌方法 this.refillTokens(); //判断桶内是否还有令牌 if (count > 0) { count--; return true; } return false; } //生成令牌方法,计算并更新这段时间内生成的令牌数量 private void refillTokens() { long currentTime = System.currentTimeMillis(); //计算这段时间内,需要生成的令牌数量 long refillTokens = (currentTime - lastRefillTime) * tokenRate / 1000; //更新桶内的令牌数 count = Math.min(count + refillTokens, threshold); //更新令牌生成时间 lastRefillTime = currentTime; } }

(3)Guava中的令牌桶算法实现

一.SmoothBursty的初始化

二.SmoothBursty的acquire()方法

三.SmoothWarmingUp的初始化

四.SmoothWarmingUp的acquire()方法

SmoothBursty和SmoothWarmingUp这两种限流器都使用了预支令牌的思路来实现,就是当前线程获取令牌的代价(阻塞时间)需要由下一个线程来支付。这样可以减少线程阻塞的概率,因为下一个请求不确定什么时候才来。如果下一个请求很久才来,那么这段时间产生的新令牌已满足下一个线程的需求,这样就不用阻塞了。

一.SmoothBursty的初始化

RateLimiter不保存上一次请求的时间,但是它保存下一次请求期望到达的时间。如果下一个请求的预期到达时间实际上已经过去了,并且假设下次请求期望到达的时间点是past,现在的时间点是now。那么now - past的这段时间表示RateLimiter没有被使用,所以在这段空闲的时间内RateLimiter就会增加storedPermits的数量。

scss
代码解读
复制代码
@Beta @GwtIncompatible @SuppressWarnings("GoodTime") public abstract class RateLimiter { ... //Creates a RateLimiter with the specified stable throughput, //given as "permits per second" (commonly referred to as QPS, queries per second). //The returned RateLimiter ensures that on average no more than permitsPerSecond are issued during any given second, //with sustained requests being smoothly spread over each second. //When the incoming request rate exceeds permitsPerSecond the rate limiter will release one permit every (1.0 / permitsPerSecond) seconds. //When the rate limiter is unused, bursts of up to permitsPerSecond permits will be allowed, //with subsequent requests being smoothly limited at the stable rate of permitsPerSecond. //创建一个具有指定稳定吞吐量的RateLimiter,传入的"permits per second"通常称为QPS、每秒查询量; //返回的RateLimiter确保在任何给定的秒期间平均不超过permitsPerSecond的令牌被发出,持续的请求将在每一秒内被平稳地通过; //当传入请求的速率超过permitsPerSecond时,速率限制器将每隔(1.0/permitsPerSecond)秒释放一个令牌; //当速率限制器未被使用时,将允许突发式的高达permitsPerSecond的令牌,而随后的请求将以permitsPerSecond的稳定速率被平滑地限制; //对外暴露的创建方法 //@param permitsPerSecond the rate of the returned RateLimiter, measured in how many permits become available per second. public static RateLimiter create(double permitsPerSecond) { //The default RateLimiter configuration can save the unused permits of up to one second. //This is to avoid unnecessary stalls in situations like this: //A RateLimiter of 1qps, and 4 threads, all calling acquire() at these moments: //T0 at 0 seconds、T1 at 1.05 seconds、T2 at 2 seconds、T3 at 3 seconds //Due to the slight delay of T1, T2 would have to sleep till 2.05 seconds, and T3 would also have to sleep till 3.05 seconds. //默认的RateLimiter配置可以保存长达一秒钟的未被使用的令牌; //这是为了避免在这种情况下出现不必要的停顿: //一个由1qps和4个线程组成的RateLimiter,所有线程都在如下这些时刻调用acquired(): //Thread0在0秒、Thread1在1.05秒、Thread2在2秒、Thread3在3秒 //由于Thread1的轻微延迟,Thread2必须睡眠到2.05秒,Thread3也必须睡眠到3.05秒 //内部调用一个qps设定 + 起始时间StopWatch的构建函数. //这里传入的SleepingStopwatch是一个以系统启动时间的一个相对时间的计量. //后面的读时间偏移是以这个开始的时间偏移为起始的. return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { //指定了令牌桶中最多存储1秒的令牌数 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); //调用RateLimiter.setRate()方法 rateLimiter.setRate(permitsPerSecond); return rateLimiter; } //Updates the stable rate of this RateLimiter, //that is, the permitsPerSecond argument provided in the factory method that constructed the RateLimiter. //Currently throttled threads will not be awakened as a result of this invocation, //thus they do not observe the new rate; only subsequent requests will. //Note though that, since each request repays (by waiting, if necessary) the cost of the previous request, //this means that the very next request after an invocation to setRate() will not be affected by the new rate; //it will pay the cost of the previous request, which is in terms of the previous rate. //The behavior of the RateLimiter is not modified in any other way, //e.g. if the RateLimiter was configured with a warmup period of 20 seconds, //it still has a warmup period of 20 seconds after this method invocation. //更新该RateLimiter的稳定速率,即在构造RateLimiter的工厂方法中提供permitsPerSecond参数; //当前被限流的线程将不会由于这个调用而被唤醒,因此它们没有观察到新的速率;只有随后的请求才会; //但是要注意的是,由于每个请求(如果需要,通过等待)会偿还先前请求的成本, //这意味着调用setRate()方法后的下一个请求将不会受到新速率的影响, //它将按照先前的速率处理先前请求的成本; //RateLimiter的行为不会以任何其他方式修改, //例如:如果RateLimiter被配置为具有20秒的预热周期,在该方法调用之后,它仍然有20秒的预热期; //@param permitsPerSecond the new stable rate of this {@code RateLimiter} public final void setRate(double permitsPerSecond) { checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); //在同步代码块中设定速率 synchronized (mutex()) { //调用SmoothRateLimiter.doSetRate()方法 doSetRate(permitsPerSecond, stopwatch.readMicros()); } } ... } @GwtIncompatible abstract class SmoothRateLimiter extends RateLimiter { //The currently stored permits. //令牌桶中当前缓存的未消耗的令牌数 double storedPermits; //The maximum number of stored permits. //令牌桶中允许存放的最大令牌数 double maxPermits; //The interval between two unit requests, at our stable rate. //E.g., a stable rate of 5 permits per second has a stable interval of 200ms. //按照我们稳定的速率,两个单位请求之间的时间间隔;例如,每秒5个令牌的稳定速率具有200ms的稳定间隔 double stableIntervalMicros; //The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests. //下一个请求(无论大小)将被批准的时间. //在批准请求后,这将在未来进一步推进,大请求比小请求更能推动这一进程。 private long nextFreeTicketMicros = 0L;//could be either in the past or future ... //这是一个可以重复调用的函数. //第一次调用和非第一次调用的过程有些不一样,目的是设定设定最大令牌数maxPermits和已存储的令牌数storedPermits @Override final void doSetRate(double permitsPerSecond, long nowMicros) { //调用SmoothRateLimiter.resync()方法,重试计算和同步存储的预分配的令牌. resync(nowMicros); //计算稳定的发放令牌的时间间隔. 单位us, 比如qps为5, 则为200ms即20万us的间隔进行令牌发放. double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; //调用SmoothBursty.doSetRate()设定最大令牌数maxPermits和已存储的令牌数storedPermits doSetRate(permitsPerSecond, stableIntervalMicros); } //Updates storedPermits and nextFreeTicketMicros based on the current time. //根据当前时间,更新storedPermits和nextFreeTicketMicros变量 //注意: 在初始化SmoothBursty时会第一次调用resync()方法,此时各值的情况如下: //coolDownIntervalMicros = 0、nextFreeTicketMicros = 0、newPermits = 无穷大. //maxPermits = 0(初始值,还没有重新计算)、最后得到的: storedPermits = 0; //同时,nextFreeTicketMicros = "起始时间" void resync(long nowMicros) { //if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); ... //This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling. //The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in terms of time, //in this sense: if a RateLimiter is 2qps, and this time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits. //SmoothBursty实现了一个"突发式"的速率限制器RateLimiter,其中的storedPermits会被转换为0; //它可以保存的最大令牌数量(当RateLimiter未使用时)是根据时间定义的, //从这个意义上说:如果RateLimiter是2qps,并且这个时间被指定为10秒,那么我们最多可以保存2*10=20个令牌; static final class SmoothBursty extends SmoothRateLimiter { //The work (permits) of how many seconds can be saved up if this RateLimiter is unused? //如果这个速率限制器RateLimiter没有被使用,那么可以节省多少秒的工作(令牌)? final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { //初次设定的时候,oldMaxPermits = 0.0 double oldMaxPermits = this.maxPermits; //新的(当前的)maxPermits为burst的时间周期(1秒) * 每周期的令牌数. maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { //if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { //初始化SmoothBursty,执行到此处时,storedPermits为0 storedPermits = (oldMaxPermits == 0.0) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } @Override double coolDownIntervalMicros() { return stableIntervalMicros; } } ... }

二.SmoothBursty的acquire()方法

java
代码解读
复制代码
@Beta @GwtIncompatible @SuppressWarnings("GoodTime") public abstract class RateLimiter { ... //无限等待的获取 //Acquires the given number of permits from this RateLimiter, //blocking until the request can be granted. //Tells the amount of time slept, if any. //@param permits the number of permits to acquire,获取的令牌数量 //@return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited @CanIgnoreReturnValue public double acquire(int permits) { //调用RateLimiter.reserve()方法 //预支令牌并获取需要阻塞的时间:即预定数量为permits的令牌数,并返回需要等待的时间 long microsToWait = reserve(permits); //将需要等待的时间补齐, 从而满足限流的需求,即根据microsToWait来让线程sleep(共性) stopwatch.sleepMicrosUninterruptibly(microsToWait); //返回这次调用使用了多少时间给调用者 return 1.0 * microsToWait / SECONDS.toMicros(1L); } //Reserves the given number of permits from this RateLimiter for future use, //returning the number of microseconds until the reservation can be consumed. //从这个RateLimiter限速器中保留给定数量的令牌,以备将来使用,返回可以使用保留前的微秒数 //@return time in microseconds to wait until the resource can be acquired, never negative final long reserve(int permits) { checkPermits(permits); //由于涉及并发操作,所以必须使用synchronized进行互斥处理 synchronized (mutex()) { //调用RateLimiter.reserveAndGetWaitLength()方法 return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } //Reserves next ticket and returns the wait time that the caller must wait for. //预定下一个ticket,并且返回需要等待的时间 final long reserveAndGetWaitLength(int permits, long nowMicros) { //调用SmoothRateLimiter.reserveEarliestAvailable()方法 long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } //Reserves the requested number of permits and returns the time that those permits can be used (with one caveat). //保留请求数量的令牌,并返回可以使用这些令牌的时间(有一个警告) //生产令牌、获取令牌、计算阻塞时间的具体细节由子类来实现 //@return the time that the permits may be used, or, if the permits may be used immediately, an arbitrary past or present time abstract long reserveEarliestAvailable(int permits, long nowMicros); ... } @GwtIncompatible abstract class SmoothRateLimiter extends RateLimiter { //The currently stored permits. //令牌桶中当前缓存的未消耗的令牌数 double storedPermits; //The maximum number of stored permits. //令牌桶中允许存放的最大令牌数 double maxPermits; //The interval between two unit requests, at our stable rate. //E.g., a stable rate of 5 permits per second has a stable interval of 200ms. //按照我们稳定的速率,两个单位请求之间的时间间隔;例如,每秒5个令牌的稳定速率具有200ms的稳定间隔 double stableIntervalMicros; //The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests. //下一个请求(无论大小)将被批准的时间. 在批准请求后,这将在未来进一步推进,大请求比小请求更能推动这一进程. private long nextFreeTicketMicros = 0L;//could be either in the past or future ... @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { //1.根据nextFreeTicketMicros计算新产生的令牌数,更新当前未使用的令牌数storedPermits //获取令牌时调用SmoothRateLimiter.resync()方法与初始化时的调用不一样. //此时会把"还没有使用"的令牌存储起来. //但是如果计数时间nextFreeTicketMicros是在未来. 那就不做任何处理. resync(nowMicros); //下一个请求(无论大小)将被批准的时间 long returnValue = nextFreeTicketMicros; //2.计算需要阻塞等待的时间 //2.1.先从桶中取未消耗的令牌,如果桶中令牌数不足,看最多能取多少个 //存储的令牌可供消费的数量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); //2.2.计算是否需要等待新鲜的令牌(当桶中现有的令牌数不足时就需要等待新鲜的令牌),如果需要,则计算需要等待的令牌数 //需要等待的令牌:新鲜的令牌 double freshPermits = requiredPermits - storedPermitsToSpend; //计算需要等待的时间 //分两部分计算:waitMicros = 从桶中获取storedPermitsToSpend个现有令牌的代价 + 等待生成freshPermits个新鲜令牌的代价 //从桶中取storedPermitsToSpend个现有令牌也是有代价的,storedPermitsToWaitTime()方法是个抽象方法,会由SmoothBursty和SmoothWarmingUp实现 //对于SmoothBursty来说,storedPermitsToWaitTime()会返回0,表示已经存储的令牌不需要等待. //而生成新鲜令牌需要等待的代价是:新鲜令牌的个数freshPermits * 每个令牌的耗时stableIntervalMicros long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); //3.更新nextFreeTicketMicros //由于新鲜的令牌可能已被预消费,所以nextFreeTicketMicros就得往后移,以表示这段时间被预消费了 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); //4.扣减令牌数,更新桶内剩余令牌 //最后把上面计算的可扣减的令牌数量从存储的令牌里减掉 this.storedPermits -= storedPermitsToSpend; //返回请求需要等待的时间 //需要注意returnValue被赋值的是上次的nextFreeTicketMicros,说明当前这次请求获取令牌的代价由下一个请求去支付 return returnValue; } //Updates storedPermits and nextFreeTicketMicros based on the current time. //根据当前时间,更新storedPermits和nextFreeTicketMicros变量 //计算nextFreeTicketMicros到当前时间内新产生的令牌数,这个就是延迟计算 void resync(long nowMicros) { //if nextFreeTicket is in the past, resync to now //一般当前的时间是大于下个请求被批准的时间 //此时:会把过去的时间换成令牌数存储起来,注意存储的令牌数不能大于最大的令牌数 //当RateLimiter初始化好后,可能刚开始没有流量,或者是一段时间没有流量后突然来了流量 //此时可以往"后"预存储一秒时间的令牌数. 也就是这里所说的burst能力 //如果nextFreeTicketMicros在未来的一个时间点,那这个if判断便不满足 //此时,不需要进行更新storedPermits和nextFreeTicketMicros变量 //此种情况发生在:"预借"了令牌的时候 if (nowMicros > nextFreeTicketMicros) { //时间差除以生成一个新鲜令牌的耗时,coolDownIntervalMicros()是抽象方法,由子类实现 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); //更新令牌桶内已存储的令牌个数,注意不超过最大限制 storedPermits = min(maxPermits, storedPermits + newPermits); //更新nextFreeTicketMicros为当前时间 nextFreeTicketMicros = nowMicros; } } //Translates a specified portion of our currently stored permits which we want to spend/acquire, into a throttling time. //Conceptually, this evaluates the integral of the underlying function we use, for the range of [(storedPermits - permitsToTake), storedPermits]. //This always holds: 0 <= permitsToTake <= storedPermits //从桶中取出已存储的令牌的代价,由子类实现 //这是一个抽象函数,SmoothBursty中的实现会直接返回0,可以认为已经预分配的令牌,在获取时不需要待待时间 abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); //Returns the number of microseconds during cool down that we have to wait to get a new permit. //每生成一个新鲜令牌的耗时,由子类实现 abstract double coolDownIntervalMicros(); ... static final class SmoothBursty extends SmoothRateLimiter { ... @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } @Override double coolDownIntervalMicros() { return stableIntervalMicros; } } ... }

三.SmoothWarmingUp的初始化

scss
代码解读
复制代码
@Beta @GwtIncompatible @SuppressWarnings("GoodTime") public abstract class RateLimiter { ... //Creates a RateLimiter with the specified stable throughput, //given as "permits per second" (commonly referred to as QPS, queries per second), //and a warmup period, during which the RateLimiter smoothly ramps up its rate, //until it reaches its maximum rate at the end of the period (as long as there are enough requests to saturate it). //Similarly, if the RateLimiter is left unused for a duration of warmupPeriod, //it will gradually return to its "cold" state, //i.e. it will go through the same warming up process as when it was first created. //The returned RateLimiter is intended for cases where the resource that actually fulfills the requests (e.g., a remote server) needs "warmup" time, //rather than being immediately accessed at the stable (maximum) rate. //The returned RateLimiter starts in a "cold" state (i.e. the warmup period will follow), //and if it is left unused for long enough, it will return to that state. //创建一个具有指定稳定吞吐量的RateLimiter, //入参为:"每秒多少令牌"(通常称为QPS,每秒的查询量),以及平稳增加RateLimiter速率的预热期, //直到RateLimiter在该预热周期结束时达到最大速率(只要有足够的请求使其饱和); //类似地,如果RateLimiter在预热时段的持续时间内未被使用,它将逐渐返回到它的"冷"状态, //也就是说,它将经历与最初创建时相同的预热过程; //返回的RateLimiter适用于实际满足请求的资源(例如远程服务器)需要"预热"时间的情况,而不是以稳定(最大)速率立即访问; //返回的RateLimiter在"冷"状态下启动(也就是说,接下来将是预热期),如果它被闲置足够长的时间,它就会回到那个"冷"状态; //@param permitsPerSecond the rate of the returned RateLimiter, measured in how many permits become available per second //@param warmupPeriod the duration of the period where the RateLimiter ramps up its rate, before reaching its stable (maximum) rate //@param unit the time unit of the warmupPeriod argument public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); //调用RateLimiter.setRate()方法 rateLimiter.setRate(permitsPerSecond); return rateLimiter; } //Updates the stable rate of this RateLimiter, //that is, the permitsPerSecond argument provided in the factory method that constructed the RateLimiter. //Currently throttled threads will not be awakened as a result of this invocation, //thus they do not observe the new rate; only subsequent requests will. //Note though that, since each request repays (by waiting, if necessary) the cost of the previous request, //this means that the very next request after an invocation to setRate() will not be affected by the new rate; //it will pay the cost of the previous request, which is in terms of the previous rate. //The behavior of the RateLimiter is not modified in any other way, //e.g. if the RateLimiter was configured with a warmup period of 20 seconds, //it still has a warmup period of 20 seconds after this method invocation. //更新该RateLimiter的稳定速率,即在构造RateLimiter的工厂方法中提供permitsPerSecond参数; //当前被限流的线程将不会由于这个调用而被唤醒,因此它们没有观察到新的速率;只有随后的请求才会; //但是要注意的是,由于每个请求(如果需要,通过等待)会偿还先前请求的成本, //这意味着调用setRate()方法后的下一个请求将不会受到新速率的影响, //它将按照先前的速率处理先前请求的成本; //RateLimiter的行为不会以任何其他方式修改, //例如:如果RateLimiter被配置为具有20秒的预热周期,在该方法调用之后,它仍然有20秒的预热期; //@param permitsPerSecond the new stable rate of this {@code RateLimiter} public final void setRate(double permitsPerSecond) { checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); //在同步代码块中设定速率 synchronized (mutex()) { //调用SmoothRateLimiter.doSetRate()方法 doSetRate(permitsPerSecond, stopwatch.readMicros()); } } ... } @GwtIncompatible abstract class SmoothRateLimiter extends RateLimiter { //The currently stored permits. //令牌桶中当前缓存的未消耗的令牌数 double storedPermits; //The maximum number of stored permits. //令牌桶中允许存放的最大令牌数 double maxPermits; //The interval between two unit requests, at our stable rate. //E.g., a stable rate of 5 permits per second has a stable interval of 200ms. //按照我们稳定的速率,两个单位请求之间的时间间隔;例如,每秒5个令牌的稳定速率具有200ms的稳定间隔 double stableIntervalMicros; //The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests. //下一个请求(无论大小)将被批准的时间. //在批准请求后,这将在未来进一步推进,大请求比小请求更能推动这一进程。 private long nextFreeTicketMicros = 0L;//could be either in the past or future ... //这是一个可以重复调用的函数. //第一次调用和非第一次调用的过程有些不一样,目的是设定一个新的速率Rate. @Override final void doSetRate(double permitsPerSecond, long nowMicros) { //调用SmoothRateLimiter.resync()方法,重试计算和同步存储的预分配的令牌. resync(nowMicros); //计算稳定的发放令牌的时间间隔. 单位us, 比如qps为5, 则为200ms即20万us的间隔进行令牌发放. double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; //调用SmoothWarmingUp.doSetRate()设定其内部的比率. doSetRate(permitsPerSecond, stableIntervalMicros); } //Updates storedPermits and nextFreeTicketMicros based on the current time. //根据当前时间,更新storedPermits和nextFreeTicketMicros变量 //注意: 在初始化SmoothBursty时会第一次调用resync()方法,此时各值的情况如下: //coolDownIntervalMicros = 0、nextFreeTicketMicros = 0、newPermits = 无穷大. //maxPermits = 0(初始值,还没有重新计算)、最后得到的: storedPermits = 0; //同时,nextFreeTicketMicros = "起始时间" void resync(long nowMicros) { //if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); ... static final class SmoothWarmingUp extends SmoothRateLimiter { private final long warmupPeriodMicros; //The slope of the line from the stable interval (when permits == 0), to the cold interval (when permits == maxPermits) private double slope;//斜率 private double thresholdPermits; private double coldFactor; SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); //将warmupPeriod转换成微妙并赋值给warmupPeriodMicros this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; //stableIntervalMicros此时已由前面的SmoothRateLimiter.doSetRate()方法设为:1/qps //coldFactor的值默认会初始化为3 //因此系统最冷时的令牌生成间隔:coldIntervalMicros等于3倍的普通间隔stableIntervalMicros double coldIntervalMicros = stableIntervalMicros * coldFactor; //warmupPeriodMicros是用户传入的预热时间 //stableIntervalMicros是稳定期间令牌发放的间隔 //进入预热阶段的临界令牌数thresholdPermits,默认就是:整个预热时间除以正常速率的一半 //该值太小会过早进入预热阶段,影响性能;该值太大会对系统产生压力,没达到预热效果 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; //最大令牌数 maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); //斜率 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); //设置当前桶内的存储令牌数 //突发型的RateLimiter——SmoothBursty: //初始化时不会预生成令牌,因为storedPermits初始为0; //随着时间推移,则会产生新的令牌,这些令牌如果没有被消费,则会存储在storedPermits里; //预热型的RateLimiter——SmoothWarmingUp: //初始化时会预生成令牌,并且初始化时肯定是系统最冷的时候,所以桶内默认就是maxPermits if (oldMaxPermits == Double.POSITIVE_INFINITY) { //if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { //对于SmoothWarmingUp的RateLimiter来说,其初始存储值storedPermits是满的,也就是存储了最大限流的令牌数 //而对于突发型的限流器SmoothBursty来说,其初始存储值storedPermits是0 storedPermits = (oldMaxPermits == 0.0) ? maxPermits : storedPermits * maxPermits / oldMaxPermits; } } ... } ... }

四.SmoothWarmingUp的acquire()方法

java
代码解读
复制代码
@Beta @GwtIncompatible @SuppressWarnings("GoodTime") public abstract class RateLimiter { ... //无限等待的获取 //Acquires the given number of permits from this RateLimiter, //blocking until the request can be granted. //Tells the amount of time slept, if any. //@param permits the number of permits to acquire,获取的令牌数量 //@return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited @CanIgnoreReturnValue public double acquire(int permits) { //调用RateLimiter.reserve()方法 //预支令牌并获取需要阻塞的时间:即预定数量为permits的令牌数,并返回需要等待的时间 long microsToWait = reserve(permits); //将需要等待的时间补齐, 从而满足限流的需求,即根据microsToWait来让线程sleep(共性) stopwatch.sleepMicrosUninterruptibly(microsToWait); //返回这次调用使用了多少时间给调用者 return 1.0 * microsToWait / SECONDS.toMicros(1L); } //Reserves the given number of permits from this RateLimiter for future use, //returning the number of microseconds until the reservation can be consumed. //从这个RateLimiter限速器中保留给定数量的令牌,以备将来使用,返回可以使用保留前的微秒数 //@return time in microseconds to wait until the resource can be acquired, never negative final long reserve(int permits) { checkPermits(permits); //由于涉及并发操作,所以必须使用synchronized进行互斥处理 synchronized (mutex()) { //调用RateLimiter.reserveAndGetWaitLength()方法 return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } //Reserves next ticket and returns the wait time that the caller must wait for. //预定下一个ticket,并且返回需要等待的时间 final long reserveAndGetWaitLength(int permits, long nowMicros) { //调用SmoothRateLimiter.reserveEarliestAvailable()方法 long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } //Reserves the requested number of permits and returns the time that those permits can be used (with one caveat). //保留请求数量的令牌,并返回可以使用这些令牌的时间(有一个警告) //生产令牌、获取令牌、计算阻塞时间的具体细节由子类来实现 //@return the time that the permits may be used, or, if the permits may be used immediately, an arbitrary past or present time abstract long reserveEarliestAvailable(int permits, long nowMicros); ... } @GwtIncompatible abstract class SmoothRateLimiter extends RateLimiter { //The currently stored permits. //令牌桶中当前缓存的未消耗的令牌数 double storedPermits; //The maximum number of stored permits. //令牌桶中允许存放的最大令牌数 double maxPermits; //The interval between two unit requests, at our stable rate. //E.g., a stable rate of 5 permits per second has a stable interval of 200ms. //按照我们稳定的速率,两个单位请求之间的时间间隔;例如,每秒5个令牌的稳定速率具有200ms的稳定间隔 double stableIntervalMicros; //The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests. //下一个请求(无论大小)将被批准的时间. 在批准请求后,这将在未来进一步推进,大请求比小请求更能推动这一进程. private long nextFreeTicketMicros = 0L;//could be either in the past or future ... @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { //1.根据nextFreeTicketMicros计算新产生的令牌数,更新当前未使用的令牌数storedPermits //获取令牌时调用SmoothRateLimiter.resync()方法与初始化时的调用不一样. //此时会把"没有过期"的令牌存储起来. //但是如果计数时间nextFreeTicketMicros是在未来. 那就不做任何处理. resync(nowMicros); //下一个请求(无论大小)将被批准的时间 long returnValue = nextFreeTicketMicros; //2.计算需要阻塞等待的时间 //2.1.先从桶中取未消耗的令牌,如果桶中令牌数不足,看最多能取多少个 //存储的令牌可供消费的数量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); //2.2.计算是否需要等待新鲜的令牌(当桶中现有的令牌数不足时就需要等待新鲜的令牌),如果需要,则计算需要等待的令牌数 //需要等待的令牌:新鲜的令牌 double freshPermits = requiredPermits - storedPermitsToSpend; //计算需要等待的时间 //分两部分计算:waitMicros = 从桶中获取storedPermitsToSpend个现有令牌的代价 + 等待生成freshPermits个新鲜令牌的代价 //从桶中取storedPermitsToSpend个现有令牌也是有代价的,storedPermitsToWaitTime()方法是个抽象方法,会由SmoothBursty和SmoothWarmingUp实现 //对于SmoothBursty来说,storedPermitsToWaitTime()会返回0,表示已经存储的令牌不需要等待. //而生成新鲜令牌需要等待的代价是:新鲜令牌的个数freshPermits * 每个令牌的耗时stableIntervalMicros long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); //3.更新nextFreeTicketMicros //由于新鲜的令牌可能已被预消费,所以nextFreeTicketMicros就得往后移,以表示这段时间被预消费了 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); //4.扣减令牌数,更新桶内剩余令牌 //最后把上面计算的可扣减的令牌数量从存储的令牌里减掉 this.storedPermits -= storedPermitsToSpend; //返回请求需要等待的时间 //需要注意returnValue被赋值的是上次的nextFreeTicketMicros,说明当前这次请求获取令牌的代价由下一个请求去支付 return returnValue; } //Updates storedPermits and nextFreeTicketMicros based on the current time. //根据当前时间,更新storedPermits和nextFreeTicketMicros变量 //计算nextFreeTicketMicros到当前时间内新产生的令牌数,这个就是延迟计算 void resync(long nowMicros) { //if nextFreeTicket is in the past, resync to now //一般当前的时间是大于下个请求被批准的时间 //此时:会把过去的时间换成令牌数存储起来,注意存储的令牌数不能大于最大的令牌数 //当RateLimiter初始化好后,可能刚开始没有流量,或者是一段时间没有流量后突然来了流量 //此时可以往"后"预存储一秒时间的令牌数. 也就是这里所说的burst能力 //如果nextFreeTicketMicros在未来的一个时间点,那这个if判断便不满足 //此时,不需要进行更新storedPermits和nextFreeTicketMicros变量 //此种情况发生在:"预借"了令牌的时候 if (nowMicros > nextFreeTicketMicros) { //时间差除以生成一个新鲜令牌的耗时,coolDownIntervalMicros()是抽象方法,由子类实现 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); //更新令牌桶内已存储的令牌个数,注意不超过最大限制 storedPermits = min(maxPermits, storedPermits + newPermits); //更新nextFreeTicketMicros为当前时间 nextFreeTicketMicros = nowMicros; } } //Translates a specified portion of our currently stored permits which we want to spend/acquire, into a throttling time. //Conceptually, this evaluates the integral of the underlying function we use, for the range of [(storedPermits - permitsToTake), storedPermits]. //This always holds: 0 <= permitsToTake <= storedPermits //从桶中取出已存储的令牌的代价,由子类实现 //这是一个抽象函数,SmoothBursty中的实现会直接返回0,可以认为已经预分配的令牌,在获取时不需要待待时间 abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); //Returns the number of microseconds during cool down that we have to wait to get a new permit. //每生成一个新鲜令牌的耗时,由子类实现 abstract double coolDownIntervalMicros(); ... static final class SmoothWarmingUp extends SmoothRateLimiter { private final long warmupPeriodMicros; private double slope;//斜率 private double thresholdPermits; private double coldFactor; ... @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { //检查当前桶内存储的令牌数是否大于进入预热阶段的临界令牌数thresholdPermits double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; //如果当前桶内存储的令牌数大于进入预热阶段的临界令牌数thresholdPermits //则说明系统当前已经冷下来了,需要进入预热期,于是需要计算在预热期生成令牌的耗时 if (availablePermitsAboveThreshold > 0.0) { //计算在超出临界值的令牌中需要取出多少个令牌,并计算耗时 double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); //计算预热阶段的耗时,前半部分的permitsToTime()计算的是生成令牌的初始速率,后半部分的permitsToTime()计算的是生成令牌的结束速率 double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); //总耗时 = ((初始速率 + 结束速率) * 令牌数) / 2 micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } //加上稳定阶段的令牌耗时就是总耗时 micros += (long) (stableIntervalMicros * permitsToTake); return micros; } //已知每生成一个令牌,下一个令牌的耗时就会固定增加slope微秒 //那么在知道初始耗时stableIntervalMicros的情况下,就可以按如下公式求出生成第permits个令牌的耗时 private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } @Override double coolDownIntervalMicros() { //预热时长 / 最大令牌数 return warmupPeriodMicros / maxPermits; } } ... }

(4)Sentinel中的令牌桶算法实现

一.WarmUpController的初始化

二.WarmUpController.canPass()方法

三.WarmUpController.syncToken()方法

四.WarmUpController.coolDownTokens()方法

Guava中的预热是通过控制令牌的生成时间来实现的,Sentinel中的预热则是通过控制每秒通过的请求数来实现的。在Guava中,冷却因子coldFactor固定为3,已被写死。在Sentinel中,冷却因子coldFactor默认为3,可通过参数修改。

一.WarmUpController的初始化

java
代码解读
复制代码
public class WarmUpController implements TrafficShapingController { //count是QPS阈值,即FlowRule中设定的阈值,表示系统在稳定阶段下允许的最大QPS //在预热阶段,系统允许的QPS不会直接到达count值,而是会逐渐增加(对应预热模型图从右向左),直到达到这个count值为止 //这样就能实现让系统接收到的流量是一个平滑上升的状态,而不是让系统瞬间被打满 protected double count; //coldFactor是冷却因子,表示系统在最冷时(预热阶段刚开始时)允许的QPS阈值与稳定阶段下允许的QPS阈值之比 //此参数直接影响预热阶段允许的QPS递增值,冷却因子越大,预热阶段允许的QPS递增值越低,默认为3 private int coldFactor; //告警值,大于告警值系统就进入预热阶段,小于告警值系统进入稳定阶段 protected int warningToken = 0; //令牌桶可以存储的最大令牌数 private int maxToken; //斜率,预热阶段令牌生成速率的增速 protected double slope; //令牌桶中已存储的令牌数 protected AtomicLong storedTokens = new AtomicLong(0); //最后一次添加令牌的时间戳 protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } public WarmUpController(double count, int warmUpPeriodInSec) { //warmUpPeriodInSec是预热时长,表示系统需要多长时间从预热阶段到稳定阶段 //比如限制QPS为100,设置预热时长为10s,那么在预热阶段,令牌生成的速率会越来越快 //可能第1s只允许10个请求通过,第2s可能允许15个请求通过,这样逐步递增,直至递增到100为止 construct(count, warmUpPeriodInSec, 3); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; //thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; //1.告警值,大于告警值系统就进入预热阶段;例如预热时长为5s,QPS为100,那么warningToken就为250 warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); //maxPermits = thresholdPermits + 2 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); //2.系统最冷时桶内存储的令牌数,例如预热时长为5s,QPS为100,那么maxToken为500 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); //slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); //3.slope斜率,例如预热时长为5s,QPS为100,那么slope为0.00008 slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } ... }

二.WarmUpController.canPass()方法

步骤一: 调用WarmUpController的syncToken()方法生成令牌并同步到令牌桶内

步骤二: 判断令牌桶内剩余令牌数是否大于告警值

情况一: 如果剩余令牌数大于警戒值,说明系统处于预热阶段,此时需要进一步比较令牌的生产速率与令牌的消耗速率。若消耗速率大,则限流,否则请求正常通行。

情况二: 如果剩余令牌数小于警戒值,说明系统处于稳定阶段。此时就直接判断当前请求的QPS与阈值大小,超过阈值则限流。

三.WarmUpController.syncToken()方法

该方法会生成令牌并同步到令牌桶内。其中入参passQps是前一个时间窗口的QPS,即上一秒通过的QPS数。首先验证当前时间与最后更新时间,避免在同一时间窗口重复添加令牌。其次通过WarmUpController的coolDownTokens()方法获取最新的令牌数,接着利用CAS来保证更新令牌桶的线程安全性,最后通过减去上一秒通过的QPS数得到目前令牌桶剩余的令牌数来更新。

四.WarmUpController.coolDownTokens()方法

该方法会根据当前时间和上一个时间窗口通过的QPS计算更新后的令牌数。具体来说就是,首先获取当前令牌桶已存储的令牌数,然后判断桶内令牌数和告警值的大小。

情况一:如果令牌桶中已存储的令牌数小于告警值

说明系统已结束冷启动,即退出预热阶段进入了稳定阶段。也就是桶内已存储的令牌数没有达到进入预热阶段的阈值,此时需要较快地向令牌桶中添加令牌。

情况二:如果令牌桶中已存储的令牌数大于告警值

说明系统处于预热阶段,还在进行冷启动。此时如果上一个时间窗口通过的QPS,小于系统最冷时允许通过的QPS。那么就说明当前系统的负载比较低,可以向令牌桶中添加令牌。系统最冷时允许通过的QPS = (1 / (1 / count * coldFactor))。

其中,向令牌桶中添加令牌的处理,就是在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间乘以QPS阈值。

注意:Guava中的预热是通过控制令牌的生成时间来实现的,Sentinel中的预热是通过控制每秒通过的请求数来实现的。

Guava的实现侧重于调整请求间隔,这类似于漏桶算法。而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法。

java
代码解读
复制代码
//The principle idea comes from Guava. //However, the calculation of Guava is rate-based, which means that we need to translate rate to QPS. //这个原理来自于Guava; //然而,Guava的计算是基于速率的,这意味着我们需要将速率转换为QPS; //Requests arriving at the pulse may drag down long idle systems even though it has a much larger handling capability in stable period. //It usually happens in scenarios that require extra time for initialization, //e.g. DB establishes a connection, connects to a remote service, and so on. //That's why we need "warm up". //突发式的流量可能会拖累一个长期空闲的系统,即使这个系统在稳定阶段具有更大的流量处理能力; //这通常发生在需要额外时间进行初始化的场景中,比如DB建立连接、连接到远程服务等; //这就是为什么我们需要对系统进行"预热"; //Sentinel's "warm-up" implementation is based on the Guava's algorithm. //However, Guava’s implementation focuses on adjusting the request interval, which is similar to leaky bucket. //Sentinel pays more attention to controlling the count of incoming requests per second without calculating its interval, //which resembles token bucket algorithm. //Sentinel的"预热"实现是基于Guava的算法的; //然而,Guava的实现侧重于调整请求间隔,这类似于漏桶; //而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法; //The remaining tokens in the bucket is used to measure the system utility. //Suppose a system can handle b requests per second. //Every second b tokens will be added into the bucket until the bucket is full. //And when system processes a request, it takes a token from the bucket. //The more tokens left in the bucket, the lower the utilization of the system; //when the token in the token bucket is above a certain threshold, //we call it in a "saturation" state. //桶中存储的令牌是用来测量系统的实用程序的; //假设一个系统每秒可以处理b个请求; //那么每秒就有b个令牌被添加到桶中,直到桶满为止; //当系统处理一个请求时,就会从桶中获取一个令牌; //桶中存储的令牌剩余得越多,那么就说明系统的利用率就越低; //当令牌桶中的令牌数高于某个阈值时,我们称之为"饱和"状态; //Base on Guava’s theory, there is a linear equation we can write this in the form //y = m * x + b where y (a.k.a y(x)), or qps(q)), //is our expected QPS given a saturated period (e.g. 3 minutes in), //m is the rate of change from our cold (minimum) rate to our stable (maximum) rate, //x (or q) is the occupied token. //根据Guava的理论,有一个线性方程,我们可以把它写成y = m * x + b; //这是在给定饱和周期(例如3分钟)的情况下预期的QPS; //m是从我们的冷(最小)速率到我们的稳定(最大)速率的变化率; //x(或q)就是需要被占用的令牌数; public class WarmUpController implements TrafficShapingController { ... @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //获取当前1s的QPS long passQps = (long) node.passQps(); //获取上一窗口通过的QPS long previousQps = (long) node.previousPassQps(); //1.生成令牌并同步到令牌桶内 syncToken(previousQps); //获取令牌桶内剩余的令牌数 long restToken = storedTokens.get(); //2.如果令牌桶中的令牌数量大于告警值,说明还处于预热阶段,此时需要判断令牌的生成速度和消费速度 if (restToken >= warningToken) { //获取桶内剩余令牌数超过告警值的令牌个数 long aboveToken = restToken - warningToken; //当前令牌的生成间隔 = 稳定阶段的生成间隔 + 桶内超出告警值部分的已存储令牌数 * slope //其中,稳定阶段的生成间隔是1/count,桶内超出告警值部分的已存储令牌数是aboveToken //注意:预热阶段生成令牌的速率会越来越慢,也就是生成令牌的间隔越来越大; //当桶内已存储的令牌超过告警值后,令牌越多,那1秒可允许的QPS越小; //下面代码计算的是: //当前1s内的时间窗口能够生成的令牌数量,即当前时间窗口生成的令牌可满足的QPS = 1 / 当前令牌的生成间隔 double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); //如果当前消费令牌的速度(passQps + acquireCount) <= 当前生成令牌的速度(warningQps),则允许通过 //如果当前时间窗口通过的QPS + 客户端申请的令牌数 小于等于 当前预热阶段的告警QPS,则代表允许通过 if (passQps + acquireCount <= warningQps) { return true; } } //3.如果令牌桶中的令牌数量小于告警值,说明预热结束,进入稳定阶段 else { //如果当前消费令牌的速度(passQps + acquireCount) <= 当前生成令牌的速度(count),则允许通过 if (passQps + acquireCount <= count) { return true; } } return false; } //生成令牌并同步到令牌桶内 //入参passQps是前一个时间窗口的QPS,也就是上一秒通过的QPS数 //syncToken()方法的逻辑是: //1.首先验证当前时间与最后更新令牌桶的时间,避免在同一个时间窗口重复添加令牌; //2.其次通过WarmUpController.coolDownTokens()方法获取最新的令牌数; //3.接着利用CAS来保证更新令牌桶的线程安全性; //4.最后将桶内已存储的令牌数,减去上一秒通过的QPS数,得到目前令牌桶剩余的令牌数; protected void syncToken(long passQps) { //获取当前时间ms long currentTime = TimeUtil.currentTimeMillis(); //将当前时间ms转换为s currentTime = currentTime - currentTime % 1000; //获取上一次更新令牌桶已存储的令牌数量的时间 long oldLastFillTime = lastFilledTime.get(); //如果上一次更新令牌桶已存储的令牌数量的时间和当前时间一样,或发生了时钟回拨等情况导致比当前时间还小 //那么就无需更新,直接return即可 if (currentTime <= oldLastFillTime) { return; } //先获取目前令牌桶已存储的令牌数 long oldValue = storedTokens.get(); //调用WarmUpController.coolDownTokens()方法得到最新的令牌数 long newValue = coolDownTokens(currentTime, passQps); //通过CAS更新令牌桶已存储的令牌数 //注意:系统初始化完毕,第一个请求进来调用WarmUpController.canPass()方法时,storedTokens = maxToken if (storedTokens.compareAndSet(oldValue, newValue)) { //设置令牌桶内已存储的最新令牌数 = 当前令牌数 - 上一个时间窗口通过的请求数 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } //更新最后一次添加令牌的时间戳 lastFilledTime.set(currentTime); } } //根据当前时间和上一个时间窗口通过的QPS计算更新后的令牌数 private long coolDownTokens(long currentTime, long passQps) { //获取当前令牌桶已存储的令牌数 long oldValue = storedTokens.get(); long newValue = oldValue; //如果令牌桶中已存储的令牌数小于告警值,说明系统已结束冷启动,即退出预热阶段进入稳定阶段 //也就是桶内已存储的令牌数没有达到进入预热阶段的阈值,此时需要较快地向令牌桶中添加令牌 if (oldValue < warningToken) { //在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间(以秒为单位)乘以令牌生成速率(QPS阈值count) newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } //如果令牌桶中已存储的令牌数大于告警值,说明系统处于预热阶段,还在进行冷启动 else if (oldValue > warningToken) { //如果上一个时间窗口通过的QPS,小于系统最冷时允许通过的QPS(1 / (1 / count * coldFactor)) //那么就说明当前系统的负载比较低,可以向令牌桶中添加令牌 if (passQps < (int)count / coldFactor) { //在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间(以秒为单位)乘以令牌生成速率(QPS阈值count) newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } //确保令牌桶更新后的令牌数不超过最大令牌数(maxToken) //系统初始化完毕,第一个请求进来调用WarmUpController.canPass()方法时, //oldValue = 0,lastFilledTime = 0,此时返回maxToken return Math.min(newValue, maxToken); } }

(5)Sentinel中的令牌桶算法总结

WarmUpController的核心原理是:首先根据当前时间和上一个时间窗口通过的QPS同步令牌桶内的令牌数。然后比较桶内令牌数和告警值,计算当前时间窗口允许通过的告警QPS。最后比较当前请求下的QPS是否大于允许通过的告警QPS来决定限流。

注意:系统在预热阶段会逐渐提高令牌的生成速度,从而平滑过渡到稳定阶段。当系统启动时,桶内令牌数最大,令牌生成速率最低,允许的QPS最低。随着桶内令牌数减少,令牌生成速度逐渐提高,允许的QPS也逐渐提高。最后到达稳定阶段,此时允许的QPS便是FlowRule中设置的QPS阈值。

所以根据稳定阶段令牌的生成速率是1/count,默认冷却因子为3,得出系统最冷时令牌的生成速率是3/count。因此预热阶段一开始允许的QPS为count/3,预热完毕的QPS就是count。

注:本文转载自juejin.cn的东阳马生架构的文章"https://juejin.cn/post/7496375493329223743"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

103
后端
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top