本文首發微信公眾號:碼上觀世界
Part 1 物聯網概述
1. 物聯網概念
物聯網是指通過各種資訊傳感器、射頻識別技術、全球定位系統、紅外感應器、激光掃描器等各種裝置與技術,實時采集任何需要監控、 連接、互動的物體或程序,采集其聲、光、熱、電、力學、化 學、生物、位置等各種需要的資訊,通過各類可能的網路接入,實作物與物、物與人的泛在連接,實作對物品和程序的智能化感知、識別和管理,
物聯網是一個基于互聯網、傳統電信網等的資訊承載體,它讓所有能夠被獨立尋址的普通物理物件形成互聯互通的網路,
2. 物聯網發展的問題與瓶頸
(1)技術標準的統一與協調,目前,傳統互聯網的標準并不適合物聯網,物聯網感知層的資料多源異構,不同的設備有不同的介面,不同的技術標準;網路層、應用層也由于使用的網路型別不同、行業的應用方向不同而存在不同的網路協議和體系結構,建立的統一的物聯網體系架構,統一的技術標準是物聯網現在正在面對的難題
(2)碎片化,由于終端設備的多樣化,物聯網的開發和應用存在較嚴重的碎片化問題,主要體現在以下幾個方面:
電氣介面的碎片化,傳統電氣介面分為模擬信號、數字信號的傳輸模式,數字信號又有無數種通訊協議,所以電器介面訪問問題給處理器編程帶來了繁重任務,集成化的通訊模塊的電氣介面也同樣存在問題;
終端傳感器的訪問協議碎片化,每個傳感器的配置、訪問方式和通訊協議均不同,也需要進行重復的編程和配置;
終端通訊接入方式碎片化,物聯網終端可能是有線網路接入或者總線接入,也可能無線網路接入,而無線網路接入協議眾多,如近距離的藍牙、超寬帶,中距離的zigbee WIFI,還有傳統廣域的2G、4G接入以及近年來興起的Lora、NB-IoT等;
處理器碎片化,處理器紛繁復雜,對應不同的處理器,需要進行不同的板級配置,沒有統一標準的板級硬體;
物聯網平臺的碎片化,由于物聯網到終端的通訊協議沒有統一,物聯網平臺傳輸協議各不相同,傳入不同的云服務器需要進行重復的編程作業,

3. 物聯網通訊協議
在物聯網應用中,通信技術包括Wi-Fi、RFID、NFC、ZigBee、Bluetooth、LoRa、NB-IoT、GSM、GPRS、3/4/5G網路、Ethernet、RS232、RS485、USB等,物聯網技術框架體系中所使用到的通訊協議主要有:AMQP、JMS、REST、HTTP/HTTPS、COAP、DDS 、MQTT等,
由于物聯網設備受限于作業環境和其特殊用途,使其應用有特別的要求,比如因為需要大量部署終端,對設備價格和維護的成本要求不能過高,終端設備本身資源占用極低,對網路帶寬、資料傳輸大小、電源消耗都要求苛刻,另外安全性也是物聯網設備的重要考慮方面,因此為了滿足物聯網設備的作業要求,除了在硬體上保證設備的低功耗、安全與輕量,在軟體上也同樣需要新的網路通信協議和安全保障,
根據Eclipse 基金會的調查,MQTT是物聯網解決方案中最常用的訊息傳遞協議,詳情見下圖:

其中MQTT協議是20世紀90年代中期,IBM為幫助石油和天然氣公司客戶實作數千英里長的石油和天然氣管道的無人值守監控,將管道上的傳感器資料通過衛星通信傳輸到監控中心而設計的資料傳輸協議,

