現在,我們需要模擬傳感器,生成資料,并發布到 RabbitMQ,
建立傳感器專案
在 GOPATH src 下建立檔案夾 sensors,使用 go mod init 初始化,并創建 main.go,
同時別忘了安裝 amqp 的包:go get -u github.com/streadway/amqp
我們要生成一些模擬資料,生成資料有一定的范圍(位于一個最大值和最小值之間),如下圖:

因此,我們需要這樣幾個配置引數:
-
傳感器的名稱
-
傳感器資料的更新頻率
-
模擬生成資料的最大值
-
模擬生成資料的最小值
-
與前一次生成資料的差值的最大值(變化幅度的最大值)
設定命令列引數并讀取
在這個專案里,我們需要通過命令列引數來傳遞配置,并在 Go 程式里面進行決議和讀取,我們可以使用 os.Args 來搞這些命令列引數,但是更好的辦法是使用 flag 這個包(其內部實作使用的也是 os.Args),
我們先看代碼:

-
第 5-9 行,我們宣告了 5 個命令列引數,都是使用 flag 包下相應的函式實作的,
-
這幾個命令列引數分別表示傳感器名稱、模擬資料的更新頻率、模擬資料的最大值、最小值以及變化幅度的最大值,
-
這些命令列引數的型別分別是 string,uint,float64,float64,float64,
-
這些函式的引數都類似:
-
第一個引數是命令列引數的名稱
-
第二個引數是命令列引數的默認值
-
第三個引數是引數的描述/幫助
-
在 main 函式里,我們呼叫 flat.Parse() 函式,就可以將命令列的引數值決議到 5-9 行宣告變數里面,
我們測驗一下,命令列輸入 go run . --help,其結果如下:

生成模擬資料
要生成模擬傳感器的資料,需要使用到 math/rand 和 time 這兩個包,
先看代碼:

-
第 17 行,我們需要一個 *rand.Rand 型別來生成亂數,它又需要一個源,這里使用 time.Now().UnixNano() 生成源,這樣做的好處是因為這個時間納秒數永遠不會重復,
-
第 19 行,宣告 value,它表示傳感器的數值,在這先生成一個初始值,
-
第 20 行,是額定值,在這里也就是最大值最小值的中間平均值,
-
第 25 行,把更新頻率(每秒更新的次數)轉化為了兩次更新之間的時間間隔(毫秒),并決議成 time.Duration 這個型別,
-
第 26 行,time.Tick 函式會回傳一個 time 的 Channel,該函式會按照提供的時間間隔不斷觸發,并向這個 Channel 發送當前時間,
-
第 28 行,使用 for range 來處理 signal 這個 Channel,每次 Channel 中有資料傳遞過來,我們就使用 calcValue 這個函式來生成新的模擬資料,
-
第 29 行,把生成的最新資料列印一下即可,
calcValue 函式
生成模擬資料的邏輯是如果資料偏離額定值,那么盡量讓下次生成的值向額定值靠攏,
這部分可根據自己的特定需求來實作,不必和我的相同,
先看代碼:

-
第 35 行,宣告了 maxStep 和 minStep 兩個變數,表示本次更新相比上次所能夠發生的最大變化和最小變化幅度,
-
第 36 - 42 行,區分當前值大于額定值或小于額定值兩種情況,按不同的邏輯得出 maxStep 和 minStep
-
第 44 行,使用 maxStep 和 minStep 以及亂數生成新的 value 資料,
運行 sensors 專案
使用 go run . 運行,命令列引數使用默認值即可:

一切正常的話,它就會每秒鐘生成 5 次資料,
如何運行多個傳感器
生產環境中,通常會接收來自多個傳感器的資料,
這里,我們讓每個傳感器都設定自己的路由 Key,所以 RabbitMQ 將會為每個 Key 創建一個 Queue:

但是這也會引起問題,就是之前章節里面的那個協調程式如何發現這些傳感器呢?
首先,我們可以讓每個傳感器使用路由 Key 向一個所有傳感器和協調程式都知曉的路徑中發送一個訊息,但這只能解決問題的一半,另一半我們以后再說,
將傳感器資料發布到 RabbitMQ
創建傳感器的訊息型別
這里會使用到 encoding/gob 包,
看代碼:

-
在 sensors 包中創建 model 包,并建立 models.go 檔案,
-
在 models.go 的第 12 行,建立 SensorMessage 作為傳感器傳遞訊息的型別,里面包含三個欄位分別是傳感器名稱、數值和時間戳,
-
很顯然我們不能把 Go 的 struct 型別直接扔到 RabbitMQ 里面,但我們專案中的各種客戶端只涉及到 Go 語言,所以在這里我使用 Go 語言的 gob 來對訊息進行編碼,這樣會更高效一些,如果這個專案是跨語言的我可能會使用 JSON 或 Protocol Buffers,
-
在 model 包的 init 函式里面,需要使用 gob 包的 Register 函式把將要編碼的型別進行注冊,這樣依賴于這個包的其它 Go 程式就可以把 SensorMessage 這個型別的訊息物件發送過去了
建立 Queue 相關的工具包
建立 tools 包,并建立 queuetools.go 檔案,其內容如下:

代碼內容與之前的專案類似,就不解釋了,
發布傳感器資料到 RabbitMQ
這里還會使用到 bytes 包,
回到 main.go,修改代碼:

-
前面添加了獲取 Channel 和 Queue 的代碼,其中第 37 行比較重要,因為我們不能保證在程式運行時,使用 Queue 名稱作為路由 Key 的 Queue 存在,而使用 GetQueue 函式,就可以保證這個 Queue 會被正確的設定,并準備好被我們使用了,
-
第 42 行,使用 bytes 包創建了一個 *bytes.Buffer,它用來來承載編碼后的資料,這個 Buffer 可以重復利用,所以實在 for range 的外部宣告的,
-
但是每次使用 Buffer 都需要進行重置,也就是第 53 行的作用,這樣以前的資料就會被移除,Buffer 的指標會回到初始位置,
-
第 43 行,使用 gob 和 Buffer 來創建編碼器 ,
-
第 54 行,使用 編碼器的 Encode 方法對訊息進行編碼,
-
第 56 行,創建要發送給 RabbitMQ 的訊息(amqp.Publishing 型別),這里只需要填寫 Body 欄位即可,其它的欄位根據自己的需求選填即可,
-
第 60 行,使用 Channel 來發布訊息,這里使用的是默認的 Exchange,路由 key 就是 Queue 的名字,最后一個引數就是發布的訊息,
運行程式
運行 sensors 包:

打開控制臺:

可以看到發送頻率確實是每秒 5 次,
打開 sensor Queue:

目前已經有 384 條訊息了,都沒有被發送,
隨便點開一個訊息查看其內容:

可以看到 Body 應該是 Base64 編碼的,因為 gob 編碼器使用的是二進制訊息格式,盡可能的高效,所以在控制臺里面它沒有一個有意義的表述展示,
然后,先停止運行程式,
傳感器上線時通知協調程式
最后我們就來處理上面那個問題:當傳感器上線的時候,得讓協調程式知道,并發送資料,
因為每個傳感器都創建了一個自己的 Queue,所以在沒有幫助的情況下,協調程式將無法有效知道這些傳感器,
這個問題實際上具體需要做兩件事,我們先來做第一件事:
多個傳感器他們 Queue 的名稱是不一樣的,是動態的,所以我們需要一個大家都知道的 Queue,它用來將每個新創建的傳感器的 Queue 名稱發送給協調程式,
首先,在 queuetools.go 里面添加這個 Queue 的名稱,使用一個常量保存:

然后,在 main.go 里,使用這個名稱創建一個 Queue,并將傳感器的 Queue 的名稱發布上去:

再次運行 sensor 包
打開控制臺:

可以看到 SensorList Queue 出現了,
進入到 SensorList Queue,看它的 Message:

可以看到當前這一個傳感器的名字 sensor 就在里面,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/270591.html
標籤:Go
上一篇:Golang開發必須了解的細節!
下一篇:NetCore的快取使用詳例
