摘要:本文圍繞AMQP協議,為大家詳細決議AMQP協議、核心技術亮點、多協議之間的對比以及使用實踐,
本文分享自華為云社區《物聯網常見協議之Amqp協議及使用場景決議》,作者:張儉,
引言
本文圍繞AMQP協議,為大家詳細決議AMQP協議、核心技術亮點、多協議之間的對比以及使用實踐,并介紹華為云IoT通過Amqp協議如何為開發者和企業提供了更加靈活和高效的通信方式,使得物聯網應用得以在各個領域得到更廣泛的推廣和應用,
AMQP協議,全稱為Advanced Message Queuing Protocol,在2006年6月,由Cisco、Redhat、iMatrix等聯合制定了AMQP的訊息標準,
除了AMQP協議,還有一些其他協議如Mqtt(Message Queuing Telemetry Transport)、Http、Kafka,每個協議的發明/出現都是為了解決特定的問題,沒有最合適的協議,只有更合適的業務場景,在后面我們也會對這些協議進行簡單的對比,
Amqp歷史上大概有如下四個版本,
- Amqp 0-8: 發布于2006年
- Amqp 0-9-1:發布于2008年,是Amqp 0-8的改進版,被廣泛應用,如rabbitmq、qpid等
- Amqp 0-10:發布于2008年,是Amqp 0-9-1的改進,未被廣泛使用
- Amqp 1.0:發布于2011年,是Amqp協議的下一代標準,與之前的版本不兼容,但提供了更強大的特性和更好的性能,目前也在華為云IoT、Azure中有應用起來,包括rabbitmq、qpid等也提供了對Amqp1.0版本協議的支持
我們也會主要討論Amqp 0-9-1 和 Amqp 1.0這兩個版本
Amqp 0-9-1 協議簡述
核心概念
- Virtual Host:簡稱vhost,個人理解是Amqp協議上的多租,每個vhost具有自己的Exchanges、Message Queues等,互相不干擾,
- Exchange: 從生產者應用程式中接收訊息,并根據特定的情況(訊息屬性或內容),將這些訊息路由到“Message Queue”中
- Message Queue: 訊息佇列,存盤訊息,直到它被消費者應用程式安全地處理
- Binding:指Exchange將何種型別的訊息發送到Queue中,提供訊息路由機制
Amqp 0-9-1 協議是一個 多鏈路、協商的、異步、安全、可移植、高效的協議,Amqp協議通常分為兩層:
+------------------Functional Layer----------------+ | Basic Transactions Exchanges Message queues | +--------------------------------------------------+ +------------------Transport Layer-----------------+ | Framing Content Data representation | | Error handling Heart-beating Channels | +--------------------------------------------------+
此外,由于Amqp協議的message queue支持許多特性:私有或共享、持久化或臨時等等,根據不同的屬性設定,我們可將AMQP用于許多應用場景,例如
- 訊息中間件使用:共享的存盤轉發佇列,保存訊息并交給多個消費者消費
- RPC使用:通過將佇列設定為臨時的,帶有IP地址的,來模擬RPC介面
Amqp 0-9-1 生產時序圖
Amqp 0-9-1 消費時序圖
Amqp 0-9-1 協議幀及資料型別
Amqp 0-9-1的協議幀由 FrameHeader、Payload、FrameEnd組成
- Integers 整數(1到8個位元組):用于表示大小、數量、限制等,整數總是無符號的,
- Bits 位:用于表示開/關值,一個八位位元組,
- Short strings 短字串:用于存盤短文本屬性,短字串長度限制為255個八位位元組,
- Long strings 長字串:用于存盤二進制資料塊,
- Field tables 欄位表:存盤名稱-值對,欄位值可以是字串、整數等型別,
Amqp 1-0協議
與Amqp 0-9-1的差異
協議設計層面:
- AMQP 0-9-1:此版本的 AMQP 主要針對代理的設計,涵蓋了訊息傳遞模型、代理行為和互動模式,0-9-1 版本的協議與代理的實作緊密耦合,
- AMQP 1.0:此版本的 AMQP 更注重基于互操作性的通信協議,不依賴于特定的代理實作,AMQP 1.0 關注點在于在發送者和接收者之間傳輸訊息,而不是代理的內部行為,
- 比如像”Queue Declare”、“Queue Delete”、“Queue Query”這些在Amqp 0-9-1支持的命令,在Amqp1.0中都被移除,并假設這些功能會在更高層(broker)參加,
對稱層面:
- Amqp 0-9-1 是一個典型的客戶端/服務器通信協議,
- Amqp 1.0 則是一個對稱的協議,任何一端都可以注冊為sender或是receiver,并且從如下Amqp 0.9.1和1.0之間的時序圖對比也可以看出來,Amqp 1.0是完全雙工的協議,從某種程度或者說網路編程的角度來說,實作的難度更大,
Amqp 1-0 鑒權時序圖
Amqp 1-0 生產時序圖
Amqp 1-0 消費時序圖
Amqp 1.0 協議幀介紹
Amqp1.0 的協議幀由FrameHeader、ExtendedHeader、FrameBody組成,
- FrameHeader 8個位元組大小,包含長度、型別資訊等
- Extended header 可變寬度區域
- FrameBody 是一個可變寬度的位元組序列,其格式取決于幀型別
FrameHeader介紹
- Size: FrameHeader的第0~3個位元組包含幀大小,無符號的32位整數,為FrameHeader、ExtendedHeader、FrameBody的總和大小,如果大小小于8位元組,則格式錯誤
- DOFF: FrameHeader的第4個位元組,這表示幀內Body的位置,
- Type: FrameHeader的第5個位元組,型別代碼表示幀的格式和目的,根據幀的型別,幀頭中的后續位元組可能會被不同地解釋,型別代碼0x00表示該幀是AMQP幀,型別代碼0x01表示該幀是SASL幀等,
Amqp 幀介紹
Amqp幀型別代碼為0x00,對于Amqp幀來說,FrameHeader的第6位元組和第7位元組表示channel的編號,Frame Body 被定義為一個 performative 后跟一個不透明的 payload,表現形式必須是第open、begin、attach、flow、transfer、disposition、detach、end、close中定義的一個,并在AMQP型別系統中編碼為描述的型別,幀體中剩余的位元組構成了該幀的 payload,payload 的存在和格式由給定表現形式的語意定義,
SASL 幀介紹
Sasl幀型別代碼為0x01,FrameHeader中的第6和第7位元組應該被忽略,也不存在擴展頭,所以DOFF固定位0x02,
與其他訊息通信協議間的對比
Amqp與Mqtt的對比
Amqp和Mqtt都是應用層的訊息傳遞協議,mqtt更加輕量,相對來說概念不如amqp那么豐富,同時mqtt頭部訊息更加短小,更加適用于低帶寬、功耗較低的物聯網設備
Amqp與Kafka協議的對比
AMQP是一種非常靈活的協議,可以用于各種型別的訊息傳遞場景,包括點對點和發布-訂閱模型,Kafka則專注于高吞吐量的流式處理,適用于資料管道和流式處理等場景,
Kafka的設計旨在提供高吞吐量和低延遲,AMQP的性能因實作和使用情況而異,但在大多數情況下,它的性能不如Kafka,
Kafka擁有強大的生態系統,包括流處理、資料湖、訊息佇列等多個應用場景,AMQP也有相應的生態系統和工具,但相對來說要小得多,
總得來說,盡管kafka存在性能上的優勢,但kafka broker很難對外暴露,相較于kafka這種私有訊息中間件協議,Amqp足夠標準,更適合各種異構系統的對接,
AMQP協議相關的開源專案
rabbitmq
提到AMQP,就不得不提rabbitmq,RabbitMQ 是一個開源的訊息代理和佇列服務器,用于通過高級訊息佇列協議(AMQP)在分布式系統中實作訊息傳遞,RabbitMQ 提供了一個可靠、高性能、可擴展和易于使用的訊息傳遞平臺,支持多種編程語言和平臺,它最初是用 Erlang 語言撰寫的,因此具有良好的并發性能和容錯能力,
所謂成也erlang,敗也erlang,由于erlang語言生態的問題,有能力深入維護Rabbitmq的人員并不是很多,也是rabbitmq越來越不流行的一個原因,
Qpid
Apache Qpid(Quick Platform for Interactive Distributed Messaging)是一個開源的訊息傳遞系統,它實作了高級訊息佇列協議(Advanced Message Queuing Protocol,AMQP)的多種版本,AMQP 是一種開放標準的應用層協議,用于訊息傳遞的中間件,它可以實作跨平臺、跨語言的訊息通信,Qpid 專案的主要目標是提供一個可靠、可擴展和高性能的訊息傳遞平臺,幫助開發者更容易地構建分布式系統,主要的組件有
- Qpid Broker:一個高性能、可擴展的AMQP訊息代理,支持持久化、事務和安全認證等特性,Qpid Broker 提供了Java和C++兩種實作,
- Qpid Proton:一個輕量級的AMQP庫,旨在為各種編程語言提供高性能的AMQP實作,提供了C和java的默認實作,此外Proton 還提供了其他編程語言如python的系結,
但Qpid總得來說,比較重型,如果僅僅是想在原有的訊息組件,如kafka/pulsar外面疊加一層Amqp可訪問的能力,我相信proton是更好的選擇,
Vertx-proton
Vert.x Proton 的目標是結合 Vert.x 的回應式編程模型和 Qpid Proton 的 AMQP 支持,以簡化構建高性能、可擴展的、基于 AMQP 的分布式應用程式,Vert.x Proton 提供了一套簡潔、易用的 API,可以讓開發者在 Vert.x 應用程式中輕松地實作 AMQP 通信,
華為云IoT對AMQP的支持
在最初階段華為云IoTDA主要支持HTTP協議,盡管這種方式已經能滿足許多需求,但隨著物聯網技術的普及和發展,用戶對于更加靈活和高效的通信方式的需求逐漸增強,華為云IoTDA逐漸豐富協議庫,當前支持60+協議接入,為開發者和企業提供更加完善的解決方案,
在IoT應用對接場景中,華為云IoT現已新增了對AMQP的支持,與HTTP協議相比,AMQP協議具有以下優勢
- 無需HTTP服務器:AMQP協議無需開發者搭建HTTP服務器,降低了專案成本,簡化了系統架構,可以部署在各種型別的設備,包括手機、平板、智能家居設備等,進一步拓寬了物聯網應用的領域,
- 低延遲、高效率:AMQP協議采用二進制傳輸,降低了資料傳輸所需的帶寬,提高了傳輸速度,降低了延遲,
- 強大的訊息佇列功能:AMQP協議具有優秀的訊息佇列功能,支持點對點和發布訂閱模式,確保訊息的可靠傳輸和順序處理,
通過支持AMQP協議,華為云IoT為開發者和企業提供了更加靈活和高效的通信方式,使得物聯網應用得以在各個領域得到更廣泛的推廣和應用,
Amqp實戰:使用qpid-proton python 消費華為云IoTDA的Amqp訊息
首先通過pip 安裝依賴包
pip install python-qpid-proton
最簡單的消費者demo, consumer.py如下
import sys from proton.handlers import MessagingHandler from proton.reactor import Container class AMQPConsumer(MessagingHandler): def __init__(self, server_url, target_address): super(AMQPConsumer, self).__init__() self.server_url = server_url self.target_address = target_address def on_start(self, event): conn = event.container.connect(self.server_url) event.container.create_receiver(conn, self.target_address) def on_message(self, event): print(f"Received message: {event.message.body}") event.connection.close() if __name__ == "__main__": server_url = "amqp://localhost:5672" target_address = "example_queue" try: Container(AMQPConsumer(server_url, target_address)).run() except KeyboardInterrupt: sys.exit(0)
我們可以使用這個producer.py驗證 consumer.py可用
import sys from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class AMQPProducer(MessagingHandler): def __init__(self, server_url, target_address, message_body): super(AMQPProducer, self).__init__() self.server_url = server_url self.target_address = target_address self.message_body = message_body def on_start(self, event): conn = event.container.connect(self.server_url) self.sender = event.container.create_sender(conn, self.target_address) def on_sendable(self, event): message = Message(body=self.message_body) event.sender.send(message) print(f"Sent message: {message.body}") event.connection.close() if __name__ == "__main__": server_url = "amqp://localhost:5672" target_address = "example_queue" message_body = "Hello, AMQP 1.0!" try: Container(AMQPProducer(server_url, target_address, message_body)).run() except KeyboardInterrupt: sys.exit(0)
為了能連接上華為云IoTDA的Amqp接入點,我們還需要給consumer.py配置用戶名、密碼,如下為樣例代碼,具體連接的資訊、憑據如何獲得可參考: https://support.huaweicloud.com/devg-iothub/iot_01_00100_2.html,注意,url也從amqp修改為了amqps
import sys from proton import Message, SSLDomain from proton.handlers import MessagingHandler from proton.reactor import Container class AMQPConsumer(MessagingHandler): def __init__(self, server_url, target_address, username, password, cert_file, key_file): super(AMQPConsumer, self).__init__() self.server_url = server_url self.target_address = target_address self.username = username self.password = password self.cert_file = cert_file self.key_file = key_file def on_start(self, event): ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT) ssl_domain.set_credentials(self.cert_file, self.key_file, None) conn = event.container.connect(self.server_url, user=self.username, password=self.password, ssl_domain=ssl_domain) event.container.create_receiver(conn, self.target_address) def on_message(self, event): print(f"Received message: {event.message.body}") event.connection.close() if __name__ == "__main__": server_url = "amqps://localhost:5671" # 注意 'amqps',它表示使用 SSL/TLS 連接 target_address = "example_queue" username = "your_username" password = "your_password" cert_file = "path/to/your/certificate.pem" key_file = "path/to/your/private_key.pem" try: Container(AMQPConsumer(server_url, target_address, username, password, cert_file, key_file)).run() except KeyboardInterrupt: sys.exit(0)
該樣例代碼已上傳到gitee
總結與展望
總體來說,AMQP作為一種應用層協議,在訊息傳遞和異構系統之間的通信方面提供了非常靈活和可靠的解決方案,與其他訊息傳遞協議相比,AMQP具有豐富的功能和靈活的設計,適用于各種型別的訊息傳遞場景,
在使用AMQP時,我們可以選擇現有的開源實作,如RabbitMQ、Qpid等,也可以自行實作AMQP的相關組件,通過這些實作,我們可以輕松地在不同的應用程式、語言和平臺之間進行訊息傳遞,并實作可靠、高效、安全的通信,
隨著物聯網、云計算和大資料等技術的發展,AMQP的應用場景越來越廣泛,比如在IoT設備、大資料流處理、分布式系統等方面都得到了廣泛應用,未來,AMQP將繼續發揮重要作用,推動各種異構系統之間的互聯互通,帶來更加便捷和高效的訊息傳遞體驗,
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/550787.html
標籤:其他
上一篇:Win10的VIM配置
下一篇:返回列表