當時設計資料傳輸協議面臨的場景和需要解決的問題是:
石油天然氣管道線路非常長,要接許多沿線的資料采集網關,服務器要能接成千上萬個通信客戶端;
石油管道傳感器的資料采集頻率不高,而且每次傳輸資料量不大,不需要傳輸大量資料;
現場采集網關由于量大,考慮到采購成本,CPU和存盤等計算資源都很有限,協議客戶端軟體要能在CPU和存盤等計算資源都很有限的單片機、單板機、RTU等上運行,并能方便的實作移植到不同的硬體上;
通信流量費用高昂,需要最大限度地減少帶寬和傳輸訊息大小,石油管道會穿越很多無人區,附近沒有網路設施,因此只能使用衛星通訊,但衛星鏈路帶寬低(當然也有高帶寬的),通信流量費用高昂,因此需要盡量節省傳輸的訊息的流量開銷;
高軌道的GEO衛星站得高看得遠,覆寫范圍廣,但軌道高延遲就大了,中低軌道的LEO/MEO衛星延遲小,但是覆寫區域有限,每天都會出現衛星切換時的網路中斷,因此需要客戶端和服務器端都能夠保留訊息收發狀態,在網路恢復正常后繼續發送;
有些資料發送失敗,不需要重發,但是有些訊息比如閥門泄露告警或控制石油管道閥門的命令,就必須要在網路有問題的情況下也要能確保發送成功,因此需要在環境允許的情況下,提供不同等級的“服務質量”,
經過10多年的發展,MQTT協議v3.1.1版已于2014年7月被OASIS組織接納為國際標準,除了MQTT協議外,還有其他常用協議,比如CoAP、WebSocket和HTTP,因為HTTP協議較重,本文不做介紹,這里只簡單比較下MQTT跟CoAP和WebSocket的區別,
MQTT基于TCP的長連接,Clients必須要知道訊息格式才能通信,是多個客戶端通過中央代理進行訊息傳遞的多對多協議,CoAP(受限制的應用協議,Constrained Application Protocol)基于UDP的無連接,采用與HTTP協議相同的請求回應作業模式,CoAP內置發現支持和內容協商,能允許設備相互窺測以找到資料交換的方式,是一個在Server和Client之間傳遞狀態資訊的單對單協議,
WebSocket最初是為瀏覽器和服務器之間的點對點連接進行全雙工通信而開發的,它可以用于嵌入式設備的連接,也可以說是物聯網設備,但不能很好地保持信號,MQTT是專門為IoT設備設計的,其設計原則是最大程度地減少網路帶寬并確保交付,MQTT在基本訊息發送機制的基礎上增加了額外的抽象,以便多個感興趣的機器可以訂閱他們感興趣的主題,因此,有時可以按主題路由訊息,以便多臺機器可以共享一個共同的興趣,服務器可以選擇按主題過濾訊息,但可以接收所有訊息,因此,在嵌入式系統的背景關系中,MQTT比WebSocket更適合,因為除了通信開銷小外,MQTT在協議級別提供pub / sub,而且提供不同等級的服務質量,
上面提到的協議是處于網路協議層次的應用層,而作業在物理層的NB-IoT協議是不得不提的另一種重要的物聯網通信協議,
在低功耗廣域網路(Low Power Wide Area Network,簡寫:LPWAN) 中,實作遠距離低功耗的無線通信網路技術主要包含NB-IoT、LoRa、Sigfox、eMTC四種,其中NB-IoT、LoRa近年來備受市場的關注和追捧,尤其是NB-IoT(Narrow Band-Internet of Things,窄帶物聯網),NB-IoT基于蜂窩網路,可直接部署于GSM網路、UMTS網路或LTE網路,部署成本低;它的傳輸距離可達十幾公里,每個網路單元可以支持接入50,000個設備終端,速度方面,NB-IoT的頻射網路帶寬為200kHz,傳輸速率一般在160kbps-250kbps之間;又由于NB-IoT通訊結構及設備本身的資料接收及上報模式的設定等因素,NB-IoT通訊常會具有10s內的傳輸延時,
NB-IoT的網路部署包含芯片、模組或終端,NB-IoT基站、NB-IoT核心網、IoT連接管理平臺等部分:

NB-IoT網路通訊比較適合于終端較為分散、環境網路信號不好、沒有穩定電源、傳輸速率要求相對較高、中&高頻上行資料傳輸等情況.盡管因為功耗、網路覆寫范圍有限以及商業模式等因素導致NB-IoT目前的市場份額相對較少,但是隨著NB-IoT被納入5G候選技術方案,NB-IoT將成為未來5G物聯網主流技術,NB-IoT也將迎來新的發展機遇,現階段,NB-IoT垂直應用行業主要集中交通行業、物流行業、衛生醫療、商品零售行業、智能抄表、公共設施、智能家居、智能農業、工業制造、企業能耗管理、企業安全防護,這些需求都成為了NB-IoT增長的市場,資料顯示:NB-IoT最早商用“三表”(電表、氣表、水表)領域已經在2018年實作了百萬量級的出貨,
本文接下來著重介紹應用層的MQTT協議及其應用開發,
Part 2 MQTT協議介紹
4 MQTT協議組成
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發布/訂閱范式的訊息協議,它作業在 TCP/IP協議族上,是為硬體性能低下的遠程設備以及網路狀況糟糕的情況下而設計的發布/訂閱型訊息協議,
MQTT協議實作需要客戶端和服務器端通訊完成,在通訊程序中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe),其中,訊息的發布者和訂閱者都是客戶端,訊息代理是服務器,

