在分布式系統中,應對高并發訪問時,快取、限流、降級是保護系統正常運行的常用方法,當請求量突發暴漲時,如果不加以限制訪問,則可能導致整個系統崩潰,服務不可用,同時有一些業務場景,比如短信驗證碼,或者其它第三方API呼叫,也需要提供必要的訪問限制支持,還有一些資源消耗過大的請求,比如資料匯出等(參考 記一次線上Java服務CPU 100%處理程序 ),也有限制訪問頻率的需求,
常見的限流演算法有令牌桶演算法,漏桶演算法,與計數器演算法,本文主要對三個演算法的基本原理及Google Guava包中令牌桶演算法的實作RateLimiter進行介紹,下一篇文章介紹最近寫的一個以RateLimiter為參考的分布式限流實作及計數器限流實作,
令牌桶演算法
令牌桶演算法的原理就是以一個恒定的速度往桶里放入令牌,每一個請求的處理都需要從桶里先獲取一個令牌,當桶里沒有令牌時,則請求不會被處理,要么排隊等待,要么降級處理,要么直接拒絕服務,當桶里令牌滿時,新添加的令牌會被丟棄或拒絕,
令牌桶演算法的處理示意圖如下(圖片來自網路)

令牌桶演算法主要是可以控制請求的平均處理速率,它允許預消費,即可以提前消費令牌,以應對突發請求,但是后面的請求需要為預消費買單(等待更長的時間),以滿足請求處理的平均速率是一定的,
漏桶演算法
漏桶演算法的原理是水(請求)先進入漏桶中,漏桶以一定的速度出水(處理請求),當水流入速度大于流出速度導致水在桶內逐漸堆積直到桶滿時,水會溢位(請求被拒絕),
漏桶演算法的處理示意圖如下(圖片來自網路)

漏桶演算法主要是控制請求的處理速率,平滑網路上的突發流量,請求可以以任意速度進入漏桶中,但請求的處理則以恒定的速度進行,
計數器演算法
計數器演算法是限流演算法中最簡單的一種演算法,限制在一個時間視窗內,至多處理多少個請求,比如每分鐘最多處理10個請求,則從第一個請求進來的時間為起點,60s的時間視窗內只允許最多處理10個請求,下一個時間視窗又以前一時間視窗過后第一個請求進來的時間為起點,常見的比如一分鐘內只能獲取一次短信驗證碼的功能可以通過計數器演算法來實作,
Guava RateLimiter決議
Guava是Google開源的一個工具包,其中的RateLimiter是實作了令牌桶演算法的一個限流工具類,在pom.xml中添加guava依賴,即可使用RateLimiter
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
如下測驗代碼示例了RateLimiter的用法,
public static void main(String[] args) {
RateLimiter rateLimiter = RateLimiter.create(1); //創建一個每秒產生一個令牌的令牌桶
for(int i=1;i<=5;i++) {
double waitTime = rateLimiter.acquire(i); //一次獲取i個令牌
System.out.println("acquire:" + i + " waitTime:" + waitTime);
}
}
運行后,輸出如下,
acquire:1 waitTime:0.0
acquire:2 waitTime:0.997729
acquire:3 waitTime:1.998076
acquire:4 waitTime:3.000303
acquire:5 waitTime:4.000223
第一次獲取一個令牌時,等待0s立即可獲取到(這里之所以不需要等待是因為令牌桶的預消費特性),第二次獲取兩個令牌,等待時間1s,這個1s就是前面獲取一個令牌時因為預消費沒有等待延到這次來等待的時間,這次獲取兩個又是預消費,所以下一次獲取(取3個時)就要等待這次預消費需要的2s了,依此類推,可見預消費不需要等待的時間都由下一次來買單,以保障一定的平均處理速率(上例為1s一次),
RateLimiter有兩種實作:
- SmoothBursty: 令牌的生成速度恒定,使用
RateLimiter.create(double permitsPerSecond)創建的是 SmoothBursty 實體, - SmoothWarmingUp:令牌的生成速度持續提升,直到達到一個穩定的值,WarmingUp,顧名思義就是有一個熱身的程序,使用
RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)時創建就是 SmoothWarmingUp 實體,其中 warmupPeriod 就是熱身達到穩定速度的時間,
類結構如下

