RabbitMQ簡單模式和work模式
使用的語言是go語言,框架beego ,廢話不多說直接代碼
一、操作rabbitmq的工具包
使用的工具包是github.com/streadway/amqp
go get github.com/streadway/amqp
import "github.com/streadway/amqp"
二、創建連接
使用的函式時amqp.Dail(),直接將連接封裝起來,因為每個模式生產者和消費者都要進行連接
func Connect() (*amqp.Connection, error) {
conn, err := amqp.Dial("amqp://用戶名:密碼@rabbitmqIp:5672/")
return conn, err
}
引數說明:Dail(“amqp://用戶名:密碼@rabbitmqIp:5672/”)
三、創建發送端函式
步驟:
-
創建連接 并且使用defer 關閉連接
-
創建channel通道并使用defer 關閉連接
-
創建佇列,然后才能向佇列發送訊息 ,使用:
channel.QueueDecclare() -
發送訊息 .使用:
ch.Publish()
/**
* @Description: 簡單模式和work模式生產者函式
* @param exchange 交換機名稱 簡單和work模式下exchange為空
* @param queueName 佇列名稱
* @param body 訊息內容
* @return error 回傳值
*/
func Publish(exchange string, queueName string, body string) error {
//連接
conn, err := Connect()
if err != nil {
return err
}
defer conn.Close()
//創建channel
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
//宣告一個佇列供我們發送,然后才能向佇列發送訊息
//第一個引數 佇列名稱 第二個引數 是否持久化 第三個引數 是否洗掉 第四個引數 是否唯一 第五個引數 是否等待 第六個其它引數
q, err := ch.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return err
}
//發送訊息
//第一引數 交換機名稱 ,第二個引數 隊里名稱 第三個引數 是否強制 第四個引數 立即執行 ,第五個 訊息體
err = ch.Publish(exchange, q.Name, false, false,
amqp.Publishing{DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body)})
return err
}
四、創建消費端函式
步驟:
-
創建連接 并且使用defer 關閉連接
-
創建channel通道并使用defer 關閉連接
-
宣告消費佇列,并與生產者函式保持一致 ,使用:
channel.QueueDeclare -
做佇列宣告,因為消費者可能在生產者之前啟動,所以要確保使用訊息之前佇列已經存在,使用:
ch.Consume -
接收訊息
/**
* @Description: 簡單模式和work模式消費者函式
* @param exchange
* @param queueName
* @param callback
*/
func ReceiveMsg(exchange string, queueName string, callback Callback) {
//先建立連接
conn, err := Connect()
if err != nil {
failOnError(err, "Failed to connect to RabbitMQ")
return
}
defer conn.Close()
//創建channel
ch, err := conn.Channel()
if err != nil {
failOnError(err, "Failed to open Channel")
return
}
defer ch.Close()
//宣告消費佇列,與發送端相匹配
//引數 第一個引數 佇列名稱,第二個引數 是否持久化 第三個引數 是否洗掉 第四個引數 是否獨一 第五個引數 是否等待 第六個其他引數
q, err := ch.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
failOnError(err, "Failed to create QueueDeclare")
return
}
// 做佇列宣告,因為消費者可能在生產者之前啟動
//引數 第一個引數 佇列名稱,第二個引數 消費者 第三個引數 是否自動應答 第四個引數 是否獨一 第五個引數 是否區域 第六個引數是否等待 第七個其他引數 nil
msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
failOnError(err, "Failed to register QueueDeclare")
return
}
//接收訊息
forever := make(chan bool)
go func() {
for d := range msgs {
s := BytesToString(&(d.Body))
callback(*s)
_ = d.Ack(false) //如果ch.Consume第三個引數否自動應答為false,需要我們手動應答,添加此行代碼,如果為true 不需要
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
五、運行生產端和消費端
生產端
這里使用的是beego框架,直接通過路由來訪問生產者
//簡單模式和work作業模式, push方法
func (m *MqController) GetMq() {
go func() {
count := 0
for {
err := mq.Publish("", "fyouku_demo", "hello+"+strconv.Itoa(count))
fmt.Println(err)
count++
time.Sleep(1 * time.Second)
}
}()
m.Ctx.WriteString("HELLO")
}
運行效果

消費端
新建個mq/work/main.go檔案
func main() {
mq.ReceiveMsg("","fyouku_demo",callback)
}
func callback(s string) {
fmt.Printf("Msg is :%s\n",s)
}
通過終端進入main.go;通過命令bee run運行檔案

進入rabittmq管理界面,就可以看到剛才運行的佇列了

轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/278916.html
標籤:區塊鏈
上一篇:282-Go語言的作業區
