Lua層訊息處理機制在lualib/skynet.lua,提供大部分Lua層的api(最侄訓呼叫到c層的api),包括啟動一個snlua服務時Lua層的處理,創建新服務,注冊服務協議,如何發送訊息,如何處理對方發過來的訊息等,本篇主要介紹訊息處理機制,從而理解skynet如何實作高并發,
為了簡化,代碼里用到的coroutine_resume,coroutine_yield看成coroutine.resume,coroutine.yield即可,
local coroutine_resume = profile.resume
local coroutine_yield = profile.yield
1. 協程
coroutine.create,創建一個co,唯一的引數是co要執行的閉包f,此時是不會執行閉包f的
coroutine.resume,執行一個co,第一個引數是co的句柄,如果是第一次執行,其他引數是傳遞給閉包f的,co啟動后,一直執行直到它終止或讓出,正常終止,回傳true和閉包f的回傳值;發生錯誤例外終止,則回傳false和錯誤資訊
coroutine.yield,使co暫停,讓出執行權,對應最近的resume會立刻回傳,回傳true和yield的引數,下一次resume同一個co時,會從讓出點繼續執行,此時,yield的呼叫會立刻回傳,回傳值為resume除第一個引數之外的其他引數
參考Lua檔案介紹協程coroutine(簡稱co)的經典例子,可以看出,co可以被不斷的暫停和重啟,skynet廣泛使用co,當發送一個rpc請求時會暫停當前co,等對方回傳時又重啟co,

