主頁 > 後端開發 > 一個工業級、跨平臺、輕量級的 tcp 網路服務框架:gevent

一個工業級、跨平臺、輕量級的 tcp 網路服務框架:gevent

2020-09-13 05:47:13 後端開發

作為公司的公共產品,經常有這樣的需求:就是新建一個本地服務,產品線作為客戶端通過 tcp 接入本地服務,來獲取想要的業務能力,

與印象中動輒處理成千上萬連接的 tcp 網路服務不同,這個本地服務是跑在客戶機器上的,Win32 上作為開機自啟動的 windows 服務運行;

Linux 上作為 daemon 在后臺運行,總的說來就是用于接收幾個產品行程的連接,因此輕量化是其最重要的要求,在這個基礎上要能兼顧跨平臺就可以了,

其實主要就是 windows,再兼顧一點兒 linux,

 

考察了幾個現有的開源網路框架,從 ACE 、boost::asio 到 libevent,都有不盡于人意的地方:

a) ACE:太重,只是想要一個網路框架,結果它扒拉扒拉一堆全提供了,不用還不行;

b) boost::asio:太復雜,牽扯到 boost 庫,并且引入了一堆 c++ 模板,需要高版本 c++ 編譯器支持;

c) libevent:這個看著不錯,當時確實用這個做底層封裝了一版,結果發版后發現一個比較致命的問題,導致在防火墻設定比較嚴格的機器上初始化失敗,這個后面我會詳細提到,

 

其它的就更不用說了,之前也粗略看過陳碩的 muddo,總的感覺吧,它是基于其它開源框架不足地方改進的一個庫,有相當可取的地方,但是這個改進的方向也主要是解決更大并發、更多連接,不是我的痛點,所以沒有繼續深入研究,

 

好了,與其在不同開源框架之間糾結,不如自己動手寫一個,

反正我的場景比較固定,不用像它們那樣面面俱,我給自己羅列了一些這個框架需要支持基本的功能:

1)同步寫、異步讀;

2)可同時監聽多路事件,基于 1)這里只針對異步 READ 事件(包含連接進入、連接斷開),寫資料是同步的,因而不需要處理異步 WRITE 事件;

3)要有設定一次性和周期性定時器的能力 (業務決定的);

4)不需要處理信號 (windows 上也沒信號這一說,linux 自己搞搞 sigaction 就好啦);

……

 

雖然這個框架未來只會運行在用戶的單機上,但是我不希望它一出生就帶有性能缺陷,所以性能平平的 select 沒能進入我的法眼,我決定給它裝上最強大的心臟:

Windows 平臺: iocp

Linux 平臺:epoll

 

ok,從需求到底層技術路線,貌似都講清楚了,依照 libevent 我給它取名為 gevent,下面我們從代碼級別看下這個框架是怎么簡化 tcp 服務搭建這類作業的,

首先看一下這個 tcp 服務框架的 sample:

svc_handler.h

 1 #include "EventBase.h"
 2 #include "EventHandler.h"
 3 
 4 class GMyEventBase : public GEventBase
 5 {
 6 public:
 7     GEventHandler* create_handler (); 
 8 }; 
 9 
10 
11 class svc_handler : public GJsonEventHandler
12 {
13 public:
14     virtual ~svc_handler () {}
15     virtual void on_read_msg (Json::Value const& val); 
16 };

epoll_svc.cpp

 1 #include <stdio.h>
 2 #include "svc_handler.h"
 3 #include <signal.h>
 4 
 5 GMyEventBase g_base; 
 6 GEventHandler* GMyEventBase::create_handler () 
 7 {
 8     return new svc_handler; 
 9 }
10 
11 void sig_int (int signo)
12 {
13     printf ("%d caught\n", signo); 
14     g_base.exit (1); 
15     printf ("exit ok\n"); 
16 }
17 
18 int main (int argc, char *argv[])
19 {
20     if (argc < 2)
21     {
22         printf ("usage: epoll_svc port\n"); 
23         return -1; 
24     }
25 
26     unsigned short port = atoi (argv[1]);
27 
28 #ifndef WIN32
29     struct sigaction act; 
30     act.sa_handler = sig_int; 
31     sigemptyset(&act.sa_mask);   
32     act.sa_flags = SA_RESTART; 
33     if (sigaction (SIGINT, &act, NULL) < 0)
34     {
35         printf ("install SIGINT failed, errno %d\n", errno); 
36         return -1; 
37     }
38     else
39         printf ("install SIGINT ok\n"); 
40 #endif
41 
42     // to test small message block
43     if (g_base.init (/*8, 10*/) < 0)
44         return -1; 
45 
46     printf ("init ok\n"); 
47     do
48     {
49         if (!g_base.listen (port))
50         {
51             g_base.exit (0); 
52             printf ("exit ok\n"); 
53             break; 
54         }
55 
56         printf ("listen ok\n"); 
57         g_base.run (); 
58         printf ("run  over\n"); 
59     } while (0); 
60 
61     g_base.fini (); 
62     printf ("fini ok\n"); 
63 
64     g_base.cleanup (); 
65     printf ("cleanup ok\n"); 
66     return 0; 
67 }

 