關鍵屬性及方法決議(以 SmoothBursty 為例)
1.關鍵屬性
/** 桶中當前擁有的令牌數. */
double storedPermits;
/** 桶中最多可以保存多少秒存入的令牌數 */
double maxBurstSeconds;
/** 桶中能存盤的最大令牌數,等于storedPermits*maxBurstSeconds. */
double maxPermits;
/** 放入令牌的時間間隔*/
double stableIntervalMicros;
/** 下次可獲取令牌的時間點,可以是過去也可以是將來的時間點*/
private long nextFreeTicketMicros = 0L;
2.關鍵方法
呼叫 RateLimiter.create(double permitsPerSecond) 方法時,創建的是 SmoothBursty 實體,默認設定 maxBurstSeconds 為1s,SleepingStopwatch 是guava中的一個時鐘類實作,
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
并通過呼叫 SmoothBursty.doSetRate(double, long) 方法進行初始化,該方法中:
- 呼叫
resync(nowMicros)對 storedPermits 與 nextFreeTicketMicros 進行了調整——如果當前時間晚于 nextFreeTicketMicros,則計算這段時間內產生的令牌數,累加到 storedPermits 上,并更新下次可獲取令牌時間 nextFreeTicketMicros 為當前時間, - 計算 stableIntervalMicros 的值,1/permitsPerSecond,
- 呼叫
doSetRate(double, double)方法計算 maxPermits 值(maxBurstSeconds*permitsPerSecond),并根據舊的 maxPermits 值對 storedPermits 進行調整,
原始碼如下所示
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
呼叫 acquire(int) 方法獲取指定數量的令牌時,
- 呼叫
reserve(int)方法,該方法最終呼叫reserveEarliestAvailable(int, long)來更新下次可取令牌時間點與當前存盤的令牌數,并回傳本次可取令牌的時間點,根據該時間點計算需要等待的時間 - 阻塞等待1中回傳的等待時間
- 回傳等待的時間(秒)
原始碼如下所示
/** 獲取指定數量(permits)的令牌,阻塞直到獲取到令牌,回傳等待的時間*/
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
/** 回傳需要等待的時間*/
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
/** 針對此次需要獲取的令牌數更新下次可取令牌時間點與存盤的令牌數,回傳本次可取令牌的時間點*/
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 更新當前資料
long returnValue = https://www.cnblogs.com/spec-dog/p/nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可消費的令牌數
double freshPermits = requiredPermits - storedPermitsToSpend; // 需要新增的令牌數
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // 需要等待的時間
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 更新下次可取令牌的時間點
this.storedPermits -= storedPermitsToSpend; // 更新當前存盤的令牌數
return returnValue;
}
acquire(int) 方法是獲取不到令牌時一直阻塞,直到獲取到令牌,tryAcquire(int,long,TimeUnit) 方法則是在指定超時時間內嘗試獲取令牌,如果獲取到或超時時間到則回傳是否獲取成功
- 先判斷是否能在指定超時時間內獲取到令牌,通過
nextFreeTicketMicros <= timeoutMicros + nowMicros是否為true來判斷,即可取令牌時間早于當前時間加超時時間則可取(預消費的特性),否則不可獲取, - 如果不可獲取,立即回傳false,
- 如果可獲取,則呼叫
reserveAndGetWaitLength(permits, nowMicros)來更新下次可取令牌時間點與當前存盤的令牌數,回傳等待時間(邏輯與前面相同),并阻塞等待相應的時間,回傳true,
原始碼如下所示
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) { //判斷是否能在超時時間內獲取指定數量的令牌
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; //只要可取時間小于當前時間+超時時間,則可獲取(可預消費的特性!)
}
@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
以上就是 SmoothBursty 實作的基本處理流程,注意兩點:
- RateLimiter 通過限制后面請求的等待時間,來支持一定程度的突發請求——預消費的特性,
- RateLimiter 令牌桶的實作并不是起一個執行緒不斷往桶里放令牌,而是以一種延遲計算的方式(參考
resync函式),在每次獲取令牌之前計算該段時間內可以產生多少令牌,將產生的令牌加入令牌桶中并更新資料來實作,比起一個執行緒來不斷往桶里放令牌高效得多,(想想如果需要針對每個用戶限制某個介面的訪問,則針對每個用戶都得創建一個RateLimiter,并起一個執行緒來控制令牌存放的話,如果在線用戶數有幾十上百萬,起執行緒來控制是一件多么恐怖的事情)
總結
本文介紹了限流的三種基本演算法,其中令牌桶演算法與漏桶演算法主要用來限制請求處理的速度,可將其歸為限速,計數器演算法則是用來限制一個時間視窗內請求處理的數量,可將其歸為限量(對速度不限制),Guava 的 RateLimiter 是令牌桶演算法的一種實作,但 RateLimiter 只適用于單機應用,在分布式環境下就不適用了,雖然已有一些開源專案可用于分布式環境下的限流管理,如阿里的Sentinel,但對于小型專案來說,引入Sentinel可能顯得有點過重,但限流的需求在小型專案中也是存在的,下一篇文章就介紹下基于 RateLimiter 的分布式下的限流實作,
[轉載請注明出處]
作者:雨歌
歡迎關注作者公眾號:半路雨歌,查看更多技術干貨文章

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/138073.html
標籤:Java
下一篇:Python基礎-20裝飾器