2. skynet創建協程的方式
先闡述下skynet創建協程(co)的方式,通過co_create(f)這個api創建一個協程,這段代碼非常有意思,為了性能,skynet會把創建的co放到快取里(第9行),當協程執行完流程(閉包f)后不會終止,而是暫停(第10行),當呼叫者呼叫co_create這個api時,如果快取里沒有,通過coroutine.create創建一個co,此時是不會執行閉包f,然后在某個時刻(通常是收到訊息呼叫訊息分發skynet.dispatch_message)會重啟(附帶需要的引數)這個co,co接著執行閉包f(第6行),最后暫停以等待下一次使用,對應最近的resume回傳true和“EXIT”(第10行);如果是一個復用的co,重啟co(第15行,引數是將要執行的閉包f),yield會立刻回傳把閉包賦值給f(第10行),在11行又暫停,同樣在某個時刻會重啟(附帶需要的引數)這個co,co接著執行閉包f(第11行),最后又在第10行暫停等待下一次使用,
1 -- lualib/skynet.lua
2 local function co_create(f)
3 local co = table.remove(coroutine_pool)
4 if co == nil then
5 co = coroutine.create(function(...)
6 f(...)
7 while true do
8 f = nil
9 coroutine_pool[#coroutine_pool+1] = co
10 f = coroutine_yield "EXIT"
11 f(coroutine_yield())
12 end
13 end)
14 else
15 coroutine_resume(co, f)
16 end
17 return co
18 end
推薦一個Skynet視頻講解:https://ke.qq.com/course/2806743?flowToken=1030833,講解詳細還有檔案資料配套學習,新手老牛都可以看看的哦,

3. 如何處理Lua層訊息
了解了co_create的原理后,接下來以服務A向服務B發一條訊息為例說明skynet是如何處理Lua層訊息:
-- A.lua
local skynet = require "skynet"
skynet.start(function()
print(skynet.call("B", "lua", "aaa"))
end)
-- B.lua
local skynet = require "skynet"
require "skynet.manager"
skynet.start(function()
skynet.dispatch("lua", function(session, source, ...)
skynet.ret(skynet.pack("OK"))
end)
skynet.register "B"
end)
在服務啟動最后會呼叫skynet.start,skynet.start呼叫skynet.timeout,在timeout里會創建一個co(12行),稱之為服務的主協程co1,此時co1不會執行
1 -- lualib/skynet.lua
2 function skynet.start(start_func)
3 c.callback(skynet.dispatch_message)
4 skynet.timeout(0, function()
5 skynet.init_service(start_func)
6 end)
7 end
8
9 function skynet.timeout(ti, func)
10 local session = c.intcommand("TIMEOUT",ti)
11 assert(session)
12 local co = co_create(func)
13 assert(session_id_coroutine[session] == nil)
14 session_id_coroutine[session] = co
15 end
定時器被觸發(因為定時器設定是0,所以下一幀就觸發)會向服務發送一條“RESPONSE”型別(PTYPE_RESPONSE=1)的訊息
// skynet-src/skynet_timer.c
static inline void
dispatch_list(struct timer_node *current) {
...
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
...
}
服務收到訊息后,呼叫訊息分發api,由于訊息型別是RESPONSE,最侄訓執行到第7行,重啟主協程co1,執行co1的閉包f(這里是skynet.init_service(start_func)),如果閉包f里沒有暫停的操作,待閉包f成功運行完,co1暫停,resume會回傳true和"EXIT",接下來,第7行就變成,suspend(co, true, "EXIT")
1 -- luablib/skynet.lua
2 local function raw_dispatch_message(prototype, msg, sz, session, source)
3 -- skynet.PTYPE_RESPONSE = 1, read skynet.h
4 if prototype == 1 then
5 local co = session_id_coroutine[session]
6 ...
7 suspend(co, coroutine_resume(co, true, msg, sz))
8 ...
9 end
然后,呼叫suspend,由于型別是"EXIT",做一些清理作業即可,
-- lualib/skynet.lua
function suspend(co, result, command, param, size)
...
elseif command == "EXIT" then
-- coroutine exit
local address = session_coroutine_address[co]
if address then
release_watching(address)
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
session_response[co] = nil
end
...
end
當閉包f里有暫停操作,比如A服務向B服務發送訊息skynet.call("B", "lua", "aaa"),這里分別講解A服務和B服務是如何處理的:
對于A服務:
首先在c層把訊息發送出去(第14行,把訊息push到目的服務的次級訊息佇列),然后暫停co1,resume回傳true,"CALL"和session值
1 -- lualib/skynet.lua
2 local function yield_call(service, session)
3 watching_session[session] = service
4 local succ, msg, sz = coroutine_yield("CALL", session)
5 watching_session[session] = nil
6 if not succ then
7 error "call failed"
8 end
9 return msg,sz
10 end
11
12 function skynet.call(addr, typename, ...)
13 local p = proto[typename]
14 local session = c.send(addr, p.id , nil , p.pack(...))
15 if session == nil then
16 error("call to invalid address " .. skynet.address(addr))
17 end
18 return p.unpack(yield_call(addr, session))
19 end
然后呼叫suspend(co, true, "CALL", session),型別是"CALL",以session為key,co為value保存在session_id_coroutine里,以便當B服務對A的請求回傳后,根據session找到對應的co,從而可以重啟co
1 -- lualib/skynet.lua
2 function suspend(co, result, command, param, size)
3 ...
4 if command == "CALL" then
5 session_id_coroutine[param] = co
6 ...
7 end
當A收到B的回傳訊息時,呼叫訊息分發api,根據session找到對應的co(即主協程co1),從上一次暫停點重啟它,下面這一行代碼yield會立刻回傳,列印出B回傳的結果print(...)(A.lua),此時執行完co1整個流程,回傳true和“EXIT”給suspend,對co1做一些清理作業,
local succ, msg, sz = coroutine_yield("CALL", session)
稍微改一下A.lua,co1執行閉包f流程中通過fork創建一個協程(稱為co2),由于co1沒有暫停,會一直執行完整個流程,此時co2并沒有執行,
1 -- A.lua
2 local skynet = require "skynet"
3
4 skynet.start(function()
5 skynet.fork(function()
6 print(skynet.call("B", "lua", "aaa"))
7 end)
8 end)
1 -- lualib/skynet.lua
2 function skynet.fork(func,...)
3 local args = table.pack(...)
4 local co = co_create(function()
5 func(table.unpack(args,1,args.n))
6 end)
7 table.insert(fork_queue, co)
8 return co
9 end
訊息分發api做的第二件事是處理fork_queue里的co,所以收到定時器發送回來的訊息后做的第二件事是重啟co2,向B服務發送訊息后暫停co2,直到B回傳時再重啟co2,
1 -- lualib/skynet.lua
2 function skynet.dispatch_message(...)
3 ...
4 local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
5 ...
6 end
對于B服務:
收到A服務的訊息后呼叫訊息分發api,創建一個co(第12行),co要執行的閉包f是已注冊的訊息回呼函式p.dispatch(第4行),然后通過resume重啟它(第15行)
1 -- lualib/skynet.lua
2 local function raw_dispatch_message(prototype, msg, sz, session, source)
3 ...
4 local f = p.dispatch
5 if f then
6 local ref = watching_service[source]
7 if ref then
8 watching_service[source] = ref + 1
9 else
10 watching_service[source] = 1
11 end
12 local co = co_create(f)
13 session_coroutine_id[co] = session
14 session_coroutine_address[co] = source
15 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
16 ...
17 end
執行skynet.ret(skynet.pack("OK")),呼叫yield暫停它(第4行),最近的resume回傳,上面第15行變成suspend(co, true, "RETURN", msg, sz)
1 -- lualib/skynet.lua
2 function skynet.ret(msg, sz)
3 msg = msg or ""
4 return coroutine_yield("RETURN", msg, sz)
5 end
當command=="RETURN"時,做兩件事:1. 向源地址(即A服務)發送回傳訊息(第5行);2. 重啟co(第7行),co從skynet.ret回傳,然后B服務的訊息回呼函式(p.dispatch)執行完,co的閉包f全部執行完放入快取中,回傳true和“EXIT“給suspend
1 -- lualib/skynet.lua
2 function suspend(co, result, command, param, size)
3 ...
4 elseif command == "RETURN" then
5 ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
6 ...
7 return suspend(co, coroutine_resume(co, ret))
8 ...
9 end
至此,就是Lua層訊息處理的整個流程,
4. 例外處理
在一些情況下需要做例外處理,比如沒有注冊對應訊息型別的協議,沒有提供訊息回呼函式,執行co程序中發生錯誤等,當一個服務處理一條訊息的程序發生例外,必須要做兩件事:1. 例外終止當前co;2. 通知訊息發送方,而不是讓對方一直忙等待,
當執行co程序中發生錯誤時,resume第一個回傳值是false,呼叫suspend,向對方發送一條PTYPE_ERROR型別訊息(第9行),然后拋出例外,終止當前co(第14行),
1 -- lualib/skynet.lua
2 function suspend(co, result, command, param, size)
3 if not result then
4 local session = session_coroutine_id[co]
5 if session then -- coroutine may fork by others (session is nil)
6 local addr = session_coroutine_address[co]
7 if session ~= 0 then
8 -- only call response error
9 c.send(addr, skynet.PTYPE_ERROR, session, "")
10 end
11 session_coroutine_id[co] = nil
12 session_coroutine_address[co] = nil
13 end
14 error(debug.traceback(co,tostring(command)))
15 end
16 ...
17 end
大部分例外情況下,都會向對方發送一條PTYPE_ERROR型別訊息通知對方,當收到PYTPE_ERROR型別訊息,會呼叫_error_dispatch,把error_source記錄在dead_service里,把error_session記錄在error_queue里
1 -- lualib/skynet.lua
2 local function _error_dispatch(error_session, error_source)
3 if error_session == 0 then
4 -- service is down
5 -- Don't remove from watching_service , because user may call dead service
6 if watching_service[error_source] then
7 dead_service[error_source] = true
8 end
9 for session, srv in pairs(watching_session) do
10 if srv == error_source then
11 table.insert(error_queue, session)
12 end
13 end
14 else
15 -- capture an error for error_session
16 if watching_session[error_session] then
17 table.insert(error_queue, error_session)
18 end
19 end
20 end
在suspend最后會呼叫dispatch_error_queue處理error_queue,通過session查找到正在等待的co,然后強制終止它,保證co不會一直忙等待,
1 -- lualib/skynet.lua
2 local function dispatch_error_queue()
3 local session = table.remove(error_queue,1)
4 if session then
5 local co = session_id_coroutine[session]
6 session_id_coroutine[session] = nil
7 return suspend(co, coroutine_resume(co, false))
8 end
9 end
5. 總結
一次同步的rpc請求的流程如下圖,當一個服務當前co暫停時,可以去執行服務里其他co的流程,N個co之間可以交叉執行,一個co暫停并不會影響其他co的執行,最大化提供計算能力,實作高并發,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/240569.html
標籤:其他