這個服務的核心是 GMyEventBase 類,它使用了框架中的 GEventBase 類,從后者派生而來,

只改寫了一個 create_handler 介面來提供我們的事件處理物件 svc_handler,它是從框架中的 GEventHandler 派生而來,

svc_handler 只改寫了一個 on_read_msg 來處理 Json 格式的訊息輸入,

 

程式的運行就是分別呼叫 GMyEventBase(實際上是GEventBase)  的 init / listen / run / fini / cleaup 方法,

而與業務相關的代碼,都在 svc_handler 中處理:

svc_handler.cpp

 1 #include "svc_handler.h"
 2 
 3 void svc_handler::on_read_msg (Json::Value const& val)
 4 {
 5     int key = val["key"].asInt (); 
 6     std::string data = https://www.cnblogs.com/goodcitizen/p/val["data"].asString (); 
 7     printf ("got %d:%s\n", key, data.c_str ()); 
 8 
 9     Json::Value root; 
10     Json::FastWriter writer; 
11     root["key"] = key + 1; 
12     root["data"] = data; 
13 
14     int ret = 0;
15     std::string resp = writer.write(root); 
16     resp = resp.substr (0, resp.length () - 1); // trim tailing \n
17     if ((ret = send (resp)) <= 0)
18         printf ("send response failed, errno %d\n", errno); 
19     else 
20         printf ("response %d\n", ret); 
21 }

 

它期待 Json 格式的資料,并且有兩個欄位 key(int) 與 data (string),接收資料后將 key 增 1 后回傳給客戶端,

再來看下客戶端 sample:

clt_handler.h

 1 #include "EventBaseAR.h"
 2 #include "EventHandler.h"
 3 
 4 class GMyEventBase : public GEventBaseWithAutoReconnect
 5 {
 6 public:
 7     GEventHandler* create_handler (); 
 8 }; 
 9 
10 
11 class clt_handler : public GJsonEventHandler
12 {
13 public:
14     virtual ~clt_handler () {}
15 #ifdef TEST_TIMER
16     virtual bool on_timeout (GEV_PER_TIMER_DATA *gptd); 
17 #endif
18     virtual void on_read_msg (Json::Value const& val); 
19 };

 

