注意:建議各位看如下代碼,一定要匯入一份原始碼到自己本地電腦中哦,可以匯入進行除錯,流程代碼我洗掉許多,保留重點,
本章主要介紹的是,worker 開啟,并與master通信流程,以及inbox收件箱outbox發件箱的創建,認真學習你可以識訓許多哦,舉例使用該inbox,outbox處理模式,可以嵌入到聊天程式當中哦,處理高并發請求,
廢話不多說,進入主題,
spark業務操作和資料傳輸相隔里,資料傳輸就交給netty,netty不涉及業務,提高netty服務器的回應時間,提高代碼的復用性,降低耦合性,work類中有許多業務處理邏輯,scale撰寫,資料訊息傳輸netty Java撰寫,
worker 服務開啟通信,需要一個開啟遠程通信環境RpcEnv,創建inbox收件箱outbox發件箱對訊息進行收取轉發,
具體worker創建,與master通信程序如下:

worker開啟流程圖
**開啟worker步驟:**按順序讀
①Worker類: val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)//開啟worker服務入口類
②RpcEnv類:
def create(,,,){
new NettyRpcEnvFactory().create(config)
}
③NettyRpcEnvFactory 類:
def create(config: RpcEnvConfig): RpcEnv = {
val nettyEnv = new NettyRpcEnv(,,,,)
nettyEnv.startServer(config.bindAddress, actualPort)
}
④NettyRpcEnv類:
{
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val transportContext = new TransportContext(transportConf,new NettyRpcHandler(dispatcher, this, streamManager))
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
def startServer(bindAddress: String, port: Int): Unit = {
server = transportContext.createServer(bindAddress, port, bootstraps)//創建netty server 服務,即worker server服務行程
dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))//注冊驗證
}
⑤dispatcher類:
registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)//sharedLoop 延遲裝載,裝載時開啟⑥,不斷輪詢 inbox ,worker開啟呼叫兩次registerRpcEndpoint()方法,如下圖
}

Worker類中
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))//該方法直接向master發送onstart 訊息,

⑥ SharedMessageLoop類:先初始化
override protected val threadpool: ThreadPoolExecutor = {
val numThreads = getNumOfThreads(conf)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, “dispatcher-event-loop”)
for (i <- 0 until numThreads) {
pool.execute(receiveLoopRunnable)
}
pool
}
⑦ MessageLoop類:SharedMessageLoop父類
{
protected val receiveLoopRunnable = new Runnable() {
override def run(): Unit = receiveLoop()
}
private def receiveLoop(): Unit = {
try {
while (true) {
val inbox = active.take()//該方法空阻塞 await
inbox.process(dispatcher)
}
⑧Inbox類:
inbox.synchronized {
messages.add(OnStart)//類初始化時添加OnStart 開啟訊息
}
def process(dispatcher: Dispatcher): Unit = {
message = messages.poll()
message match {
case RpcMessage(_sender, content, context) => endpoint.receiveAndReply(context)
case OneWayMessage(sender, content) => endpoint.receive
case OnStart => endpoint.onStart() //endpoint分為驗證和worker端點,驗證處理空訊息,worker處理onStart
case OnStop => endpoint.onStop()
case RemoteProcessConnected(remoteAddress) =>endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) => endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) => endpoint.onNetworkError(cause, remoteAddress)
}
}
⑨Worker類:
def onStart(): Unit = {
registerWithMaster()
}
def registerWithMaster(): Unit = {
registerMasterFutures = tryRegisterAllMasters()
}
private def tryRegisterAllMasters(): Array[JFuture[]] = {
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)//創建master端點,注意該步還未創建netty遠程客戶端
sendRegisterMessageToMaster(masterEndpoint)
}
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.send(RegisterWorker( ))
}
⑩NettyRpcEndpointRef類:
def send(message: Any): Unit = {
require(message != null, “Message is null”)
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
def send(message: RequestMessage): Unit = {
postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
}
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
newOutbox =targetOutbox.send(message)
}
?OutBox類:
def send(message: OutboxMessage): Unit = {
messages.add(message)
drainOutbox()
}
def drainOutbox(): Unit = {
launchConnectTask()
message = messages.poll()
while (true) {
val _client = synchronized { client }
message.sendWith(_client)
}
private def launchConnectTask(): Unit = {
val _client = nettyEnv.createClient(address) //在這一步進行創建netty client,一步一步到TransportClientFactory類中創建
outbox.synchronized {
client = _client
outbox.synchronized { connectFuture = null }
}
?TransportClientFactory類:
public TransportClient createClient(String remoteHost, int remotePort){
clientPool.clients[clientIndex] = createClient(resolvedAddress);//創建netty client
return clientPool.clients[clientIndex];
}
認真閱讀本文至此,如本文對你有用處,希望點個贊哦,
作者:小亦
好好學習,天天向上
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/251723.html
標籤:其他
上一篇:spring boot整合spark,集群模式或local模式運行,http請求呼叫spark API,啟動job任務配置、優化spark配置等
下一篇:【資料庫】MySQL新手安裝教程
