
從本節開始,我介紹一下如何將相關資料持久化到資料庫,也就是上圖中藍色的部分,
目前的問題
我先運行 6 個傳感器和2 個協調器,這里我使用了批處理檔案:

運行后,看一下 RabbitMQ 的管理控制臺:

注意上面前面幾個 Queue,這些 Queue 就是我們讓傳感器和協調器監聽那兩個 Fanout Exchange 時創建的,因為這兩個 Exchange 不使用路由 Key 來決定接收者,我使用了空字串“”作為這些 Queue 的名稱,而RabbitMQ 就會為它們賦予一個唯一的名字,
因為目前創建的 Queue 都是臨時的,如果我重新啟動系統,RabbitMQ會創建另一套不同的 Queue 來完成作業,這樣的話系統資源就會被慢慢的耗盡,所以這個問題需要解決,
調整 autoDelete 引數
首先修改 tools 包下的 queuetools.go 里面的GetQueue 函式,添加一個 autoDelete 引數:

GetQueue 函式會確保創建一個Queue 從而能接收到訊息,剛創建它的時候,我的意圖是讓它作用于 Direct Exchange 和命名的 Queue,后來我對它進行了擴展使用,也可以應用于匿名的 Queue,
再說一下 autoDelete 引數的作用是:若值為 true,那么如果一個 Queue 它沒有被注冊任何的使用者,這個 Queue 就會被洗掉,針對上述問題中的臨時 Queue,這就是我想要的效果,但是針對傳感器的資料 Queue,我還是希望在系統重啟后,這些 Queue 能夠保留,
所以,我為該函式添加了一個 autoDelete 引數,在創建 Queue 的時候,可以對 autoDelete 進行設定,
有三個呼叫該函式的地方需要調整代碼,先打開 sensors.go:

-
針對傳感器傳送資料的 Queue,我要讓它能夠保留下來,所以 autoDelete 就是 false
-
而 discoveryQueue 是用來監聽協調器的“發現”請求的,我想讓每個傳感器每次上線都會得到一個新的 Queue,這里 autoDelete 就設定為 true,這樣的話 RabbitMQ 就會把舊的 Queue 自動清理掉,
調整 queuelistener.go 里面的呼叫:

這里得到的臨時 Queue 是用來監聽傳感器上線時或回應協調器發現請求時來發布資料 Queue 名稱的,
這里函式呼叫的 autoDelete 引數也設定為 true,從而讓它們可以自動被清除掉,
測驗運行
把之前的 Queue 都刪掉:

然后再運行 5 個傳感器和2 個協調器:

現在又是很多的 Queue,
然后我們再停掉所有的傳感器和協調器:

可以看到傳感器傳送資料的 Queue 被保留了,而其它的臨時 Queue 都自動洗掉掉了,這就是我們想要的效果,
泛化事件資料
到目前為止,系統中只發布了一種型別的事件(接收到傳感器資料時的事件),而且目前還沒有任何使用者監聽這個事件,接下來我們就要完善事件這部分功能了,但首先必須做出一些優化修改,以便能真正滿足需求,
目前 eventaggregator.go 里面包含了所有添加監聽者以及向監聽者發布事件的方法,
但現在的情況是事件的使用者也知道如何自行發布事件,這點不太好,因為它們不需要這樣做,代碼修改如下:

-
為了盡量少的暴露功能,我為事件的使用者創建了 EventRaiser 這個介面,它里面只有一個 AddListener 方法,與已經實作的 AddListener 方法相幾乎完全匹配,
-
但是我把介面里 AddListener 的第二個引數,也就是回呼函式里面的引數型別改為了 interface{},從而可以接收多種型別的資料,
-
相應的,后邊所有涉及事件資料引數的地方都改為 interface{}
現在 EventAggregator 被泛化了,我也可以發布其它型別的事件了,
來到 queuelistener.go,我想在協調器發現資料源之后,發布一個事件:

這個事件的名稱叫做 DataSourceDiscovered,事件資料就是 Queue 的名稱,由于這個引數的型別是 interface{},所以它可以正常的傳遞進去,
創建資料的使用者

目前,我們整個系統的設計一共有三層,而資料源和資料的使用者是通過協調器分開的,這樣做的好處是,關于如何處理訊息的業務邏輯都集中在協調器這一層上面了,而資料源和資料使用者層僅關注它們自身的任務即可,
為了達到這個目的,需要在 coordinator 目錄下創建一個 databaseconsumer.go 檔案:

這個檔案的作用是監聽整個系統發出的事件,并決定哪些事件可以轉發到資料管理包(我一會要建立的),
dataconsumer.go
首先看一下 dataconsumer.go 檔案的內容:

-
第 15 行建立 DatabaseConsumer struct,它有5個欄位:
-
第一個欄位型別是 EventRaiser 介面,該介面只能用于監聽,而不能發布事件,這就是該介面的目的,
-
接下來三個欄位都是與 RabbitMQ 相關的,
-
最后一個欄位是注冊的監聽器的 Queue 名稱的集合,
-
第 23 行,為 DatabaseConsumer 創建一個建構式,它接收 EventRaiser 作為引數,并創建 RabbitMQ 相關的連接、Channel、Queue 為 DatabaseConsumer 各欄位賦值,
-
第 29 行創建 Queue 時用到了一個 Queue 的名稱,這個 Queue 是用來做持久化的,它是眾所周知的,它的名稱存放在 queuetools.go 檔案里:

-
第 31 行就是監聽資料源被發現的事件,回呼函式的引數型別是空介面(其實就是事件的名稱),在回呼函式內,呼叫我隨后要建立的 SubscribeToDataEvent 方法,把 eventData 轉化為字串傳遞進去,
下面看一看 SubscribeToDataEvent 方法:

-
該方法的引數是事件的名稱,
-
第 39 行,對已注冊的監聽器進行遍歷,如果傳進來的事件名稱已注冊,return 即可,
-
否則的話,需要注冊這個資料源,這個事件的名稱是 MessageReceived_+Queue 的名稱,
-
第 45 行的回呼函式,我將傳入一個立即執行的匿名函式,它會回傳我們真正需要使用的回呼函式,也就是閉包,這種做法的好處就是回傳的函式可以捕獲其被定義的作用域內的變數,這樣的話真正的回呼函式就可以擁有一些可持續的“狀態”(也就是 prevTime 和 buf),這里我的需求是至少要間隔 5 秒鐘以上,才記錄一次(到資料庫),
-
回呼函式內其它的邏輯都很簡單,就不逐行介紹了,
-
第 67 行,發布訊息使用的是 Default Exchange,并路由到持久化的那個 Queue,
修改 queuelistener.go 里面的建構式

讓其傳入 EventAggregator 作為引數并賦值給 QueueListener 的 ea 欄位,
修改協調器的 main 函式

-
創建包級共享的 DatabaseConsumer 變數,在 main 里用建構式進行創建并賦值,
-
創建 EventAggregator,并傳遞給 DatabaseConsumer 和 QueueListener,讓他們共享同一個 EventAggregator,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/274357.html
標籤:其他