epoll_clt.cpp

  1 #include <stdio.h>
  2 #include "clt_handler.h"
  3 #include <signal.h>
  4 
  5 //#define TEST_READ
  6 //#define TEST_CONN
  7 //#define TEST_TIMER
  8 
  9 GMyEventBase g_base; 
 10 GEventHandler* GMyEventBase::create_handler () 
 11 {
 12     return new clt_handler; 
 13 }
 14 
 15 
 16 int sig_caught = 0; 
 17 void sig_int (int signo)
 18 {
 19     sig_caught = 1; 
 20     printf ("%d caught\n", signo); 
 21     g_base.exit (0); 
 22     printf ("exit ok\n"); 
 23 }
 24 
 25 void do_read (GEventHandler *eh, int total)
 26 {
 27     char buf[1024] = { 0 }; 
 28     int ret = 0, n = 0, key = 0, err = 0;
 29     char *ptr = nullptr; 
 30     while ((total == 0 ||  n++ < total) && fgets (buf, sizeof(buf), stdin) != NULL)
 31     {
 32         // skip \n
 33         buf[strlen(buf) - 1] = 0; 
 34         //n = sscanf (buf, "%d", &key); 
 35         key = strtol (buf, &ptr, 10); 
 36         if (ptr == nullptr)
 37         {
 38             printf ("format: int string\n"); 
 39             continue; 
 40         }
 41 
 42         Json::Value root; 
 43         Json::FastWriter writer; 
 44         root["key"] = key; 
 45         // skip space internal
 46         root["data"] = *ptr == ' ' ? ptr + 1 : ptr;  
 47 
 48         std::string req = writer.write (root); 
 49         req = req.substr (0, req.length () - 1); // trim tailing \n
 50         if ((ret = eh->send (req)) <= 0)
 51         {
 52             err = 1; 
 53             printf ("send %d failed, errno %d\n", req.length (), errno); 
 54             break; 
 55         }
 56         else 
 57             printf ("send %d\n", ret); 
 58     }
 59 
 60     if (total == 0)
 61         printf ("reach end\n"); 
 62 
 63     if (!err)
 64     {
 65         eh->disconnect (); 
 66         printf ("call disconnect to notify server\n"); 
 67     }
 68 
 69     // wait receiving thread 
 70     //sleep (3); 
 71     // if use press Ctrl+D, need to notify peer our break
 72 }
 73 
 74 #ifdef TEST_TIMER
 75 void test_timer (unsigned short port, int period_msec, int times)
 76 {
 77     int n = 0; 
 78     GEventHandler *eh = nullptr; 
 79 
 80     do
 81     {
 82         eh = g_base.connect (port); 
 83         if (eh == nullptr)
 84             break;
 85 
 86         printf ("connect ok\n"); 
 87         void* t = g_base.timeout (1000, period_msec, eh, NULL); 
 88         if (t == NULL)
 89         {
 90             printf ("timeout failed\n"); 
 91             break; 
 92         }
 93         else 
 94             printf ("set timer %p ok\n", t); 
 95 
 96         // to wait timer
 97         do
 98         {
 99             sleep (400); 
100             printf ("wake up from sleep\n"); 
101         } while (!sig_caught && n++ < times);
102 
103         g_base.cancel_timer (t); 
104     } while (0); 
105 }
106 #endif
107 
108 #ifdef TEST_CONN
109 void test_conn (unsigned short port, int per_read, int times)
110 {
111 #  ifdef WIN32
112     srand (GetCurrentProcessId()); 
113 #  else
114     srand (getpid ()); 
115 #  endif
116     int n = 0, elapse = 0; 
117     clt_handler *eh = nullptr; 
118 
119     do
120     {
121         eh = (clt_handler *)g_base.connect (port); 
122         if (eh == nullptr)
123             break;
124 
125         printf ("connect ok\n"); 
126 
127         do_read (eh, per_read); 
128 #  ifdef WIN32
129         elapse = rand() % 1000; 
130         Sleep(elapse); 
131         printf ("running  %d ms\n", elapse); 
132 #  else
133         elapse = rand () % 1000000; 
134         usleep (elapse); 
135         printf ("running  %.3f ms\n", elapse/1000.0); 
136 #  endif
137 
138     } while (!sig_caught && n++ < times);
139 }
140 #endif
141 
142 #ifdef TEST_READ
143 void test_read (unsigned short port, int total)
144 {
145     int n = 0; 
146     GEventHandler *eh = nullptr; 
147 
148     do
149     {
150         eh = g_base.connect (port); 
151         if (eh == nullptr)
152             break;
153 
154         printf ("connect ok\n"); 
155         do_read (eh, total); 
156     } while (0); 
157 }
158 #endif
159 
160 int main (int argc, char *argv[])
161 {
162     if (argc < 2)
163     {
164         printf ("usage: epoll_clt port\n"); 
165         return -1; 
166     }
167 
168     unsigned short port = atoi (argv[1]); 
169 
170 #ifndef WIN32
171     struct sigaction act; 
172     act.sa_handler = sig_int; 
173     sigemptyset(&act.sa_mask);   
174     // to ensure read be breaked by SIGINT
175     act.sa_flags = 0; //SA_RESTART;  
176     if (sigaction (SIGINT, &act, NULL) < 0)
177     {
178         printf ("install SIGINT failed, errno %d\n", errno); 
179         return -1; 
180     }
181 #endif
182 
183     if (g_base.init (2) < 0)
184         return -1; 
185 
186     printf ("init ok\n"); 
187 
188 #if defined(TEST_READ)
189     test_read (port, 0); // 0 means infinite loop until user break
190 #elif defined(TEST_CONN)
191     test_conn (port, 10, 100); 
192 #elif defined (TEST_TIMER)
193     test_timer (port, 10, 1000); 
194 #else
195 #  error please define TEST_XXX macro to do something!
196 #endif
197 
198     if (!sig_caught)
199     {
200         // Ctrl + D ?
201         g_base.exit (0); 
202         printf ("exit ok\n"); 
203     }
204     else 
205         printf ("has caught Ctrl+C\n"); 
206 
207     g_base.fini (); 
208     printf ("fini ok\n"); 
209 
210     g_base.cleanup (); 
211     printf ("cleanup ok\n"); 
212     return 0; 
213 }

 

客戶端同樣使用了 GEventBase 的派生類 GMyEventBase 來作為事件回圈的核心,所不同的是(注意并非之前例子里的那個類,雖然同名),它提供了 clt_handler 來處理自己的業務代碼,

另外為了提供連接中斷后自動向服務重連的功能,這里 GMyEventBase 派生自 GEventBase 類的子類 GEventBaseWithAutoReconnect (位于 EventBaseAR.h/cpp 中),

程式的運行是分別呼叫 GEventBase 的 init / connect / fini / cleaup 方法以及 GEventHandler 的 send / disconnect 來測驗讀寫與連接,

定義宏 TEST_READ 用來測驗讀寫;定義宏 TEST_CONN 可以測驗連接的通斷及讀寫;定義宏 TEST_TIMER 來測驗周期性定時器及讀寫,它們是互斥的,

clt_handler 主要用來異步接收服務端的回送資料并列印:

