文章目錄
- 前言
- 安裝開始
- 打好基礎
- 代碼實作
- 功能倉庫檔案
- 各個模式實作
前言
最近在跟慕課做一個秒殺商城的小專案,接觸了RabbitMQ
雖然平時是在Python中實作訊息佇列,但是不得不說RabbitMQ香呀
今天也是除夕,在這個祝大家新年快樂,發個小水文章吧QAQ
安裝開始
# 基礎安裝
$ brew install rabbitmq
$ vim ~/.zshrc # 將 export PATH=$PATH:/usr/local/sbin 寫入
$ rabbitmq-server # 重新打開終端,開啟服務
# 開啟插件
$ rabbitmq-plugins list # 查看插件
$ rabbitmq-plugins enable rabbitmq_management # 啟動管理插件
$ rabbitmq-plugins enable rabbitmq_tracing # 啟動日志
$ rabbitmq-plugins disable rabbitmq_tracing # 關閉日志
# 額外命令
$ rabbitmq-server -detached # 后臺啟動
$ rabbitmqctl status # 查看狀態
$ rabbitmqctl stop # 關閉
打好基礎
這些都是很基本的概念,你得明白什么是什么就好了
因為作為工具,首先要會用起來,會采用囫圇吞棗的模式學習
隨著后面的深入,慢慢了解特點吧
| 概念 | 描述 |
|---|---|
| Channel | 生產者publish或是消費者subscribe一個佇列都是通過信道來通信的 |
| Exchange | exchange的作用就是類似路由器,服務器會根據路由鍵將訊息從交換器路由到佇列上去 |
| Queue | 佇列收到的訊息將發送給消費者 |
| Binding | 建立鏈接交換的系結資訊 |
| VirtualHost | 不同的隔離區,防止污染 |
| Connection | 建立的鏈接 |
| 作業模式 | 描述 |
|---|---|
| simple | 最簡單的收發模式 |
| work | 資源的競爭 |
| publish/subscribe | 共享資源 |
| routing | 只能匹配上路由key對應的訊息佇列,對應的消費者才能消費訊息 |
| topic | routing的一種模糊匹配 |
代碼實作
功能倉庫檔案
// MQURL 連接資訊 amqp://賬號:密碼@ip:host/vhost
const MQURL = "amqp://guest:guest@127.0.0.1:5672/"
// RabbitMQ rabbitMQ結構體
type RabbitMQ struct {
conn *amqp.Connection // 鏈接
channel *amqp.Channel // 通道
QueueName string //佇列名稱
Exchange string //交換機名稱
Key string //bind Key 名稱
Mqurl string //連接資訊
}
// NewRabbitMQ 創建結構體實體
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
}
// Destroy 斷開 channel 和 connection
func (r *RabbitMQ) Destroy() {
r.channel.Close() // 斷開 channel
r.conn.Close() // 斷開 conn
}
// 錯誤處理函式
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Printf("%s:%s", message, err) // 列印錯誤
panic(fmt.Sprintf("%s:%s", message, err)) // 拋出錯誤
}
}
// NewRabbitMQSimple 創建簡單模式下RabbitMQ實體
// 在Simple模式下唯一不同的是 queueName
func NewRabbitMQSimple(queueName string) *RabbitMQ {
// todo 創建RabbitMQ實體
rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
// todo 補上conn與channel
//獲取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
//獲取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// PublishSimple 簡單模式下佇列生產
func (r *RabbitMQ) PublishSimple(message string) {
// todo 申請佇列,如果佇列不存在會自動創建,存在則跳過創建
_, err := r.channel.QueueDeclare(
r.QueueName, // 首先放入名稱
false, //是否持久化
false, //是否自動洗掉
false, //是否具有排他性
false, //是否阻塞處理
nil, //額外的屬性
)
if err != nil {
fmt.Println(err)
}
//todo 呼叫channel 發送訊息到佇列中
r.channel.Publish(
r.Exchange, // 此處為空
r.QueueName,
false, //如果為true,根據自身exchange型別和routeKey規則;無法找到符合條件的佇列會把訊息返還給發送者
false, //如果為true,當exchange發送訊息到佇列后發現佇列上沒有消費者,則會把訊息返還給發送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// ConsumeSimple 簡單模式下消費者
func (r *RabbitMQ) ConsumeSimple() {
//todo 申請佇列,如果佇列不存在會自動創建,存在則跳過創建
q, err := r.channel.QueueDeclare(
r.QueueName,
false, //是否持久化
false, //是否自動洗掉
false, //是否具有排他性
false, //是否阻塞處理
nil, //額外的屬性
)
if err != nil {
fmt.Println(err)
}
//todo 接收訊息
msg, err := r.channel.Consume(
q.Name, // queue
"", //用來區分多個消費者 此處不區分
true, //是否自動應答
false, //是否獨有
false, //設定為true,表示不能將同一個Connection中生產者發送的訊息傳遞給這個Connection中的消費者
false, // 是否阻塞處理
nil, // 額外的屬性
)
if err != nil {
fmt.Println(err)
}
//todo 啟用協程處理訊息
// 此處使用forever的意思為因為協程會始終監聽訊息(除非手動結束)
// 手動結束才會進行 <-forever 有協程且一直嘗試讀取資料
forever := make(chan bool)
go func() {
for d := range msg {
// 訊息邏輯處理
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
// NewRabbitMQPubSub 訂閱模式創建RabbitMQ實體就要設定路由器了
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
//todo 創建RabbitMQ實體
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
//todo 獲取connection和獲取channel
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// PublishPub 訂閱模式生產
func (r *RabbitMQ) PublishPub(message string) {
//todo 嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout",
true,
false,
false, //true表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的系結
false,
nil,
)
r.failOnErr(err, "Failed to declare an exchange")
//todo 發送訊息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// ReceiveSub 訂閱模式消費端代碼
func (r *RabbitMQ) ReceiveSub() {
//todo 試探性創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", //交換機型別
true,
false,
false, //true表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的系結
false,
nil,
)
r.failOnErr(err, "Failed to declare an exchange")
//todo 試探性創建佇列,這里注意佇列名稱不要寫
q, err := r.channel.QueueDeclare(
"", //隨機生產佇列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//todo 系結佇列到 exchange 中
err = r.channel.QueueBind(
q.Name,
"", //在pub/sub模式下,這里的key要為空
r.Exchange,
false,
nil)
//todo 消費訊息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message: %s", d.Body)
}
}()
fmt.Println("退出請按 CTRL+C\n")
<-forever
}
// NewRabbitMQRouting 路由模式
func NewRabbitMQRouting(exchangeName string, routingKey string) *RabbitMQ {
//todo 創建RabbitMQ實體
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
var err error
//todo 獲取connection 獲取channel
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// PublishRouting 路由模式發送訊息
func (r *RabbitMQ) PublishRouting(message string) {
//1.嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare an exchange")
//2.發送訊息
err = r.channel.Publish(
r.Exchange,
//要設定
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// ReceiveRouting 路由模式接受訊息
func (r *RabbitMQ) ReceiveRouting() {
// todo 試探性創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
//交換機型別
"direct",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare an exchange")
// todo 試探性創建佇列,這里注意佇列名稱不要寫
q, err := r.channel.QueueDeclare(
"", //隨機生產佇列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//系結佇列到 exchange 中
err = r.channel.QueueBind(
q.Name,
//需要系結key
r.Key,
r.Exchange,
false,
nil)
// todo 消費訊息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message: %s", d.Body)
}
}()
fmt.Println("退出請按 CTRL+C\n")
<-forever
}
// NewRabbitMQTopic 話題模式
func NewRabbitMQTopic(exchangeName string, routingKey string) *RabbitMQ {
// todo 創建RabbitMQ實體
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
var err error
// todo 獲取connection與獲取channel
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// PublishTopic 話題模式發送訊息
func (r *RabbitMQ) PublishTopic(message string) {
// todo 嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"topic",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare an exchange")
// todo 發送訊息
err = r.channel.Publish(
r.Exchange,
//要設定
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// ReceiveTopic 話題模式接受訊息
//要注意key,規則
//其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)
//匹配 xx.* 表示匹配 xx.hello, 但是 xx.hello.one需要用 xx.#才能匹配到
func (r *RabbitMQ) ReceiveTopic() {
//1.試探性創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
//交換機型別
"topic",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare an exch"+
"ange")
//2.試探性創建佇列,這里注意佇列名稱不要寫
q, err := r.channel.QueueDeclare(
"", //隨機生產佇列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//系結佇列到 exchange 中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
r.Key,
r.Exchange,
false,
nil)
//消費訊息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message: %s", d.Body)
}
}()
fmt.Println("退出請按 CTRL+C\n")
<-forever
}
各個模式實作
因為不確定包的位置
所以報紅簡單寫一下引入上面的倉庫檔案就好了
- 簡單模式
// 發布者
func main() {
// todo 創建實體并發送訊息
rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
rabbitmq.PublishSimple("Hello world!")
fmt.Println("發送成功!")
}
// 接受者
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple") // 名字要一樣
rabbitmq.ConsumeSimple()
}
- 作業模式
// 發布者
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
for i := 0; i <= 100; i++ {
rabbitmq.PublishSimple("Hello world!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
// 接受者1
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
rabbitmq.ConsumeSimple()
}
// 接受者2
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
rabbitmq.ConsumeSimple()
}
- 發布模式
// 發布者
func main() {
rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
for i := 0; i < 100; i++ {
rabbitmq.PublishPub("訂閱模式生產第" + strconv.Itoa(i) + "條" + "資料")
fmt.Println("訂閱模式生產第" + strconv.Itoa(i) + "條" + "資料")
time.Sleep(1 * time.Second)
}
}
// 訂閱者1
func main() {
rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
rabbitmq.ReceiveSub()
}
// 訂閱者2
func main() {
rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
rabbitmq.ReceiveSub()
}
- 路由模式
// 發布者
func main() {
One := RabbitMQ.NewRabbitMQRouting("ex", "one")
Two := RabbitMQ.NewRabbitMQRouting("ex", "two")
for i := 0; i <= 10; i++ {
One.PublishRouting("Hello one!" + strconv.Itoa(i))
Two.PublishRouting("Hello Two!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
// 接受者
func main() {
One := RabbitMQ.NewRabbitMQRouting("ex", "one")
One.ReceiveRouting()
}
// 接受者
func main() {
Two := RabbitMQ.NewRabbitMQRouting("ex", "two")
Two.ReceiveRouting()
}
- 話題模式
// 發布者
func main() {
One := RabbitMQ.NewRabbitMQTopic("exTopic", "topic.one")
Two := RabbitMQ.NewRabbitMQTopic("exTopic", "topic.two")
for i := 0; i <= 10; i++ {
One.PublishTopic("Hello topic one!" + strconv.Itoa(i))
Two.PublishTopic("Hello topic Two!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
// 接受者1
func main() {
One := RabbitMQ.NewRabbitMQTopic("exTopic", "#") // # 表示一個或者多個詞語
One.ReceiveTopic()
}
// 接受者2
func main() {
Two := RabbitMQ.NewRabbitMQTopic("exTopic", "*.two") // 表示多個詞語
Two.ReceiveTopic()
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423254.html
標籤:其他
上一篇:一天學完spark的Scala基礎語法教程十、類和物件(idea版本)
下一篇:阿里云基本概念與基礎架構(一)
