
到目前為止,我一直專注于如何讓訊息進出訊息代理,也就是RabbitMQ,
實際上,我們可以繼續使用 RabbitMQ 和它的 Exchanges 來連接這個應用程式的其他部分,但是我想探索一個稍微不同的模型:我想使用協調器來跟蹤哪些型別的消費者得到訊息通知,
這樣的話,我斷開了傳感器資料生成器和資料使用者之間的連接,
同時為了處理這些資料通信,我決定使用事件(event)來通知用戶系統中正在發生的事情,并讓他們決定是否要處理資料,
其原理大致如下:
-
在協調器內部,我們有構建好的 QueueListener,
-
我還需要構建另外一個型別,我叫它 EventAggregator,
-
來自RabbitMQ 的訊息,它將通過一個異步的goroutine 進入QueueListener
-
goroutine 將把訊息傳輸到一個事件物件(event object)中,并通過事件聚合物件(event aggregation object)進行廣播,
-
該物件將維護任何對事件感興趣的使用者的注冊表,并向其發送事件物件的副本,
-
這使我們能夠通過將資料轉儲到下游的 Queue 來為這些事件注冊其他應用程式,但它也可以讓使用者能夠在協調器內部進行設定,例如日志系統,
-
最后,如果使用者最終要通過 Queue 將資料發送到另一個應用程式,則可以對其進行預處理,以添加有用的附加資料,而最終使用者不必知道這些附加資訊是如何到達那里的,
撰寫代碼
創建 EventAggregator
在 coordinator 目錄下添加 eventaggregator.go,代碼如下:

-
第 28 行,建立 EventData struct,目前它的欄位碰巧和 SensorMessage 是一樣的,但是兩個 struct 的職責不同,所以我們不復用 SensorMessage,而是單獨建立 EventData,以便它們以后可以獨立的進化;
-
第 5 行,建立了 EventAggregator struct,也就是事件聚合,它只有一個 listeners 欄位,是一個 map,它的 key 是事件的名稱,它的值是回呼函式的集合,當事件發生的時候,EventAggregator 就輪流呼叫為該事件注冊的回呼函式;
-
第 9 行,就是 EventAggregator 的建構式;
-
第 16 行,AddListener 方法,使用者通過該方法可以向 EventAggregator 注冊回呼函式;
-
第 20 行,PublishEvent 方法用來發布事件,它接收事件名稱和事件的資料作為引數,這里需要判斷 EventAggregator 里是否已經注冊了該事件,如果注冊了,那么遍歷其對應的回呼函式,并使用事件資料進行呼叫,
-
呼叫回呼函式時,使用的不是 EventData 的指標,而是 EventData 的副本,這可以保證使用者不會把事件資料搞亂,影響其它使用者
-
取消訂閱的功能我就不做了,
把 EventAggregator 連接到 QueueListener
打開 queuelistener.go,添加代碼:

-
第19 行,在QueueListener struct 里面添加欄位ea,型別是 *EventAggregator;
-
第 25 行,在 QueueListener 的建構式里為 ea 自讀賦初始值,

在 AddListener 方法里,原來只是把原始資料列印到控制臺,現在添加如下代碼:
-
創建一個 EventData,其欄位內容目前和傳感器的訊息內容一樣;
-
使用 QueueListener 上的 EventAggregator 發布事件:
-
事件的名稱是 MessageReceived_傳感器名稱
-
第二個引數就是事件資料
發現早已運行的傳感器
最后我們要做的就是如何讓協調器發現在協調器上線前就已經在運行的傳感器,

