1 前言
前幾天我們又遇到了一個Netty報從連接池獲取連接超時例外從而導致整個服務不可用的例外,報的具體例外資訊是Exception accurred when acquire channel channel pool:TimeoutException,當時自己看了這個例外資訊,有種似曾相識的感覺,印象中自己第一次接觸到該例外是不久前也遇到了Netty報超時錯誤導致整個服務不可用的問題,最終只能重啟服務器來解決,于是自己去翻看了之前的例外訊息,發現報的錯誤果真同樣是從連接池獲取連接超時的例外!印象中前段時間Netty報這個錯誤時是剛好相關網路部門做過網路調整,當時我們就認為可能是由于網路原因導致Netty獲取連接超時,但是至于為啥會因為網路原因導致獲取Netty連接超時后從而導致服務不可用就還是一無所知,因此,這個“幽靈”Bug暫時對我們來說成了一團謎,
2 “幽靈”Bug得以復現給了我們解決這個Bug的希望
萬幸的是,這次相關同事復現了這個Bug,然后對方說只要在并發量大一點且后臺業務邏輯處理時間久的話這個Bug就會復現,且這個Bug是伴隨前臺執行緒請求后臺超時(這個是請求超時例外,而非獲取連接超時例外,注意區分)后報出來的,于是自己提高并發量且在后臺模擬業務超時進行測驗,果真“幽靈”Bug得以復現了,且這個Bug導致后面整個服務都不可用了,報錯如下截圖:

這個“幽靈”Bug的復現給我們帶來了解決它的希望,那么是什么原因導致在并發量一上來且前臺請求后臺超時后就會導致從Netty連接池獲取連接超時了呢?
注意: 這里有兩個超時例外,請注意區分:一個是從連接池獲取連接超時例外;令一個是從連接池成功獲取連接后,前臺請求后臺,由于后臺業務邏輯執行時間過長導致拋出的請求超時例外
我們無從而知,只能去翻看拋例外的代碼,我們撰寫的Netty連接池實作大概如下:
// CustomChannelPool.java
public class CustomChannelPool {
private ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
acquireTimeoutAction = null;
acquireTimeoutMillis = -1;
maxConnect = 8;
maxPendingAcquires = Integer.MAX_VALUE
releaseHealthCheck=true
//...省略無關屬性
static ChannelPool fixpool =
new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction,
acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); // 【0】
// 獲取連接
public Channel acquire(int timeoutMillis) throws Exception {
try {
Future<Channel> fch = fixpool.acquire(); // 【1】
Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);// 【2】
return ch;
} catch (Exception e) {
logger.error("Exception accurred when acquire channel from channel pool.", e);//【3】
throw e; //【4】
}
}
// 釋放連接
public void release(Channel channel) {
try {
if (channel != null) {
fixpool.release(channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后業務獲取連接的代碼大概如下:
// BusineseService.java
public class BusineseService {
public Response rpcCall() throw Exception{
// 獲取連接
Channel channel = CustomChannelPool.fixpool.acquire(10000); // 【5】
try {
// ...省略相關業務邏輯
// 最終進行底層遠程呼叫
channel.writeAndFlush(data);
// ...省略相關業務邏輯
} finally {
// 釋放連接
// 若前臺請求后臺超時后,是有釋放連接的
CustomChannelPool.fixpool.release(channel); // 【6】
}
}
}
根據報的例外資訊可用判斷是在進行遠程呼叫前呼叫CustomChannelPool.acquire方法的Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);這句代碼從Netty連接池獲取連接超時(即10秒后),然后拋出TimeoutException,最后再在CustomChannelPool.acquire方法的catch代碼塊列印出Exception accurred when acquire channel from channel pool:TimeoutException例外資訊,然后再把該例外往外拋出去,即最后會在BusineseService的標號【5】處的代碼Channel channel = CustomChannelPool.fixpool.acquire(10000);拋出了一個TimeoutException例外,又因為標號【5】處的代碼沒有包含在try塊內,因此不會執行標號【6】處的finally塊釋放連接的邏輯,
分析到這里,我們松了一口氣,原來導致該“幽靈”Bug的原因就是因為獲取連接的這決代碼Channel channel = CustomChannelPool.fixpool.acquire(10000);沒有被try塊包圍住,才導致沒有執行finlly塊的釋放連接邏輯!!!
Please calm down here!
即使我們將獲取連接的這決代碼Channel channel = CustomChannelPool.fixpool.acquire(10000);用try塊包圍住,最終在執行finally塊釋放連接的邏輯時等待我們的將會是什么呢?顯然,等待我們的是一個空指標例外!為啥呢?因為執行Channel channel = CustomChannelPool.fixpool.acquire(10000);這句代碼拋出TimeoutException例外后,拿到的channel將為null,然后我們再用結果為null的channel去釋放連接,自然會拋出一個NPE.
之前燃起的一線希望又被NPE撲滅了,出現幽靈Bug的原因依然沒找到!此時我們又失去了方向!
既然選擇了遠方,便只顧風雨兼程,嘿嘿,這里我們自我勵志下,別灰心,努力了總能解決它,不就是一個小小的bug么,
于是我們又冷靜分析了下出問題的兩句問題代碼上來:
Future<Channel> fch = fixpool.acquire(); // 【1】
Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);// 【2】
【1】處代碼呼叫fixpool.acquire()方法去獲取一個連接然后馬上回傳一個Future<Channel>物件fch,緊接著我們再呼叫【2】處代碼fch.get(timeoutMillis, TimeUnit.MILLISECONDS);方法來等待連接池的可用連接回傳,一直阻塞直至超時,超時后就拋出了TimeoutException例外,
從這里初步分析可以看到Netty獲取連接是異步進行的,當獲取到一個連接后再喚醒呼叫fch.get(timeoutMillis, TimeUnit.MILLISECONDS);代碼后正在阻塞等待的執行緒,
我們再回想下,復現該bug的前提條件是拋出該例外的前提是并發量大且會伴隨著大量前臺請求后臺的執行緒請求超時后出現,這里請求后臺超時的執行緒是已經成功從連接池獲得連接的執行緒,且超時拋出請求超時例外后也有執行finally塊的釋放(歸還)連接回連接池的操作的!
那么是什么原因會導致拋出從連接池獲取連接超時例外呢?于是我們不禁有以下猜測:
猜測1: 瞬間高并發的請求導致連接池資源耗盡,從而導致大量獲取連接超時,這種情況是可能出現的,但是高并發過后,整個服務就不可用了(這里的服務不可用不是指應用宕掉,而是總是報獲取連接超時)!按理說高并發過后應該歸還連接到連接池了,因此肯定不會出現服務不可用的情況,因此這個猜測可以排除了,唯一的原因就是連接沒能正常歸還到連接池!!!
至于為啥連接沒能正常歸還到連接池,我們又有以下猜測:
猜測2: 請求后臺超時的channel連接不能正常歸還到連接池,channel連接請求后臺超時后,這個連接不能正常放回連接池,導致channel連接池可用連接耗盡,最終導致其他執行緒從連接池獲取連接超時?如果是這樣,那么為啥請求后臺超時的連接不能正常放回連接池呢?
猜測3: 請求后臺超時channel連接能正常歸還到連接池,此時又因為從連接池獲取channel連接是異步的,當獲取連接超時后,我們關心的是獲取連接的異步執行緒最終有無從連接池成功獲取到一個連接呢?這里有兩種可能:1)獲取連接超時后不能從連接池獲取到一個連接,即使前面實作的代碼中獲取連接超時的話沒有釋放連接也不影響,因為這種情況根本就沒有獲取到連接;1)獲取連接超時后仍能成功獲取到一個連接,但從前面實作代碼的分析程序中可以知道,獲取連接超時的話,這個獲取到的連接是沒有被釋放的,如果是這種情況,那么就會導致連接池資源耗盡從而導致服務不可用了!
顯然,我們要朝著猜測2和猜測3的方向去排查問題,至于哪種原因導致連接沒能正常歸還到連接池呢?我們依然百思不得其解!因為此時Netty連接池對于我們來說是一個黑盒,此時是時候去打開這個黑盒一探究竟了!
3 Netty連接池FixedChannelPool獲取和釋放連接原始碼分析
來到這里我們就要打開Netty的channel連接池原始碼看一下了,前面導致問題的代碼無非就是連接池的acquire和release兩個方法,相信我們能從連接池的這兩個方法的原始碼中找到導致Exception accurred when acquire channel from channel pool:TimeoutException例外即獲取連接超時例外的原因,
3.1 連接池整體類結構的理解
這里用到的是Netty的FixedChannelPool連接池,同時FixedChannelPool繼承了SimpleChannelPool,而SimpleChannelPool又實作了ChannelPool介面,如下圖:

我們先來看下ChannelPool介面的原始碼:
// ChannelPool.java
public interface ChannelPool extends Closeable {
Future<Channel> acquire();
Future<Channel> acquire(Promise<Channel> promise);
Future<Void> release(Channel channel);
Future<Void> release(Channel channel, Promise<Void> promise);
void close();
}
可見ChannelPool介面實作了Netty連接池獲取連接和釋放連接的基本介面,而相應的獲取連接和釋放連接的回傳結果類時都是Future型別,可見Netty連接池獲取連接和釋放連接的操作都是異步執行的,
原始碼這里先補貼了,我們來看下SimpleChannelPool的類結構:

首先SimpleChannelPool實作了Netty的channel連接池的基本功能如獲取連接,釋放連接以及對channel連接進行健康檢查等,此外,SimpleChannelPool是如何來存盤channel連接呢?此時從上圖的序號4可以看到定義了一個雙端佇列deque來存盤channel連接,
再來看下FixedChannelPool的類結構:

可以看到FixedChannelPool在SimpleChannelPool的基礎上實作了連接池數量控制,待獲取連接超時任務處理,待獲取連接超時任務處理策略以及釋放連接后喚醒待獲取連接的任務的一些邏輯,詳細決議如下:
- 成員變數
maxPendingAcquires表示連接池的最大連接數即連接池容量,pendingAcquireCount表示已經獲取的連接數量(包括從連接池建立的連接及額外新建的連接),這兩個變數用來判斷連接池有無可用連接; - 內部類
AcquireTask,待獲取連接任務,當連接池資源耗盡時,待獲取的連接會被封裝成一個AcquireTask任務; - 定義了一個
ArrayDeque型別的雙端佇列pendingAcquireQueue,當連接池可用channel連接耗盡時,待獲取的連接會被封裝成一個AcquireTask,然后pendingAcquireQueue佇列就是用來存盤AcquireTask的; - 成員變數
maxPendingAcquires表示pendingAcquireQueue佇列的大小,pendingAcquireCount表示等待獲取channel連接的數量,這兩個變數用來控制pendingAcquireQueue佇列容量滿還是不滿; - 成員變數
acquireTimeoutNanos表示從連接池獲取channel連接的超時時間,內部列舉類AcquireTimeoutAction封裝了待獲取連接的任務超時時該執行的策略,默認有新建NEW和失敗FAIL策略; - 內部抽象類
TimeoutTask實作了Runnable介面,當待獲取連接任務超時時,此時根據AcquireTimeoutAction策略來執行該任務;
3.2 從連接池獲取連接的原始碼分析
我們首先來分析下連接池獲取連接的原始碼,直接上原始碼:
// FixedChannelPool.java
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
try {
// 如果當前執行緒是executor的執行緒,那么就直接呼叫acquire0方法獲取連接,
// 【注意】這里是異步去獲取channel連接哈,如果呼叫future.get方法,只要連接沒獲取到,那么將一直阻塞,直到連接獲取完成,
if (executor.inEventLoop()) {
acquire0(promise);
// 如果當前執行緒不是executor的執行緒,那么就由executor這個執行緒呼叫acquire0方法獲取連接,這里是異步獲取連接哈
} else {
executor.execute(new Runnable() {
@Override
public void run() {
acquire0(promise);
}
});
}
} catch (Throwable cause) {
// 出現例外,設定失敗回呼
promise.setFailure(cause);
}
// 回傳保證,這里的保證是能拿到Channel,Promise繼承了Future
re
可以看到acquire方法又呼叫了acquire0方法:
// FixedChannelPool.java
private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop();
// 判斷FixedChannelPool連接池是否已經關閉
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
return;
}
// 【1】如果已經獲取的連接數量acquiredChannelCount小于Channel連接池的數量,說明連接池還有可用連接,因此這里直接從池子里取連接即可
// 注意:acquiredChannelCount是從0開始計數的哈
if (acquiredChannelCount.get() < maxConnections) {
assert acquiredChannelCount.get() >= 0;
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop
Promise<Channel> p = executor.newPromise();
// 新建一個AcquireListener,這個AcquireListener是FixedChannelPool的一個內部類,
// TODO 用來當獲取到連接回呼其內部的operationComplete方法?
AcquireListener l = new AcquireListener(promise);
// 呼叫AcquireListener的acquired方法,在獲取到連接前先給acquiredChannelCount加1,
// TODO [思考]大膽猜測,如果后續的獲取連接若失敗,肯定有acquiredChannelCount減1的代碼,但是在哪里呢
l.acquired();
// 給保證添加AcquireListener監聽器
p.addListener(l);
// 這里還是呼叫父類SimpleChannelPool來獲取連接,這里先提下父類SimpleChannelPool沒有實作連接池數量控制的相關功能,
// SimpleChannelPool只是實作了新建連接,健康檢查等邏輯
super.acquire(p);
// 【2】獲取連接時,能執行到這里,說明已經獲取的連接數量acquiredChannelCount大于或等于Channel連接池的數量,
// 即表明連接池無可用連接了,此時就需要根據有無設定AcquireTimeoutAction策略來執行相應的操作了
} else {
// 【2.1】如果等待獲取連接數量pendingAcquireCount超過佇列的最大容量maxPendingAcquires的話,此時直接拋例外
if (pendingAcquireCount >= maxPendingAcquires) {
tooManyOutstanding(promise);
// 【2.2】若等待獲取連接數量pendingAcquireCount還沒占滿pendingAcquireQueue佇列
} else {
// 這里把等待獲取連接的保證promise封裝成AcquireTask任務
AcquireTask task = new AcquireTask(promise);
// 將之前封裝的AcquireTask任務入pendingAcquireQueue佇列,放到最后面,這里pendingAcquireQueue是一個ArrayDeque佇列
if (pendingAcquireQueue.offer(task)) {
// 入隊成功,因此pendingAcquireCount自增1
++pendingAcquireCount;
// 【重要】這里如果timeoutTask不為null,則說明要么設定了獲取連接超時的處理策略,目前的netty連接池內置的策略中,要么為NEW,要么為FAIL
if (timeoutTask != null) {
// 設定了獲取連接超時處理策略的話,那么把timeoutTask扔到定時任務里去,一旦獲取連接超時,那么就執行timeoutTask
// 若策略為NEW,那么就會新建連接然后回傳;若策略為FAIL,那么直接拋例外
// 這里調度超時任務后,然后再給task.timeoutFuture賦值,也是為了做標記的意思,因為后面一個執行緒釋放連接后
// 會繼續“喚醒”pendingAcquireQueue的一個任務,那時候這個任務肯定是未超時的,所以需要取消這個定時任務
task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
}
// 執行到這里,說明前面入pendingAcquireQueue佇列時佇列已滿,然后也直接拋例外
} else {
tooManyOutstanding(promise);
}
}
assert pendingAcquireCount > 0;
}
}