clt_handler.cpp

 1 #include "clt_handler.h"
 2 
 3 #ifdef TEST_TIMER
 4 extern void do_read (clt_handler *, int); 
 5 bool clt_handler::on_timeout (GEV_PER_TIMER_DATA *gptd)
 6 {
 7     printf ("time out ! id %p, due %d, period %d\n", gptd, gptd->due_msec, gptd->period_msec); 
 8     do_read ((clt_handler *)gptd->user_arg, 1); 
 9     return true; 
10 }
11 #endif
12 
13 void clt_handler::on_read_msg (Json::Value const& val)
14 {
15     int key = val["key"].asInt (); 
16     std::string data = https://www.cnblogs.com/goodcitizen/p/val["data"].asString (); 
17     printf ("got %d:%s\n", key, data.c_str ()); 
18 }

 

這個測驗程式可以通過在控制臺手工輸入資料來驅動,也可以通過測驗資料檔案來驅動,下面的 awk 腳本用來制造符合格式的測驗資料:

epoll_gen.awk

 1 #! /bin/awk -f
 2 BEGIN {
 3         WORDNUM = 1000
 4         for (i = 1; i <= WORDNUM; i++) {
 5                 printf("%d %s\n", randint(WORDNUM), randword(20))
 6         }
 7 }
 8 
 9 # randint(n): return a random integer number which is >= 1 and <= n
10 function randint(n) {
11         return int(n *rand()) + 1
12 }
13 
14 # randlet(): return a random letter, which maybe upper, lower or number. 
15 function randlet() {
16         return substr("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", randint(62), 1)
17 }
18 
19 # randword(LEN): return a rand word with a length of LEN
20 function randword(LEN) {
21         randw=""
22         for( j = 1; j <= LEN; j++) {
23                 randw=randw randlet()
24         }
25         return randw
26 }

 

生成的測驗檔案格式如下:

238 s0jKlYkEjwE4q3nNJugF
568 0cgNaSgDpP3VS45x3Wum
996 kRF6SgmIReFmrNBcCecj
398 QHQqCrB5fC61hao1BV2x
945 XZ6KLtA4jZTEnhcAugAM
619 WE95NU7FnsYar4wz279j
549 oVCTmD516yvmtuJB2NG3
840 NDAaL5vpzp8DQX0rLRiV
378 jONIm64AN6UVc7uTLIIR
251 EqSBOhc40pKXhCbCu8Ey

 

整個工程編譯的話就是一個 CMakeLists 檔案,可以通過 cmake 生成對應的 Makefile 或 VS solution 來編譯代碼:

CMakeLists.txt

 1 cmake_minimum_required(VERSION 3.0)
 2 project(epoll_svc)
 3 include_directories(../core ../include)
 4 set(CMAKE_CXX_FLAGS "-std=c++11 -pthread -g -Wall ${CMAKE_CXX_FLAGS}")
 5 link_directories(${PROJECT_SOURCE_DIR}/../lib)
 6 set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)
 7 
 8 add_executable (epoll_svc epoll_svc.cpp svc_handler.cpp ../core/EventBase.cpp ../core/EventHandler.cpp ../core/log.cpp)
 9 IF (WIN32)
10 target_link_libraries(epoll_svc jsoncpp ws2_32)
11 ELSE ()
12 target_link_libraries(epoll_svc jsoncpp rt)
13 ENDIF ()
14 
15 add_executable (epoll_clt epoll_clt.cpp clt_handler.cpp ../core/EventBase.cpp ../core/EventBaseAR.cpp ../core/EventHandler.cpp ../core/log.cpp)
16 target_compile_definitions(epoll_clt PUBLIC -D TEST_READ)
17 IF (WIN32)
18 target_link_libraries(epoll_clt jsoncpp ws2_32)
19 ELSE ()
20 target_link_libraries(epoll_clt jsoncpp rt)
21 ENDIF ()
22 
23 add_executable (epoll_local epoll_local.cpp)
24 IF (WIN32)
25 target_link_libraries(epoll_local jsoncpp ws2_32)
26 ELSE ()
27 target_link_libraries(epoll_local jsoncpp rt)
28 ENDIF ()

 

 這個專案包含三個編譯目標,分別是 epoll_svc 、epoll_clt 與 epoll_local,其中前兩個可以跨平臺編譯,后一個只能在 Linux 平臺編譯,用來驗證 epoll 的一些特性,

編譯完成后,首先運行服務端:

>./epoll_svc 1025 

 然后運行客戶端:

>./epoll_clt 1025 < demo

測驗多個客戶端同時連接,可以使用下面的腳本:

epoll_start.sh

1 #! /bin/bash
2 # /bin/sh -> /bin/dash, do not recognize our for loop
3 
4 for((i=0;i<10;i=i+1))
5 do
6     ./epoll_clt 1025 < demo &
7     echo "start $i"
8 done

 

可以同時啟動 10 個客戶端,

通過 Ctrl+C 退出服務端;通過 Ctrl+C 或 Ctrl+D 退出單個客戶端;

