異步訊息佇列
說道訊息佇列,你肯定會想到Kafka、Rabbitmq等訊息中間件,這些專業的訊息中間件提供了很多功能特性,當然他的部署使用維護都是比較麻煩的,如果你對訊息佇列沒那么高要求,想要輕量級的,使用Redis就沒錯啦,
Redis通過list資料結構來實作訊息佇列.主要使用到如下命令:
-
lpush和rpush入佇列
-
lpop和rpop出佇列
-
blpop和brpop阻塞式出佇列

廢話補不多說上代碼:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//消費訊息
while (true) {
try {
$msg = $redis->rPop($list);
if (!$msg) {
sleep(1);
}
//業務處理
} catch (Exception $e) {
echo $e->getMessage();
}
}
以上代碼會有個問題,如果佇列長時間是空的,pop就不會不斷的回圈,會導致redis的QPS升高,影響性能,所以我們使用sleep來解決,當沒有訊息的時候阻塞一段時間,但其實這樣還會帶來另一個問題,就是sleep會導致訊息的處理延遲增加的機率,這個問題我們可以通過blpop/brpop來阻塞讀取佇列,blpop/brpop在佇列沒有資料的時候,會立即進入休眠狀態,一旦資料到來,則立刻醒過來,訊息的延遲幾乎為零,用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題,還有一個需要注意的點是我們需要是用try/catch來進行例外捕獲,如果一直阻塞在那里,Redis服務器一般會主動斷開掉空鏈接,來減少閑置資源的占用 可以使用 ping檢查redis心跳
延時佇列
定義:
延遲佇列就是個帶延遲功能的訊息佇列,相對于普通佇列,它可以在指定時間消費掉訊息,
應用場景:
- 1、新用戶注冊,10分鐘后發送郵件或站內信,
- 2、用戶下單后,30分鐘未支付,訂單自動作廢,
- 3、用戶下單后,在搶單大廳訂單進行補貼 10s 30s 90s 不同下單時長的訂單進行不同的補貼策略,[我們公司目前遇到的場景]
實作方案
方式一:資料庫實作
最簡單的方式,定時掃表,例如對于訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作,優點是簡單,缺點是每分鐘全域掃表,浪費資源,如果遇到表資料訂單量即將過期的訂單量很大,會造成關單延遲,,
方式二:redis的有序集合sort set
步驟:
- 1.產生訊息
- 用時間戳作為score,使用zadd key score1 value1 命令生產訊息
- 2.讀取訊息
- withscores limit 0 1消費訊息最早的一條訊息,
- 3.消費訊息并洗掉
- 實作簡單的延遲佇列,將訊息資料序列化,作為zset的value,把訊息處理時間作為score,每次通過zRangeByScore獲取一條訊息進行處理,
redis 客戶端命令
ZADD key score1 member1 [score2 member2]
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
ZREM key member [member ...]
rabbitMQ的死信佇列
訊息在以下情況下會變成死資訊,會被DXL(Dead-Letter-Exchane)死信交換機投遞到死信佇列:
- 1.訊息被拒絕,
- 2.訊息未被及時消費或者消費了不ack,直接過期,
- 3.佇列達到最大長度,
- 死信佇列的實作:
- 訊息(設定ttl)--->交換機-->佇列(訊息過期)-->死信交換機-->死信佇列-->消費
我們使用redis做延遲佇列
整體思路:通過redis的有序集合zset來實作簡單的延遲佇列,將訊息資料序列化,作為zset的value,把訊息處理時間作為score,每次通過zRangeByScore獲取一條訊息進行處理,
<?php
class DelayQueue
{
protected $prefix = 'delay_queue:';
protected $redis = null;
protected $key = '';
private static $_instance = null;
/**
* 建構式
* DelayQueue constructor.
* @param $queue
* @param array $config
*/
public function __construct($queue,$config = [])
{
$this->key = $this->prefix . $queue;
$this->redis = new Redis();
$this->redis->connect($config['host'], $config['port'], $config['timeout']);
$this->redis->auth($config['auth']);
}
/**
* Notes: 獲取資料庫句柄方法
* User: jackin.chen
* Date: 2020/7/20 下午3:55
* function: getRedis
* @return null|Redis
*/
public function getRedis()
{
return $this->redis;
}
/**
* Notes:這是獲取當前類物件的唯一方式
* User: jackin.chen
* Date: 2020/7/20 下午3:55
* function: getInstance
* @param string $queue
* @param array $config
* @return DelayQueue|null
* @static
*/
public static function getInstance($queue, $config = [])
{
// 檢查物件是否已經存在,不存在則實體化后保存到$instance屬性
if(!(self::$_instance instanceof self)){
//內部實體化物件
self::$_instance = new self($queue,$config);
}
return self::$_instance;
}
/**
* Notes: 宣告成私有方法,禁止克隆物件
* User: jackin.chen
* Date: 2020/7/20 下午3:56
* function: __clone
*/
private function __clone(){}
/**
* Notes: 宣告成私有方法,禁止重建物件
* User: jackin.chen
* Date: 2020/7/20 下午3:56
* function: __wakeup
*/
private function __wakeup(){}
/**
* Notes: 洗掉任務串列
* User: jackin.chen
* Date: 2020/7/20 下午4:00
* function: delTask
* @param $value
* @return int
*/
public function delTask($value)
{
return $this->redis->zRem($this->key, $value);
}
/**
* Notes: 獲取一條任務
* User: jackin.chen
* Date: 2020/7/20 下午4:00
* function: getTask
* @return array
*/
public function getTask()
{
//獲取任務,以0和當前時間為區間,回傳一條記錄
return $this->redis->zRangeByScore($this->key, 0, time(), ['limit' => [0, 1]]);
}
/**
* Notes: 添加任務
* User: jackin.chen
* Date: 2020/7/20 下午4:00
* function: addTask
* @param $name
* @param $time
* @param $data
* @return int
*/
public function addTask($name, $time, $data)
{
//添加任務,以時間作為score,對任務佇列按時間從小到大排序
return $this->redis->zAdd(
$this->key,
$time,
json_encode([
'task_name' => $name,
'task_time' => $time,
'task_params' => $data,
], JSON_UNESCAPED_UNICODE)
);
}
/**
* Notes: 執行任務
* User: jackin.chen
* Date: 2020/7/20 下午4:14
* function: run
* @return bool
*/
public function run()
{
//每次只取一條任務
$task = $this->getTask();
if (empty($task)) {
return false;
}
$task = isset($task[0]) ? $task[0] : [];
//有并發的可能,這里通過zrem回傳值判斷誰搶到該任務
if ($task && $this->delTask($task)) {
$task = json_decode($task, true);
//處理任務
echo '任務:' . $task['task_name'] . ' 運行時間:' . date('Y-m-d H:i:s') . PHP_EOL;
return true;
}
return false;
}
}
//生產使用
$Queue = DelayQueue::getInstance('payment_order',[
'host' => '127.0.0.1',
'port' => 6379,
'auth' => '',
'timeout' => 60,
]);
$Queue->addTask('payment_order_1', time() + 30, ['order_id' => '1']);
$Queue->addTask('payment_order_2', time() + 60, ['order_id' => '2']);
$Queue->addTask('payment_order_3', time() + 90, ['order_id' => '3']);
寫一個php腳本,用來處理佇列中的任務,
<?php
set_time_limit(0);
$Queue1 = DelayQueue::getInstance('payment_order',[
'host' => '127.0.0.1',
'port' => 6379,
'auth' => '',
'timeout' => 60,
]);
//處理任務
while (true) {
$Queue1->run();
usleep(100000);
}
這里又產生了一個問題,同一個任務可能會被多個行程取到之后再使用 zrem 進行爭搶,那些沒搶到的行程都是白取了一次任務,這是浪費,解決辦法:將 zrangebyscore 和 zrem 使用 lua 腳本進行原子化操作,這樣多個行程之間爭搶任務時就不會出現這種浪費了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/333632.html
標籤:其他
上一篇:2021-10-23
下一篇:2021精選 Java后端面試題資料大全 SpringBoot,Kafka,Mysql,Redis等PDF資料,實戰專案,阿里巴巴,騰訊,位元組,京東,美團,滴滴,Bilibili面試經歷,實用干貨
