
上一節我們分析到如何將訊息放入記憶體緩沖器主要分三步,如下圖所示:

我們重點分析了getOrCreateDeque()方法,它主要創建了如下資料結構,如下所示:

這一節我們繼續向下分析,看看如何通過BufferPool申請記憶體空間NIO的多塊記憶體ByteBuffer的,
BufferPool的創建
記憶體緩沖區,分配記憶體的邏輯代碼主要如下所示:
private final BufferPool free;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//getOrCreateDeque()相關邏輯 省略...
//free.allocate()相關邏輯
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
//tryAppend相關邏輯 省略...
}
可以看到這個邏輯非常簡單,只是計算了一個空間大小,之后根據free.allocate()創建記憶體空間ByteBuffer,
熟悉NIO的同學,一定知道ByteBuffer這個組件,是NIO核心3大組件之一,它是一塊記憶體,這里通過一個記憶體池來維護多塊ByteBuffer,這樣的好處就是避免創建的記憶體空間,頻繁的被GC,而且可以達到很好的重用性,這一點是不錯的思考,而且由于 Kafka底層使用NIO進行通信,使用ByteBuffer存放的資料,可以更好、更簡單的被發送出去,
好了回到正題,這個ByteBuffer可以明顯的看到是被BufferPool的allocate方法創建的,但是在研究allocate方法之前,我們先來看看ByteBuffer是如何創建的,
在之前第二節組件分析時,初步看過BufferPool這個類的結構,可以看到之前初始化RecordAccumulator時候,創建的BufferPool,它的基本核心是一個ReentrantLock和Deque

有了之前初步的了解,現在我們再仔細看下它的創建細節:
public final class BufferPool {
private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
private long availableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
/**
* Create a new buffer pool
*
* @param memory The maximum amount of memory that this buffer pool can allocate
* @param poolableSize The buffer size to cache in the free list rather than deallocating
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
this.free = new ArrayDeque<ByteBuffer>();
this.waiters = new ArrayDeque<Condition>();
this.totalMemory = memory;
this.availableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
}
這個建構式主要脈絡如下:
1)根據入參,設定核心的引數,主要有兩個,long memory, int poolableSize,其余的入參都是時間或者統計相關的,可以先忽略,你可以向上查找建構式傳遞入參的入口,最侄訓找到ConfigDef中默認初始化的值,如下:
memory默認對應的配置buffer.memory=33554432 ,也就是總緩沖區的大小,默認是32MB,poolableSize對應的配置batch.size=16384, 默認是16KB,也就是說訊息可以打包的batch默認一批是16KB,這里要注意如果訊息比較大,這個兩個引數需要適當調整,
2)初始化核心記憶體結構和一把鎖,new ArrayDeque
建構式的邏輯整體如下圖所示:

你可以連蒙帶猜下,free這個佇列,應該是存放記憶體塊ByteBuffer的,由于是ArrayDeque,所以需要ReentrantLock進行并發控制,waiters的Condition佇列暫時不知道是做什么的,可能是執行緒排隊等待獲取記憶體塊使用的,
BufferPool如何申請記憶體
創建好了BufferPool,它是如何通過allocate()申請記憶體的呢?
首先申請記憶體前需要明確申請記憶體的大小size,如下:
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
public interface Records extends Iterable<LogEntry> {
int SIZE_LENGTH = 4;
int OFFSET_LENGTH = 8;
int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
}
size的計算涉及到了幾個值取Max的邏輯,
batchSize就是之前BufferPool使用的引數,默認是16KB,
LOG_OVERHEAD+訊息大小:12+keyBytes.size()+valueBytes.size();
簡單的說意思就是,如果訊息的大小大于默認的batchSize,申請的記憶體以訊息大小為主,否則就是默認batchSize的大小16KB,
PS:batchSize一般根據我們發送的訊息肯定會調整的,如果你訊息大于16KB,之后打包發送的時候是基于batchSize大小的ByteBuffer記憶體塊的,結果由于你的訊息大小超過默認batchSize,每次打包發送其實就是一條訊息,這樣每一條訊息一次網路傳輸,批量打包發送的意義就不大了,
上面的邏輯如下圖所示:

確認了申請記憶體空間的大小后,就會執行如下代碼申請記憶體了:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request
freeUp(size);
this.availableMemory -= size;
lock.unlock();
return ByteBuffer.allocate(size);
} else {
// we are out of memory and will have to block
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory);
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// remove the condition for this thread to let the next thread
// in line start getting memory
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left
// over for them
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// unlock and return the buffer
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
這個方法比較長,但是邏輯比較清晰,整體分為一個大的if-else 主要脈絡如下:
1)最外層的if主要邏輯是:如果free佇列存在空閑記憶體,直接使用,否則創建一塊大小為size的ByteBuffer,可用記憶體會扣減相應值
2)else主要邏輯是:如果總緩沖區的記憶體32MB都使用完了,執行緒需要通過Condition佇列進行排隊等待,獲取ByteBuffer
整體如下圖所示:

我們分別來看下細節,首先是第一段邏輯:
//如果free佇列存在空閑記憶體,直接使用
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
//創建一塊大小為size的ByteBuffer,可用記憶體會扣減相應值
// we have enough unallocated or pooled memory to immediately
// satisfy the request
freeUp(size);
this.availableMemory -= size;
lock.unlock();
return ByteBuffer.allocate(size);
}
這塊邏輯很簡單,獲取ByteBuffer的方式不是從free佇列就是新創建,
但是這里有一個問題,free佇列什么時候有值的?
其實可以猜到,當從緩沖區發送出去訊息后,會清空ByteBuffer,之后就會空閑這塊記憶體,自然也就會加入free這個佇列中了,你可以搜索下這個free佇列的參考自己大體看下,之后分析如何發送緩沖器中的訊息時會帶大家看到的,

剩下的第二段邏輯是總記憶體不夠用的時候執行緒排隊等待,之后喚醒的邏輯,這塊邏輯考慮很多特殊邏輯,看上去比較復雜,
// we are out of memory and will have to block
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory);
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// remove the condition for this thread to let the next thread
// in line start getting memory
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left
// over for them
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// unlock and return the buffer
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
但是當你梳理清楚后,發現其實本質就是Condition的await和signal而已,而且這里有一個最大的等待超時時間,超時后會拋出例外,具體就不一步一步帶大家分析了,我們肯定是盡量避免這種情況的,大體邏輯總結如下圖:

Condition這個waiter佇列如何被喚醒的呢?其實和free記憶體增加是一樣的,當發送訊息之后,記憶體使用完成,有可用記憶體之后,自然會被喚醒,之后分析如何發送緩沖器中的訊息時會帶大家看到的,如下所示:

小結
好了, 到這里,記憶體緩沖器RecordAccumulator通過BufferPool申請記憶體的原始碼原理基本就分析完了,你主要知道了:
BufferPool的創建多塊記憶體ByteBuffer的原因
兩個核心的引數batchSize=16kb,bufferMemory=32MB
核心資料結構Deque
每一塊ByteBuffer的大小計算邏輯
如何申請和重用記憶體ByteBuffer的邏輯
下一節我們繼續來分析發送訊息的記憶體緩沖器原理—tryAppend的邏輯,之后如何打包訊息,并將打包好的訊息發送出去的,訊息的最終序列化格式和NIO的拆包粘包問題,大家敬請期待!
本文由博客群發一文多發等運營工具平臺 OpenWrite 發布
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/308228.html
標籤:其他
上一篇:windos10環境下編譯python3版pjsua庫
下一篇:實作動態封禁 IP,干死爬蟲!!