通過下面的腳本來停止多個客戶端與服務端:

epoll_stop.sh

1 #! /bin/sh
2 pkill -INT epoll_clt
3 sleep 1
4 pkill -INT epoll_svc

 


 

框架的用法介紹完之后,再簡單游覽一下這個庫的各層級對外介面,

EventBase.h

  1 #pragma once
  2 
  3 
  4 #include "EventHandler.h" 
  5 #include <string>
  6 #include <map>
  7 #include <mutex> 
  8 #include <condition_variable>
  9 #include "thread_group.hpp"
 10 
 11 #define GEV_MAX_BUF_SIZE 65536
 12 
 13 class GEventBase : public IEventBase
 14 {
 15 public:
 16     GEventBase();
 17     ~GEventBase();
 18 
 19 #ifdef WIN32
 20     virtual HANDLE iocp () const; 
 21 #else
 22     virtual int epfd () const; 
 23 #endif
 24     virtual bool post_timer(GEV_PER_TIMER_DATA *gptd); 
 25     virtual GEventHandler* create_handler() = 0; 
 26 
 27     // thr_num : 
 28     //  =0 - no default thread pool, user provide thread and call run
 29     //  <0 - use max(|thr_num|, processer_num)
 30     //  >0 - use thr_num
 31     bool init(int thr_num = -8, int blksize = GEV_MAX_BUF_SIZE
 32 #ifndef WIN32
 33               , int timer_sig = SIGUSR1
 34 #endif
 35               ); 
 36 
 37     bool listen(unsigned short port, unsigned short backup = 10);
 38     GEventHandler* connect(unsigned short port, GEventHandler* exist_handler = NULL); 
 39     // PARAM
 40     // due_msec: first timeout milliseconds
 41     // period_msec: later periodically milliseconds
 42     // arg: user provied argument
 43     // exist_handler: reuse the timer handler
 44     //
 45     // RETURN
 46     //   NULL: failed
 47     void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler);
 48     bool cancel_timer(void* tid); 
 49     void fini();  
 50     void run(); 
 51     void exit(int extra_notify = 0); 
 52     void cleanup(); 
 53 
 54 protected:
 55 #ifdef WIN32
 56     bool do_accept(GEV_PER_IO_DATA *gpid); 
 57     bool do_recv(GEV_PER_HANDLE_DATA *gphd, GEV_PER_IO_DATA *gpid); 
 58     void do_error(GEV_PER_HANDLE_DATA *gphd); 
 59 
 60     int init_socket();
 61     bool issue_accept(); 
 62     bool issue_read(GEV_PER_HANDLE_DATA *gphd);
 63     bool post_completion(DWORD bytes, ULONG_PTR key, LPOVERLAPPED ol); 
 64 
 65 #else
 66     bool do_accept(int fd); 
 67     bool do_recv(conn_key_t key); 
 68     void do_error(conn_key_t key); 
 69 
 70     bool init_pipe(); 
 71     void close_pipe(); 
 72     bool post_notify (char ch, void* ptr = nullptr); 
 73     void promote_leader (std::unique_lock<std::mutex> &guard); 
 74 
 75     GEventHandler* find_by_key (conn_key_t key, bool erase); 
 76     GEventHandler* find_by_fd (int fd, conn_key_t &key, bool erase); 
 77 
 78 #  ifdef HAS_SIGTHR
 79     void sig_proc (); 
 80 #  endif
 81 #endif
 82 
 83     bool do_timeout(GEV_PER_TIMER_DATA *gptd); 
 84 
 85     virtual bool on_accept(GEV_PER_HANDLE_DATA *gphd);
 86     virtual bool on_read(GEventHandler *h, GEV_PER_IO_DATA *gpid); 
 87     virtual void on_error(GEventHandler *h);
 88     virtual bool on_timeout (GEV_PER_TIMER_DATA *gptd); 
 89     
 90 
 91 protected:
 92     volatile bool m_running = false;
 93     int m_thrnum = 0; 
 94     int m_blksize = GEV_MAX_BUF_SIZE; 
 95     std::thread_group m_grp; 
 96     SOCKET m_listener = INVALID_SOCKET;
 97 
 98     std::mutex m_mutex;  // protect m_map
 99     std::mutex m_tlock; // protect m_tmap
