brpc的使用比較容易上手,以官方demo為例,因為brpc的資料序列化依賴protobuf,所以首先需要定義個proto

然后繼承EchoService并實作Echo方法

然后是整體流程

啟動還是比較簡單的,定義server,AddService,然后Start即可
首先放一張官網的圖,陷入細節前先有個大概印象

然后看下AddService

然后是AddServiceInternal

首先判斷注冊的service中是否有method,如果沒有則直接回傳;然后呼叫InitializeOnce()只進行一次初始化,該函式實際呼叫的是GlobalInitializeOrDieImpl
在GlobalInitializeOrDieImpl內首先忽略掉SIGPIPE,然后初始化SSL,注冊NamingService,LoadBalancer,CompressHandler;然后開始注冊協議,brpc server一個埠支持多種協議,這里的協議都是指的是應用層協議,如baidu_std協議是基于tcp,以注冊baidu_std協議為例

協議Protocol類是一個包含了多個函式指標的結構體,例如:

Parse負責將訊息從source上切割下來,client和server均會使用,
![]()
ProcessRequest處理server端parse回傳的訊息,只有server端會呼叫,
注冊完協議之后,遍歷協議,將含有process_response的協議加入到client端InputMessenger的handler中,
然后回來到AddServiceInternal,接著判斷當前注冊的service是否注冊過,沒注冊的話則注冊service的所有method,
然后在demo中使用默認的ServerOption呼叫server.Start(),首先還是呼叫InitializeOnce,這里會直接回傳,然后中間的邏輯在默認的option不會呼叫,直接到初始化bthread,

然后從min_port開始遍歷到max_port直到找到一個可用的port,然后創建Acceptor,

Acceptor是InputMessenger的子類,主要的作業就是處理指定的port來的訊息,然后看下BuildAcceptor,遍歷所有的協議,如果協議指定了process_request,那么就加到acceptor的handler中

然后呼叫acceptor的StartAccept,核心代碼如下,初始化SocketOptions,設定fd為監聽的埠,user為當前acceptor,on_edge_triggered_events為OnNewConnections,OnNewConnections是一個函式指標,表示當前fd發生事件后的回呼函式

然后呼叫Socket的Create,摘一段官網介紹:
和fd相關的資料均在Socket中,是rpc最復雜的結構之一,這個結構的獨特之處在于用64位的SocketId指代Socket物件以方便在多執行緒環境下使用fd,

這里主要邏輯就是根據SocketOption去初始化Socket,然后ResetFileDescriptor

這里其實還是對socket的初始化作業

因為brpc使用的是epoll的邊緣觸發,所以將fd設定為非阻塞,然后設定socket的send,recv buffer大小,然后將當前fd加入到event_dispatcher,

在GetGlobalEventDispatcher中,會只進行一次初始化dispatcher的作業,會創建FLAGS_event_dispatcher_num個dispatcher,默認為1,建構式中會創建epoll fd,然后呼叫Start

在Start中會啟動一個bthread,執行RunThis,RunThis會執行dispatcher的Run

Run中就是回圈進行epoll_wait,

然后對判斷事件,如果是EPOLLIN事件,那么執行StartInputEvent,如果是EPOLLOUT,則執行HandleEpollOut

到現在為止,dispatcher的Start就執行結束了,然后回到Socket的ResetFileDescriptor,會執行AddConsumer,將socket_id存到epoll_event的data中,然后注冊EPOLLIN事件,到這里,server端初始化程序就完成了,接下來就坐等請求到來,

然后回到epoll_wait,當有新請求到來時,epoll_wait回傳,遍歷每一個事件,執行Socket::StartInputEvent;注冊epoll事件時將socketid注冊在了epoll_data的u64中,所以首先通過SocketId address到Socket,為了保證一個fd上只有一個bthread在處理,這里引入了一個atomic變數_nevent,通過_nevent判斷當前socket是否有bthread正在處理,如果有的話就什么都不做,因為正在處理的執行緒執行完后會執行新事件,如果沒有的話就使用bthread_start_urgent啟動一個bthread執行ProcessEvent來處理新訊息,此時epoll bthread讓出當前worker的處理,worker執行新建的ProcessEvent bthread,而epoll bthread則被steal到另一個worker執行緒執行,這里就是官網中所說,brpc不區分io執行緒和worker執行緒,epoll bthread不負責資料的讀取,IO執行緒的問題在于一個執行緒同時只能讀一個fd,當多個繁忙的fd聚集在一個IO執行緒中時,一些讀取就被延遲了;另外epoll bthread讓出當前worker給ProcessEvent bthread,這樣使其有更好的cache locality,可以盡快地讀取fd上的資料,

