一、RabbitMQ 簡介
訊息佇列是一種應用程式對應用程式的通信方法,應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通信,而無需專用連接來鏈接它們,訊息傳遞指的是程式之間通過在訊息中發送資料進行通信,而不是通過直接呼叫彼此來通信,直接呼叫通常是/用于諸如遠程程序呼叫的技術,排隊指的是應用程式通過 佇列來通信,佇列的使用除去了接收和發送應用程式同時執行的要求,
二、使用場景
1.解耦
場景說明:用戶下單后,訂單系統需要通知庫存系統,傳統的做法是,訂單系統呼叫庫存系統的介面,

傳統模式的缺點:
- 如果庫存系統出現故障,訂單系統當收到訂單時也會出現問題
- 訂單與庫存系統之間高耦合
采用訊息佇列后

- 訂單系統:用戶下單后,訂單系統完成持久化處理,將訊息寫入訊息佇列,回傳用戶訂單下單成功
- 庫存系統:訂閱下單的訊息,采用拉/推的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作
- 假如:在下單時庫存系統不能正常使用,也不影響正常下單,因為下單后,訂單系統寫入訊息佇列就不再關心其他的后續操作了,實作訂單系統與庫存系統的應用解耦
- 為了保證庫存肯定有,可以將佇列大小設定成庫存數量,或者采用其他方式解決,
2.異步
場景說明:用戶注冊后,需要發注冊郵件和注冊短信,傳統的做法有兩種 1.串行的方式;2.并行方式
(1)串行方式:將注冊資訊寫入資料庫成功后,發送注冊郵件,再發送注冊短信,以上三個任務全部完成后,回傳給客戶端

(2)并行方式:將注冊資訊寫入資料庫成功后,發送注冊郵件的同時,發送注冊短信,以上三個任務完成后,回傳給客戶端,與串行的差別是,并行的方式可以提高處理的時間

(3)引入訊息佇列,將不是必須的業務邏輯,異步處理,

3.流量削峰
一般應用于秒殺活動中,因為秒殺活動一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入訊息佇列,
作用: 1.可以控制活動人數,超過此一定閥值的訂單直接丟棄 2.可以緩解短時間的高流量壓垮應用
4.缺點
-
系統的可用性降低
例如系統只需要呼叫abc三個介面,現在引入了mq之后雖然之前呼叫都沒有出錯,系統可以正常運行,但是mq出現問題的話,整個系統也同樣會受影響
-
系統的復雜性提高
需要考慮訊息是否丟失,還需要考慮訊息傳遞的順序
-
一致性問題
系統發送完訊息回傳成功,但是abc中若有系統寫入失敗,就會產生資料不一致的問題
三、docker安裝rabbitmq
1.先將rabbitmq和erlang的軟體包上傳至阿里云服務器,然后解壓安裝
rpm -Uvh esl-erlang_23.0-1_centos_7_amd64.rpm
yum install erlang
yum install -y socat
rpm -Uvh rabbitmq-server-3.8.14-1.el7.noarch.rpm
yum install rabbit-server -y
2.啟動rabbitmq-server
systemctl start rabbitmq-server
systemctl status rabbitmq-server # 查看狀態,
3.添加超級用戶(web管理界面)
rabbitmqctl add_user admin admin # 添加用戶 用戶名 密碼
rabbitmqctl set_user_tags admin administrator # 修改用戶權限 用戶名 權限
rabbitmqctl change_password admin admin # 修改密碼
4.運行rabbitmq
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 8030:15672 -p 8031:5672 -p 8032:25672 -p 8033:61613 -p 8034:1883 rabbitmq:management
四、作業模式
(一).simple模式(即最簡單的收發模式)

1.訊息產生訊息,將訊息放入佇列
2.訊息的消費者(consumer) 監聽 訊息佇列,如果佇列中有訊息,就消費掉,訊息被拿走后,自動從佇列中洗掉(隱患 訊息可能沒有被消費者正確處理,已經從佇列中消失了,造成訊息的丟失,這里可以設定成手動的ack,但如果設定成手動ack,處理完后要及時發送ack訊息給佇列,否則會造成記憶體溢位),
//send.go
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
//鏈接rabbitmq server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//創建一個通道,用于完成任務
ch,err := conn.Channel()
FailOnError(err,"Failed to open a channel")
defer ch.Close()
//宣告一個佇列,用于訊息發送
q, err := ch.QueueDeclare(
"hello world", // 佇列名
false, // 持久化
false, // 是否自動洗掉
false, // 排他性
false, // no-wait
nil, // 附屬引數
)
FailOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish( // 發送訊息(生產者)
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
FailOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
//receive.go
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
//連接rabbitmq server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//創建一個通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//宣告佇列
q, err := ch.QueueDeclare(
"hello world", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume( // 注冊一個消費者(接收訊息)
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
(二).work作業模式(資源的競爭)

1.訊息產生者將訊息放入佇列消費者可以有多個,消費者1,消費者2同時監聽同一個佇列,訊息被消費,C1 C2共同爭搶當前的訊息佇列內容,誰先拿到誰負責消費訊息(隱患:高并發情況下,默認會產生某一個訊息被多個消費者共同使用,可以設定一個開關(syncronize) 保證一條訊息只能被一個消費者使用),
2.訊息確認
完成一個任務需要消耗時間,如果一個消費者開始了一個任務并在完成期間關倍訓者死亡,我們就會丟失他剛剛處理的訊息,同時還將丟失所有已經派發給特定worker但未處理的資訊,我們可以使用訊息確認來避免這樣的事情發生,消費者發回一個 ack(nowledgement) 來告訴 RabbitMQ 一個特定的訊息已經被接收、處理并且 RabbitMQ 可以自由地洗掉它,如果消費者在沒有發送 ack 的情況下死亡(其通道關閉、連接關倍訓 TCP 連接丟失),RabbitMQ 將理解訊息未完全處理并將重新排隊,如果有其他消費者同時在線,它會迅速將其重新交付給另一個消費者,這樣您就可以確保不會丟失任何訊息,即使作業人員偶爾會死亡,
3.訊息持久性
RabbitMQ服務器停止,如果沒有持久化,任務就會丟失,我們只需要在宣告佇列時將durable引數設定為true,同時將amqp.Publishing中加入DeliveryMode: amqp.Persistent,即可實作持久化,這樣即使我們rabbit重啟,我們的佇列也不會丟失
//new_task.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"work queues", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange 這里使用默認交換機,訊息是通過交換機傳給佇列
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
//獲取運行時的引數
//例如:go run main.go 1 3 -X ? 輸出如下:
//引數0: /tmp/go-build116558042/command-line-arguments/_obj/exe/main
//引數1: 1
//引數2: 3
//引數3: -X
//引數4: ?
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" { //args長度小于2 或者引數為空的時候
s = "hello"
} else {
s = strings.Join(args[1:], " ") //Join 連接其第一個引數的元素以創建單個字串,分隔符字串 sep 放置在結果字串中的元素之間
}
return s
}
//work.go
package main
import (
"bytes"
"log"
"time"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"work queues", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count 服務器將在收到確認之前將那么多訊息傳遞給消費者,
0, // prefetch size 服務器將嘗試在收到消費者的確認之前至少將那么多位元組的交付保持重繪到網路
false, // 當 global 為 true 時,這些 Qos 設定適用于同一連接上所有通道上的所有現有和未來消費者,當為 false 時,Channel.Qos 設定將應用于此頻道上的所有現有和未來消費者
)
FailOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
FailOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// func Count(s, sep [] byte ) int 計算s中sep的非重疊實體數,如果sep是空切片,則Count回傳1+s中UTF-8編碼的代碼點數
dotCount := bytes.Count(d.Body, []byte(".")) // 回傳 . 的個數
t := time.Duration(dotCount) // 表示為 int64 納秒計數
time.Sleep(t * time.Second) //將當前 goroutine 暫停至少持續時間d
log.Printf("Done")
d.Ack(false) // 必須在成功處理此交付后呼叫 Delivery.Ack,如果autoAck為true則不需要. 引數 true表示回復當前信道所有未回復的ack,用于批量確認,false表示回復當前條目
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
啟動兩個終端用于執行work.go,一個終端用來執行new_task,相當于newtask是生產者,work是兩個消費者,兩個消費者共同消費訊息,(輪詢訪問,每個消費者消費的次數相同,公平分發,處理速度快的消費者消費的多,能者多勞)
(三).publish/subscribe發布訂閱(共享資源)

1、每個消費者監聽自己的佇列;
2、生產者將訊息發給broker,由交換機將訊息轉發到系結此交換機的每個佇列,每個系結交換機的佇列都將接收到訊息,
//emit_log.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name 交換機名字
"fanout", // type 交換機模式 廣播到每個佇列
true, // durable 持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs", // exchange 生產者將訊息發往logs交換機
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
//receive_log.go
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare a queue")
err = ch.QueueBind( //宣告的佇列要和交換機系結
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
FailOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
FailOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
//測驗
//啟動兩個消費者,打開兩個終端
go run receive_log.go
//啟動一個生產者
go run emit_log.go // 看到兩個消費者的終端輸出hello
go run emit_log.go 1 // 兩個消費者的終端輸出1
(四).routing路由模式

1.訊息生產者將訊息發送給交換機按照路由判斷,路由是字串(info) 當前產生的訊息攜帶路由字符(物件的方法),交換機根據路由的key,只能匹配上路由key對應的訊息佇列,對應的消費者才能消費訊息;
2.根據業務功能定義路由字串
3.從系統的代碼邏輯中獲取對應的功能字串,將訊息任務扔到對應的佇列中,
4.業務場景:error 通知;EXCEPTION;錯誤通知的功能;傳統意義的錯誤通知;客戶通知;利用key路由,可以將程式中的錯誤封裝成訊息傳入到訊息佇列中,開發者可以自定義消費者,實時接收錯誤;
//emit_log_direct.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // direct交換機將會對binding key和routing key進行精確匹配,從而確定訊息該分發到哪個佇列
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "info"
} else {
s = os.Args[1]
}
return s
}
//receive_logs_direct.go
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare a queue")
// 命令輸入的時候如果引數小于2 證明沒有傳引數info,warning,error
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
//給每一個引數創建一個系結
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // 佇列名稱
s, // 將引數作為 routing key
"logs_direct", // exchange
false,
nil)
FailOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
FailOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
測驗:
go run receive_logs_direct.go info warning error
go run emit_log_direct.go error haha
可以在receive終端中看到輸出haha,route key為error
(五).topic 主題模式(路由模式的一種)

1.星號井號代表通配符
2.星號代表多個單詞,井號代表一個單詞
3.路由功能添加模糊匹配
4.訊息產生者產生消息,把訊息交給交換機
5.交換機根據key的規則模糊匹配到對應的佇列,由佇列的監聽消費者接收訊息消費
(在我的理解看來就是routing查詢的一種模糊匹配,就類似sql的模糊查詢方式)
//emit_log_topic.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
//receive_logs_topic.go
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
FailOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
FailOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
測驗:
打開兩個終端,一個是從kern中接收日志,一個是從critical中接收日志
go run receive_logs_topic.go "kern.*"
go run receive_logs_topic.go "*.critical"
打開一個終端,使用kern.critical作為route key
go run emit_log_topic.go "kern.critical" lalalalalala
可以在上面兩個終端中輸出日志資訊lalalalalala
五、RabbitMQ高級特性-TTL佇列/訊息TTL-死信佇列
1、TTL
-
time to live 訊息存活時間
-
RabbitMQ支持訊息的過期時間, 在訊息發送時可以進行指定
-
如果訊息在存活時間內未被消費,則會被清除
-
RabbitMQ支持兩種ttl設定
-
- 單獨訊息進行配置ttl
- 整個佇列進行配置ttl(居多)
-
//整個佇列進行配置ttl package main import ( "log" "github.com/streadway/amqp" ) func FailOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { //鏈接rabbitmq server conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") FailOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //創建一個通道,用于完成任務 ch,err := conn.Channel() FailOnError(err,"Failed to open a channel") defer ch.Close() //使用x-message-ttl引數設定當前佇列中所有訊息的過期時間 args := make(map[string]interface{}) args["x-message-ttl"] = 5000 //宣告一個佇列,用于訊息發送 q, err := ch.QueueDeclare( "direct_ttl", // 佇列名 false, // 持久化 false, // 是否自動洗掉 false, // 排他性 false, // no-wait args, // 設定ttl為5s ) FailOnError(err, "Failed to declare a queue") body := "Hello World!" err = ch.Publish( // 發送訊息(生產者) "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) FailOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } -
打開web管理界面可以看到features上顯示TTL,發送一條訊息,ready中會顯示1,在5s后自動清除,(放入死信佇列中,而對單條訊息設定ttl洗掉了就消失了),如圖:
-
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-W2jECIh4-1623400450231)(/Users/apple/Desktop/截圖/截屏2021-06-08 下午4.18.51.png)]
2、什么是rabbitmq的死信佇列/死信交換機
-
沒有被及時消費的訊息存放的佇列
-
Dead Letter Exchange(死信交換機,縮寫: DLX)當訊息成為死信后,會被重新發送到另?個交換機,這個交換機就是DLX死信交換機,
-
導致死信佇列的原因有:
- 訊息被拒絕
- 訊息過期
- 佇列達到最大長度
- 結果:訊息成為死信后,如果該佇列系結了死信交換機,則訊息會被死信交換機重新路由到死信佇列


轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/287123.html
標籤:其他
上一篇:Vim入門教程