100     // timer_t may conflict when new timer created after old timer closed
101     //std::map <timer_t, GEventHandler *> m_tmap; 
102     std::map <GEV_PER_TIMER_DATA*, GEventHandler *> m_tmap; 
103 
104 #ifdef WIN32
105     LPFN_ACCEPTEX m_acceptex = nullptr; 
106     LPFN_GETACCEPTEXSOCKADDRS m_getacceptexsockaddrs = nullptr; 
107     HANDLE m_iocp = NULL; 
108     HANDLE m_timerque = NULL; 
109 
110     std::map<GEV_PER_HANDLE_DATA*, GEventHandler*> m_map; 
111 #else
112     int m_ep = -1; 
113     int m_pp[2]; 
114     int m_tsig = 0; // signal number for timer
115 
116     std::mutex m_lock;   // protect epoll
117     pthread_t m_leader = -1; 
118     std::map<conn_key_t, GEventHandler*> m_map; 
119 #  ifdef HAS_SIGTHR
120     // special thread only cares about signal
121     std::thread *m_sigthr = nullptr; 
122 #  endif
123 #endif
124 };

 

  • init,它在底層啟動 thr_num 個執行緒來跑 run 方法;每次 IO 的塊緩沖區大小由 blksize 指定;它內部還創建了對應的 iocp 或 epoll 物件,便于之后加入 socket 句柄進行處理,
  • exit,它通知執行緒池中的所有執行緒退出等待,windows 上是通過 PostQueuedCompletionStatus,Linux 上是通過在自建的一個 pipe 上寫資料以觸發 epoll 退出(這個 pipe 在 init 中創建并加入 epoll);
  • fini,它在所有作業執行緒退出后,關閉之前創建的物件,清理事件回圈用到的資源;
  • cleanup,它清理之前建立的 fd-handler 映射,清理遺留的處理器并釋放資源;
  • run,它是執行緒池運行函式,windows 上是通過 GetQueuedCompletionStatus 在 iocp 上等待;在 linux 上是通過 epoll_wait 在 epoll 上等待事件,當有事件產生后,根據事件型別,分別呼叫 do_accept / on_accept、do_recv / on_read、do_error / on_error 回呼來分派事件;
  • listen,創建偵聽 socket 并加入到 iocp 或 epoll 中;
  • connect,連接到遠程服務并將成功連接的 socket 加入到 iocp 或  epoll 中;
  • timeout,設定定時器事件,windows 上是通過 CreateTimerQueueTimer 實作定時器超時;linux 則是通過 timer_create 實作的,都是系統現成的東西,只不過在系統定時器到期后,給對應的 iocp 或 epoll 物件發送了一個通知而已,在 linux 上這個通知機制是上面提到過的 pipe 來實作的,因而有一定延遲,不能指定精度太小的定時器;
  • cancel_timer,取消之前設定的定時器,

 

然后看下 GEventHandler 提供的回呼介面,應用可以從它派生并完成業務相關代碼:

EventHandler.h

  1 #pragma once
  2 #include "platform.h"
  3 
  4 #ifdef WIN32
  5 // must ensure <winsock2.h> precedes <widnows.h> included, to prevent winsock2.h conflict with winsock.h
  6 #  include <WinSock2.h>
  7 #  include <Windows.h>
  8 #  include <mswsock.h>  // for LPFN_ACCEPTEX & LPFN_GETACCEPTEXSOCKADDRS later in EventBase.h
  9 #else
 10 #  include <unistd.h> // for close
 11 #  include <sys/socket.h>
 12 #  include <sys/epoll.h>
 13 #  include <sys/time.h>
 14 #  include <netinet/in.h> // for struct sockaddr_in
 15 #  include <arpa/inet.h> // for inet_addr/inet_ntoa
 16 #  include <string.h> // for memset/memcpy
 17 #  include <signal.h>
 18 #endif
 19 
 20 #include <mutex>
 21 #include "jsoncpp/json.h"
 22 
 23 
 24 class GEventHandler; 
 25 struct GEV_PER_TIMER_DATA; 
 26 class IEventBase
 27 {
 28 public:
 29 #ifdef WIN32
 30     virtual HANDLE iocp () const = 0; 
 31 #else
 32     virtual int epfd () const = 0; 
 33 #endif
 34 
 35     virtual void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler) = 0; 
 36     virtual bool cancel_timer(void* tid) = 0; 
 37     virtual bool post_timer(GEV_PER_TIMER_DATA *gptd) = 0; 
 38 };
 39 
 40 
 41 #ifdef WIN32
 42 enum GEV_IOCP_OP
 43 {
 44     OP_TIMEOUT = 1, 
 45     OP_ACCEPT,
 46     OP_RECV,
 47 };
 48 #else 
 49 // the purpose of this key is to distinguish different connections with same fd !
 50 // (when connection break and re-established soon, fd may not change but port will change)
 51 struct conn_key_t
 52 {
 53     int fd; 
 54     unsigned short lport; 
 55     unsigned short rport; 
 56 
 57     conn_key_t (int f, unsigned short l, unsigned short r); 
 58     bool operator< (struct conn_key_t const& rhs) const; 
 59 }; 
 60 #endif
 61 
 62 
 63 struct GEV_PER_HANDLE_DATA
 64 {
 65     SOCKET so;
 66     SOCKADDR_IN laddr;
 67     SOCKADDR_IN raddr;
 68 
 69 #ifndef WIN32
 70     conn_key_t key () const; 
 71 #endif
 72 
 73     GEV_PER_HANDLE_DATA(SOCKET s, SOCKADDR_IN *l, SOCKADDR_IN *r); 
 74     virtual ~GEV_PER_HANDLE_DATA(); 
 75 };
 76 
 77 struct GEV_PER_IO_DATA
 78 {
 79     SOCKET so;
 80 #ifdef WIN32
 81     GEV_IOCP_OP op;
 82     OVERLAPPED ol;
 83     WSABUF wsa;         // wsa.len is buffer length
 84     DWORD bytes;        // after compeleted, bytes trasnfered
 85 #else
 86     char *buf; 
 87     int len; 
 88 #endif
 89 
 90     GEV_PER_IO_DATA(
 91 #ifdef WIN32
 92             GEV_IOCP_OP o, 
 93 #endif
 94             SOCKET s, int l); 
 95     virtual ~GEV_PER_IO_DATA(); 
 96 };
 97 
 98 struct GEV_PER_TIMER_DATA
 99 #ifdef WIN32