這里再著重看下當帶獲取連接超時后,這個TimeoutTask的原始碼邏輯是怎樣的:
// FixedChannelPool.TimeoutTask.java
private abstract class TimeoutTask implements Runnable {
@Override
public final void run() {
assert executor.inEventLoop();
// 獲取系統當前時間
long nanoTime = System.nanoTime();
// 進入死回圈
for (;;) {
// 從pendingAcquireQueue佇列中獲取一個待獲取連接的任務,【注意】這里是peek哈,相當于查詢,而不會移除佇列中的元素
// 【思考】天哪,這里是死回圈+查詢佇列的操作,那當一個獲取連接超時定時任務到來時,豈不會將pendingAcquireQueue佇列中的
// 所有任務(包括未timeout的任務)都查出來?可以肯定的是這里確實是這樣子,答案見后面代碼注釋分析
AcquireTask task = pendingAcquireQueue.peek();
// Compare nanoTime as descripted in the javadocs of System.nanoTime()
//
// See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
// See https://github.com/netty/netty/issues/3705 這里估計出現過bug,后面修復了,嘿嘿,以后有空再去看看這個issue
// (1)如果從pendingAcquireQueue佇列獲取的任務為空,那么則說明沒有待獲取連接的任務了,此時直接break;
// (2) 如果從pendingAcquireQueue佇列獲取的任務不為空,此時肯定不能直接進行remove操作吧,想想,此時pendingAcquireQueue佇列里
// 是不是有可能還有未超時的任務,
// 2.1)因此下面需要執行nanoTime - task.expireNanoTime是不是小于0,如果小于0直接break,等這個任務超時時再來執行這里的代碼,
// 想想如果這里將非超時的任務也一起取出來去執行也不是不可以,想想這里不這樣做的原因如下:
// a)這里的職責是專門處理獲取連接超時任務的,如果這里也執行非超時任務,那么造成功能混亂;
// b)若本來超時任務就多,此時又加上處理非超時任務的話,那么系統壓力會更大
// c)這里非超時任務應該留給連接池的可用連接去處理哈,因為這里pendingAcquireQueue里的任務本來就是因為連接池資源耗盡的情況下,
// 其余獲取連接的任務才入pendingAcquireQueue佇列的,因此當一個執行緒用完從連接池獲取的連接后,這個執行緒把連接歸還給連接池后,
// 這個執行緒首先判斷連接池還有無可用連接,若連接池還有可用連接,那么其有義務有“喚醒”pendingAcquireQueue佇列中的一個未超時的任務,
// 這個任務被喚醒后,然后再去連接池獲取連接即可
// 2.2)如果大于等于0,那么就根據是NEW還是FAIL策略來執行這個獲取連接超時任務了
if (task == null || nanoTime - task.expireNanoTime < 0) {
break;
}
// 執行到這里,說明獲取連接任務確實超時了,因此可以將這個任務直接從pendingAcquireQueue佇列移除了哈
pendingAcquireQueue.remove();
// 自然,pendingAcquireCount也會減1
--pendingAcquireCount;
// 還記得FixedChannelPool的一個構造方法最侄訓根據AcquireTimeoutAction的NEW還是FAIL策略來新建一個TimeoutTask,
// 然后當獲取連接時連接池又無可用連接情況下,此時除了獲取連接任務會入pendingAcquireQueue佇列外,另外TimeoutTask也會交給
// 一個定時任務調度執行緒執行緒去執行,還記得么?
// 那么代碼執行到這里,說明已經在這個定時任務的調度方面里面了,此時再回呼TimeoutTask的onTimeout方法哈
onTimeout(task);
}
}
// 根據帶獲取連接任務超時時,該回呼的策略方法
public abstract void onTimeout(AcquireTask task);
}
同樣,詳情請見注釋即可,我們再來看下根據帶獲取連接超時后,最后會執行回呼onTimeout方法,那么我們再來看看onTimeout方法的相關邏輯:
// FixedChannelPool.java
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires,
boolean releaseHealthCheck, boolean lastRecentUsed) {
super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
if (maxConnections < 1) {
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
}
if (maxPendingAcquires < 1) {
throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
}
// 這里表示初始化時獲取連接超時action策略為null且acquireTimeoutMillis == -1
if (action == null && acquireTimeoutMillis == -1) {
timeoutTask = null;
acquireTimeoutNanos = -1;
// 做一些不合理的引數校驗
} else if (action == null && acquireTimeoutMillis != -1) {
throw new NullPointerException("action");
// 做一些不合理的引數校驗
} else if (action != null && acquireTimeoutMillis < 0) {
throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
// 執行到這里,表示action != null且acquireTimeoutMillis >= -1,即設定了獲取連接超時的從處理策略
} else {
acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
// 判斷是NEW還是FAIL策略
switch (action) {
// (1)如果是獲取連接超時FAIL策略,當獲取連接超時的話,此時如果pendingAcquireQueue佇列中還有未能拿到連接的執行緒任務,此時直接失敗拋例外,簡單粗暴!!!
case FAIL:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Fail the promise as we timed out.
task.promise.setFailure(new TimeoutException(
"Acquire operation took longer then configured maximum time") {
@Override
public Throwable fillInStackTrace() {
return this;
}
});
}
};
break;
// (2)如果是獲取連接超時NEW策略,當獲取連接超時的話,此時如果pendingAcquireQueue佇列中還有未能拿到連接的執行緒任務,
// 此時會為這些獲取連接的執行緒任務新建連接,這里理性一點,到時如果設定pendingAcquireCount過大,在高并發情況下會導致大量連接創建
// 有著耗盡資源的風險
case NEW:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Increment the acquire count and delegate to super to actually acquire a Channel which will
// create a new connection.
// acquiredChannelCount獲取的連接數+1且給acquired賦值true
task.acquired();
// 呼叫父類SimpleChannelPool.acquire來創建一直新連接
FixedChannelPool.super.acquire(task.promise);
}
};
break;
default:
throw new Error();
}
}
// 這個executor是用來獲取連接的,總是同一個executor異步去獲取連接
executor = bootstrap.config().group().next();
this.maxConnections = maxConnections;
this.maxPendingAcquires = maxPendingAcquires;
}
可見,原來TimeoutTask.onTimeout方法是在FixedChannelPool的構造方法中初始化的即當我們新建一個Netty連接池FixedChannelPool時TimeoutTask.onTimeout方法就會根據超時任務策略初始化好,詳情見原始碼注釋即可,
3.3 釋放(歸還)連接回連接池的原始碼分析
前面分析了Netty連接池FixedChannelPool獲取連接的程序,下面我們同樣來分析下Netty連接池FixedChannelPool釋放連接的原始碼,因為釋放連接是直接呼叫了父類SimpleChannelPool的release方法:
// SimpleChannelPool.java
@Override
public final Future<Void> release(Channel channel) {
// 這里如果連接池是FixedChannelPool的話,這里實質呼叫的是FixedChannelPool的release(final Channel channel, final Promise<Void> promise)方法,
// 因為FixedChannelPool多載了SimpleChannelPool的release(final Channel channel, final Promise<Void> promise)方法
return release(channel, channel.eventLoop().<Void>newPromise());
}
此時在SimpleChannelPool的release方法中又呼叫了子類FixedChannelPool的多載的release(channel, channel.eventLoop().<Void>newPromise());方法,我們進該方法一看究竟:
// FixedChannelPool.java
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 新建一個Promise
final Promise<Void> p = executor.newPromise();
// 然后再呼叫父類SimpleChannelPool的release(final Channel channel, final Promise<Void> promise)方法,
// 【思考】這里為啥要這么繞呀???先是呼叫父類SimpleChannelPool的release(Channel channel),然后在父類SimpleChannelPool的release方法
// 中再呼叫本方法,而明明父類就有這個release(final Channel channel, final Promise<Void> promise)方法,為何不直接呼叫呢???
//【答案】答案就是SimpleChannelPool只實作了連接池獲取連接,釋放連接和健康檢查的相關基本方法,而連接釋放回連接池后,我們是不是要喚醒
// pendingAcquireQueue佇列中的一個任務呢?是吧,因此下面就給Promise又添加了一個FutureListener監聽器,這個監聽器的作用就是當SimpleChannelPool的
// release方法把連接放回連接池后,此時回呼該監聽器的operationComplete方法來喚醒pendingAcquireQueue里的一個任務,嘿嘿,是不是有點繞,哈哈
super.release(channel, p.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop();
// 以為連接池已經關閉,我們沒得選擇只能關閉channel,然后回呼setFailure反彈廣發
if (closed) {
// Since the pool is closed, we have no choice but to close the channel
channel.close();
promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
return;
}
// (1)如果釋放連接回連接池成功
// TODO【思考】這個future是只哪個future呢?你能找到這個future是從哪里傳進來的嗎?嘿嘿嘿
if (future.isSuccess()) {
// 那么此時就要就獲取的連接數量acquiredChannelCount減1且“喚醒”pendingAcquireQueue佇列的一個待獲取連接的一個任務
// 還記得之前分析acquire原始碼時當連接池無可用連接時,此時會將這個獲取連接的一個執行緒封裝成一個AcquireTask任務放進pendingAcquireQueue佇列嗎?
decrementAndRunTaskQueue();
// 回到setSuccess方法
promise.setSuccess(null);
// (2)如果連接沒有成功釋放回連接池,且沒有還錯池子的情況下發生了例外,那么這里同樣需要獲取的連接數量acquiredChannelCount減1
// 且“喚醒”pendingAcquireQueue佇列的一個待獲取連接的一個任務
// TODO 納尼??這里還可以多個池子?為啥沒還錯池子的情況發生了歸還連接的時候發生例外就不用 decrementAndRunTaskQueue呢?二十直接呼叫setFailure方法,
// 這個setFailure方法又因此著什么邏輯呢?
} else {
Throwable cause = future.cause();
// Check if the exception was not because of we passed the Channel to the wrong pool.
if (!(cause instanceof IllegalArgumentException)) {
decrementAndRunTaskQueue();
}
// 回呼setFailure方法
promise.setFailure(future.cause());
}
}
}));
return promise;
}
從原始碼可以看到呼叫了子類FixedChannelPool的多載的release(channel, channel.eventLoop().<Void>newPromise());方法是為了添加一個FutureListener監聽器,這個監聽器的作用詳見注釋,然后又調回父類SimpleChannelPool的release(final Channel channel, final Promise<Void> promise)方法,是不是有點繞,嘿嘿,繼續看該方法原始碼:
// SimpleChannelPool.java
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
// TODO 【思考】這里每次釋放連接都是有多個NioEventLoop執行緒,而獲取連接卻用的是同一個NioEventLoop執行緒,為啥???
EventLoop loop = channel.eventLoop();
// TODO 【思考】這里何時會被執行到?經過除錯一般都是執行的是else分支
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
// 此時將釋放連接的操作封裝成一個Runnable任務,然后將這個任務添加進SingleThreadEventExecutor的taskQueue中
// 反正最終是異步釋放連接
loop.execute(new Runnable() {
@Override
public void run() {
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
return promise;
}
結果父類SimpleChannelPool的release方法中又繼續呼叫doReleaseChannel方法來釋放連接,由于篇幅有限,這里更具體的原始碼就不再深究了,不過可以肯定的是呼叫完doReleaseChannel方法釋放連接后,必然會回呼之前添加的FutureListener的operationComplete方法,然后繼續呼叫decrementAndRunTaskQueue方法,那么我們繼續跟下decrementAndRunTaskQueue方法原始碼:
// FixedChannelPool.java
private void decrementAndRunTaskQueue() {
// We should never have a negative value.
// 因為前面已經把連接歸還回連接池了,自然這里會將已獲取的連接數量減1
int currentCount = acquiredChannelCount.decrementAndGet();
assert currentCount >= 0;
// Run the pending acquire tasks before notify the original promise so if the user would
// try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
// maxPendingAcquires we may be able to run some pending tasks first and so allow to add
// more.
// 然后“喚醒”pendingAcquireQueue佇列的一個待獲取連接的一個任務去連接池拿連接,
// 因為這里喚醒的是未超時的任務,因此連接必須從連接池拿
runTaskQueue();
}
繼續跟runTaskQueue方法原始碼:
// FixedChannelPool.java
private void runTaskQueue() {
// 這里非超時任務應該留給連接池的可用連接去處理哈,因為這里pendingAcquireQueue里的任務本來就是因為連接池資源耗盡的情況下,
// 其余獲取連接的任務才入pendingAcquireQueue佇列的,因此當一個執行緒用完從連接池獲取的連接后,這個執行緒把連接歸還給連接池后,
// 這個執行緒首先判斷連接池還有無可用連接,若連接池還有可用連接,那么其有義務有“喚醒”pendingAcquireQueue佇列中的一個未超時的任務,
// 這個任務被喚醒后,然后再去連接池獲取連接即可
// 如果acquiredChannelCount小于連接池數量,說明連接池還有可用連接
// TODO 【思考】這里while (acquiredChannelCount.get() < maxConnections)判斷的初衷感覺像是一定要從連接池獲取一個連接,
// 而不是新建一個連接,否則就不用這么判斷了,那么問題來了:
// while (acquiredChannelCount.get() < maxConnections)沒有執行緒安全問題么???如果不用鎖的話可能會出現“一票多賣”問題
// 除非這里是單執行緒執行就沒有執行緒安全問題,
// 如果存在執行緒安全問題,當并發量大的話出現“一票多賣問題”,即最侄訓會導致連接池可用連接耗盡,其他沒能拿到連接的執行緒還是會新建
// 一些連接出來,這么做可是可以,但卻又違反了“未超時任務的連接只能等待執行緒池的連接,超時任務再由定時任務額外新建連接”的初衷,
// 因為執行到這里從pendingAcquireQueue佇列取出的任務的一般都是未超時的,
// 答案這里應該是單執行緒執行?待確認?除錯的時候發現基本是同一個執行緒
//
while (acquiredChannelCount.get() < maxConnections) {
// 取出第一個待獲取連接的未超時的任務,因為如果是超時的獲取連接任務的話,已經被定時任務移除掉了哈
AcquireTask task = pendingAcquireQueue.poll();
// 若佇列里沒有待獲取連接的任務,直接跳出即可
if (task == null) {
break;
}
// 如果當初有設定定時任務清理超時的帶獲取連接任務,那么此時timeoutFuture不為Null,因此需要取消這個定時任務的執行
// Cancel the timeout if one was scheduled
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
// pendingAcquireCount減1
--pendingAcquireCount;
// acquiredChannelCount加1
task.acquired();
// 呼叫父類SimpleChannelPool的acquire方法:
// 1)連接池有可用連接,從連接池取出即可;
// 2)連接池沒有可用連接,此時直接NEW一個連接出來
super.acquire(task.promise);
}
// We should never have a negative value.
assert pendingAcquireCount >= 0;
assert acquiredChannelCount.get() >= 0;
}
可見,當獲取到連接的執行緒將連接放回連接池后,會繼續喚醒一些pendingAcquireQueue佇列未超時的待獲取連接的任務來獲取連接,
下面繼續用一個流程圖來總結下釋放連接的程序:

3.4 Netty連接池獲取和釋放連接流程總結
同樣,以一個流程圖來總結Netty連接池獲取和釋放連接流程:

這里不再文字累贅總結,更詳細的Netty原始碼注釋可參見我的github網址:
https://github.com/yuanmabiji/netty
分析完 Netty連接池獲取和釋放連接流程,前面的猜測2和猜測3全部可以得到答案了:
猜測2: 請求后臺超時的channel連接不能正常歸還到連接池,channel連接請求后臺超時后,這個連接不能正常放回連接池,導致channel連接池可用連接耗盡,最終導致其他執行緒從連接池獲取連接超時?如果是這樣,那么為啥請求后臺超時的連接不能正常放回連接池呢?
猜測2答案: 請求后臺超時的channel連接可以正常釋放回連接池,且放回的連接是健康可用的,
猜測3: 請求后臺超時channel連接能正常歸還到連接池,此時又因為從連接池獲取channel連接是異步的,當獲取連接超時后,我們關心的是獲取連接的異步執行緒最終有無從連接池成功獲取到一個連接呢?這里有兩種可能:1)獲取連接超時后不能從連接池獲取到一個連接,即使前面實作的代碼中獲取連接超時的話沒有釋放連接也不影響,因為這種情況根本就沒有獲取到連接;1)獲取連接超時后仍能成功獲取到一個連接,但從前面實作代碼的分析程序中可以知道,獲取連接超時的話,這個獲取到的連接是沒有被釋放的,如果是這種情況,那么就會導致連接池資源耗盡從而導致服務不可用了!
猜測3答案: 獲取連接超時后仍能成功獲取到一個連接,但從前面實作代碼的分析程序中可以知道,獲取連接超時的話,這個獲取到的連接是沒有被釋放的,如果是這種情況,那么就會導致連接池資源耗盡從而導致服務不可用了!
4 獲取連接超時例外導致連接池資源耗盡的Bug
前面詳細分析了Netty連接池獲取連接和釋放連接的流程,相信一直困擾著我們的“幽靈”Bug的原因已經付出水面了吧,
這里直接開門見山說答案吧,現在還是先來回復下我們的問題代碼:
// CustomChannelPool.java
public class CustomChannelPool {
private ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
// 【重要,問題代碼】acquireTimeoutAction策略居然是null!
acquireTimeoutAction = null;
// 【重要,問題代碼】acquireTimeoutAction策略是null,且這里acquireTimeoutMillis = -1
acquireTimeoutMillis = -1;
// 連接池容量為8
maxConnect = 8;
// axPendingAcquires容量為Integer.MAX_VALUE
maxPendingAcquires = Integer.MAX_VALUE
releaseHealthCheck=true
//...省略無關屬性
// 【重要,問題代碼】
static ChannelPool fixpool =
new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction,
acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); // 【0】
// 獲取連接
public Channel acquire(int timeoutMillis) throws Exception {
try {
Future<Channel> fch = fixpool.acquire(); // 【1】
// 【重要,問題代碼】
Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);// 【2】
return ch;
} catch (Exception e) {
logger.error("Exception accurred when acquire channel from channel pool.", e);//【3】
throw e; //【4】
}
}
// 釋放連接
public void release(Channel channel) {
try {
if (channel != null) {
fixpool.release(channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
分析到這里,導致“幽靈”Bug出現的原因就是獲取連接任務超時后,此時還有一個異步執行緒在執行著從連接池獲取連接的操作,這個連接取出后由于不能再正常回傳給業務執行緒了,因為此時業務執行緒因為獲取連接超時例外了;又因為正常情況下,釋放連接的操作由業務執行緒來觸發完成,當獲取連接超時的任務從連接池取完所有可用連接后,此時服務就不可用了,而偏偏此時我們沒有實作待獲取連接的超時任務策略AcquireTimeoutAction,因為我們在構造一個FixedChannelPool連接池時執行的建構式代碼static ChannelPool fixpool = new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction, acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed);中
傳的cquireTimeoutAction引數為null且acquireTimeoutMillis為-1,也就意味著在獲取連接任務超時后,沒有一個定時任務會從pendingAcquireQueue佇列中取出超時的獲取連接任務,然后回傳給業務執行緒!且在連接池資源耗盡的情況下,隨著請求的積壓,pendingAcquireQueue佇列的待獲取連接任務會越積越多,當超擠壓的任務超出pendingAcquireQueue佇列的容量后,此時就會報Too many outstanding acquire operations例外,這里pendingAcquireQueue佇列容量是Integer.MAX_VALUE,因此存在OOM的風險,不過風險應該很小,
上面分析了業務執行緒執行Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);這句代碼會因為超時而提前回傳,因此即使我們實作了待獲取連接超時任務策略acquireTimeoutAction也不行的,如果fch.get的超時時間timeoutMillis小于帶獲取連接的任務超時時間acquireTimeoutMillis同樣也會導致處理待獲取連接超時任務的定時任務最侄訓取連接后回傳不了給業務執行緒,此時同樣解決不了這個“幽靈”Bug,
5 修復獲取連接超時例外導致連接池資源耗盡的Bug
相信經過前面的分析,那么如何修復這個獲取連接超時例外導致連接池資源耗盡的Bug呢?相信你心中已經有了答案,下面直接上修復后的代碼:
// CustomChannelPool.java
public class CustomChannelPool {
private ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
// 【修復】新建策略:當檢測到獲取連接超時時,此時新建一個連接
acquireTimeoutAction = AcquireTimeoutAction.NEW;
// 【修復】超時時間設定為10秒
acquireTimeoutMillis = -1;
// 【修復】連接池容量調整為100,原來的8未免太小
maxConnect = 100
// 【修復】mxPendingAcquires容量由原來的Integer.MAX_VALUE調整為100000,避免oom風險
maxPendingAcquires = 100000
releaseHealthCheck=true
//...省略無關屬性
static ChannelPool fixpool =
new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction,
acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); // 【0】
// 獲取連接
public Channel acquire(int timeoutMillis) throws Exception {
try {
Future<Channel> fch = fixpool.acquire(); // 【1】
// 這里連續n個獲取連接超時后,因為沒有歸還連接,會造成連接池可用連接耗盡,最終導致服務不可用,注:n為連接池的數量
// 解決方案:需要實作AcquireTimeoutAction的NEW或FAIL策略,因為您實作的代碼創建FixedChannelPool連接池時AcquireTimeoutAction引數傳的是null
// Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 【修復】因為需要用到AcquireTimeoutAction策略,因此這里不需要超時了
Channel ch = fch.get();
return ch;
} catch (Exception e) {
logger.error("Exception accurred when acquire channel from channel pool.", e);//【3】
throw e; //【4】
}
}
// 釋放連接
public void release(Channel channel) {
try {
if (channel != null) {
fixpool.release(channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
6 總結
解決一個Bug真心不容易,繼續加油吧,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/232642.html
標籤:AI
上一篇:【飛槳PaddlePaddle】四天搞懂生成對抗網路(二)——風格遷移的“精神始祖”Conditional GAN
