文章目錄
- 認識RPC
- RPC框架的設計
- protobuf 作用
- zookeeper
- 從框架的使用來認識
- 其他服務器是怎樣呼叫的呢
- 樁類是干嘛的
- 總結一下RPC流程
- 參考文獻
郵箱:2107810343@qq.com
時間:2021/4/18 22:55
開發環境:Ubuntu VS Code
編譯器:g++
編程語言:C++
框架原始碼下載: GitHub
認識RPC
RPC的全稱是遠程程序呼叫(Remote Procedure Call),
什么是遠程程序呼叫呢?
那么對于一個聊天系統有int send_information(int friend_id,string msg)這個方法,我們的一個處理邏輯是不是這樣:
- 呼叫
bool is_exist(int friend_id)判斷用戶是否在線 - 根據結果在決議是發送在線訊息還是離線訊息,
那么對于一個繼承了登錄和聊天功能的系統,我們在本地呼叫一個函式,就直接回傳值=函式名(引數1,引數2),就直接執行了這個程序并且得到了回傳值,
但是如果考慮高并發、高可用以及系統擴展性的話,那我們不得不引入分布式的設計,這意味這,登錄和聊天會部署在不同的機器上!那么要完成上面的邏輯,就不得不依靠網路,將要呼叫的函式名以及引數通過網路打包發送給另一個機器,然后由另一臺機器將結果發過來,
而我們要做的RPC框架就是為使這個程序更好用而設計的,

RPC框架的設計
博文的標題是基于 protobuf 和 zookeeper 的RPC框架,那么protobuf 和 zookeeper又在整個框架啊中扮演怎樣的角色呢?
protobuf 作用
protobuf 主要是作為整個框架的傳輸協議,我們可以看一下整個框架對于傳輸資訊的格式定義:
message RpcHeader
{
bytes service_name = 1; //類名
bytes method_name = 2; //方法名
uint32 args_size = 3; //引數大小
}
可以看到,它定義了要呼叫方法是屬于哪個類的哪個方法以及這個方法所需要的的引數大小,
那么引數呢?是怎樣定義的?
首先我們來看一下我們框架內傳輸的資料是什么:
4位元組標識頭部長度+RpcHeader+args
/*
* 16表示整個類名+方法名+引數長度的大小
有個這個長度,我們就可以從整個長度中截取UserServiceLogin15這一段
再根據RpcHeader來反序列話得出類名和方法名以及引數長度三個重要資料
* 15表示后面的引數長度
由于我們找到了類名和方法名,我們就可以在整個框架存盤這些資訊的地方得到一個對于這個方法的描述,
然后借用protobuf的service物件提供的一個介面GetResponsePrototype,并且根據這個方法的描述來反序列化出引數,這個都是根據我們注冊的方法的描述來做的,
*/
18UserServiceLogin15zhang san123456
注:
如果還是有點迷糊,可以保留引數解釋資訊,看到后面就大致懂了
zookeeper
zookeeper 呢在這里其實主要就是起到了一個配置中心的目的,
什么意思呢?
zookeeper上面我們標識了每個類的方法所對應的分布式節點地址,當我們其他服務器想要RPC的時候,就去 zookeeper 上面查詢對應要呼叫的服務在哪個節點上,

