主頁 > 軟體設計 > skynet原始碼分析之lua層訊息處理

skynet原始碼分析之lua層訊息處理

2020-12-26 12:43:51 軟體設計

Lua層訊息處理機制在lualib/skynet.lua,提供大部分Lua層的api(最侄訓呼叫到c層的api),包括啟動一個snlua服務時Lua層的處理,創建新服務,注冊服務協議,如何發送訊息,如何處理對方發過來的訊息等,本篇主要介紹訊息處理機制,從而理解skynet如何實作高并發,

為了簡化,代碼里用到的coroutine_resume,coroutine_yield看成coroutine.resume,coroutine.yield即可,

local coroutine_resume = profile.resume
local coroutine_yield = profile.yield

1. 協程

coroutine.create,創建一個co,唯一的引數是co要執行的閉包f,此時是不會執行閉包f的

coroutine.resume,執行一個co,第一個引數是co的句柄,如果是第一次執行,其他引數是傳遞給閉包f的,co啟動后,一直執行直到它終止或讓出,正常終止,回傳true和閉包f的回傳值;發生錯誤例外終止,則回傳false和錯誤資訊

coroutine.yield,使co暫停,讓出執行權,對應最近的resume會立刻回傳,回傳true和yield的引數,下一次resume同一個co時,會從讓出點繼續執行,此時,yield的呼叫會立刻回傳,回傳值為resume除第一個引數之外的其他引數

參考Lua檔案介紹協程coroutine(簡稱co)的經典例子,可以看出,co可以被不斷的暫停和重啟,skynet廣泛使用co,當發送一個rpc請求時會暫停當前co,等對方回傳時又重啟co,

2. skynet創建協程的方式

