Kafka服務端原始碼之網路
網路傳輸關鍵類及流程圖:

上圖可以看出有如下幾個關鍵執行緒和關鍵類:
- Acceptor執行緒:接受并配置新連接的執行緒, 每個端點中有一個,
- **Processor執行緒:**處理來自單個連接的所有請求的執行緒, 其中有N個并行運行,每個都有自己的選擇器
- **RequestChannel類:**處理請求,并將請求快取在該類的requestQueue中,發送回應等功能
- KafkaRequestHandlerPool類: 執行緒處理池,有多個handler處理訊息
- **KafkaRequestHandler類:**真正處理訊息的類
Kafka網路之初始化
? kafka啟動程序主要是各個執行緒的準備作業,包括Accecptor執行緒啟動成功并監聽所有連接該服務器的生產者,
? 各個Processor啟動輪詢SocketChannel的訊息;
? RequestHandlerPool初始化,并開啟各個RequestHanler輪詢RequestChannel中的RequestQueue中是否有訊息可以消費
? 啟動Acceptor和Processor的入口類為SocketServer,啟動的方法是startup():這里粘貼主要代碼
/**
* 啟動套接字服務器, 啟動所有偵聽器的接受器,
* 如果startupProcessors為true,則啟動處理器, 如果不是,則僅在呼叫kafka.network.SocketServer.startDataPlaneProcessors()
* 或kafka.network.SocketServer.startControlPlaneProcessor()時啟動處理器,
* 處理器的延遲啟動用于延遲對客戶端連接的處理,直到服務器完全初始化為止,
* 例如,以確保在執行身份驗證之前已加載所有憑據,
* 接收器始終在啟動程序中啟動,因此即使使用臨時埠,此方法完成后,系結埠也將是已知的,
* 當處理器啟動并呼叫org.apache.kafka.common.network.Selector.poll時,將處理此服務器上的傳入連接,
*/
def startup(startupProcessors: Boolean = true): Unit = {
this.synchronized {
//連接的一些配額資訊
connectionQuotas = new ConnectionQuotas(config, time)
/**創建控制Acceptor以及Processor
-control-plane :
- 處理來自控制器的請求, 這是可選的,可以通過指定“ control.plane.listener.name”進行配置, 如果未配置,則控制器請求由資料平面處理,
- 執行緒模型是處理新連接的1個Acceptor執行緒,Acceptor具有1個處理器執行緒,該執行緒具有自己的選擇器并從套接字讀取請求, 1處理程式執行緒,用于處理請求并向處理器執行緒回傳回應以進行寫入,
*/
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
/**
-data-plane:
- 處理來自客戶端和集群中其他代理的請求,
- 執行緒模型是每個偵聽器1個Acceptor執行緒,用于處理新連接, 通過在KafkaConfig中為“偵聽器”指定多個“,”分隔的端點,可以配置多個資料平面, 接受者有N個處理器執行緒,每個執行緒都有自己的選擇器并從套接字讀取請求,M個處理程式執行緒處理請求并向處理器執行緒回傳回應以進行寫入,
*/
//Acceptor會在該方法中啟動
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
if (startupProcessors) {
//啟動Processor
startControlPlaneProcessor(Map.empty)
startDataPlaneProcessors(Map.empty)
}
}
}
//createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads,config.dataPlaneListeners)
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
endpoints.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
//創建Acceptor
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)\
//向Acceptor中添加Processor
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
//啟動Acceptor執行緒
KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
dataPlaneAcceptor.awaitStartup()
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
}
}
//addDataPlaneProcessors
private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
val listenerName: ListenerName = endpoint.listenerName
val securityProtocol: SecurityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
for (_ <- 0 until newProcessorsPerListener) {
//創建Processor執行緒
val processor: Processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
listenerProcessors += processor
//向requestChannel中添加Processor
dataPlaneRequestChannel.addProcessor(processor)
nextProcessorId += 1
}
listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
}
初始化KafkaRequstHanlerPool的入口類為KafkaServer,入口方法為startup():關鍵代碼如下
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
//在初始化KafkaRequestHanlerPool時,會根據config.numIoThreads的大小來啟動回應數量的KafakRequstHandler,
for (i <- 0 until numThreads) {
createHandler(i)
}
def createHandler(id: Int): Unit = synchronized {
//其中的apis是處理訊息的類
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
Kafka網路-接受請求之Acceptor
Acceptor簡要類圖如下:
? 
上圖的主要的方法就是run()方法,
/**
* 回圈檢查是否有新的連接,如果有,則將連接分配給processor
*/
def run(): Unit = {
//向serverChannel注冊OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
//記錄執行緒啟動成功
startupComplete()
try {
//當前的processor號,由于存在多個processor,這里使用輪詢的方法確認可用的processor
var currentProcessorIndex = 0
while (isRunning) {
try {
//查詢Selector中是否有事件進入
val ready: Int = nioSelector.select(500)
if (ready > 0) {
val keys: util.Set[SelectionKey] = nioSelector.selectedKeys()
val iter: util.Iterator[SelectionKey] = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key: SelectionKey = iter.next
iter.remove()
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
// 將通道分配給可以添加通道而不會阻塞的下一個處理器(使用回圈),
// 如果所有處理器上的newConnections佇列都已滿,
// 阻塞直到最后一個處理器能夠接受連接,
var retriesLeft: Int = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
processor = synchronized {
//調整索引(如有必要),并以原子方式檢索處理器以糾正行為,以防動態減少處理器數量
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
//processor的索引值+1
currentProcessorIndex += 1
//將連接分配給Processor,
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else {
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
}
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
綜上可以看出Accptor只是分發socketChannel的一個作用,不進行具體的資料處理,
Kafka網路-接受請求之Processor
Processor類圖如下:

上圖顯示了Processor中的public方法,這里主要介紹run()方法,
/**processor中的run方法,可以看出這里的方法十分的簡單明了,值得借鑒*/
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
//注冊已排隊的任何新連接, 限制每次迭代中處理的連接數,以確保及時處理現有通道的流量和連接關閉通知,這里設定了默認值為20,所有的連接注冊為OP_READ事件
configureNewConnections()
// 這里是處理回應的代碼
processNewResponses()
//對每個連接處理執行任何I/O操作,即這里完成了資料傳輸程序,完成此呼叫后,用戶可以使用completedSends() , completedReceives() , connected() , disconnected()檢查發送,接收,連接或斷開連接的completedSends()
poll()
//處理已經準備好的請求
processCompletedReceives()
//處理已經處理好的回應
processCompletedSends()
//處理已經丟失的連接
processDisconnected()
//關閉多余的連接
closeExcessConnections()
} catch {
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
private def configureNewConnections(): Unit = {
var connectionsProcessed = 0
//處理newConnections中的連接
while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
selector.register(connectionId(channel.socket), channel)
connectionsProcessed += 1
} catch {
// We explicitly catch all exceptions and close the socket to avoid a socket leak.
case e: Throwable =>
val remoteAddress = channel.socket.getRemoteSocketAddress
// need to close the channel here to avoid a socket leak.
close(listenerName, channel)
processException(s"Processor $id closed connection from $remoteAddress", e)
}
}
}
private def processCompletedReceives(): Unit = {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) => {
val header = RequestHeader.parse(receive.payload)
if (header.apiKey() == ApiKeys.SASL_HANDSHAKE && channel
.maybeBeginServerReauthentication(receive, nowNanosSupplier)) {
trace(s"Begin re-authentication: $channel")
} else {
val nowNanos = time.nanoseconds()
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
// be sure to decrease connection count and drop any in-flight responses
debug(s"Disconnecting expired channel: $channel : $header")
close(channel.id)
expiredConnectionsKilledCount.record(null, 1, 0)
} else {
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
//決議出Requst
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
//將請求添加到RequestChannel中的RequestQueue佇列中
requestChannel.sendRequest(req)
//關閉注冊的OP_READ事件
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
}
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
{
throw new IllegalStateException(
s"Channel ${receive.source} removed from selector before processing completed receive")
}
}
} catch {
// note that even though we got an exception, we can assume that receive.source is valid.
// Issues with constructing a valid receive object were handled earlier
case e: Throwable =>
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
}
}
}
Kafka網路-接受請求之KafkaRequestHandler
KafkaRequestHandler類圖如下:

這里的run方法如下:
def run(): Unit = {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return
//接受到請求后,在這里進行處理
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
//發送給KafkaApis進行處理
apis.handle(request)
} catch {
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/271285.html
標籤:其他
上一篇:Hadoop單點安裝詳細教程
下一篇:藍橋杯集錦04(python3)
