使用Redis的List(串列)命令實作訊息佇列,生產者使用lPush命令發布訊息,消費者使用rpoplpush命令獲取訊息,同時將訊息放入監聽佇列,如果處理超時,監聽者將把訊息彈回訊息佇列
1.用到的List(串列)命令
| 命令 | 作用 |
|---|---|
| lPush | 將一個或多個值插入到串列頭部 |
| rpoplpush | 彈出串列最后一個值,同時插入到另一個串列頭部,并回傳該值 |
| lRem | 洗掉串列內的給定值 |
| lIndex | 按索引獲取串列內的值 |
2.佇列的組成
| 名稱 | 職責 |
|---|---|
| 生產者 | 發布訊息 |
| 消費者 | 獲取并處理訊息 |
| 監聽者 | 監聽超時的訊息,彈回原訊息佇列,確保消費者掛掉后或處理失敗后訊息能被其他消費者處理 |
3.php實作代碼
生產者Producter.php
<?php
try {
//宣告訊息佇列-list的鍵名
$queueKey = 'testQueueKey';
$redis = new Redis();
$redis->connect('ip', 6379);
//向串列中push10條訊息
for ($i = 0;$i < 10;$i++){
//為訊息生成唯一標識
$uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
$ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
var_dump($ret);
}
} catch (Exception $e){
echo $e->getMessage();
}
消費者Consumer.php
<?php
try {
//宣告訊息佇列-list的鍵名
$queueKey = 'testQueueKey';
//宣告監聽者佇列-list的鍵名
$watchQueueKey = 'watchQueueKey';
$redis = new Redis();
$redis->connect('ip', 6379);
//佇列先進先出,彈出最先加入的訊息,同時放入監聽佇列
while (true){
$ret = $redis->rpoplpush($queueKey, $watchQueueKey);
if ($ret === false){
sleep(1);
} else {
$retArray = json_decode($ret, true);
//將唯一id寫入快取設定有效期
$redis->setex($retArray['uniqid'], 60, 0);
//模擬失敗
$rand = mt_rand(0,9);
if ($rand < 3){
echo "failure:".$ret."\n";
} else {
//todo
//處理成功移除訊息
$redis->lRem($watchQueueKey, $ret, 0);
echo "success:".$ret."\n";
}
}
}
} catch (Exception $e){
echo $e->getMessage();
}
監聽者Watcher.php
<?php
try {
//宣告訊息佇列-list的鍵名
$queueKey = 'testQueueKey';
//宣告監聽者佇列-list的鍵名
$watchQueueKey = 'watchQueueKey';
$redis = new Redis();
$redis->connect('ip', 6379);
while (true){
//取出串列尾部的一個值
$ret = $redis->lIndex($watchQueueKey, -1);
//如果不存在則休眠1秒
if ($ret === false){
sleep(1);
} else {
$retArray = json_decode($ret, true);
$idCache = $redis->get($retArray['uniqid']);
if ($idCache === false){
//如果已過期,表示任務超時,彈回原佇列
$redis->rpoplpush($watchQueueKey, $queueKey);
echo "rpoplpush:".$ret."\n";
} else {
//處理中,繼續等待
sleep(1);
}
}
}
} catch (Exception $e){
echo $e->getMessage();
}
4.執行佇列
開啟監聽者php Watcher.php
開啟消費者php Consumer.php
執行生產者php Producter.php
生產者輸出
int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)
監聽者輸出
rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
消費者輸出
success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
我們看到消費者第一次執行時失敗的訊息,超時后又被彈回了訊息佇列,消費者有了再次執行的機會,監聽者的職責就是確保消費者執行失敗或掛掉后訊息還能再彈回原佇列得到再次執行
轉自:https://www.jmsite.cn/blog-615.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/53132.html
標籤:PHP
下一篇:PHP實作定時任務的幾種方式
