RabbitMQ 是一個由 erlang 開發的 AMQP(Advanced Message Queuing Protocol)的開源實作,
AMQP:高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,訊息中間件主要用于組件之間的解耦,訊息的發送者無需知道訊息使用者的存在,反之亦然, AMQP的主要特征是面向訊息、佇列、路由(包括點對點和發布/訂閱)、可靠性、安全, RabbitMQ是一個開源的AMQP實作,服務器端用Erlang語言撰寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,用于在分布式系統中存盤轉發訊息,在易用性、擴展性、高可用性等方面表現不俗,
一、應用場景
- 異步處理
- 應用解耦
- 流量削峰
二、RabbitMQ 特性
RabbitMQ 最初起源于金融系統,用于在分布式系統中存盤轉發訊息,在易用性、擴展性、高可用性等方面表現不俗,具體特點包括:
# 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認, # 靈活的路由(Flexible Routing) 在訊息進入佇列之前,通過 Exchange 來路由訊息的,對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實作,針對更復雜的路由功能,可以將多個 Exchange 系結在一起,也通過插件機制實作自己的 Exchange , # 訊息集群(Clustering) 多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker , # 高可用(Highly Available Queues) 佇列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下佇列仍然可用, # 多種協議(Multi-protocol) RabbitMQ 支持多種訊息佇列協議,比如 STOMP、MQTT 等等, # 多語言客戶端(Many Clients) RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等, # 管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理訊息 Broker 的許多方面, # 跟蹤機制(Tracing) 如果訊息例外,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什么, # 插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以撰寫自己的插件,
三、RabbitMQ 基本概念
# Message 訊息,訊息是不具名的,它由訊息頭和訊息體組成,訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他訊息的優先權)、delivery-mode(指出該訊息可能需要持久性存盤)等, # Publisher 訊息的生產者,也是一個向交換器發布訊息的客戶端應用程式, # Exchange 交換器,用來接收生產者發送的訊息并將這些訊息路由給服務器中的佇列, # Routing Key 路由關鍵字,exchange根據這個關鍵字進行訊息投遞, # Binding 系結,用于訊息佇列和交換器之間的關聯,一個系結就是基于路由鍵將交換器和訊息佇列連接起來的路由規則,所以可以將交換器理解成一個由系結構成的路由表, # Queue 訊息佇列,用來保存訊息直到發送給消費者,它是訊息的容器,也是訊息的終點,一個訊息可投入一個或多個佇列,訊息一直在佇列里面,等待消費者連接到這個佇列將其取走, # Connection 網路連接,比如一個TCP連接, # Channel 信道,多路復用連接中的一條獨立的雙向資料流通道,信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布訊息、訂閱佇列還是接收訊息,這些動作都是通過信道完成,因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接, # Consumer 訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式, # Virtual Host 虛擬主機,表示一批交換器、訊息佇列和相關物件,虛擬主機是共享相同的身份認證和加密環境的獨立服務器域,每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的佇列、交換器、系結和權限機制,vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / , # Broker
四、Exchange 型別
Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種型別:direct、fanout、topic、headers ,headers 匹配 AMQP 訊息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種型別:
- direct
訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中,路由鍵與佇列名完全匹配,如果一個佇列系結到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的訊息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等,它是完全匹配、單播的模式,
- fanout
每個發到 fanout 型別交換器的訊息都會分到所有系結的佇列上去,fanout 交換器不處理路由鍵,只是簡單的將佇列系結到交換器上,每個發送到交換器的訊息都會被轉發到與該交換器系結的所有佇列上,很像子網廣播,每臺子網內的主機都獲得了一份復制的訊息,fanout 型別轉發訊息是最快的,
- topic
topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要系結到一個模式上,它將路由鍵和系結鍵的字串切分成單詞,這些單詞之間用點隔開,它同樣也會識別兩個通配符:符號“#”和符號“”,#匹配0個或多個單詞,匹配不多不少一個單詞,
五、ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的物件,
- Connection 是 RabbitMQ 的 socket 鏈接,它封裝了socket 協議相關部分邏輯,
- ConnectionFactory 為 Connection 的制造工廠,
- Channel 是我們與 RabbitMQ 打交道的最重要的一個介面,我們大部分的業務操作是在 Channel 這個介面中完成的,包括定義Queue、定義Exchange、系結 Queue 與 Exchange、發布訊息等,
六、任務分發機制
1、Round-robin dispathching回圈分發
RabbbitMQ的分發機制非常適合擴展,而且它是專門為并發程式設計的,如果現在load加重,那么只需要創建更多的Consumer來進行任務處理
2、Message acknowledgment 訊息 確認
在實際應用中,可能會發生消費者收到 Queue 中的訊息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致訊息丟失,為了避免這種情況發生,我們可以要求消費者在消費完訊息后發送一個回執給 RabbitMQ,RabbitMQ 收到訊息回執(Message acknowledgment)后才將該訊息從Queue中移除;如果 RabbitMQ 沒有收到回執并檢測到消費者的 RabbitMQ連接斷開,則RabbitMQ會將該訊息發送給其他消費者(如果存在多個消費者)進行處理,這里不存在timeout概念,一個消費者處理訊息時間再長也不會導致該訊息被發送給其他消費者,除非它的RabbitMQ連接斷開, 這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的訊息會越來越多;消費者重啟后會重復消費這些訊息并重復執行業務邏輯…
另外pub message是沒有ack的,
3、Message durability 訊息持久化
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失訊息,我們可以將Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ訊息不會丟失,但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的訊息,但還沒來得及持久化該訊息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務,由于這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務,
要持久化佇列queue的持久化需要在宣告時指定durable=True;
這里要注意,佇列的名字一定要是Broker中不存在的,不然不能改變此佇列的任何屬性.
佇列和交換機有一個創建時候指定的標志durable,durable的唯一含義就是具有這個標志的佇列和交換機會在重啟之后重新建立,它不表示說在佇列中的訊息會在重啟后恢復
訊息持久化包括3部分
# 1.exchange持久化,在宣告時指定durable => true hannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//宣告訊息佇列,且為可持久化的 # 2.queue持久化,在宣告時指定durable => true channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//宣告訊息佇列,且為可持久化的 # 3.訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化). channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
如果 exchange 和 queue 都是持久化的,那么它們之間的binding 也是持久化的,如果 exchange 和 queue 兩者之間有一個持久化,一個非持久化,則不允許建立系結.
注意:一旦創建了佇列和交換機,就不能修改其標志了,例如,創建了一個non-durable的佇列,然后想把它改變成durable的,唯一的辦法就是洗掉這個佇列然后重現創建,
關于持久化的進一步討論:
為了資料不丟失,我們采用了:
在資料處理結束后發送ack,這樣RabbitMQ Server會認為Message Deliver 成功,
持久化queue,可以防止RabbitMQ Server 重啟或者crash引起的資料丟失,
持久化Message,理由同上,
但是這樣能保證資料100%不丟失嗎?答案是否定的,問題就在與RabbitMQ 需要時間去把這些資訊存到磁盤上,這個time window 雖然短,但是它的確還是有,在這個時間視窗內如果資料沒有保存,資料還會丟失,還有另一個原因就是 RabbitMQ 并不是為每個 Message 都做 fsync:它可能僅僅是把它保存到Cache 里,還沒來得及保存到物理磁盤上,因此這個持久化還是有問題,但是對于大多數應用來說,這已經足夠了,當然為了保持一致性,你可以把每次的publish放到一個transaction中,這個transaction的實作需要user defined codes,那么商業系統會做什么呢?一種可能的方案是在系統例外重啟時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能 exit gracefully,
4、Fair dispath 公平分發
你可能也注意到了,分發機制不是那么優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer,n是取余后的,它不管Consumer是否還有unacked Message,只是按照這個默認的機制進行分發.
那么如果有個Consumer作業比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那么,Rabbit是如何處理這種問題呢?
- 4.1 Prefetch count
前面我們講到如果有多個消費者同時訂閱同一個Queue中的訊息,Queue中的訊息會被平攤給多個消費者,這時如果每個訊息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭作業并一直空閑的情況,我們可以通過設定prefetchCount來限制Queue每次發送給每個消費者的訊息數,比如我們設定prefetchCount=1,則Queue每次給每個消費者發送一條訊息;消費者處理完這條訊息后Queue會再給該消費者發送一條訊息,
通過basic.qos方法設定prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它
channel.basic_qos(prefetch_count=1)
注意,這種方法可能會導致queue滿,當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtual Host來細化你的設計,
七、訊息序列化
RabbitMQ使用ProtoBuf序列化訊息,它可作為RabbitMQ的Message的資料格式進行傳輸,由于是結構化的資料,這樣就極大的方便了Consumer的資料高效處理,當然也可以使用XML,與XML相比, ProtoBuf有以下優勢:
1.簡單
2.size小了3-10倍
3.速度快了20-100倍
4.易于編程
6.減少了語意的歧義.
,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛
八、RPC
MQ 本身是基于異步的訊息處理,前面的示例中所有的生產者(P)將訊息發送到 RabbitMQ 后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條訊息都不知道),
但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的訊息處理完成后再進行下一步處理,這相當于RPC(Remote Procedure Call,遠程程序呼叫),
RabbitMQ 中也支持 RPC,RabbitMQ 中實作 RPC 的機制是:
客戶端發送請求(訊息)時,在訊息的屬性(MessageProperties ,在 AMQP 協議中定義了14種 properties ,這些屬性會隨著訊息一起發送)中設定兩個值 replyTo (一個 Queue 名稱,用于告訴服務器處理完成后將通知我的訊息發送到這個 Queue 中)和 correlationId (此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
服務器端收到訊息并處理,處理完訊息后,將生成一條應答訊息到replyTo 指定的 Queue ,同時帶上 correlationId 屬性
客戶端之前已訂閱 replyTo 指定的 Queue ,從中收到服務器的應答訊息后,根據其中的correlationId 屬性分析哪條請求被執行了,根據執行結果進行后續業務處理
九、RabbitMQ 選型和對比
1.從社區活躍度
按照目前網路上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選,
2.持久化訊息比較
ZeroMq 不支持,ActiveMq 和RabbitMq 都支持,持久化訊息主要是指我們機器在不可抗力因素等情況下掛掉了,訊息不會丟失的機制,
3.綜合技術實作
可靠性、靈活的路由、集群、事務、高可用的佇列、訊息排序、問題追蹤、可視化管理工具、插件系統等等,
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差,當然ZeroMq 也可以做到,不過自己必須手動寫代碼實作,代碼量不小,尤其是可靠性中的:持久性、投遞確認、發布者證實和高可用性,
4.高并發
毋庸置疑,RabbitMQ 最高,原因是它的實作語言是天生具備高并發高可用的erlang 語言,
5.比較關注的比較, RabbitMQ 和 Kafka
RabbitMq 比 Kafka 成熟,在可用性上,穩定性上,可靠性上,RabbitMq 勝于 Kafka(理論上),
另外,Kafka 的定位主要在日志等方面, 因為Kafka 設計的初衷就是處理日志的,可以看做是一個日志(訊息)系統一個重要組件,針對性很強,所以 如果業務方面還是建議選擇 RabbitMq ,
還有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出來很多,
選型最后總結:
如果我們系統中已經有選擇 Kafka ,或者 RabbitMq ,并且完全可以滿足現在的業務,建議就不用重復去增加和造輪子,
可以在 Kafka 和 RabbitMq 中選擇一個適合自己團隊和業務的,這個才是最重要的,但是毋庸置疑現階段,綜合考慮沒有第三選擇,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/13956.html
標籤:PHP
