php實戰RabbitMQ四(資料持久化與公平調度)
- 前言
- 分析
- 訊息持久化
- RabbitMQ退出或崩潰應對
- 佇列持久化
- 訊息持久化
- 消費者例外退出應對
- 消費者訊息確認
- 持久化注意
- 公平調度
- 原始碼
- 生產者
- 消費者
- 運行
前言
在開始之前先講個故事吧!
憧憬下未來,假如我有2個孩子,龍鳳胎,俊男靚女,emmm哈哈哈停不下來了,
有天,我給家里留了紙條,紙條內容:
1. 孩子們記得吃飯
2. 要洗碗
3. 要拖地
4. 出門時記得鎖門
哈哈哈[ 愛你們的爸爸 ]
分析
我:訊息生產者
紙條:rabbitMQ
孩子們: 消費者
通過將故事與我們實際開發相結合,在這里有兩個問題:
1. 誰洗碗?誰拖地?都是男孩嗎?那女孩子就沒事干了,反映到開發中,有的消費者很忙,有的消費者很閑,是否公平調度?
2. 紙條我放到地方安全顯眼嗎?會不會被小風吹走?會不會被無視?反映到開發中,資料能持久存在嗎?
訊息持久化
RabbitMQ退出或崩潰應對
RabbitMQ退出或崩潰會丟失佇列與訊息,所以在這里我們需要將訊息與佇列標記為持久,
佇列持久化
原始碼

根據原始碼將第三個引數設為true
$channel->queue_declare('hello', false, true, false, false);
訊息持久化
原始碼

原始碼

根據原始碼在訊息傳遞的程序中設定delivery_mode的值為2或者DELIVERY_MODE_PERSISTENT
$msg = new AMQPMessage('你的訊息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );
消費者例外退出應對
消費者可能接收到訊息后,執行一個耗時任務,結果執行在中途,例外退出,默認情況下,rabbitMQ一旦發送訊息給客戶端立即洗掉,這就很尷尬了,反映到現實~~張三剛下的訂單突然沒了,
我們當然希望有消費者例外退出后,趕緊來個接茬的,任務交給另一個消費者來干,
消費者訊息確認
為了保證訊息不丟失,我們希望消費者完成訊息處理后發送ack確認,rabbitMQ收到后才能對訊息洗掉,
原始碼

根據原始碼在消費者方法中,第四個引數設定為false,要求確認
緊接著在回呼函式中處理完訊息后呼叫ack()方法,發送確認
$callback = function($msg) {
//收到訊息了,列印
echo " [x] Received ", $msg->body, "\n";
//***訊息確認***
$msg->ack();
};
$channel->basic_consume('mq_sms_send_q', 'consumer1', false, false, false, false, $callback);
持久化注意
不能在已存在的佇列名上加持久化設定,會報錯,可以重新設定佇列名稱
公平調度
rabbitMQ在分發給消費者任務時,不會智能去監測消費者是否空閑,所以會出現部分消費者在處理一個重任務還未完成,另一個任務已經到來,而另外的消費者之前拿到輕任務很快處理完,閑了很久,
原始碼
根據原始碼在basic_qos()方法中,第二個引數設定為1,等待消費者處理完成后再接新訊息,不堆積
$channel->basic_qos(null, 1, null);
原始碼
生產者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//獲取終端提示用戶輸入的資料
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);
//建立生產者與mq之間的連接
//引數:地址,埠,賬號,密碼
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//在已連接基礎上建立生產者與mq之間的通道
$channel = $connection->channel();
//宣告初始化交換機
//引數:交換機名,路由型別,是否檢測同名佇列,是否開啟佇列持久化,通道關閉后是否洗掉佇列
$channel->exchange_declare('mq_sms_send_ex', 'direct', false, true, false);
//宣告初始化一條佇列
//引數:佇列名,是否檢測同名佇列,是否開啟佇列持久化,是否能被其他佇列訪問,通道關閉后是否洗掉佇列
$channel->queue_declare('mq_sms_send_q1', false, true, false, false);
//將佇列與某個交換機進行系結,并使用路由關鍵字
//引數:佇列名,交換機名,路由鍵名
$channel->queue_bind('mq_sms_send_q1', 'mq_sms_send_ex', 'sms_send');
//生成訊息
$msg = new AMQPMessage($msg_str, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
//推送訊息到某個交換機
//引數:訊息,交換機名,路由鍵名
$channel->basic_publish($msg, 'mq_sms_send_ex', 'sms_send');
echo " [x] Sent: $msg_str \n";
$channel->close();
$connection->close();
消費者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//宣告初始化交換機
//引數:交換機名,路由型別,是否檢測同名佇列,是否開啟佇列持久化,通道關閉后是否洗掉佇列
$channel->exchange_declare('mq_sms_send_ex', AMQPExchangeType::DIRECT, false, true, false);
//宣告初始化一條佇列
//引數:佇列名,是否檢測同名佇列,是否開啟佇列持久化,是否能被其他佇列訪問,通道關閉后是否洗掉佇列
$channel->queue_declare('mq_sms_send_q1', false, true, false, false);
//將佇列與某個交換機進行系結,并使用路由關鍵字
//引數:佇列名,交換機名,路由鍵名
$channel->queue_bind('mq_sms_send_q1', 'mq_sms_send_ex', 'sms_send');
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
//訊息確認
$msg->ack();
//判斷獲取到quit后
if (trim($msg->body) == 'quit') {
$msg->getChannel()->basic_cancel($msg->getConsumerTag());
}
};
$channel->basic_qos(null, 1, null);
//
//引數:佇列名,消費者識別符號,不接收此使用者發布的訊息,使用者是否使用自動確認模式,請求獨占使用者訪問,不等待,訊息回呼函式
$channel->basic_consume('mq_sms_send_q1', 'consumer1', false, false, false, false, $callback);
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);
while(count($channel->callbacks)) {
$channel->wait();
}
運行

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/259270.html
標籤:其他
