IOTXING

记录技术学习之路

0%

guava源码-RateLimiter

使用方法

  • 创建RateLimiter

    1
    RateLimiter ra = RateLimiter.create(2000);  //创建一个2000/s的限速
  • 使用的时候,去acquire

    1
    2
    3
    4
    while (true){
    ra.acquire();
    xxx
    }

工作流程

采用令牌桶算法。在create的时候,根据传入的每秒令牌数,计算出每个令牌发放锁需要的时间,然后设定初始的令牌数。

在使用acquire获取令牌的时候,会先校验申请的令牌数是否合法,然后判断剩余令牌数是否足够,如果不够的话,会根据差值和每个令牌所需要的时间,计算出来需要等待的时间,并且会以不中断的方式进行sleep等待,直到到达了预定了时间,拿到令牌返回。每个令牌不需要被返回,在消耗的时候,是直接对当前可用令牌数进行扣减。

create

1
2
3
4
5
6
7
8
9
10
public static RateLimiter create(double permitsPerSecond) {
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

acquire

1
2
3
4
5
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait); //如果需要等待,则以不中断的方式进行等待
return 1.0 * microsToWait / SECONDS.toMicros(1L); //返回需要等待的时间
}

reserve

1
2
3
4
5
6
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
1
2
3
4
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0); //返回需要等待的时间
}

SleepingStopwatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
abstract static class SleepingStopwatch {
/** Constructor for use by subclasses. */
protected SleepingStopwatch() {}

/*
* We always hold the mutex when calling this. TODO(cpovirk): Is that important? Perhaps we need
* to guarantee that each call to reserveEarliestAvailable, etc. sees a value >= the previous?
* Also, is it OK that we don't hold the mutex when sleeping?
*/
protected abstract long readMicros();

protected abstract void sleepMicrosUninterruptibly(long micros);

public static final SleepingStopwatch createFromSystemTimer() {
return new SleepingStopwatch() {
final Stopwatch stopwatch = Stopwatch.createStarted();

@Override
protected long readMicros() {
return stopwatch.elapsed(MICROSECONDS);
}

@Override
protected void sleepMicrosUninterruptibly(long micros) {
if (micros > 0) {
Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
}
}
};
}

stopwatch是一个能够计算时间消耗的类

SmoothRateLimiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond; //根据时间和传入的许可数,计算出最大的许可书
if (oldMaxPermits == Double.POSITIVE_INFINITY) { //说明之前没有设置,直接修改为最大许可数
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits; //最大许可数变了,重新计算可用许可数,为之前可用的 * 变化的比例
}
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}

@Override
double coolDownIntervalMicros() {
return stableIntervalMicros; //表示不同令牌间隔的时间,放回的是需要冷却的时间,也就是等待令牌放回的时间
}
}

SmoothRateLimiter.reserveEarliestAvailable

1
2
3
4
5
6
7
8
9
10
11
12
13
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); //更新可用令牌数以及下一次刷新时间
long returnValue = nextFreeTicketMicros; //下一次刷新时间
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); //可以提供出来的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend; //剩余未提供的令牌数
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); //计算需要等待的时间

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); //计算下一次更新令牌的时间
this.storedPermits -= storedPermitsToSpend; //减去消耗的令牌数
return returnValue;
}

SmoothRateLimiter.resync

1
2
3
4
5
6
7
8
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); //计算新的令牌书
storedPermits = min(maxPermits, storedPermits + newPermits); //选出来新的令牌数和最大令牌数中小的哪个
nextFreeTicketMicros = nowMicros;
}
}

sleepUninterruptibly(以不会中断的方式sleep)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
boolean interrupted = false;
try {
long remainingNanos = unit.toNanos(sleepFor);
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
// TimeUnit.sleep() treats negative timeouts just like zero.
NANOSECONDS.sleep(remainingNanos);
return;
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}