我正在撰寫一個應用程式,我遇到了這個問題,一遍又一遍地查看代碼,似乎沒有任何問題,使用下面的基本片段進行測驗,問題是可重現的....RabbitMQ 說佇列總是空的不是。
下面的 Golang 片段顯示了生產者發送訊息的頻率高于消費者消費訊息的頻率。消費者始終處于活動狀態,但睡眠時間更長,以使佇列在其積壓中擁有訊息。結果?消費者每次嘗試時都會獲取訊息,但是 API 總是說沒有訊息 -> 訊息計數為 0。
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"io/ioutil"
"net/http"
"testing"
"time"
)
func main() {
username := "guest"
password := "guest"
scheme := "amqp"
rabbitMqHost := "localhost"
port := "5672"
connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)
conn, err := amqp.Dial(connectionString)
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
exchangeName := "my-exchange"
// Declare exchange
err = ch.ExchangeDeclare(
exchangeName, // name
"fanout", // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Create first Queue
queueName := "my-queue"
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
true, // delete when unsused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Bind Exchange to Queue
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeName, // exchange
false,
nil,
)
// Listen
eventQueue, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
go func() {
for a := range eventQueue {
fmt.Printf("Received Event %s\n", string(a.Body))
time.Sleep(time.Second * 4)
}
}()
go func() {
count := 0
for {
err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("Message %d", count)),
})
fmt.Printf("Sent Message %d\n", count)
count
if err != nil {
panic(err)
}
time.Sleep(time.Second * 2)
}
}()
for {
httpRes, err := http.Get("http://guest:guest@localhost:15672/api/queues///my-queue")
if err != nil {
panic(err)
}
var resJson map[string]interface{}
content, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
panic(err)
}
httpRes.Body.Close()
err = json.Unmarshal(content, &resJson)
if err != nil {
panic(err)
}
q2, err := ch.QueueDeclarePassive(
queueName, // name
true, // durable
true, // delete when unsused
false, // exclusive
false, // no-wait
nil,
)
fmt.Printf("Queue Len: %f - %d\n", resJson["messages"], q2.Messages)
time.Sleep(time.Second)
}
}
您可以使用以下 RabbitMQ 服務器進行測驗:
docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
輸出:
Sent Message 0
Received Event Message 0
Queue Len: %!f(<nil>) - 0
Queue Len: %!f(<nil>) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....
Que Len 沒有一次說它不是 0。在我的應用程式中,當我放一堆訊息時,我可能會看到它說 X,但很快它變成了 0,我以為我在應用程式中有一個隱藏的消費者,但沒有,API 給出了一些不準確的結果,或者我應該尋找其他地方來獲得長度。
更新
以上僅在有消費者時發生,如果佇列沒有消費者,則按預期作業,只需注釋.Consume代碼:
/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/
現在它“改進”了,但首先,它不是我想要的,其次,它仍然是奇怪的輸出 =S:
Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11
uj5u.com熱心網友回復:
該欄位q2.Messages是不可靠的,它是未等待確認的訊息計數,即已經確認的訊息。
您的使用者宣告為autoAck = true- 即noAck-,這意味著不需要確認,這意味著已經確認的訊息為零。
當您注釋掉消費者時,已確認訊息的數量可能取決于發布者緩沖區。
使用 AMQP 0.9.1 在給定佇列上以編程方式獲取精確數量的訊息基本上是不可能的。您可以改用message_stats管理 API 中的欄位:
http://localhost:15672/api/queues/vhost/queue_name
uj5u.com熱心網友回復:
接受的解決方案將是 blackgreen 的。證明是下面的替換,只需將問題部分中的消費者和發布者代碼替換為:
// Listen
eventQueue, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack <-- Difference
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
go func() {
for a := range eventQueue {
err = ch.Ack(a.DeliveryTag, false) // <-- Difference
if err != nil {
panic(err)
}
fmt.Printf("Received Event %s\n", string(a.Body))
time.Sleep(time.Second * 4)
}
}()
go func() {
count := 0
for {
err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("Message %d", count)),
})
fmt.Printf("Sent Message %d\n", count)
count
if err != nil {
panic(err)
}
if count >= 20 { // <-- Difference
break
}
time.Sleep(time.Second * 2)
}
}()
輸出:
.... The increase in the queue length
Sent Message 13
Queue Len: 8.000000 - 0
Queue Len: 8.000000 - 0
Received Event Message 4
Sent Message 14
Queue Len: 8.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 15
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 5
Sent Message 16
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 17
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 6
Sent Message 18
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Sent Message 19
Queue Len: 11.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 7
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 8
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 9
Queue Len: 12.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 10
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Received Event Message 11
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 12
....
As publisher exits it decreases, the consumer catches up and message len decreases:
Received Event Message 16
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 17
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 18
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Received Event Message 19
Queue Len: 2.000000 - 0
Queue Len: 1.000000 - 0
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/371087.html