注意:
這里就相當于,我其他服務器來 zookeeper 查詢User::is_exist,然后會得到127.0.0.1:8001 這個值,這個值就是你布置這個功能的一個RPC節點的網路識別符號,然后向這個節點去發送引數并且等待這個節點的相應,
從框架的使用來認識
框架的使用一般都是在example目錄下的 callee/UserService.cpp 里面
RpcApplication::init(argc, argv);
//框架服務提供provider
RpcProvider provide;
provide.notify_service(new UserService());
provide.run();
可以看到,主要去做了三個事情:
- 首先 RPC 框架肯定是部署到一臺服務器上的,所以我們需要對這個服務器的 ip 和 port 進行初始化
- 然后創建一個 porvider(也就是server)物件,將當前 UserService 這個物件傳遞給他,也就是其實這個 RPC 框架和我們執行具體業務的節點是在同一個服務器上的,RPC框架負責決議其他服務器傳遞過來的請求,然后將這些引數傳遞給本地的方法,并將回傳值回傳給其他服務器,
- 最后是去讓這個 provider 去 run 起來,具體我們其實可以看一下源代碼:
//啟動rpc服務節點,開始提供rpc遠程網路呼叫服務
void RpcProvider::run()
{
//獲取ip和port
string ip = RpcApplication::get_instance().get_configure().find_load("rpcserver_ip");
uint16_t port = atoi(RpcApplication::get_instance().get_configure().find_load("rpcserver_port").c_str());
//cout << ip << ":" << port << endl;
InetAddress address(ip, port);
//創建tcpserver物件
TcpServer server(&eventloop_, address, "RpcProvider");
//系結鏈接回呼和訊息讀寫回呼方法
server.setConnectionCallback(bind(&RpcProvider::on_connection, this, _1));
server.setMessageCallback(bind(&RpcProvider::on_message, this, _1, _2, _3));
//設定muduo庫的執行緒數量
server.setThreadNum(4);
//把當前rpc節點上要發布的服務全部注冊到zk上面,讓rpc client可以從zk上發現服務
ZookeeperClient zk_client;
zk_client.start();
//在配置中心中創建節點
for (auto &sp : service_map_)
{
string service_path = "/" + sp.first;
zk_client.create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.method_map_)
{
string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
//ZOO_EPHEMERAL 表示znode時候臨時性節點
zk_client.create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}
RPC_LOG_INFO("server RpcProvider [ip: %s][port: %d]", ip.c_str(), port);
//啟動網路服務
server.start();
eventloop_.loop();
}
可以看到,整個 run 其實就是干了這么幾件事情:
- 因為底層呼叫的是muduo網路庫,所以這里會獲取ip地址和埠號,然后初始化網路層
- 然后去設定一個連接回呼以及發生讀寫事件時候的回呼函式(稍后介紹)
- 然后設定整個 muduo 網路庫作業的執行緒數量
- 然后創建zookeeper配置中心,將這些方法的資訊以及本機的IP地址注冊到zookeeper
- 然后開啟本機服務器的事件回圈,等待其他服務器的連接
其他服務器是怎樣呼叫的呢
我們看一下 example 目錄下的 caller/CallUserService.cpp 里面是怎樣呼叫的,
//初始化ip地址和埠號
RpcApplication::init(argc, argv);
//演示呼叫遠程發布的rpc方法login
ik::UserServiceRpc_Stub stub(new RpcChannel());
//請求引數和回應
ik::LoginRequest request;
request.set_name("zhang san");
request.set_password("123456");
ik::LoginResponse response;
RpcControl control;
//發起rpc呼叫,等待回應回傳
stub.Login(&control, &request, &response, nullptr);
//rpc呼叫完成,讀呼叫的結果
if (response.errmsg().error() == 0)
{
//沒錯誤
cout << "rpc login response: " << response.success() << endl;
}
else
{
//有錯誤
cout << "rpc login response errer: " << response.errmsg().error_msg() << endl;
}
同樣,也是有以下幾個步驟:
- 初始化 RPC 遠程呼叫要連接的服務器
- 定義一個 UserSeervice 的 stub 樁類,由這個裝類去呼叫Login方法,這個login方法可以去看一下原始碼的定義:
//重寫基類UserServiceRpc的虛函式,供框架呼叫
void Login(::google::protobuf::RpcController *controller,
const ::ik::LoginRequest *request,
::ik::LoginResponse *response,
::google::protobuf::Closure *done)
{
//框架給業務上報了請求引數 request,業務獲取相應資料做本地業務
string name = request->name();
string password = request->password();
//本地業務
bool login_result = Login(name, password);
//把回應給呼叫方回傳
ik::ErrorMsg *errmsg = response->mutable_errmsg();
errmsg->set_error(0);
errmsg->set_error_msg("");
response->set_success(login_result);
//執行回呼操作
done->Run();
}
可以看到,Login的 RPC 多載函式有四個引數:controller(表示函式是否出錯)、request(引數)、response(回傳值)、done(回呼函式)
其主要做的也是去圍繞著決議引數,將引數放入本地呼叫的方法,將結果回傳并執行回呼函式,至于這個回呼函式則是在服務端執行讀寫事件回呼函式系結的,
系結的是如下方法:
void RpcProvider::send_rpc_response(const TcpConnectionPtr &conn, google::protobuf::Message *response)
{
string response_str;
//序列化
if (response->SerializeToString(&response_str))
{
//發送序列化的資料
conn->send(response_str);
}
else
{
//序列化失敗
RPC_LOG_ERROR("serialize reponse error");
}
//短鏈接
conn->shutdown();
}
它會將由bind函式系結的引數:response_str 發送回去,然后呼叫方的服務器就會收到這個回傳值并決議它,
樁類是干嘛的
那么其實作在來說,我們并沒有看到呼叫方是何時發送了要呼叫方法以及相應引數?
我們還是需要去回傳樁類,這個是由 protobuf 自動去幫你生成的,
class UserServiceRpc_Stub : public UserServiceRpc {
public:
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel);
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel,
::google::protobuf::Service::ChannelOwnership ownership);
~UserServiceRpc_Stub();
inline ::google::protobuf::RpcChannel* channel() { return channel_; }
// implements UserServiceRpc ------------------------------------------
void Login(::google::protobuf::RpcController* controller,
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done);
void Register(::google::protobuf::RpcController* controller,
const ::ik::RegisterRequest* request,
::ik::RegisterResponse* response,
::google::protobuf::Closure* done);
private:
::google::protobuf::RpcChannel* channel_;
bool owns_channel_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(UserServiceRpc_Stub);
};
我們在定義樁類的時候,會傳入一個RpcCannel的指標,這個系結到這個樁類的channel_指標,
當我們去呼叫這個樁類的Login方法的時候,會去呼叫傳遞進來的channel的CallMethod方法:
void UserServiceRpc_Stub::Login(::google::protobuf::RpcController* controller,
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
所以,顯而易見,我去發送這些方法啊,引數啊,都是在CallMethod這個方法中執行的,
那么CallMethod里面執行的內容對我們理解RPC呼叫體系至關重要(代碼比較長,可以直接跳過聽結論):
void RpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)
{
const google::protobuf::ServiceDescriptor *service_des = method->service();
string service_name = service_des->name();
string method_name = method->name();
//獲取引數的序列化字串長度 args_size
int args_size = 0;
string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
//定義rpc的請求header
ikrpc::RpcHeader rpc_header;
rpc_header.set_service_name(service_name);
rpc_header.set_method_name(method_name);
rpc_header.set_args_size(args_size);
uint32_t header_size = 0;
string rpc_header_str;
if (rpc_header.SerializeToString(&rpc_header_str))
{
//序列化成功
header_size = rpc_header_str.size();
}
else
{
//序列化失敗
controller->SetFailed("serialize rpc_header error!");
return;
}
//組織待發送的rpc請求的字串
string send_rpc_str;
send_rpc_str.insert(0, string((char *)&header_size, 4)); //header_size
send_rpc_str += rpc_header_str; //rpc_header
send_rpc_str += args_str; //args_str
//使用tcp編程,完成rpc方法的遠程呼叫
int client_fd = socket(AF_INET, SOCK_STREAM, 0);
if (client_fd == -1)
{
close(client_fd);
RPC_LOG_FATAL("create socket error! errno:%d", errno);
}
//獲取ip和port
ZookeeperClient zk_client;
zk_client.start();
string method_path = "/" + service_name + "/" + method_name;
string host_data = zk_client.get_data(method_path.c_str());
if(host_data == "")
{
controller->SetFailed(method_path+" is not exist");
return;
}
int host_index = host_data.find(":");
if(host_index == -1)
{
controller->SetFailed(method_path+" address is invalid!");
return;
}
string ip = host_data.substr(0,host_index);
uint16_t port = atoi(host_data.substr(host_index+1,host_data.size()-host_index).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
{
close(client_fd);
RPC_LOG_FATAL("connet error! errno: %d", errno);
}
// 發送rpc請求
if (send(client_fd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1)
{
controller->SetFailed("send error! errno: " + errno);
close(client_fd);
return;
}
//接受rpc請求
char recv_buffer[BUFF_SIZE] = {0};
ssize_t recv_size = recv(client_fd, recv_buffer, BUFF_SIZE, 0);
if (recv_size == -1)
{
controller->SetFailed("recv error! errno: " + errno);
close(client_fd);
return;
}
//反序列化回應資料
//String 因為遇到\0會認為是字串結束,所以用Array
if (!response->ParseFromArray(recv_buffer, recv_size))
{
string recv = recv_buffer;
controller->SetFailed("parse error! response_str: " + recv);
close(client_fd);
return;
}
close(client_fd);
}
- 組織要發送的 request_str 字串
- 從zookeeper中拿到服務端的 ip 和 port,連接服務端
- 發送 request_str
- 接受服務端回傳過來的 response 字串并反序列化出結果
總結一下RPC流程

參考文獻
[1] 施磊.分布式網路通信RPC框架.騰訊課堂
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/278051.html
標籤:其他
上一篇:分布式全文搜索引擎ES詳解《Java-2021面試談資系列》
下一篇:搭建簡單的加密的web網站
