使用方法
创建RateLimiter
1 RateLimiter ra = RateLimiter.create(2000 );
使用的时候,去acquire
1 2 3 4 while (true ){ ra.acquire(); xxx }
工作流程 采用令牌桶算法。在create的时候,根据传入的每秒令牌数,计算出每个令牌发放锁需要的时间,然后设定初始的令牌数。
在使用acquire获取令牌的时候,会先校验申请的令牌数是否合法,然后判断剩余令牌数是否足够,如果不够的话,会根据差值和每个令牌所需要的时间,计算出来需要等待的时间,并且会以不中断的方式进行sleep等待,直到到达了预定了时间,拿到令牌返回。每个令牌不需要被返回,在消耗的时候,是直接对当前可用令牌数进行扣减。
create 1 2 3 4 5 6 7 8 9 10 public static RateLimiter create (double permitsPerSecond) { return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond); } @VisibleForTesting static RateLimiter create (SleepingStopwatch stopwatch, double permitsPerSecond) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 ); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
acquire 1 2 3 4 5 public double acquire (int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L ); }
reserve 1 2 3 4 5 6 final long reserve (int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } }
1 2 3 4 final long reserveAndGetWaitLength (int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0 ); }
SleepingStopwatch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 abstract static class SleepingStopwatch { protected SleepingStopwatch () {} protected abstract long readMicros () ; protected abstract void sleepMicrosUninterruptibly (long micros) ; public static final SleepingStopwatch createFromSystemTimer () { return new SleepingStopwatch() { final Stopwatch stopwatch = Stopwatch.createStarted(); @Override protected long readMicros () { return stopwatch.elapsed(MICROSECONDS); } @Override protected void sleepMicrosUninterruptibly (long micros) { if (micros > 0 ) { Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS); } } }; }
stopwatch是一个能够计算时间消耗的类
SmoothRateLimiter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 static final class SmoothBursty extends SmoothRateLimiter { final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super (stopwatch); this .maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate (double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this .maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0 ) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime (double storedPermits, double permitsToTake) { return 0L ; } @Override double coolDownIntervalMicros () { return stableIntervalMicros; } }
SmoothRateLimiter.reserveEarliestAvailable 1 2 3 4 5 6 7 8 9 10 11 12 13 final long reserveEarliestAvailable (int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this .storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this .storedPermits, storedPermitsToSpend) + (long ) (freshPermits * stableIntervalMicros); this .nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this .storedPermits -= storedPermitsToSpend; return returnValue; }
SmoothRateLimiter.resync 1 2 3 4 5 6 7 8 void resync (long nowMicros) { if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
sleepUninterruptibly(以不会中断的方式sleep) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void sleepUninterruptibly (long sleepFor, TimeUnit unit) { boolean interrupted = false ; try { long remainingNanos = unit.toNanos(sleepFor); long end = System.nanoTime() + remainingNanos; while (true ) { try { NANOSECONDS.sleep(remainingNanos); return ; } catch (InterruptedException e) { interrupted = true ; remainingNanos = end - System.nanoTime(); } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }