
前言
Redis 5 新特性中,Streams 資料結構的引入,可以說它是在本次迭代中最大特性,它使本次 5.x 版本迭代中,Redis 作為訊息佇列使用時,得到更完善,更強大的原生支持,其中尤為明顯的是持久化訊息佇列,同時,stream 借鑒了 kafka 的消費組模型概念和設計,使消費訊息處理上更加高效快速,本文就 Streams 資料結構中常用 API 進行分析,
準備
本文所使用 Redis 版本為 5.0.5 ,如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同,
添加訊息
Streams 添加資料使用 XADD 指令進行添加,訊息中的資料以 K-V 鍵值對的形式進行操作,一條訊息可以存在多個鍵值對,添加命令格式:
XADD key ID field string [field string ...]
其中 key 為 Streams 的名稱,ID 為訊息的唯一標志,不可重復,field string 就為鍵值對,下面我們就添加以 person 為名稱的流,進行操作,
XADD person * name ytao des https://ytao.top
上面添加案例中,ID 使用 * 號復制,這里代表著服務端自動生成 Id,添加后回傳資料 "1578238486193-0"
這里自動生成的 Id 格式為 <millisecondsTime>-<sequenceNumber>
Id 是由兩部分組成:
- millisecondsTime 為當前服務器時間毫秒時間戳,
- sequenceNumber 當前序列號,取值來源于當前毫秒內,生成訊息的順序,默認從 0 開始加 1 遞增,
比如:1578238486193-3 表示在 1578238486193 毫秒的時間戳時,添加的第 4 條訊息,
除了服務端自動生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:
- Id 中的前后部分必須為數字,
- 最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的,
- 添加的訊息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小,
否則,當不滿足上述條件時,添加后會拋出例外:
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
實際上,當添加一條訊息時,會進行兩部操作,第一步,先判斷如果不存在 Streams,則創建 Streams 的名稱,再添加訊息到 Streams 中,即使添加訊息時,由于 Id 例外,也可以在 Redis 中存在以當前 Streams 的名稱,
Streams 中 Id 也可作為指標使用,因為它是一個有序的標記,
生產中,如果這樣使用添加訊息,會存在一個問題,那就是訊息數量太大時,會使服務宕機,這里 Streams 的設計初期也有考慮到這個問題,那就是可以指定 Streams 的容量,如果容量操作這個設定的值,就會對調舊的訊息,在添加訊息時,設定 MAXLEN 引數,
XADD person MAXLEN 5 * name ytao des https://ytao.top
這樣就指定該了 Streams 中的容量為 5 條訊息,也可使用 XTRIM 截取訊息,從小到大剔除多余的訊息:
XTRIM person MAXLEN 8
訊息數量
查看訊息數量使用 XLEN 指令進行操作,
XLEN key
例:查看 person 流中的訊息數量:
> XLEN person
(integer) 5
查詢訊息
查詢 Streams 中的訊息使用 XRANGE 和 XREVRANGE 指令,
XRANGE
查詢資料時,可以按照指定 Id 范圍進行查詢,XRANGE 查詢指令格式:
XRANGE key start end [COUNT count]
引數說明:
- key 為 Streams 的名稱
- start 為范圍查詢開始 Id,包含本 Id,
- start 為范圍查詢結束 Id,包含本 Id,
- Count 為查詢回傳最大的訊息數量,非必填,
這里 start 和 end 有-和+兩個非指定值,他們分別表示無窮小和無窮大,所以當使用這個兩個值時,會查詢出全部的訊息,
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
上面查詢的訊息資料,可以看到是按照先進先出的順序查詢出來的,
使用 COUNT 指定查詢回傳的數量:
# 查詢所有的訊息,并且回傳一條資料
> XRANGE person - + COUNT 1
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
在范圍查詢中,Id 的后半部分可省略,后半部分中的資料會全部查詢到,
XREVRANGE
XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 引數順序進行了調換:
XREVRANGE key end start [COUNT count]
使用案例:
> XREVRANGE person + -
1) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
查詢后的結果與 XRANGE 的結果順序剛好相反,其他都一樣,這兩個指令可進行訊息的升序和降序的回傳,
洗掉訊息
洗掉訊息使用 XDEL 指令操作,只需指定將要洗掉的 Streams 名稱和 Id 即可,支持一次洗掉多個訊息 ,
XDEL key ID [ID ...]
洗掉案例:
# 查詢所有訊息
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
# 洗掉訊息
> XDEL person 2-0
(integer) 1
# 再次查詢洗掉后的所有訊息
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
# 查詢洗掉后的長度
> XLEN person
(integer) 2
從上面可以看到,洗掉訊息后,長度也會減少相應的數量,
消費訊息
在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費訊息,在 Streams 資料結構中,同樣也能實作同等功能,當沒有新的訊息時,可進行阻塞等待,不僅支持單獨消費,而且還可以支持群組消費,
單獨消費
單獨消費使用 XREAD 指令,可以看到,下面命令中,STREAMS,key, 以及 ID 為必填項,ID 表示將要讀取大于該 ID 的訊息,當 ID 值使用 $ 賦予時,表示已存在訊息的最大 Id 值,
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT 引數用來指定讀取的最大數量,與 XRANGE 的用法一樣,
> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
在 XREAD 里面還有個 BLOCK 引數,這個是用來阻塞訂閱訊息的,BLOCK 攜帶的引數為阻塞時間,單位為毫秒,如果在這個時間內沒有新的訊息消費,那么就會釋放該阻塞,當這里的時間指定為 0 時,會一直阻塞,直到有新的訊息來消費到,
# 視窗 1 開啟阻塞,等待新訊息的到來
> XREAD BLOCK 0 STREAMS person $
# 另開一個連接視窗 2,添加一條新的訊息
> XADD person 2-2 name tao des coder
"2-2"
# 視窗 1,獲取到有新的訊息來消費,并且帶有阻塞的時間
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
2) 1) 1) "2-2"
2) 1) "name"
2) "tao"
3) "des"
4) "coder"
(60.81s)
當使用 XREAD 進行順序消費時,需要額外記錄下讀取到位置的 Id,方便下次繼續消費,
群組消費
群組消費的主要目的也就是為了分流訊息給不同的客戶端處理,以更高效的速率處理訊息,為達到這一肝功能需求,我們需要做三件事:創建群組,群組讀取訊息,向服務端確認訊息以處理,
群組操作
操作群組使用 XGROUP 指令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操作有:
- CREATE 創建消費組,
- SETID 修改下一個處理訊息的 Id,
- DESTROY 銷毀消費組,
- DELCONSUMER 洗掉消費組中指定的消費者,
我們當前需要使用的是創建消費組:
# 以當前存在的最大 Id 作為消費起始
> XGROUP CREATE person group1 $
OK
群組讀取訊息
群組讀取使用 XREADGROUP 指令,COUNT和BLOCK的使用類似 XREAD 的操作,只是多了個群組和消費者的指定:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
由于群組消費和單獨消費類似,這里只進行個阻塞分析,這里 Id 也有個特殊值>,表示還未進行消費的訊息:
# 視窗 1,消費群組中,taotao 消費者建立阻塞監聽
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
# 視窗 2,消費群組中,yangyang 消費者建立阻塞監聽
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
# 視窗 3,添加消費訊息
> XADD person 3-1 name tony des 666
"3-1"
# 視窗 1,讀取到新訊息,此時 視窗 2 沒有任何反應
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-1"
2) 1) "name"
2) "tony"
3) "des"
4) "666"
(77.54s)
# 視窗 3,再次添加消費訊息
> XADD person 3-2 name james des abc!
"3-2"
# 視窗 2,讀取到新訊息,此時 視窗 1 沒有任何反應
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
(76.36s)
以上執行流程中,group1 群組中有兩個消費者,當添加兩條訊息后,這兩個消費者輪流消費,
訊息ACK
訊息消費后,為避免再次重復消費,這是需要向服務端發送 ACK,確保訊息被消費后的標記,
例如下列情況,我們上面我們將最新兩條訊息已進行了消費,但是當我們再次讀取訊息時,還是被讀到:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
這時,我們使用 XACK 指令告訴服務器,我們已處理的訊息:
XACK key group ID [ID ...]0
讓服務器標記 3-2 已處理:
> XACK person group1 3-2
(integer) 1
再次獲取群組讀取訊息:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) (empty list or set)
佇列中沒有了可讀訊息,
除了上面以講解到的 API 外,查看消費群組資訊可使用 XINFO 指令查看,本文不做分析,
總結
上面對 Streams 常用 API 進行了分析,我們可以感受到 Redis 在訊息佇列支持的道路上,也越來越強大,如果使用過它的 PUB/SUB 功能的話,就會感受到 5.x 迭代正是將你的一些痛點進行了優化,
個人博客: https://ytao.top
關注公眾號 【ytao】,更多原創好文

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/16450.html
標籤:NoSQL
