RPC(Remote Procedure Call) 是 Hadoop 服務通信的關鍵庫,支撐上層分布式環境下復雜的行程間(Inter-Process Communication, IPC)通信邏輯,是分布式系統的基礎,允許運行于一臺計算機上的程式像呼叫本地方法一樣,呼叫另一臺計算機的子程式,
由于 RPC 服務整體知識較多,本節僅針對對 Yarn RPC 進行簡略介紹,詳細內容會后續開專欄介紹,
一、RPC 通信模型介紹
為什么會有 RPC 框架?
在分布式或微服務情境下,會有大量的服務間互動,如果用傳統的 HTTP 協議埠來通信,需要耗費大量時間處理網路資料交換上,還要考慮編解碼等問題,如下圖所示,
- 客戶端通過 RPC 框架的動態代理得到一個代理類實體,稱為 Stub(樁)
- 客戶端呼叫介面方法(實際是 Stub 對應的方法),Stub 會構造一個請求,包括函式名和引數
- 服務端收到這個請求后,先將服務名(函式)決議出來,查找是否有對應的服務提供者
- 服務端找到對應的實作類后,會傳入引數呼叫
- 服務端 RPC 框架得到回傳結果后,再進行封裝回傳給客戶端
- 客戶端的 Stub 收到回傳值后,進行決議,回傳給呼叫者,完成 RPC 呼叫,
二、Hadoop RPC 介紹
一)簡介
Hadoop RPC 是 Hadoop 自己實作的一個 RPC 框架,主要有以下幾個特點:
- 透明性:像呼叫本地方法一樣呼叫遠程方法,
- 高性能:Hadoop 各個系統均采用 Master/Slave 結構,Master 是一個 RPC Server 用于處理各個 Slave 節點發送的請求,需要有高性能,
- 可控性:由于 JDK 中的 RPC 框架 RMI 重量級過大,且封裝度太高,不方便控制和修改,因此實作了自己的 RPC 框架,以保證輕量級、高性能、可控性,
框架原理和整體執行流程與第一節介紹的 RPC 框架一致,感興趣可深入原始碼進行了解,
二)總體架構
Hadoop RPC 架構底層依靠 Java 的 nio、反射、動態代理等功能實作「客戶端 - 服務器(C/S)」通信模型,
上層封裝供程式呼叫的 RPC 介面,
三、案例 demo
下面兩個案例的 demo 已上傳至 github,有幫助的話點個??,
https://github.com/Simon-Ace/hadoop_rpc_demo
一)RPC Writable 案例實作
1、新建一個 maven 工程,添加依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
2、定義 RPC 協議
public interface BusinessProtocol {
void mkdir(String path);
String getName(String name);
long versionID = 345043000L;
}
3、定義協議實作
public class BusinessIMPL implements BusinessProtocol {
@Override
public void mkdir(String path) {
System.out.println("成功創建了檔案夾 :" + path);
}
@Override
public String getName(String name) {
System.out.println("成功打了招呼: hello :" + name);
return "bigdata";
}
}
4、通過 Hadoop RPC 構建一個 RPC 服務端
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
public class MyServer {
public static void main(String[] args) {
try {
// 構建一個 RPC server 端,提供了一個 BussinessProtocol 協議的 BusinessIMPL 服務實作
RPC.Server server = new RPC.Builder(new Configuration())
.setProtocol(BusinessProtocol.class)
.setInstance(new BusinessIMPL())
.setBindAddress("localhost")
.setPort(6789)
.build();
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
5、構建一個 RPC 客戶端
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.net.InetSocketAddress;
public class MyClient {
public static void main(String[] args) {
try {
// 獲取代理類實體,也就是 Stub
BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,
new InetSocketAddress("localhost", 6789), new Configuration());
// 通過 Stub 發送請求,實際使用就像呼叫本地方法一樣
proxy.mkdir("/tmp/ABC");
String res = proxy.getName("Simon");
System.out.println("從 RPC 服務端接收到的回傳值:" + res);
} catch (IOException e) {
e.printStackTrace();
}
}
}
6、測驗,先啟動服務端,再啟動客戶端
服務端輸出
成功創建了檔案夾 :/tmp/ABC
成功打了招呼: hello :Simon
客戶端輸出
從 RPC 服務端接收到的回傳值:bigdata
二)RPC Protobuf 案例實作
專案結構如下
對 proto 檔案格式不熟悉的同學,參考上一篇文章《2-1 Yarn 基礎庫概述》
MyResourceTrackerMessage.proto 定義資料格式
syntax = "proto3";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerMessageProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
message MyRegisterNodeManagerRequestProto {
string hostname = 1;
int32 cpu = 2;
int32 memory = 3;
}
message MyRegisterNodeManagerResponseProto {
string flag = 1;
}
MyResourceTracker.proto 定義 rpc 介面
syntax = "proto3";
import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
service MyResourceTrackerService {
rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}
2、對 proto 檔案編譯,生成 java 類
# 在專案根目錄執行,路徑按照自己的進行修改
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto
3、定義呼叫方法介面 MyResourceTracker
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;
public interface MyResourceTracker {
MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}
4、對呼叫方法介面的實作(服務端)
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
public class MyResourceTrackerImpl implements MyResourceTracker {
@Override
public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {
// 輸出注冊的訊息
String hostname = request.getHostname();
int cpu = request.getCpu();
int memory = request.getMemory();
System.out.println("NodeManager 的注冊訊息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);
// 省略處理邏輯
// 構建一個回應物件,用于回傳
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();
// 直接回傳 True
builder.setFlag("true");
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();
return response;
}
}
5、撰寫 proto 的協議介面
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import org.apache.hadoop.ipc.ProtocolInfo;
@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {
}
6、撰寫 proto 的協議介面實作(服務端)
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {
final private MyResourceTracker server;
public MyResourceTrackerServerSidePB(MyResourceTracker server) {
this.server = server;
}
@Override
public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {
try {
return server.registerNodeManager(request);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
7、RPC Server 的實作
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import java.io.IOException;
public class ProtobufRpcServer {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);
// 構建 Rpc Server
RPC.Server server = new RPC.Builder(conf)
.setProtocol(MyResourceTrackerPB.class)
.setInstance(MyResourceTrackerProto.MyResourceTrackerService
.newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))
.setBindAddress("localhost")
.setPort(9998)
.setNumHandlers(1)
.setVerbose(true)
.build();
// Rpc Server 啟動
server.start();
}
}
8、RPC Client 的實作
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import java.io.IOException;
import java.net.InetSocketAddress;
public class ProtobufRpcClient {
public static void main(String[] args) throws IOException {
// 設定 RPC 引擎為 ProtobufRpcEngine
Configuration conf = new Configuration();
String hostname = "localhost";
int port = 9998;
RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);
// 獲取代理
MyResourceTrackerPB protocolProxy = RPC
.getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);
// 構建請求物件
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =
builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();
// 發送 RPC 請求,獲取回應
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;
try {
response = protocolProxy.registerNodeManager(null, bigdata02);
} catch (ServiceException e) {
e.printStackTrace();
}
// 處理回應
String flag = response.getFlag();
System.out.println("最終注冊結果: flag = " + flag);
}
}
9、測驗
先啟動服務端,在啟動客戶端,
四、總結
本節介紹了 Hadoop 底層通信庫 RPC,首先介紹了 RPC 的框架和原理,之后對 Hadoop 自己實作的 RPC 進行了介紹,并給出了兩個 demo 實踐,
強烈建議了解基礎知識后,跟著 demo 實作一個案例出來,可以更好的幫助你理解,
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo
參考文章:
YARN-RPC網路通信架構設計
YARN-高并發RPC原始碼實作
Hadoop3.2.1 【 HDFS 】原始碼分析 : RPC原理 [八] Client端實作&原始碼
Hadoop RPC機制詳解
Hadoop2原始碼分析-RPC探索實戰
《Hadoop 技術內幕 - 深入決議 Yarn 結構設計與實作原理》3.3 節
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/530533.html
標籤:其他
上一篇:手寫本地快取實戰2—— 打造正規軍,構建通用本地快取框架
下一篇:Java SE
