Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。
Redis Stream 主要用于消息隊(duì)列(MQ,Message Queue),Redis 本身是有一個(gè) Redis 發(fā)布訂閱 (pub/sub) 來實(shí)現(xiàn)消息隊(duì)列的功能,但它有個(gè)缺點(diǎn)就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會(huì)被丟棄。
簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。
而 Redis Stream 提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,還能保證消息不丟失。
Redis Stream 的結(jié)構(gòu)如下所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來,每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容:
每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。
上圖解析:
Consumer Group :消費(fèi)組,使用 XGROUP CREATE 命令創(chuàng)建,一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer)。
last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。
pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認(rèn)字符)。
消息隊(duì)列相關(guān)命令:
XADD - 添加消息到末尾
XTRIM - 對(duì)流進(jìn)行修剪,限制長度
XDEL - 刪除消息
XLEN - 獲取流包含的元素?cái)?shù)量,即消息長度
XRANGE - 獲取消息列表,會(huì)自動(dòng)過濾已經(jīng)刪除的消息
XREVRANGE - 反向獲取消息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取消息列表
消費(fèi)者組相關(guān)命令:
XGROUP CREATE - 創(chuàng)建消費(fèi)者組
XREADGROUP GROUP - 讀取消費(fèi)者組中的消息
XACK - 將消息標(biāo)記為"已處理"
XGROUP SETID - 為消費(fèi)者組設(shè)置新的最后遞送消息ID
XGROUP DELCONSUMER - 刪除消費(fèi)者
XGROUP DESTROY - 刪除消費(fèi)者組
XPENDING - 顯示待處理消息的相關(guān)信息
XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)
XINFO - 查看流和消費(fèi)者組的相關(guān)信息;
XINFO GROUPS - 打印消費(fèi)者組的信息;
XINFO STREAM - 打印流信息
使用 XADD 向隊(duì)列添加消息,如果指定的隊(duì)列不存在,則創(chuàng)建一個(gè)隊(duì)列,XADD 語法格式:
XADD key ID field value [field value ...]
key :隊(duì)列名稱,如果不存在就創(chuàng)建
ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
field value : 記錄。
redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" redis>
使用 XTRIM 對(duì)流進(jìn)行修剪,限制長度, 語法格式:
XTRIM key MAXLEN [~] count
key :隊(duì)列名稱
MAXLEN :長度
count :數(shù)量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) "1601372434568-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D" 127.0.0.1:6379> redis>
使用 XDEL 刪除消息,語法格式:
XDEL key ID [ID ...]
key:隊(duì)列名稱
ID :消息 ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) "1" 2) 1) 1538561701744-0 2) 1) "c" 2) "3"
使用 XLEN 獲取流包含的元素?cái)?shù)量,即消息長度,語法格式:
XLEN key
key:隊(duì)列名稱
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
使用 XRANGE 獲取消息列表,會(huì)自動(dòng)過濾已經(jīng)刪除的消息 ,語法格式:
XRANGE key start end [COUNT count]
key :隊(duì)列名
start :開始值, - 表示最小值
end :結(jié)束值, + 表示最大值
count :數(shù)量
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
使用 XREVRANGE 獲取消息列表,會(huì)自動(dòng)過濾已經(jīng)刪除的消息 ,語法格式:
XREVRANGE key end start [COUNT count]
key :隊(duì)列名
end :結(jié)束值, + 表示最大值
start :開始值, - 表示最小值
count :數(shù)量
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) "1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis>
使用 XREAD 以阻塞或非阻塞方式獲取消息列表 ,語法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count :數(shù)量
milliseconds :可選,阻塞毫秒數(shù),沒有設(shè)置就是非阻塞模式
key :隊(duì)列名
id :消息 ID
# 從 Stream 頭部讀取兩條消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) "1532" 3) "event-id" 4) "5" 5) "user-id" 6) "7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) "812" 3) "event-id" 4) "9" 5) "user-id" 6) "388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
使用 XGROUP CREATE 創(chuàng)建消費(fèi)者組,語法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key :隊(duì)列名稱,如果不存在就創(chuàng)建
groupname :組名。
$ : 表示從尾部開始消費(fèi),只接受新消息,當(dāng)前 Stream 消息會(huì)全部忽略。
從頭開始消費(fèi):
XGROUP CREATE mystream consumer-group-name 0-0
從尾部開始消費(fèi):
XGROUP CREATE mystream consumer-group-name $
使用 XREADGROUP GROUP 讀取消費(fèi)組中的消息,語法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group :消費(fèi)組名
consumer :消費(fèi)者名。
count : 讀取數(shù)量。
milliseconds : 阻塞毫秒數(shù)。
key : 隊(duì)列名。
ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >