<style>.tutorial blockquote { padding: 2px 12px; margin: 30px 6px; border: 1px solid rgba(170, 170, 170, 1); background-image: initial; background-size: initial; background-attachment: initial; background-origin: initial; background-clip: initial; background-color: rgba(187, 187, 187, 1); border-radius: 5px } div.tutorial pre { border-radius: 5px } .tutorial pre.sourcecode { font-size: medium; white-space: pre-wrap } .hljs { display: block; padding: 0.5em; background-image: initial; background-size: initial; background-attachment: initial; background-origin: initial; background-clip: initial; background-color: rgba(35, 35, 35, 1); color: rgba(230, 225, 220, 1) } .hljs-string { color: rgba(165, 194, 97, 1) } .hljs-keyword, .hljs-selector-tag { color: rgba(194, 98, 48, 1) } .hljs-title, .hljs-section { color: rgba(255, 198, 109, 1) } .hljs-params { color: rgba(208, 208, 255, 1) } .hljs-comment, .hljs-quote { color: rgba(188, 148, 88, 1); font-style: italic }</style>
(使用 php-amqplib)
在第二節教程里,我們學習了怎樣在多個作業者之間使用作業佇列來分發耗時任務,
但如果我們需要在一個遠程電腦上運行一個函式并且等待運行結果要怎樣做呢?好吧,那是另一個問題了,這種模式通常被稱為遠程程序呼叫,或者RPC(Remote Procedure Call),
在這篇教程里我們要使用RabbitMQ去建立一個RPC系統:一個客戶端和一個可擴展的RPB服務器,因為我們還沒有任何一個值得分發的耗時任務,我們就先建立一個仿真RPC服務用來回傳“斐波納契數列(Fibonacci numbers)”,
一、客戶端介面(Client interface)
為了演示一個RPC服務怎樣能被使用,我們先建立一個簡單的客戶端類,這個類將暴露一個命名為“call”的方法用于發送一個RPC請求并且阻塞程式執行直到應答被接收,
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo ' [.] Got ', $response, "\n";
關于RPC的注解
盡管RPC在計算機領域方面是一個很普通的方式,它卻經常被批評,當一個程式員不清楚一個函式呼叫是否在本地或者是否這是一個緩慢的RPC的話,問題就來了,像這樣的混亂會導致一個不可預知結果的系統里會給系統除錯添加不必要的復雜性,而不是簡化軟體,濫用RPC會導致代碼像意大利式細面條似的不可維護,
牢記這一點,并考慮下面的建議:
- 搞清楚哪個功能呼叫在本地哪個在遠程,
- 為你的系統做好檔案,使得組件間的依賴清晰可見,
- 處理好錯誤場景,當RPC服務器出現長時間的問題時,客戶端應該如何反應?
當你對這些問題還困惑時,就先不要使用RPC,如果你可以的話,你應該使用一個異步管道來代替類似于阻塞的RPC,使用異步pipeline,結果被異步推送到下一個計算階段,
回呼佇列(Callback queue)
通常來說,在RabbitMQ上做RPC很容易,一個客戶端發送一條請求訊息,然后一個服務器回應一條回應訊息,為了接收一個回應,我們需要發送一個帶有請求的“回呼”佇列地址,我們可以使用默認的佇列,讓我們試試看,
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$msg = new AMQPMessage(
$payload,
array('reply_to' => $queue_name)
);
$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
訊息屬性
AMQP 0-9-1協議預定了一組與訊息相關的14個屬性,多數屬性很少使用,下面情況例外:
- 配送模式(delivery_mode):標記一條訊息為持久的(值2)或短暫的(值1),你可能記著在第二篇教程中有這個屬性,
- 內容型別(content_type):用于描述編碼的“mime-type”,比如對于經常使用的JSON編碼的例子,設定這個屬性為“application/json”就是一個很好的應用,
- 應答(reply_to):一般用于命名一個回呼佇列,
- 關聯Id(correlation_id): 用于RPC和有關的回應佇列的關聯,
二、關聯Id(Correlation Id)
在上面列出的方法內,我建議為每個RPC請求建立一個回呼佇列,這雖然很低效,但是,幸運的是有一個更好的辦法——讓我們為每個客戶端建一個單獨的回呼佇列,
這就產生了一個新的問題,在那個佇列收到回應后,它不清楚這個回應是屬于哪個請求的,這就是“correlation_id”屬性被用到的時候了,我們將設定它一個唯一的值給每一個請求,稍后,當我們再回呼佇列里收到一條訊息后,我們將查看這個屬性,基于此,我們就可以將一個回應匹配一個請求了,如果我們看到一個未知的“correlation_id”值,我們可以放心地丟棄它——它不屬于我們的請求,
或許你會問,在回呼佇列里,為什么我們忽略未知訊息,而不是給出帶有錯誤資訊的失敗?這是由于在服務端可能會存在競爭情況,盡管不太可能,但RPC服務器可能會在剛剛發送給我們回應后就死掉也是可能的,而不是在為這個請求發送一條確認訊息之前,如果發生了這樣的情況,重啟的RPC服務會再次處理這個請求,這就是為什么在客戶端我們必須要優雅地處理重復的回應,并且理想情況下RPC應該是冪等的,
三、總結(Summary)

我們的RPC將像這樣作業:
- 當客戶端啟動時,它建立一個匿名排外的回呼佇列
- 對于一個RPC請求,客戶端發送一條帶有兩個屬性的訊息:“reply_to”,這被設定給回呼佇列;“correlation_id”,這被設定一個唯一的值給每個請求,
- 請求被發送給一個“rpc_queue”佇列,
- RPC作業者(又稱:服務端)在那個佇列里等候請求,當一個請求出現,它就執行作業,使用來自于“reply_to”的佇列,發送一條帶有結果的訊息給客戶端,
- 客戶端等候回呼佇列里的資料,當一條訊息出現后,它檢查“correlation_id”屬性,如果它和來自于請求的值匹配,它就回傳回應給應用,
四、合在一起(Putting it all together)
斐波那契(Fibonacci)的任務:
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
我們定義我們的斐波那契(fibonacci )函式,假設僅輸入有效的正整數,(不要想用很大的數字來做,這可能是最慢的遞回實作)
我們的 RPC 服務端代碼 rpc_server.php 看起來是這樣的:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
$n = intval($req->body);
echo ' [.] fib(', $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg,
'',
$req->get('reply_to')
);
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']
);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
服務端代碼是很簡單的:
- 像往常一樣,我們首先建立連接,通道并且宣告佇列,
- 我們可能想運行不只一個服務端行程,為了平均地分擔負載給多臺服務器,我們需要在“$channel.basic_qos”中設定 “prefetchCount” 的值,
- 我們使用“basic_consume”來訪問佇列,然后進入我們等候的請求訊息里的while回圈中,執行作業并回傳回應,
我們的RPC客戶端代碼 rpc_client.php 如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient
{
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"",
false,
false,
true,
false
);
$this->channel->basic_consume(
$this->callback_queue,
'',
false,
true,
false,
false,
array(
$this,
'onResponse'
)
);
}
public function onResponse($rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n)
{
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array(
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue
)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo ' [.] Got ', $response, "\n";
現在是時候看一下我們完整例子的源代碼了rpc_client.php and rpc_server.php.
我們的 RPC服務現在已經就緒,我們能啟動服務端:
php rpc_server.php
# => [x] Awaiting RPC requests
運行客戶端來請求一個斐波那契數字:
php rpc_client.php
# => [x] Requesting fib(30)
這里展示出的設計不是唯一一種實作RPC服務的可能,但是它有一些重要的優勢:
- 如果RPC服務太慢,你可以通過運行另一個來同比例擴充它,試著在一個新的控制臺上運行另一個“rpc_server.php”,
- 在客戶端,RPC只需要發送和接收一條訊息,不需要像queue_declare要求的一樣同步呼叫,因此,對一個單獨的RPC請求,RPC客戶端僅需要一個網路往返,
我們的代碼依然很簡單,并且沒有嘗試解決更復雜(但很重要的)的問題,就像:
- 如果服務端沒有運行,客戶端將如何反應?
- 一個客戶端對于RPC是否應該設定某種超時?
- 如果服務端故障并且發生例外,是否應該傳給客戶端?
- 在處理之前,防止無效的傳入資訊(例如邊界檢測,型別),
原文:https://www.rabbitmq.com/tutorials/tutorial-six-php.html如果你想要嘗試,你或許會發現 management UI 對于查看佇列是很有用的,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/17439.html
標籤:PHP
上一篇:PHP代碼實作AC自動機
下一篇:PHP8 新特征