100        : public GEV_PER_IO_DATA
101 #endif
102 {
103     IEventBase *base; 
104     int due_msec; 
105     int period_msec; 
106     void *user_arg;
107     bool cancelled;
108 #ifdef WIN32
109     HANDLE timerque; 
110     HANDLE timer; 
111 #else
112     timer_t timer; 
113 #endif
114 
115     GEV_PER_TIMER_DATA(IEventBase *base, int due, int period, void *arg
116 #ifdef WIN32
117             , HANDLE tq);
118 #else
119             , timer_t tid); 
120 #endif
121 
122     virtual ~GEV_PER_TIMER_DATA(); 
123     void cancel (); 
124 };
125 
126 class GEventHandler
127 {
128 public:
129     GEventHandler();
130     virtual ~GEventHandler();
131 
132     GEV_PER_HANDLE_DATA* gphd(); 
133     GEV_PER_TIMER_DATA* gptd(); 
134     bool connected();
135     void disconnect(); 
136     void clear(); 
137     SOCKET fd(); 
138 
139     int send(char const* buf, int len);
140     int send(std::string const& str);
141     
142     virtual bool reuse();
143     virtual bool auto_reconnect();
144     virtual void arg(void *param) = 0;
145     virtual void reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);
146     virtual bool on_read(GEV_PER_IO_DATA *gpid) = 0;
147     virtual void on_error(GEV_PER_HANDLE_DATA *gphd); 
148     // note when on_timeout called, handler's base may cleared by cancel_timer, use gptd->base instead if it is not null.
149     virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd) = 0; 
150     virtual void cleanup(bool terminal);
151     void close(bool terminal);
152 
153 protected:
154     GEV_PER_HANDLE_DATA *m_gphd = nullptr; 
155     GEV_PER_TIMER_DATA *m_gptd = nullptr; 
156     IEventBase *m_base = nullptr;
157     // us so instead of m_gphd, 
158     // as the later one may destroyed during using..
159     SOCKET m_so;
160 };
161 
162 // a common handler to process json protocol.
163 class GJsonEventHandler : public GEventHandler
164 {
165 public:
166     //virtual void on_read();
167     virtual void arg(void *param);
168     virtual void reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);
169     virtual bool on_read(GEV_PER_IO_DATA *gpid);
170     virtual void on_read_msg(Json::Value const& root) = 0;
171     virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd);
172     virtual void cleanup(bool terminal);
173 
174 protected:
175     // protect m_stub to prevent multi-entry
176 #ifdef HAS_ET
177     std::mutex m_mutex; 
178 #endif
179 
180     std::string m_stub;
181 };

 

這里主要有兩個類,GEventHandler 處理通用的基于流的資料;GJsonEventHandler 處理基于 json 格式的資料,

前者需要重寫 on_read 方法來處理塊資料;后者需要重寫 on_read_msg 方法來處理 json 資料,

目前 json 的決議是通過 jsoncpp 庫完成的,這個庫本身是跨平臺的(本 git 庫僅提供 64 位 Linux 靜態鏈接庫及 VS2013 的 32 位 Release 版本 Windows 靜態庫),

svc_handler 與 clt_handler  均從 GJsonEventHandler 派生,

如果有新的流格式需要處理 ,只需要從 GEventHandler 類派生新的處理類即可,

 

除了讀取連接上的資料,還有其它一些重要的回呼介面,列明如下:

  • on_read,連接上有資料到達;
  • on_error,連接斷開;
  • on_tmeout,定時器事件;
  • ……

如果有新的事件需要處理 ,也可以在這里擴展,

最后看下 GEventBaseWithAutoReconnect 提供的與自動重連相關的介面:

EventBaseAR.h

 1 #pragma once
 2 
 3 
 4 #include "EventBase.h"
 5 #include <thread>
 6 
 7 #define GEV_RECONNECT_TIMEOUT 2 // seconds
 8 #define GEV_MAX_RECONNECT_TIMEOUT 256 // seconds
 9 