先闡述下skynet創建協程(co)的方式,通過co_create(f)這個api創建一個協程,這段代碼非常有意思,為了性能,skynet會把創建的co放到快取里(第9行),當協程執行完流程(閉包f)后不會終止,而是暫停(第10行),當呼叫者呼叫co_create這個api時,如果快取里沒有,通過coroutine.create創建一個co,此時是不會執行閉包f,然后在某個時刻(通常是收到訊息呼叫訊息分發skynet.dispatch_message)會重啟(附帶需要的引數)這個co,co接著執行閉包f(第6行),最后暫停以等待下一次使用,對應最近的resume回傳true和“EXIT”(第10行);如果是一個復用的co,重啟co(第15行,引數是將要執行的閉包f),yield會立刻回傳把閉包賦值給f(第10行),在11行又暫停,同樣在某個時刻會重啟(附帶需要的引數)這個co,co接著執行閉包f(第11行),最后又在第10行暫停等待下一次使用,

 1 -- lualib/skynet.lua
 2 local function co_create(f)
 3     local co = table.remove(coroutine_pool)
 4     if co == nil then
 5         co = coroutine.create(function(...)
 6             f(...)
 7             while true do
 8                 f = nil
 9                 coroutine_pool[#coroutine_pool+1] = co
10                 f = coroutine_yield "EXIT"
11                 f(coroutine_yield())
12             end
13         end)
14     else
15         coroutine_resume(co, f)
16     end
17     return co
18 end

推薦一個Skynet視頻講解:https://ke.qq.com/course/2806743?flowToken=1030833,講解詳細還有檔案資料配套學習,新手老牛都可以看看的哦,

3. 如何處理Lua層訊息

了解了co_create的原理后,接下來以服務A向服務B發一條訊息為例說明skynet是如何處理Lua層訊息:

-- A.lua
local skynet = require "skynet"

skynet.start(function()
    print(skynet.call("B", "lua", "aaa"))
end)
-- B.lua
local skynet = require "skynet"
require "skynet.manager"

skynet.start(function()
    skynet.dispatch("lua", function(session, source, ...)
        skynet.ret(skynet.pack("OK"))
    end)
    skynet.register "B"
end)

在服務啟動最后會呼叫skynet.start,skynet.start呼叫skynet.timeout,在timeout里會創建一個co(12行),稱之為服務的主協程co1,此時co1不會執行

 1  -- lualib/skynet.lua
 2  function skynet.start(start_func)
 3      c.callback(skynet.dispatch_message)
 4      skynet.timeout(0, function()
 5          skynet.init_service(start_func)
 6      end)
 7  end
 8  
 9  function skynet.timeout(ti, func)
10      local session = c.intcommand("TIMEOUT",ti)
11      assert(session)
12      local co = co_create(func)
13      assert(session_id_coroutine[session] == nil)
14      session_id_coroutine[session] = co
15  end

定時器被觸發(因為定時器設定是0,所以下一幀就觸發)會向服務發送一條“RESPONSE”型別(PTYPE_RESPONSE=1)的訊息

// skynet-src/skynet_timer.c
static inline void
dispatch_list(struct timer_node *current) {
    ...
    message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
    ...
}

服務收到訊息后,呼叫訊息分發api,由于訊息型別是RESPONSE,最侄訓執行到第7行,重啟主協程co1,執行co1的閉包f(這里是skynet.init_service(start_func)),如果閉包f里沒有暫停的操作,待閉包f成功運行完,co1暫停,resume會回傳true和"EXIT",接下來,第7行就變成,suspend(co, true, "EXIT")

1 -- luablib/skynet.lua
2 local function raw_dispatch_message(prototype, msg, sz, session, source)
3     -- skynet.PTYPE_RESPONSE = 1, read skynet.h
4     if prototype == 1 then
5         local co = session_id_coroutine[session]
6         ...
7         suspend(co, coroutine_resume(co, true, msg, sz))
8     ...
9 end

然后,呼叫suspend,由于型別是"EXIT",做一些清理作業即可,

-- lualib/skynet.lua
function suspend(co, result, command, param, size)
    ...
    elseif command == "EXIT" then
        -- coroutine exit
        local address = session_coroutine_address[co]
        if address then
            release_watching(address)
            session_coroutine_id[co] = nil
            session_coroutine_address[co] = nil
            session_response[co] = nil
        end
    ...
end

當閉包f里有暫停操作,比如A服務向B服務發送訊息skynet.call("B", "lua", "aaa"),這里分別講解A服務和B服務是如何處理的:

對于A服務:

首先在c層把訊息發送出去(第14行,把訊息push到目的服務的次級訊息佇列),然后暫停co1,resume回傳true,"CALL"和session值

 1 -- lualib/skynet.lua
 2 local function yield_call(service, session)
 3     watching_session[session] = service
 4     local succ, msg, sz = coroutine_yield("CALL", session)
 5     watching_session[session] = nil
 6     if not succ then
 7         error "call failed"
 8     end
 9     return msg,sz
10 end
11 
12 function skynet.call(addr, typename, ...)
13     local p = proto[typename]
14     local session = c.send(addr, p.id , nil , p.pack(...))
15     if session == nil then
16         error("call to invalid address " .. skynet.address(addr))
17     end
18     return p.unpack(yield_call(addr, session))
19 end

然后呼叫suspend(co, true, "CALL", session),型別是"CALL",以session為key,co為value保存在session_id_coroutine里,以便當B服務對A的請求回傳后,根據session找到對應的co,從而可以重啟co

1 -- lualib/skynet.lua
2 function suspend(co, result, command, param, size)
3     ...
4     if command == "CALL" then
5         session_id_coroutine[param] = co
6     ...
7 end

當A收到B的回傳訊息時,呼叫訊息分發api,根據session找到對應的co(即主協程co1),從上一次暫停點重啟它,下面這一行代碼yield會立刻回傳,列印出B回傳的結果print(...)(A.lua),此時執行完co1整個流程,回傳true和“EXIT”給suspend,對co1做一些清理作業,

local succ, msg, sz = coroutine_yield("CALL", session)

稍微改一下A.lua,co1執行閉包f流程中通過fork創建一個協程(稱為co2),由于co1沒有暫停,會一直執行完整個流程,此時co2并沒有執行,

1 -- A.lua
2 local skynet = require "skynet"
3 
4 skynet.start(function()
5     skynet.fork(function()
6         print(skynet.call("B", "lua", "aaa"))
7     end)
8 end)
1 -- lualib/skynet.lua
2 function skynet.fork(func,...)
3     local args = table.pack(...)
4     local co = co_create(function()
5         func(table.unpack(args,1,args.n))
6     end)
7     table.insert(fork_queue, co)
8     return co
9 end

訊息分發api做的第二件事是處理fork_queue里的co,所以收到定時器發送回來的訊息后做的第二件事是重啟co2,向B服務發送訊息后暫停co2,直到B回傳時再重啟co2,

1 -- lualib/skynet.lua
2 function skynet.dispatch_message(...)
3     ...    
4     local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
5     ...
6 end

對于B服務:

收到A服務的訊息后呼叫訊息分發api,創建一個co(第12行),co要執行的閉包f是已注冊的訊息回呼函式p.dispatch(第4行),然后通過resume重啟它(第15行)

 1 -- lualib/skynet.lua
 2 local function raw_dispatch_message(prototype, msg, sz, session, source)
 3     ...    
 4     local f = p.dispatch
 5     if f then
 6         local ref = watching_service[source]
 7         if ref then
 8             watching_service[source] = ref + 1
 9         else
10             watching_service[source] = 1
11         end
12             local co = co_create(f)
13        session_coroutine_id[co] = session
14             session_coroutine_address[co] = source
15             suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
16     ...
17 end

執行skynet.ret(skynet.pack("OK")),呼叫yield暫停它(第4行),最近的resume回傳,上面第15行變成suspend(co, true, "RETURN", msg, sz)

1 -- lualib/skynet.lua
2 function skynet.ret(msg, sz)
3     msg = msg or ""
4     return coroutine_yield("RETURN", msg, sz)
5 end

當command=="RETURN"時,做兩件事:1. 向源地址(即A服務)發送回傳訊息(第5行);2. 重啟co(第7行),co從skynet.ret回傳,然后B服務的訊息回呼函式(p.dispatch)執行完,co的閉包f全部執行完放入快取中,回傳true和“EXIT“給suspend

1 -- lualib/skynet.lua
2 function suspend(co, result, command, param, size) 
3     ...     
4     elseif command == "RETURN" then
5         ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
6         ...
7         return suspend(co, coroutine_resume(co, ret))
8     ...
9 end

至此,就是Lua層訊息處理的整個流程,

4. 例外處理

在一些情況下需要做例外處理,比如沒有注冊對應訊息型別的協議,沒有提供訊息回呼函式,執行co程序中發生錯誤等,當一個服務處理一條訊息的程序發生例外,必須要做兩件事:1. 例外終止當前co;2. 通知訊息發送方,而不是讓對方一直忙等待,

當執行co程序中發生錯誤時,resume第一個回傳值是false,呼叫suspend,向對方發送一條PTYPE_ERROR型別訊息(第9行),然后拋出例外,終止當前co(第14行),

 1 -- lualib/skynet.lua
 2 function suspend(co, result, command, param, size)
 3     if not result then
 4         local session = session_coroutine_id[co]
 5         if session then -- coroutine may fork by others (session is nil)
 6             local addr = session_coroutine_address[co]
 7             if session ~= 0 then
 8                 -- only call response error
 9                 c.send(addr, skynet.PTYPE_ERROR, session, "")
10             end
11             session_coroutine_id[co] = nil
12             session_coroutine_address[co] = nil
13         end
14         error(debug.traceback(co,tostring(command)))
15     end
16     ...
17 end

大部分例外情況下,都會向對方發送一條PTYPE_ERROR型別訊息通知對方,當收到PYTPE_ERROR型別訊息,會呼叫_error_dispatch,把error_source記錄在dead_service里,把error_session記錄在error_queue里

 1 -- lualib/skynet.lua
 2 local function _error_dispatch(error_session, error_source)
 3     if error_session == 0 then
 4         -- service is down
 5         --  Don't remove from watching_service , because user may call dead service
 6         if watching_service[error_source] then
 7              dead_service[error_source] = true
 8         end
 9         for session, srv in pairs(watching_session) do
10             if srv == error_source then
11                 table.insert(error_queue, session)
12             end
13         end
14     else
15         -- capture an error for error_session
16         if watching_session[error_session] then
17             table.insert(error_queue, error_session)
18         end
19     end
20 end

在suspend最后會呼叫dispatch_error_queue處理error_queue,通過session查找到正在等待的co,然后強制終止它,保證co不會一直忙等待,

1 -- lualib/skynet.lua
2 local function dispatch_error_queue()
3     local session = table.remove(error_queue,1)
4     if session then
5         local co = session_id_coroutine[session]
6         session_id_coroutine[session] = nil
7         return suspend(co, coroutine_resume(co, false))
8     end
9 end

5. 總結

一次同步的rpc請求的流程如下圖,當一個服務當前co暫停時,可以去執行服務里其他co的流程,N個co之間可以交叉執行,一個co暫停并不會影響其他co的執行,最大化提供計算能力,實作高并發,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/240569.html

標籤:其他

上一篇:linux imx8mm 漏中斷問題 & linux遙控器驅動除錯流程

下一篇:騰訊T1~T9工程師技術剖析以及評定標準和薪資待遇(各大廠同)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more