文章目錄
- 一、訊息佇列
- 1.1 作用
- 1.2 主流訊息佇列比較
- 二、RabbitMQ的安裝
- 2.1 安裝
- 2.1.1 Docker 方式
- 2.1.2 原生方式(Ubuntu 20.04)
- 2.2 管理插件的用法
- 三、RabbitMQ快速入門
- 3.1 名詞介紹
- 3.2 Hello World!
- 3.3 任務佇列(work queue)
- 3.3.1 回圈調度
- 3.3.2 訊息確認
- 3.3.3 訊息持久化
- 3.3.4 公平調度
- 3.4 發布/訂閱(Publish/Subscribe)
- 3.4.1 交換器(Exchanges)
- 3.4.2 臨時佇列
- 3.4.3 系結
- 3.5 路由(Routing)
- 3.5.1 direct 交換器
- 3.5.2 topic 交換器
一、訊息佇列
在計算機中,訊息是一個程式發送給另一個程式的資料,而佇列(一種資料結構)就是一個容器,存放在里面的東西,都是先放進來的先被拿出去,所以,訊息佇列就是在訊息的傳輸程序中暫時保存訊息的容器,使用佇列,能夠避免在收、發訊息需要同步進行的弊端,可以讓發送方直接將訊息放如佇列,避免因為等待接收方而阻塞,
1.1 作用
-
應用解耦:
把一個大系統拆分為多個小系統,互相之間用訊息佇列來做資料的互動或函式的呼叫,
-
流量削峰:
在流量特別大的應用場景內,比如秒殺活動,這種情況下,服務端很容易崩潰,但是我直接拒絕用戶,對用戶來說是一種極為不好的用戶體驗,所以,我們可以控制處理請求的速度,將暫時處理不了的請求放入訊息佇列,讓用戶稍等一會兒,這比直接拒絕好很多,
-
訊息分發:
就是將資料發送給需要接收資料的其他主機、程式等
-
異步處理:
比如,在用戶成功購買某一件商品后,同時向他發送訂單的手機短信和電子郵件,然后跳轉到成功頁面:
- 沒有訊息佇列:先將訂單資料交給短信模塊發送手機短信,然后交給郵件模塊發送電子郵件,完成后跳轉到成功頁面,
- 使用訊息佇列:直接跳轉到成功頁面,至于手機短信和電子郵件,可以把訂單資料放入訊息佇列中,短信模塊和郵件模塊在后臺異步讀取資料并發送,
1.2 主流訊息佇列比較
| Kafka | RocketMQ | RabbitMQ | |
|---|---|---|---|
| 單機吞吐量 | 十萬級 | 十萬級 | 萬級 |
| 訊息延遲 | 毫秒級 | 毫秒級 | 微秒級 |
| 可用性 | 非常高(分布式) | 非常高(分布式) | 高(主從) |
| 訊息丟失 | 理論上不會丟失 | 理論上不會丟失 | 低 |
| 社區活躍度 | 高 | 中 | 高 |
二、RabbitMQ的安裝
2.1 安裝
2.1.1 Docker 方式
# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
# for RabbitMQ 3.8,
# 3.8.x support timeline: https://www.rabbitmq.com/versions.html
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
2.1.2 原生方式(Ubuntu 20.04)
-
將下面的代碼,保存到
rabbitMQ_install.sh檔案中:#!/usr/bin/sh sudo apt-get install curl gnupg apt-transport-https -y ## Team RabbitMQ's main signing key curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null ## Cloudsmith: modern Erlang repository curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null ## Cloudsmith: RabbitMQ repository curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null ## Add apt repositories maintained by Team RabbitMQ sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF ## Provides modern Erlang/OTP releases ## deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main ## Provides RabbitMQ ## deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main EOF ## Update package indices sudo apt-get update -y ## Install Erlang packages sudo apt-get install -y erlang-base \ erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \ erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \ erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl ## Install rabbitmq-server and its dependencies sudo apt-get install rabbitmq-server -y --fix-missing -
給檔案添加執行權限:
sudo chmod +x ./rabbitMQ_install.sh -
執行腳本:
sudo bash ./rabbitMQ_install.sh
2.2 管理插件的用法
-
啟動 RabbitMQ服務:
sudo systemctl start rabbitmq-server -
創建 RabbitMQ用戶:
sudo rabbitmqctl add_user '用戶名' '密碼'其實,RabbitMQ自帶了一個來賓用戶,用戶名和密碼都是
guest, -
授予管理員權限:
sudo rabbitmqctl set_user_tags 用戶名 administrator -
向虛擬主機中的用戶授予所有權限:
sudo rabbitmqctl set_permissions -p / 用戶名 '.*' '.*' '.*' -
啟用管理插件:
sudo rabbitmq-plugins enable rabbitmq_management -
訪問管理頁面:http://localhost:15672/,輸入前面創建的用戶名和密碼即可,
三、RabbitMQ快速入門
3.1 名詞介紹
-
生產(Producing):即發送訊息,發送訊息的程式被稱為“生產者( producer)”,
-
消費(Consuming):即接收訊息,接收訊息的程式被稱為“消費者(consumer)”
-
佇列(Queue):就是存放訊息的地方,佇列只受主機的記憶體和磁盤限制,它本質上是一個大的訊息緩沖區,
可以有多個生產者將訊息發送到一個佇列,也可以有多個消費者嘗試從一個佇列接收資料,
注意:生產者、消費者、訊息佇列可以放在不同的機器上;生產者和消費者也可以是同一個程式,
3.2 Hello World!
接下來,我們用 python 代碼撰寫兩個小程式,生產者發送訊息(將訊息放入佇列),消費者接收訊息(從佇列取出訊息),訊息內容為“Hello World!”,這是一個最精簡的 RabbitMQ 的使用程序,
-
安裝 RabbitMQ 官方推薦的 python 客戶端:
pip install pika -
發送訊息,新建 send.py 檔案:
#!/usr/bin/env python import pika # 連接到本地的訊息佇列 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 創建佇列,佇列名稱為 hello channel.queue_declare(queue='hello') # 發送訊息 channel.basic_publish(exchange='', routing_key='hello', # 佇列名稱 body='Hello World!') # 訊息內容 print(" [x] 發送 'Hello World!'") # 關閉連接 connection.close() # 會自動重繪網路快取區,確保訊息被發送 -
接收訊息,新建 receive.py 檔案:
#!/usr/bin/env python import pika, sys, os def main(): # 連接到本地的訊息佇列 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 創建佇列,佇列名稱為 hello channel.queue_declare(queue='hello') # 回呼函式,每當接收到訊息時被呼叫 def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body) channel.basic_consume(queue='hello', # 指定接受訊息的佇列 auto_ack=True, # 是否自動呼叫回呼函式 on_message_callback=callback) # 指定回呼函式 print(' [*] 等待訊息. 按下 CTRL+C 退出') # 進入一個回圈,等待訊息 channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)注意:消費者和生產者都進行了同一個佇列的創建,目的是無論哪一方先被啟動,都能確保佇列存在,
-
啟動消費者:
python receive.py # [*] 等待訊息. 按下 CTRL+C 退出 # [x] 接收到 b'Hello World!' -
啟動生產者:
python send.py # [x] 發送 'Hello World!'
3.3 任務佇列(work queue)
任務佇列(也叫作業佇列)是存盤了一些資源密集型任務的佇列,這些任務被封裝成一個個的訊息,目的是為了避免資源密集型任務對當前行程的阻塞,在后臺中讓專門的任務處理行程(worker)從任務佇列中取出任務(就是訊息,這里叫任務更加貼切)去執行,而且,當有多個任務處理行程一起作業時,便于任務在它們之間共享,
-
修改 send.py 中的代碼,保存為 new_task.py ,要修改的部分如下:
import sys # 要處理的任務 message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) # 改為任務 -
修改 receive.py 中的代碼,保存為 worker.py:
import time def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body.decode()) # 訊息體中有幾個“.”就睡幾秒鐘,假裝任務處理很耗時 time.sleep(body.count(b'.')) print(" [x] 完成")
3.3.1 回圈調度
使用任務佇列的優點之一是能夠輕松地擴大規模,如果我們的任務被大量積壓,來不及處理,我們可以很輕松地增加更多的任務處理行程(以下都將簡稱為 worker),
-
啟動兩個終端,運行2個 worker.py:
python worker.py # [*] 等待訊息. 按下 CTRL+C 退出 -
再啟動一個終端,運行 new_task.py:
python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message..... -
看看 worker 收到了什么:
# 第一個 worker: [x] 接收到 'First message.' [x] 完成 [x] 接收到 'Third message...' [x] 完成 [x] 接收到 'Fifth message.....' [x] 完成 # 第二個 worker: [x] 接收到 'Second message..' [x] 完成 [x] 接收到 'Fourth message....' [x] 完成默認情況下,RabbitMQ 會按順序將每條訊息發送給下一個 worker,一般來說,每個 worker 都會收到相同數量的任務,這種分發任務的方式稱為輪詢,
3.3.2 訊息確認
在上面的代碼中,如果一個 worker 在執行任務的程序中被終止了,那么任務會丟失,為了避免任務丟失,RabbitMQ 提供了訊息確認機制:
- 如果 worker 回復一個 ack(acknowledgement),那么 RabbitMQ 就認為該任務被成功處理,然后從任務佇列洗掉該任務,
- 如果一個 worker 沒有回復 ack 就終止運行了,那么 RabbitMQ 就認為該任務執行失敗,會將該任務重新排隊,分配給其他運行中的 worker,
訊息確認有一個超時時間,默認為 30分鐘,它能幫助我們檢測例外的 worker,
默認情況下,手動訊息確認是打開的,在前面的例子中,我們通過auto_ack=True關閉了它們,
-
繼續修改代碼,移除關閉訊息確認的引數:
def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] 完成") ch.basic_ack(delivery_tag = method.delivery_tag) # 發送 ack channel.basic_consume(queue='hello', on_message_callback=callback)使用這段代碼后,即使你用 CTRL+C 終止一個正在處理任務的 worker,也不會有任何損失,
3.3.3 訊息持久化
我們已經學會了如何確保即使 worker 意外終止,任務也不會丟失,但是如果RabbitMQ 服務器停止,我們的任務仍然會丟失,要確保任務不會丟失,需要做兩件事:將佇列和任務都標記為持久,
-
標記佇列為持久,以便 RabbitMQ 重啟后佇列能夠存活:
# 因為 hello 佇列已經存在,所以我們換了個新佇列 task_queue channel.queue_declare(queue='task_queue', durable=True)注意!持久化標記要在生產者和消費者上都標記上,
-
標記任務為持久:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE ))
將訊息標記為持久化并不能完全保證訊息不會丟失,雖然它告訴 RabbitMQ 將訊息保存到磁盤,但是當 RabbitMQ 接收到訊息并且還沒有保存訊息時,仍然會有一個很短的空檔,
此外,RabbitMQ 不會對每條訊息做
fsync(2)處理——它可能只是被保存在快取中,而不是真正寫入磁盤,持久性保證不是強的,但對于我們的簡單任務佇列來說已經足夠了,如果你需要一個更強的保證,那么你可以使用 publisher confirm,
3.3.4 公平調度
RabbitMQ 默認均勻地分配任務,即使有的任務特別耗時,使得分配到該任務地 worker 任務已經堆積如山,RabbitMQ 還是在均勻分配,
這是因為 RabbitMQ 在任務進入佇列時只進行分配,不查看 worker 未確認訊息的數量,盲目地將第 n 個任務發送給第 n 個 worker,不過,我們可以通過設定,改變這種行為,
-
在 worker 中配置公平調度:
channel.basic_qos(prefetch_count=1)上面地數字是1,也就是說在 worker 處理并確認前一個任務完成之前,不要向它分配新任務,而是將任務發送給下一個沒有作業中的 worker,
如果所有的 worker 都很忙,任務佇列就會排滿,你需要注意這一點,并盡可能添加更多的作業人員,或者使用訊息 TTL,
3.4 發布/訂閱(Publish/Subscribe)
發布者(即生產者)發送一條訊息,每一個訂閱者(即消費者)都能接收到,這種模式稱為“發布/訂閱”,
3.4.1 交換器(Exchanges)
之前我們說:生產者發送的訊息被保存到佇列中,這只是為了好理解而這么說的,實際上,生產者從不直接向佇列發送任何訊息,而是發送給交換器(exchange),由交換器決定訊息下一步去往何處,具體的規則由交換器型別定義,
交換型別有 direct、topic、header 和 fanout,創建一個 fanout 型別的交換器,取名為logs:
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
fanout 交換器十分簡單,它只是單純地把訊息發送給它所知道的佇列,
將訊息發送到創建好的交換器:
channel.basic_publish(exchange='logs', # 通過名字指定交換器
routing_key='',
body=message)
3.4.2 臨時佇列
之前我們使用的都是命名的佇列,像hello、task_queue,如果我們創建佇列時不傳入名稱會怎樣呢?答案是 RabbitMQ 會創建一個臨時佇列,該佇列的名稱會像這樣:“mq.gen-JzTY20BRgKO-HjmUJj0wLg”,
我們還可以傳入一個exclusive=True實作獨占效果:當消費者關閉連接后,該佇列也會隨之消失,
創建一個獨占的臨時佇列:
result = channel.queue_declare(queue='', exclusive=True)
3.4.3 系結
在接收者中系結交換器和佇列,也就是訂閱:
channel.queue_bind(exchange='logs',
queue=result.method.queue)
之后,logs 交換器將向我們的臨時佇列添加訊息,只要接收者都系結同一個交換器和佇列,就都能收到訊息,
我們還可以通過以下命令查看系結資訊(前提是接收者運行中):
sudo rabbitmqctl list_bindings
3.5 路由(Routing)
目前,我們的發布/訂閱模型中,訂閱者會毫無選擇地接收發布者的所有訊息,不過我們可以在系結時,通過routing_key引數進行過濾,讓訂閱者選擇想要的訊息接收,
routing_key的含義取決于交換器的型別,之前使用的 fanout 型別會直接忽視這個引數,所以,需要使用其他型別的交換器,比如,Direct 交換器,
3.5.1 direct 交換器
它背后的演算法很簡單:
- 先看
basic_publish()的routing_key是否和queue_bind()的routing_key匹配; - 匹配就放入佇列,不匹配就去匹配其他佇列;
- 沒有匹配的就丟棄訊息,
# 發布訊息
channel.basic_publish(exchange='direct_logs',
routing_key='black',
body=message)
# 系結
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
RabbitMQ 允許使用同一個routing_key系結多個佇列,交換器會將訊息發送給多個匹配的佇列,
3.5.2 topic 交換器
topic 交換器的routing_key必須是用.分割的多個詞語,如orange.rabbit.lazy,最長為255個位元組,發布時的routing_key為可以按照.分割,如果二者之間匹配,則將訊息發送給佇列,如果不匹配則丟棄,這和 direct 交換器是一樣的,但topic 交換器真正的用處在于兩個特殊符號:
*:代表匹配任意一個單詞,比如,*.orange.*可以匹配到中間是orange的單詞數量為3個的任意routing_key,#:代表匹配任意多個單詞,比如,rabbit.#可以匹配到rabbit開頭的任意routing_key,且單詞數量在合法范圍內沒有限制,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423324.html
標籤:其他
下一篇:基于大資料(Hadoop+Java+MySQL)的數碼商城購物推薦系統設計與實作 檔案+任務書+開題報告+文獻綜述+答辯PPT+專案原始碼及資料庫檔案