客戶端作為一個使用MQTT協議的應用程式或者設備,它總是建立到服務器的網路連接,它可以:
(1)發布其他客戶端可能會訂閱的資訊,
(2)訂閱其它客戶端發布的訊息,
(3)退訂或洗掉應用程式的訊息,
(4)斷開與服務器連接,
MQTT服務器也稱為“訊息代理”(Broker),位于訊息發布者和訂閱者之間,它可以:
(1)接受來自客戶的網路連接;
(2)接受客戶發布的應用資訊;
(3)處理來自客戶端的訂閱和退訂請求;
(4)向訂閱的客戶轉發應用程式訊息,
5 MQTT協議中核心概念
(1)會話(Session)與訂閱(Subscription)
每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態互動,會話存在于一個網路之間,也可能在客戶端和服務器之間跨越多個連續的網路連接,訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS),訂閱會與一個會話(Session)關聯,一個會話可以包含多個訂閱,每一個會話中的每個訂閱都有一個不同的主題篩選器,
(2)主題名(Topic Name)和主題篩選器(Topic Filter)
主題名是訊息的標簽,主題篩選器是訂閱者指定的標簽通配符運算式,一個主題篩選器可以匹配到多個主題名稱,服務器會將訂閱所匹配到標簽下的所有訊息發送給訂閱者,MQTT 的 主題名有層級結構,主題篩選器支持通配符 + 和 #:

MQTT 的主題是不要預先創建的,發布者發送訊息到某個主題、或者訂閱者訂閱某個主題的時候,Broker 就會自動創建這個主題,
(3)訊息
MQTT協議訊息資料包由固定頭、可變頭、訊息體三部分組成:
固定頭(Fixed header),共2個位元組,第一個位元組的高4位定義了此訊息的型別,第一個位元組的低4位根據訊息的型別,含有不同的含義,第二個位元組宣告接下來的可變頭及負載的資料長度,固定頭結構如下:

其中第一個位元組的4-7bit為資料包訊息型別,有14種:

可變頭(Variable header),存在于部分MQTT資料包中,資料包型別決定了可變頭是否存在及其具體內容,可變頭位于固定頭和負載之間,其內容因資料包型別而不同,常用于包的標識:
訊息體(Payload),存在于部分MQTT資料包中,表示客戶端收到的具體內容,包括CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四種型別的訊息,PUBLISH訊息體內容可選,其他型別都沒有訊息體,
MQTT中有兩種特殊的訊息:Retained 訊息和LWT 遺囑訊息,分別介紹如下:
Retained 訊息是指在 PUBLISH 資料包中 Retain 標識設為 1 的訊息,Broker 收到這樣的 PUBLISH 包以后,將保存這個訊息,當有一個新的訂閱者訂閱相應主題的時候,Broker 會馬上將這個訊息發送給訂閱者,Retain 訊息用于解決新的訂閱開始時能夠接收訂閱之前的訊息,有以下一些特點:
一個 Topic 只能有 1 條 Retained 訊息,發布新的 Retained 訊息將覆寫老的 Retained 訊息;
如果訂閱者使用通配符訂閱主題,它會收到所有匹配的主題上的 Retained 訊息;
只有新的訂閱者才會收到 Retained 訊息,如果訂閱者重復訂閱一個主題,也會被當做新的訂閱者,然后收到 Retained 訊息;
Retained 訊息發送到訂閱者時,訊息的 Retain 標識仍然是 1,訂閱者可以判斷這個訊息是否是 Retained 訊息,以做相應的處理,
Retained 訊息和持久性會話沒有任何關系,Retained 訊息是 Broker 為每一個 Topic 單獨存盤的,而持久性會話是 Broker 為每一個 Client 單獨存盤的,
遺囑常用于獲取設備的連接狀態,當Client 非正常斷開連接,將發送遺囑訊息給訂閱者,Broker 在以下情況下認為 Client 是非正常斷開連接的:
Broker 檢測到底層的 I/O 例外;
Client 未能在 Keep Alive 的間隔內和 Broker 之間有訊息互動;
Client 在關閉底層 TCP 連接前沒有發送 DISCONNECT 資料包;
Broker 因為協議錯誤關閉和 Client 的連接,比如 Client 發送了一個格式錯誤的 MQTT 資料包,
如果 Client 通過發布 DISCONNECT 資料包斷開連接,這個屬于正常斷開連接,不會觸發 LWT 的機制,同時,Broker 還會丟棄掉這個 Client 在連接時指定的 LWT 引數,
(4) 服務質量 QoS
QoS在設定上由2 位 的二進制控制,且值不允許為 3(0x11):