10 class GEventBaseWithAutoReconnect : public GEventBase
11 {
12 public:
13     GEventBaseWithAutoReconnect(int reconn_min = GEV_RECONNECT_TIMEOUT, int reconn_max = GEV_MAX_RECONNECT_TIMEOUT);
14     ~GEventBaseWithAutoReconnect();
15 
16     bool do_connect(unsigned short port, void *arg);
17     GEventHandler* connector(); 
18 
19 protected:
20     virtual void on_error(GEventHandler *h);
21     virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd);
22 
23     virtual void on_connect_break(); 
24     virtual bool on_connected(GEventHandler *app);
25 
26 protected:
27     void do_reconnect(void *arg);
28 
29 protected:
30     unsigned short m_port; 
31     GEventHandler* m_app;
32     GEventHandler* m_htimer; 
33     void* m_timer;
34     int m_reconn_min; 
35     int m_reconn_max; 
36     int m_reconn_curr;
37 };

 

其實比較簡單,只比 GEventBase 類多了一個  do_connect 方法,來擴展 connect 不能自動重連的問題,

底層的話,是通過定時器來實作指數后退重連演算法的,

 

最后,如果你還是感到云里霧里的,可以參考一下下面的類結構圖:

 

 

黑色標注的是框架提供的類,紅色是服務端派生的類,藍色是客戶端派生的類,其實 GMyEventBase 的唯一作用就是將 svc_handler 與 clt_handler 分別引入各自的框架中,

所以用戶的關注點主要還是在派生自己的 GEventHandler 類,并在其中的回呼介面中處理資料就可以了,

 


 

后記

這個框架已經應用到我司的公共產品中,并為數個 tcp 服務提供底層支撐,經過百萬級別用戶機器驗證,運行穩定性還是可以的,所以當得起“工業級”這三個字,

 

前面在說到開源庫的選型時還留了一個口子沒有交待,這里一并說下,

其實最早的重構版本是使用 libevent 來實作的,但是發現它在 windows 上使用的是低效的 select,

而且為了增加、洗掉句柄,它又使用了一種 self-pipe-trick 的技巧,簡單說來的就是下面的代碼序列:

listen (listen_fd, 1); 
……
connect (connect_fd, &addr, size); 
……
accept_fd = accept (listen_fd, &addr, &size); 

 

在缺乏 pipe 呼叫的 win32 環境制造了一個 socket 自連接,從而進行一些通知,

這一步是必要的,如果不能成功連接就會導致整個 libevent 初始化失敗,從而運行不起來,

不巧的是,在一些 windows 機器上(約占用戶總量 10%),由于防火墻設定嚴格,上述 listen 與 connect 呼叫可以成功,

但是 accept 會失敗回傳,從而導致整個服務退出 (防火墻會嚴格禁止不在白名單上偵聽的埠的連接),

對于已知埠,可以通過在防火墻上設定白名單來避免,但是對于這種隨機 listen 的埠,真的是太難了,基本無解,

 

回頭考察了一下 asio,windows 上使用的是 iocp,自然沒有這個自連接;

ACE 有多種實作可供選擇,如果使用  ACE_Select_Reactor / ACE_TP_Reactor 是會有這個自連接,

但是你可以選擇其它實作,如基于 WaitForMultipleEvents 的 ACE_WFMO_Reactor(最大只支持 62 個句柄,放棄),

或基于 iocp 的 ACE_Proactor (前攝式,與反應式在編程上稍有不同,更接近于 asio)就沒有這個自連接,

 

再說的深一點,其實公司最早的網路庫使用的就是基于 boost 的 asio,大量的使用了 c++ 模板,

有時候產生了一些崩潰,但是根據 dump 完全無法定位崩潰點(各種冗長的模板展開名稱),

導致了一些頑固的已知 bug 一直找不到崩潰點而無法解決(雖然量不大),所以才有了要去重新選型網路庫以及后來這一系列的東西,

 

本來一開始我是想用 ACE 的,因為我讀過這個庫的原始碼,對里面所有的東西都非常熟悉,

但是看看 ACE 小 5 MB 的 dll 尺寸,還是放棄了(產品本身安裝包也就這么大吧),

對于一個公司底層的公共組件,被各種產品攜帶,需要嚴格控制“體重”

(后來聽說 ACE 按功能拆分了代碼模塊,你只需要選自己依賴的部分即可,不過我還沒有試過),

 

使用這個庫代替之前的 boost::asio 后,我還有一個意外識訓,就是編譯出來的 dll 尺寸明顯小了很多,700 K -> 500 K 的樣子,看來所謂模板膨脹是真有其事……

 

最后奉上 gevent 的 github 鏈接,歡迎有相同需求的小伙伴前來“復刻” :

https://github.com/goodpaperman/gevent

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/20779.html

標籤:C++

上一篇:關于c#根據點坐標畫線段的問題

下一篇:一個關于FastReport的問題。

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more