ProcessEvent則是呼叫on_edge_triggered_events,在Socket Create的時候將該回呼函式設定為了OnNewConnections,

這里會通過MoreReadEvents回圈判斷當前fd上是否還有新的事件,然后執行OnNewConnectionsUntilEAGAIN,這里判斷是否有新事件的方法就是上文提到的_nevent,


在OnNewConnectionsUntilEAGAIN中,首先通過accept拿到一個已完成的連接,然后從監聽socket中獲取user,即之前的acceptor,然后創建Socket,回呼函式設定為OnNewMessages,接下來的程序和之前創建監聽Socket程序一樣,執行AddConsumer,將socket_id存到epoll_event的data中,注冊EPOLLIN事件,然后這個新連接有資料來了之后會在epoll_wait回傳,執行ProcessEvent,呼叫到OnNewMessages,

再粘一個官網的介紹:
InputMessenger負責從fd上切割和處理訊息,它通過用戶回呼函式理解不同的格式,Parse一般是把訊息從二進制流上切割下來,運行時間較固定;Process則是進一步決議訊息(比如反序列化為protobuf)后呼叫用戶回呼,時間不確定,若一次從某個fd讀取出n個訊息(n > 1),InputMessenger會啟動n-1個bthread分別處理前n-1個訊息,最后一個訊息則會在原地被Process,InputMessenger會逐一嘗試多種協議,由于一個連接上往往只有一種訊息格式,InputMessenger會記錄下上次的選擇,而避免每次都重復嘗試,
可以看到,fd間和fd內的訊息都會在brpc中獲得并發,這使brpc非常擅長大訊息的讀取,在高負載時仍能及時處理不同來源的訊息,減少長尾的存在,
這里的handler就是一開始初始化注冊的所有server端協議,進入while回圈,計算一次讀取資料的長度,DoRead會執行_read_buf.append_from_file_descriptor,_read_buf是IOPortal型別,如上篇博客所講,這個方法會呼叫readv將fd中的資料讀入到iobuf的block中,

因為這一次資料讀取可能會包含多個訊息,因此下面會有另一個while回圈,每次呼叫CutInputMessage嘗試從iobuf中切割一條訊息,如上文,server端是支持多協議的,所以這里第一次會嘗試使用所有的協議進行一次parse,因為大多數情況下一個連接上只有一種協議,因此嘗試一遍之后會記錄下來執行成功的協議,之后將首先嘗試記錄的協議,
這里的parse就是上文提到的注冊協議中的parse方法,這里以協議baidu_std為例簡單介紹一下,baidu_std官方介紹在這里https://github.com/apache/incubator-brpc/blob/master/docs/cn/baidu_std.md,可以看到判斷是否是baidu_std的方法就是判斷前4 個位元組是否為”prpc”,這里的copy_to為顯式拷貝,因為在判斷是否為baidu_std協議的程序中不能消費資料,否則可能會影響其他協議決議;如果iobuf中的資料不夠4個位元組且是”prpc”的前綴,那么回傳PARSE_ERROR_NOT_ENOUGH_DATA錯誤,這個表示到目前為止不違反當前協議,但是資料不足一個訊息,因此會觸發重新DoRead的程序;如果前4個位元組就是”prpc”,那么滿足baidu_std協議,接下來將決議包體長度和包體中的元資料包長度存入到body_size和meta_size中,如果iobuf中資料長度不足body_size + 12(4 + 4 + 4),那么同樣會觸發重讀,否則將iobuf中的meta和其他資料切割到msg中并回傳,這里是零拷貝,

如果一次DoRead讀入了n條訊息,那么前n-1條訊息會通過QueueMessage后臺啟動了n-1個bthread進行處理,而最后一個訊息會被解構式RunLastMessage執行,原地執行process函式,即協議中的process,

![]()

還是以baidu_std為例,首先是從iobuf中決議pb格式的meta,

然后創建req,res,Controller,然后進行流控,決議req,如果有attachment也決議出來,最后呼叫CallMethod,CallMethod方法在編譯protobuf時生成,會呼叫到用戶定義的Echo方法


最后在Echo中done_guard析構時會呼叫done的Run方法,發送response到client,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/245633.html
標籤:其他
上一篇:CentOS7 開啟SSH坑(哪有什么坑,還不是因為自己菜)
下一篇:Api介面:手機空號狀態過濾檢測
