從本文開始,由sample入手,逐漸理解libhv的原始碼,
如有理解錯誤,歡迎批評指正,
main()函式
int main() {
// memcheck atexit
HV_MEMCHECK;
hloop_t* loop = hloop_new(0);
// test idle and priority
for (int i = HEVENT_LOWEST_PRIORITY; i <= HEVENT_HIGHEST_PRIORITY; ++i) {
hidle_t* idle = hidle_add(loop, on_idle, 10);
hevent_set_priority(idle, i);
}
// // test timer timeout
for (int i = 1; i <= 10; ++i) {
htimer_t* timer = htimer_add(loop, on_timer, i*1000, 3);
hevent_set_userdata(timer, (void*)(intptr_t)i);
}
// // test timer period
int minute = time(NULL)%3600/60;
htimer_add_period(loop, cron_hourly, minute+1, -1, -1, -1, -1, INFINITE);
// test network_logger
htimer_add(loop, timer_write_log, 1000, INFINITE);
logger_set_handler(hlog, mylogger);
hlog_set_file("loop.log");
#ifndef _MSC_VER
logger_enable_color(hlog, 1);
#endif
nlog_listen(loop, DEFAULT_LOG_PORT);
// test nonblock stdin
printf("input 'quit' to quit loop\n");
char buf[64];
hread(loop, 0, buf, sizeof(buf), on_stdin);
// test custom_events
for (int i = 0; i < 10; ++i) {
hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.event_type = (hevent_type_e)(HEVENT_TYPE_CUSTOM + i);
ev.cb = on_custom_events;
ev.userdata = (void*)(long)i;
hloop_post_event(loop, &ev);
}
hloop_run(loop);
hloop_free(&loop);
return 0;
}
代碼的結構和注釋非常的清晰,其中包含了多個事件的型別:
1.空閑事件
2.定時器超時事件
3.周期性的超時事件
4.監聽listen事件
5.鍵盤自定義事件
6.用戶自定義事件
先看一下代碼的開頭,
hloop_t* loop = hloop_new(0);
這里創建并初始化了libhv的事件回圈,看一下這里面具體做了什么,
hloop_t* hloop_new(int flags) {
hloop_t* loop;
HV_ALLOC_SIZEOF(loop);
hloop_init(loop);
loop->flags |= flags;
return loop;
}
首先看一下這個flags的取值范圍
#define HLOOP_FLAG_RUN_ONCE 0x00000001 //事件回圈體只執行一次
#define HLOOP_FLAG_AUTO_FREE 0x00000002 //執行完自動釋放資源
#define HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS 0x00000004 //當沒有事件時,退出事件回圈
具體使用,我們后面再看,
接下來看一下hloop_init具體做了什么
static void hloop_init(hloop_t* loop) {
#ifdef OS_WIN
static int s_wsa_initialized = 0;
if (s_wsa_initialized == 0) {
s_wsa_initialized = 1;
WSADATA wsadata;
WSAStartup(MAKEWORD(2,2), &wsadata);
}
#endif
#ifdef SIGPIPE
// NOTE: if not ignore SIGPIPE, write twice when peer close will lead to exit process by SIGPIPE.
signal(SIGPIPE, SIG_IGN);
#endif
loop->status = HLOOP_STATUS_STOP; //初始化狀態為 停止狀態
loop->pid = hv_getpid(); //獲取當前行程號
loop->tid = hv_gettid(); //獲取當前執行緒號
// idles
list_init(&loop->idles); //初始化空閑事件的鏈表
// timers
heap_init(&loop->timers, timers_compare); //初始化超時事件的堆
// ios
io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE); //初始化IO事件的陣列
// readbuf
loop->readbuf.len = HLOOP_READ_BUFSIZE; //分配讀緩沖區記憶體
HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
// iowatcher
iowatcher_init(loop); //初始化IO監控
// custom_events
hmutex_init(&loop->custom_events_mutex);
event_queue_init(&loop->custom_events, CUSTOM_EVENT_QUEUE_INIT_SIZE);
loop->sockpair[0] = loop->sockpair[1] = -1; //創建一對連接的socket
if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->sockpair) != 0) {
hloge("socketpair create failed!");
}
// NOTE: init start_time here, because htimer_add use it.
loop->start_ms = gettimeofday_ms(); //獲取當前事件ms
loop->start_hrtime = loop->cur_hrtime = gethrtime_us(); //獲取當前事件us
}
從代碼上看,hloop_init針對loop做了一系列初始化作業,所以不得不去看一下hloop_t的定義,
typedef struct hloop_s hloop_t;
struct hloop_s {
uint32_t flags; //標記
hloop_status_e status; //狀態
uint64_t start_ms; // ms
uint64_t start_hrtime; // us
uint64_t end_hrtime; //結束的時間
uint64_t cur_hrtime; //當前時間
uint64_t loop_cnt; //回圈的次數
long pid; //行程號
long tid; //執行緒號
void* userdata; //用戶資料
//private:
// events
uint32_t nactives; //活動的事件的數量
uint32_t npendings; //將要被處理的事件的數量
// pendings: with priority as array.index
hevent_t* pendings[HEVENT_PRIORITY_SIZE]; //將要被處理的事件的陣列
// idles //陣列的長度為HEVENT_PRIORITY_SIZE(10)個
//這里按事件的優先級,放入到不同鏈表中,執行時,由高到低執行
struct list_head idles; //空閑事件鏈表
uint32_t nidles; //空閑事件的數量
// timers
struct heap timers; //超時事件堆
uint32_t ntimers; //超時事件的數量
// ios: with fd as array.index
struct io_array ios; //IO事件陣列
uint32_t nios; //IO事件數量
// one loop per thread, so one readbuf per loop is OK.
hbuf_t readbuf; //讀緩沖區,這里作者有解釋:因為一個執行緒只有一個事件回圈,所有只要分配一個讀緩沖區就可以了
void* iowatcher; //io監視器
// custom_events
int sockpair[2]; //本地成對連接的socket
event_queue custom_events; //用戶自定義事件佇列
hmutex_t custom_events_mutex; //用戶事件鎖
};
從上面代碼里可以看出來,建立本地連接的socket是為了執行用戶自定義事件,這里比較有意思,我們后面具體再看如何使用的,
下面繼續看sample,hloop_test.c代碼
// test idle and priority
for (int i = HEVENT_LOWEST_PRIORITY; i <= HEVENT_HIGHEST_PRIORITY; ++i) {
hidle_t* idle = hidle_add(loop, on_idle, 10);
hevent_set_priority(idle, i);
}
// // test timer timeout
for (int i = 1; i <= 10; ++i) {
htimer_t* timer = htimer_add(loop, on_timer, i*1000, 3);
hevent_set_userdata(timer, (void*)(intptr_t)i);
}
// // test timer period
int minute = time(NULL)%3600/60;
htimer_add_period(loop, cron_hourly, minute+1, -1, -1, -1, -1, INFINITE);
// test nonblock stdin
printf("input 'quit' to quit loop\n");
char buf[64];
hread(loop, 0, buf, sizeof(buf), on_stdin);
// test custom_events
for (int i = 0; i < 10; ++i) {
hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.event_type = (hevent_type_e)(HEVENT_TYPE_CUSTOM + i);
ev.cb = on_custom_events;
ev.userdata = (void*)(long)i;
hloop_post_event(loop, &ev);
}
上面是注冊各種事件,先看最簡單的空閑事件
hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat) {
hidle_t* idle;
HV_ALLOC_SIZEOF(idle);
idle->event_type = HEVENT_TYPE_IDLE;
idle->priority = HEVENT_LOWEST_PRIORITY;
idle->repeat = repeat;
list_add(&idle->node, &loop->idles); //將空間事件加入到空閑事件鏈表中
EVENT_ADD(loop, idle, cb);
loop->nidles++;
return idle;
}
我們可以看到,上面代碼將空閑事件加入到了loop的空閑串列中,(其他內容先不深究)
超時事件
htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint32_t timeout, uint32_t repeat) {
if (timeout == 0) return NULL;
htimeout_t* timer;
HV_ALLOC_SIZEOF(timer);
timer->event_type = HEVENT_TYPE_TIMEOUT;
timer->priority = HEVENT_HIGHEST_PRIORITY;
timer->repeat = repeat;
timer->timeout = timeout;
hloop_update_time(loop);
timer->next_timeout = hloop_now_hrtime(loop) + timeout*1000;
heap_insert(&loop->timers, &timer->node); //超時事件加入堆
EVENT_ADD(loop, timer, cb);
loop->ntimers++;
return (htimer_t*)timer;
}
同樣,將超時事件加入loop的堆中
htimer_t* htimer_add_period(hloop_t* loop, htimer_cb cb,
int8_t minute, int8_t hour, int8_t day,
int8_t week, int8_t month, uint32_t repeat) {
if (minute > 59 || hour > 23 || day > 31 || week > 6 || month > 12) {
return NULL;
}
hperiod_t* timer;
HV_ALLOC_SIZEOF(timer);
timer->event_type = HEVENT_TYPE_PERIOD;
timer->priority = HEVENT_HIGH_PRIORITY;
timer->repeat = repeat;
timer->minute = minute;
timer->hour = hour;
timer->day = day;
timer->month = month;
timer->week = week;
timer->next_timeout = (uint64_t)cron_next_timeout(minute, hour, day, week, month) * 1000000;//將周期時間轉為下一次的超時時間
heap_insert(&loop->timers, &timer->node);
EVENT_ADD(loop, timer, cb);
loop->ntimers++;
return (htimer_t*)timer;
}
listen監聽事件
hio_t* nlog_listen(hloop_t* loop, int port) {
s_logger.loop = loop;
//這里實體化了一個servet的socket,并把其listen事件加入到IO事件陣列中,當有客戶端連接時,呼叫on_accept回呼函式
s_logger.listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);
list_init(&s_logger.clients);
hmutex_init(&s_mutex);
return s_logger.listenio;
}
鍵盤事件
hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
hio_t* io = hio_get(loop, fd);
assert(io != NULL);
io->readbuf.base = (char*)buf;
io->readbuf.len = len;
if (read_cb) {
io->read_cb = read_cb;
}
hio_read(io);
return io;
}
其實就是一個io事件,后面再具體分析
下面是用戶自定義事件
for (int i = 0; i < 10; ++i) {
hevent_t ev;
memset(&ev, 0, sizeof(ev));
//定義用戶型別,從HEVENT_TYPE_CUSTOM 開始往上定義
ev.event_type = (hevent_type_e)(HEVENT_TYPE_CUSTOM + i);
ev.cb = on_custom_events;
ev.userdata = (void*)(long)i;
//發送用戶事件
hloop_post_event(loop, &ev);
}
添加了這么多的事件,libhv是怎樣執行的呢?
下面就是真正開始libhv事件回圈的核心
int hloop_run(hloop_t* loop) {
loop->pid = hv_getpid();
loop->tid = hv_gettid();
// intern events
//這里,如果定義了本地相互連接的socket,那么就為讀socket注冊一個讀事件,緩沖區為loop初始化時定義的緩沖區
int intern_events = 0;
if (loop->sockpair[0] != -1 && loop->sockpair[1] != -1) {
hread(loop, loop->sockpair[SOCKPAIR_READ_INDEX], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
++intern_events;
}
#ifdef DEBUG
htimer_add(loop, hloop_stat_timer_cb, HLOOP_STAT_TIMEOUT, INFINITE);
++intern_events;
#endif
loop->status = HLOOP_STATUS_RUNNING;
while (loop->status != HLOOP_STATUS_STOP) { //檢查loop的狀態
if (loop->status == HLOOP_STATUS_PAUSE) {
msleep(HLOOP_PAUSE_TIME); //如果是暫停狀態,就休眠一段時間
hloop_update_time(loop);
continue;
}
++loop->loop_cnt;
//如果沒有激活的事件,且loop的標志為HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS 就退出事件回圈
if (loop->nactives <= intern_events && loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) {
break;
}
//核心:處理事件
hloop_process_events(loop);
//如果只執行一次的話,就退出事件回圈
if (loop->flags & HLOOP_FLAG_RUN_ONCE) {
break;
}
}
loop->status = HLOOP_STATUS_STOP;
loop->end_hrtime = gethrtime_us();
//退出事件回圈時,清理釋放loop的資源
if (loop->flags & HLOOP_FLAG_AUTO_FREE) {
hloop_cleanup(loop);
HV_FREE(loop);
}
return 0;
}
在這里終于看到了loop的flags的作用
if (loop->nactives <= intern_events && loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) {
break;
}
if (loop->flags & HLOOP_FLAG_RUN_ONCE) {
break;
}
if (loop->flags & HLOOP_FLAG_AUTO_FREE) {
hloop_cleanup(loop);
HV_FREE(loop);
}
最后,我們看一下libhv是如何處理事件的
// hloop_process_ios -> hloop_process_timers -> hloop_process_idles -> hloop_process_pendings
//作者注釋,libhv處理事件的順序是IO事件->超時事件->空閑事件 => 要執行的事件
static int hloop_process_events(hloop_t* loop) {
// ios -> timers -> idles
int nios, ntimers, nidles;
nios = ntimers = nidles = 0;
// calc blocktime
int32_t blocktime = HLOOP_MAX_BLOCK_TIME;
if (loop->timers.root) {
hloop_update_time(loop);
//從超時事件堆中,找到最近要執行的超時事件
uint64_t next_min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout;
//計算與當前時間的時間差
int64_t blocktime_us = next_min_timeout - hloop_now_hrtime(loop);
//如果已經超時了,立馬執行
if (blocktime_us <= 0) goto process_timers;
//計算可以阻塞的時間(ms)
blocktime = blocktime_us / 1000;
++blocktime;
blocktime = MIN(blocktime, HLOOP_MAX_BLOCK_TIME);
}
//檢查是否有IO事件
if (loop->nios) {
nios = hloop_process_ios(loop, blocktime);
}
else {
msleep(blocktime);
}
hloop_update_time(loop);
// wakeup by hloop_stop
if (loop->status == HLOOP_STATUS_STOP) {
return 0;
}
process_timers:
//檢查是否有超時事件
if (loop->ntimers) {
ntimers = hloop_process_timers(loop);
}
//查看有沒有需要處理的事件,如果沒有則查看有沒有空間事件
int npendings = loop->npendings;
if (npendings == 0) {
if (loop->nidles) {
nidles= hloop_process_idles(loop);
}
}
//執行需要處理的事件(需要處理的事件其實就是當前需要處理的IO事件,超時事件,空間事件)
int ncbs = hloop_process_pendings(loop);
// printd("blocktime=%d nios=%d/%u ntimers=%d/%u nidles=%d/%u nactives=%d npendings=%d ncbs=%d\n",
// blocktime, nios, loop->nios, ntimers, loop->ntimers, nidles, loop->nidles,
// loop->nactives, npendings, ncbs);
return ncbs;
}
通過上面的原始碼理解分析,大致明白了libhv的事件回圈的程序,總結一下流程就是:

下一次詳細分析hloop_process_events函式中的內容
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/279840.html
標籤:其他