目前我們的做法是這樣的:首先協調器先運行,然后傳感器在上線的時候立即把它們的資料Queue 發送過去,使用的是 Fanout Exchange,這樣多個協調器都可以被通知到,
但是,如果傳感器先運行,協調器后運行,那么協調器就無法知道傳感器的存在,為了解決這個問題,我這樣做:
-
我在訊息代理中也就是 RabbitMQ 里,建立一個新的 Exchange,它是一個 Fanout Exchange,它和其它資訊流的方向正好相反,
-
在這里,協調器將會向這個 Fanout Exchange 發出一個“發現”請求,這個資訊將會發送給所有的傳感器,
-
傳感器接收到這個“發現”請求資訊后,將會回應,將它們的資料 Queue 的名稱發送給我們以前建立的那個 Fanout Exchange(中間黃色的),
-
這里會出現一些冗余的資訊,但協調器里有過濾機制,所以就這樣吧,
我們首先測驗一下先運行傳感器專案,再運行協調器專案的效果:

可以看到,協調器運行起來以后,沒有接收到該傳感器的資料,
修改 queuetools
我們要解決的就是這個問題,下面看代碼,首先看 queuetools.go:

這里改動不多,就是把要新建立的 Fanout Exchange 的名稱作為常量存在這里,
注意之前在這里定義的 SensorListQueue 已經不需要了,可以刪掉,
修改 queuelistener
然后看 queuelistener.go,在這里為 QueueListener 添加一個DiscoverSensors 方法:

該方法中首先我使用了 ExchangeDeclare 方法來宣告這個新的 Exchange,并進行設定,
雖然專案中還沒用過這個方法,但是里面大多數引數的作用你應該能夠猜得出來:
-
name:Exchange 的名稱
-
kind:Exchange 的型別,可以是 direct、topic、header 或者 fanout,這里使用 fanout
-
durable:表示這個 Exchange 是否可持久
-
autoDelete:表示在沒有系結的情況下是否洗掉 Exchange
-
internal:這個引數我們還沒見過,如果想拒絕外部的發布請求,就把這個設為 true,這可以在高級場景中使用,在高級場景中,Exchange 系結在一起,在訊息代理中形成更復雜的拓撲,
-
noWait 和 args 就不介紹了,
現在,協調器可以向這個 Exchange 發布訊息了,而我們只需要向它發送一個訊息即可,并沒有什么具體的內容要發送,所以我發布了一個空的 Publishing,這就可以告訴瀏覽器我在尋找它們了,
修改傳感器
下面我們讓傳感器(sensor.go)對上面發布的“發現”請求進行回應,不過首先,需要重構一下,
把 main 函式里面當傳感器上面時,發布資料 Queue 名稱那部分代碼提取出來放在單獨的一個函式里面:

然后在 main 函式相應的位置進行呼叫:

-
第 39 行,對重構的函式進行呼叫,
-
第 41 行,創建一個 Queue
-
第 42 行,使用 QueueBind 方法將這個 Queue 和 SensorDiscovery Exchange
-
第 48 行,創建goroutine 運行一個將要新建的函式 listenForDiscoveryRequests,通過使用 goroutine,無論當請求什么時候進來,這部分邏輯都將可用,而且不會阻塞系統的其余部分,這里需要傳入 Queue 的名稱和 Channel,
然后看一下 listenForDiscoveryRequests 函式:

這里使用 Channel 的 Consume 方法對 Channel進行設定以便能接收“發現”請求,
然后用 for range 來接收“發現”請求,這里忽略訊息本身即可,因為該訊息就是一個觸發而已,當訊息進來時,呼叫剛剛重構出來的 publishQueueName 函式即可,
在 queuelistener 里呼叫發現方法
在 queuelistener.go 的 ListenForNewSource 方法里,在如下位置呼叫 DiscoverSensors 方法:

為什么在這里呼叫?因為這是可以保證協調器正在監聽傳感器路由的訊息的第一個地方,
運行測驗
先運行一個傳感器,然后在運行協調器:

傳感器這里我使用了 freq 引數,讓其每兩秒鐘生成一個資料,
可以看到,在這種情況下協調器也可以發現已經運行的傳感器并接收資料了,
你可以運行多個傳感器和多個協調器,應該也會好用的,
這也是一種非常簡單的分布式應用吧,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/272437.html
標籤:其他
下一篇:面向物件編程
