
之前我們分析了Producer的配置決議、組件分析、拉取元資料、訊息的初步序列化方式、訊息的路由策略,如下圖:

這一節我們繼續分析發送訊息的記憶體緩沖器原理—RecordAccumulator.append(),
如何將訊息放入記憶體緩沖器的?
在doSend中的,拉取元資料、訊息的初步序列化方式、訊息的路由策略之后就是accumulator.append(),
如下代碼所示:(去除了多余的日志和例外處理,截取了核心代碼)
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//拉取元資料、訊息的初步序列化方式、訊息的路由策略
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
byte[] serializedKey = keySerializer.serialize(record.topic(), record.key());
byte[] serializedValue = https://www.cnblogs.com/fanmao/archive/2021/10/07/valueSerializer.serialize(record.topic(), record.value());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
Callback interceptCallback = this.interceptors == null ?
callback : new InterceptorCallback<>(callback, this.interceptors, tp);
// 將路由結果、初步序列化的訊息放入到訊息記憶體緩沖器中
RecordAccumulator.RecordAppendResult result =
accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
} catch (Exception e) {
throw e;
}
//省略其他各種例外捕獲
}
accumulator.append() 它主要是將路由結果、初步序列化的訊息放入到訊息記憶體緩沖器中,
分析如何將訊息放入記憶體緩沖器之前,需要回顧下它內部的基本結構,之前組件分析的時候,我們初步分析過RecordAccumulator的大體結構,如下圖:

1)設定了一些引數 batchSize、totalSize、retryBackoffMs、lingerMs、compression等
2)初始化了一些資料結構,比如batches是一個 new CopyOnWriteMap<>()
3)初始化了BufferPool和IncompleteRecordBatches
回顧了RecordAccumulator這個組件之后,我們就來看看到底如何將訊息放入記憶體緩沖器的資料結構中的,
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
// check if we have an in-progress batch
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
整個方法的脈絡,看著邏輯比較多,涉及了很多資料結構,我們一步一步來分析下,第一次看的話,大體你可以梳理如下脈絡:
1)getOrCreateDeque 這個方法應該是才創建一個雙端佇列,佇列放的每一個元素不是單條訊息Record,而是訊息的集合RecordBatch,
2)free.allocate 應該是在分配記憶體緩沖器中的記憶體
3)tryAppend 應該是將訊息放入記憶體中

創建存放訊息集合的佇列
在將訊息放入記憶體緩沖器之前,首先通過getOrCreateDeque 創建的是一個存放訊息集合的佇列,代碼如下:
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
public RecordAccumulator(int batchSize,
long totalSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
Metrics metrics,
Time time) {
//省略...
this.batches = new CopyOnWriteMap<>();
//省略...
}
/**
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
Deque<RecordBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
這個創建的記憶體結構可以看到,是一個變數 batches,它是一個CopyOnWriteMap,這個資料結構之前我們組件圖初步分析過,再結合這段代碼,不難理解它的脈絡:
這個map主要根據Topic磁區資訊作為key,value是一個佇列核心資料結構是RecordBatch,由于是第一次給某個topic磁區發送的訊息,value為空,需要初始化佇列,否則說明曾經給這個topic的磁區發送給資料,value非空,直接回傳之前的佇列,
由于我們這里是第一次向test-topic發送訊息,所以可以得到下圖的資料結構:

之后執行了一段加鎖邏輯,之前提到,tryAppend應該是將訊息放入記憶體中,但是由于佇列是剛創建的,deque.peekLast();肯定是空,所以這段加鎖的代碼不會執行,
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque){
RecordBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
last.records.close();
else
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}
但是到這里你會發現代碼一個明顯的特點,使用了synchronized加鎖和執行緒安全的記憶體結構CopyOnWriteMap,這些都是明顯執行緒安全的控制,
為什么呢?因為同一個Producer可以使用多執行緒進行發送訊息,必然要考慮執行緒安全的很多東西,
為什么選用CopyOnWriteMap,而不用ConcurrentHashMap呢?你可以思考下,(這里給個提示,JDK成長記提到過,CopyOnWriteMap它的底層是寫時復制,適合讀多寫少的場景)
synchronized加鎖代碼塊使用了,分段加鎖,并沒有暴力的在方法上加synchronized,這也是一個使用亮點,
寫在結尾的話
到這里,你會發現在中間件會大量的見到并發包下的組件的使用,作業中你用到可能都是鳳毛麟角,這些組件的使用是我們研究中間件原始碼值得學習的一點,
你一定要多思考為什么,不要停留在是什么,怎么用上,這個思想需要刻意訓練,希望你可以慢慢養成,
好了,今天的內容就到這里,之前有同學反饋,每一節的只是太過于干了,實實在在的干貨!看起來有時候會比較費勁,所以之后的章節盡量會避免上萬字的大章節,會控制在6000字左右,
另外,除了成長記外,我偶爾也會分享我自己的故事和行業中遇見的事情,希望大家從我的經歷中可以有另一番成長和識訓,比如我是如何學習和提升技術的?我是如何畫圖的?我如何做技術分享的等等,
本文由博客群發一文多發等運營工具平臺 OpenWrite 發布
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/306132.html
標籤:其他
上一篇:Go語言之回圈與條件判斷
下一篇:Go語言之陣列與切片基礎