要注意的是,QoS 是 Sender 和 Receiver 之間達成的協議,不是 Publisher 和 Subscriber 之間達成的協議,比如Publisher 發布一條 QoS1 的訊息,只能保證 Broker 能至少收到一次這個訊息;至于對應的 Subscriber 能否至少收到一次這個訊息,還要取決于 Subscriber 在 Subscribe 的時候和 Broker 協商的 QoS 等級,
另外,在 MQTT 協議中,從 Broker 到 Subscriber 這段訊息傳遞的實際 QoS 等于 "Publisher 發布訊息時指定的 QoS 等級和 Subscriber 在訂閱時與 Broker 協商的 QoS 等級,這兩個 QoS 等級中的最小那一個,"這就是所謂QoS 降級,
QoS 0 :ender 不關心 Receiver 是否收到訊息,它"盡力"發送訊息,

QoS 1:Sender 發送的一條訊息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver 發送訊息,如果發送失敗,會繼續重試,直到 Receiver 收到訊息為止,但是因為重傳的原因,Receiver 有可能會收到重復的訊息,

1)Sender 向 Receiver 發送一個帶有訊息資料的 PUBLISH 包, 并在本地保存這個 PUBLISH 包,
2)Receiver 收到 PUBLISH 包以后,向 Sender 發送一個 PUBACK 資料包,PUBACK 資料包沒有訊息體(Payload),在可變頭中(Variable header)中有一個包標識(Packet Identifier),和它收到的 PUBLISH 包中的 Packet Identifier 一致,
3)Sender 收到 PUBACK 之后,根據 PUBACK 包中的 Packet Identifier 找到本地保存的 PUBLISH 包,然后丟棄掉,一次訊息的發送完成,
4)如果 Sender 在一段時間內沒有收到 PUBLISH 包對應的 PUBACK,它將該 PUBLISH 包的 DUP 標識設為 1(代表是重新發送的 PUBLISH 包),然后重新發送該 PUBLISH 包,重復這個流程,直到收到 PUBACK,然后執行第 3 步,
QoS 2:Sender 發送的一條訊息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發送訊息,如果發送失敗,會繼續重試,直到 Receiver 收到訊息為止,同時保證 Receiver 不會因為訊息重傳而收到重復的訊息,

QoS 2 使用 2 套請求/應答流程(一個 4 段的握手)來確保 Receiver 收到來自 Sender 的訊息,且不重復:
1)Sender 發送 QoS 為 2 的 PUBLISH 資料包,資料包 Packet Identifier 為 P,并在本地保存該 PUBLISH 包;
2)Receiver 收到 PUBLISH 資料包以后,在本地保存 PUBLISH 包的 Packet Identifier P,并回復 Sender 一個 PUBREC 資料包,PUBREC 資料包可變頭中的 Packet Identifier 為 P,沒有訊息體(Payload);
3)當 Sender 收到 PUBREC,它就可以安全地丟棄掉初始的 Packet Identifier 為 P 的 PUBLISH 資料包,同時保存該 PUBREC 資料包,同時回復 Receiver 一個 PUBREL 資料包,PUBREL 資料包可變頭中的 Packet Identifier 為 P,沒有訊息體;如果 Sender 在一定時間內沒有收到 PUBREC,它會把 PUBLISH 包的 DUP 標識設為 1,重新發送該 PUBLISH 資料包(Payload);
4)當 Receiver 收到 PUBREL 資料包,它可以丟棄掉保存的 PUBLISH 包的 Packet Identifier P,并回復 Sender 一個 PUBCOMP 資料包,PUBCOMP 資料包可變頭中的 Packet Identifier 為 P,沒有訊息體(Payload);
5)當 Sender 收到 PUBCOMP 包,那么它認為資料包傳輸已完成,它會丟棄掉對應的 PUBREC 包,如果 Sender 在一定時間內沒有收到 PUBCOMP 包,它會重新發送 PUBREL 資料包,
我們可以看到在 QoS2 中,完成一次訊息的傳遞,Sender 和 Reciever 之間至少要發送四個資料包,QoS2 是最安全也是最慢的一種 QoS 等級了,
6 MQTT協議格式舉例
(1)CONNECT - 連接請求報文
客戶端到服務端的網路連接建立后,客戶端發送給服務端的第一個報文必須是CONNECT, 連接服務端報文,其格式如下圖:

在一個網路連接上,客戶端只能發送一次CONNECT報文,服務端必須將客戶端發送的第二個CONNECT報文當作協議違規處理并斷開客戶端的連接,CONNECT報文頭第一個位元組固定為0x01,代表CONNECT報文,第二個位元組代表余下的資料包位元組長度,可變報頭按下列次序包含四個欄位:
協議名(Protocol Name),值固定為字符 “MQTT”的UTF-8編碼的字串,MQTT規范的后續版本不會改變這個字串的偏移和長度,占用6個位元組,
協議級別(Protocol Level),對 MQTT 3.1.1 來說,值為 4,占用1個位元組,
對于3.1.1版協議,協議級別欄位的值是4(0x04),如果發現不支持的協議級別,服務端必須給發送一個回傳碼為0x01(不支持的協議級別)的CONNACK報文回應CONNECT報文,然后斷開客戶端的連接,
連接標志(Connect Flags):連接標志位元組包含一些用于指定MQTT連接行為的引數,它還指出有效載荷中的欄位是否存在,byte8[0]必須為0,否則斷開連接,

用戶名標識(User Name Flag):訊息體中是1否0有用戶名欄位,
密碼標識(Password Flag):訊息體中是1否0有密碼欄位,
遺囑訊息 Retain 標識(Will Retain):標識遺囑訊息是1否0是 Retain 訊息,如果 遺囑標識 被設定為0,遺囑保留(Will Retain)標志也必須設定為0,如果遺囑標志被設定為1:
如果遺囑保留被設定為0,服務端必須將遺囑訊息當作非保留訊息發布,
如果遺囑保留被設定為1,服務端必須將遺囑訊息當作保留訊息發布,
遺囑訊息 QOS 標識(Will Qos):標識遺囑訊息的 Qos,2bit,如果遺囑標志被設定為0,遺囑QoS也必須設定為0(0x00),
遺囑標識(Will Flag):標識是1否0使用遺囑訊息,
會話清除標識(Clean Session):標識 Client 是0否1建立一個持久化的會話,當 Clean Session 的標識設為 0 時,代表 Client 希望建立一個持久會話的連接,Broker 將存盤該 Client 訂閱的主題和未接受的訊息,否則(設定為1) Broker 不會存盤這些資料,同時在建立連接時清除這個 Client 之前存在的持久化會話所保存的資料,持久會話只在 QoS 等級 大于等于 1 的訊息中有效,
連接保活(Keep Alive): 設定一個單位為秒的時間間隔,指在 client 傳輸完成一個控制報文的時刻到發送下一個報文的時刻,client 與 broker 兩者之間允許空閑的最大時間間隔(單位:秒),MQTT 協議中約定:在 1.5*Keep Alive 的時間間隔內,如果 Broker 沒有收到來自 Client 的任何資料包,那么 Broker 認為它和 Client 之間的連接已經斷開;同樣地, 如果 Client 沒有收到來自 Broker 的任何資料包,那么 Client 認為它和 Broker 之間的連接已經斷開,MQTT 還有一對 PINGREQ/PINGRESP 資料包,當 Broker 和 Client 之間沒有任何資料包傳輸的時候,可以通過 PINGREQ/PINGRESP 來滿足 Keep Alive 的約定和偵測連接狀態,
CONNECT的payload欄位是根據可變報頭的連接標志決定是否存在,如果存在,必須按照客戶端識別符號(client ID)、遺囑主題(will topic)、用戶名(user name)和密碼(password)的順序出現,其中客戶端識別符號(client ID)必須存在而且必須是CONNECT報文有效載荷的第1個欄位,必須用UTF-8進行編碼,
服務端使用客戶端識別符號 (Client Id) 識別客戶端,連接服務端的每個客戶端都有唯一的客戶端識別符號(Client Id),遺囑主題(will topic)、用戶名(user name)和密碼(password)這三個欄位分別由一個兩位元組的長度和訊息的有效載荷組成,表示為零位元組或多個位元組序列,長度給出了跟在后面的資料的位元組數,不包含長度欄位本身占用的兩個位元組,
(2)CONNACK – 確認連接請求報文
服務端發送CONNACK報文回應從客戶端收到的CONNECT報文,服務端發送給客戶端的第一個報文必須是CONNACK,如果客戶端在合理的時間內沒有收到服務端的CONNACK報文,客戶端應該關閉網路連接,報文格式如下圖:

除了固定頭,可變報文圖中有兩個欄位:連接確認標志 (Connect Acknowledge Flags)和連接回傳碼 (Connect Return code),
連接確認標志 Connect Acknowledge Flags 只用了第0位,第0 (也叫 SP)位是當前會話(Session Present)標志,如果服務端收到清理會話(Clean Session)標志為1的連接,除了將CONNACK報文中的回傳碼設定為0之外,還必須將CONNACK報文中的當前會話設定(Session Present)標志為0 ,
如果服務端收到一個Clean Session為0的連接,當前會話標志的值取決于服務端是否已經保存了ClientId對應客戶端的會話狀態,
如果服務端已經保存了會話狀態,它必須將CONNACK報文中的SP設定為1,
如果服務端沒有已保存的會話狀態,它必須將CONNACK報文中的SP設定為0;還需要將CONNACK報文中的回傳碼設定為0,(此時代表連接成功)
SP使服務端和客戶端在是否有已存盤的會話狀態上保持一致,
一旦完成了會話的初始化設定,已經保存會話狀態的客戶端將期望服務端維持它存盤的會話狀態,如果客戶端從服務端收到的當前的值與預期的不同,客戶端可以選擇繼續這個會話或者斷開連接,客戶端可以丟棄客戶端和服務端之間的會話狀態,方法是,斷開連接,將清理會話標志設定為1,再次連接,然后再次斷開連接,
如果服務端發送了一個包含非零回傳碼的CONNACK報文,它必須將當前會話標志設定為0,
連接回傳碼 Connect Return code 使用一個位元組的無符號值,在下表中列出:

(3)PUBLISH – 發布訊息報文
PUBLISH控制報文是指從客戶端向服務端或者服務端向客戶端傳輸一個應用訊息,客戶端使用PUBLISH報文發送應用訊息給服務端,目的是分發到其它訂閱匹配的客戶端,服務端使用PUBLISH報文發送應用訊息給每一個訂閱匹配的客戶端,其格式如下圖:

第一個位元組為固定頭,其中高4位為訊息型別(0x03),低4位有3個重要標志位:
重發標志 DUP:如果DUP標志被設定為0:表示這是客戶端或服務端第一次請求發送這個PUBLISH報文,對于QoS 0的訊息,DUP標志必須設定為0,如果DUP標志被設定為1:表示這可能是一個早前報文請求的重發,客戶端或服務端請求重發一個PUBLISH報文時,必須將DUP標志設定為1,
服務質量等級 QoS:PUBLISH報文不能將QoS所有的位設定為1,如果服務端或客戶端收到QoS所有位都為1的PUBLISH報文,它必須關閉網路連接,當QoS設定為1時,客戶端或服務器發布訊息時,需要得到對方的確認(PUBACK),如果一段時間后沒收到PUBACK,那么會再次發送當前訊息,并將DUP欄位標記為1,
保留標志 RETAIN:如果服務端收到一條保留(RETAIN)標志為1的QoS 0訊息,它必須丟棄之前為那個主題保留的任何訊息,它應該將這個新的QoS 0訊息當作那個主題的新保留訊息,但是任何時候都可以選擇丟棄它 — 如果這種情況發生了,那個主題將沒有保留訊息,
服務端發送PUBLISH報文給客戶端時,如果訊息是作為客戶端一個新訂閱的結果發送,它必須將報文的保留標志設為1 [MQTT-3.3.1-8],當一個PUBLISH報文發送給客戶端是因為匹配一個已建立的訂閱時,服務端必須將保留標志設為0,不管它收到的這個訊息中保留標志的值是多少,
保留標志為1且有效載荷為零位元組的PUBLISH報文會被服務端當作正常訊息處理,它會被發送給訂閱主題匹配的客戶端,此外,同一個主題下任何現存的保留訊息必須被移除,因此這個主題之后的任何訂閱者都不會收到一個保留訊息,
PUBLISH 的可變頭
可變報頭按順序包含主題名(Topic Name)和報文識別符號(Packet Identifier),
主題名(Topic Name)用于識別有效載荷資料應該被發布到哪一個資訊通道,服務端發送給訂閱客戶端的PUBLISH報文的主題名必須匹配該訂閱的主題過濾器,
報文識別符號 Packet Identifier:只有當QoS等級是1或2時,報文識別符號(Packet Identifier)欄位才能出現在PUBLISH報文中,QoS等于0的PUBLISH報文不能包含報文識別符號,
報文識別符號用來區分報文,特別是在重發的報文中用來標識是否是同一個報文,并在需要應答的場景中用于確定是對哪個發送報文的應答,可變報頭的報文識別符號(Packet Identifier)欄位存在于在多個型別的報文里(占用2個位元組),
PUBLISH 報文的接收者必須按照根據PUBLISH報文中的QoS等級發送回應,回應報文參見QoS流程,
(4)SUBSCRIBE 訂閱報文
客戶端向服務端發送SUBSCRIBE報文用于創建一個或多個訂閱,每個訂閱注冊客戶端關心的一個或多個主題,為了將應用訊息轉發給與那些訂閱匹配的主題,服務端發送PUBLISH報文給客戶端,SUBSCRIBE報文也(為每個訂閱)指定了最大的QoS等級,服務端根據這個發送應用訊息給客戶端,SUBSCRIBE報文格式如下圖:

SUBSCRIBE報文的有效載荷必須包含至少一對主題過濾器 和 QoS等級欄位組合,每一個過濾器后面跟著一個位元組,這個位元組被叫做 服務質量要求(Requested QoS),它給出了服務端向客戶端發送應用訊息所允許的最大QoS等級,請求的最大服務質量等級QoS:欄位編碼為一個位元組,主題過濾器 和 QoS等級組合是連續地打包,
如果服務端收到一個SUBSCRIBE報文,報文的主題過濾器與一個現存訂閱的主題過濾器相同,那么必須使用新的訂閱徹底替換現存的訂閱,新訂閱的主題過濾器和之前訂閱的相同,但是它的最大QoS值可以不同,與這個主題過濾器匹配的任何現存的保留訊息必須被重發,但是發布流程不能中斷,
如果主題過濾器不同于任何現存訂閱的過濾器,服務端會創建一個新的訂閱并發送所有匹配的保留訊息,
如果服務端收到包含多個主題過濾器的SUBSCRIBE報文,它必須如同收到了一系列的多個SUBSCRIBE報文一樣處理那個,除了需要將它們的回應合并到一個單獨的SUBACK報文發送 [MQTT-3.8.4-4],
客戶端使用帶通配符的主題過濾器請求訂閱時,客戶端的訂閱可能會重復,因此發布的訊息可能會匹配多個過濾器,對于這種情況,服務端必須將訊息分發給所有訂閱匹配的QoS等級最高的客戶端,服務端之后可以按照訂閱的QoS等級,分發訊息的副本給每一個匹配的訂閱者,
(5)SUBACK 訂閱確認報文
服務端收到客戶端發送的一個SUBSCRIBE報文時,必須使用SUBACK報文回應,報文格式如下圖:

固定報文頭為MQTT控制報文型別 (0x9),剩余長度欄位:等于可變報頭的長度加上有效載荷的長度,可變報頭包含等待確認的SUBSCRIBE報文的報文識別符號,占用2個位元組,SUBACK報文必須和等待確認的SUBSCRIBE報文有相同的報文識別符號,SUBACK報文包含一個回傳碼清單,它們指定了SUBSCRIBE請求的每個訂閱被授予的最大QoS等級,回傳碼的順序必須和SUBSCRIBE報文中主題過濾器的順序相同,
Part 3 MQTT應用開發
7 MQTT 應用開發詳解
MQTT 應用開發涉及到客戶端和服務器兩部分,客戶端主要關注訊息的發送和消費,服務器主要關注訊息的持久化存盤和系統的性能和可靠性,特別是服務器有不少開源系統可用,比如RabbitMQ、ActiveMQ、Mosquitto和moquette等,都可以支持多種不同的協議,另外IOTDB基于moquette也支持MQTT 應用,
本文介紹基于moquette和RabbitMQ如何開發MQTT應用程式,
(1)基于moquette開發服務器
moquette基于Netty實作了MQTT相關協議,提供了獨立部署方式和嵌入式方式,便于測驗和研究,這里介紹嵌入式開發方式,
嵌入式開發只需要引入以下包即可:
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.13</version>
</dependency>
開發服務器需要實體化io.moquette.broker.Server,并設定對應的Hander和組態檔,通過startServer方法啟動服務器,

實作Handler只需要繼承AbstractInterceptHandler,并重寫onPublish方法,這樣就可以處理客戶端發送的訊息,比如這里簡單的列印訊息結構的頭資訊:

(2)基于RabbitMQ開發客戶端應用程式
基于RabbitMQ省去了服務器的開發作業,將重心放在訊息的生產和消費,這里使用到Eclipse Paho Java Client庫,其提供了MQTT的同步和異步呼叫的API,使用客戶端庫需要引入依賴包:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
生產端和客戶端開發都需要首先實體化MqttClient和連接配置資訊,代碼如下:
mqttClient = new MqttClient(host, clientId, new MemoryPersistence())
//配置連接的選項,MqttConnectOptions包含控制客戶端連接到服務器的方式的選項,
MqttConnectOptions options = new MqttConnectOptions();
//設定連接用戶名和密碼
options.setUserName(userName);
options.setPassword(password.toCharArray());
//設定超時時間
options.setConnectionTimeout(3);
//設定心跳時間間隔
options.setKeepAliveInterval(3);
//設定服務器是否應該記住重新連接時客戶端的狀態
options.setCleanSession(true);
mqttClient.connect(options);
//設定訊息發送后的回呼方法
mqttClient.setCallback(new MqttCallbackImpl());
//通過字串獲取MqttTopic型別的主題
topic = mqttClient.getTopic(t);
其中host連接地址采用tcp://host:port格式,如果是生產端,就可以通過如下方式發送訊息:
MqttMessage message = new MqttMessage();
message.setQos(0);
message.setPayload("hello MQTT from MQTTPublisher!".getBytes());
opic.publish(message);
如果是消費端,首先通過
mqttClient.connect(options);
建立連接,再訂閱topic:
String[] topic = {ConfigInfo.topic};
int qos[] = {2};
mqttClient.subscribe(topic, qos);
其中topic和qos都是陣列形式,分別表示訂閱的topic過濾器和服務質量等級,
最后實作回呼介面MqttCallback,該介面提供了3個介面:
connectionLost:當連接丟失觸發該方法呼叫
messageArrived:當訊息達到訂閱客戶端觸發該方法呼叫,該方法被客戶端同步呼叫,并且正常執行完會發送訊息確認,如果方法實作拋出例外會導致客戶端例外退出,
deliveryComplete:當訊息投遞完成,并且收到所有訊息確認,觸發該方法呼叫,具體來說,如果服務質量是QoS0,訊息發送到網路即觸發方法,如果是QoS1,當收到PUBACK報文即觸發方法,如果是QoS2,當收到PUBCOMP報文即觸發方法,
這里實作簡單的列印資訊:

8 參考資料
https://www.cnblogs.com/schips/p/12267372.html
http://www.lpwap.com/lora-university-case/201710142310/
https://www.rabbitmq.com/web-mqtt.html
https://github.com/moquette-io/moquette
https://www.eclipse.org/paho/index.php?page=clients/java/index.php
本文提供pdf版本,用于后續更新,回復關鍵字 “MQTT介紹與應用開發” 獲取后續更新,更多文章,歡迎掃碼關注:

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/287804.html
標籤:其他
上一篇:工具 | 虛擬串口軟體的使用分享
