文章很長,建議收藏起來,慢慢讀! 高并發 發燒友社群:瘋狂創客圈 為小伙伴奉上以下珍貴的學習資源:
-
瘋狂創客圈 經典升級 : 極致經典 《 Java 高并發 三部曲 》 面試必備 + 大廠必備 + 漲薪必備
-
瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高并發實戰》 面試必備 + 大廠必備 +漲薪必備 免費領
-
瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高并發核心編程》 面試必備 + 大廠必備 + 漲薪必備 免費領
-
瘋狂創客圈 資源寶庫: Java 必備 百度網盤資源大合集 價值>1000元 【免費取 】
推薦: 瘋狂創客圈 必看/必收/高質量/博文
| 史上最全 分布式鎖 2 大篇 | 圖解 + 史上最全 + 吐血推薦 |
|---|---|
| 1:Redis 分布式鎖 (圖解-秒懂-史上最全) | 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全) |
| 史上最全 Java 面試題 21 個專題 | 阿里、京東、美團、頭條… 隨意挑、橫著走!!! |
|---|---|
| 1: JVM面試題(史上最強、持續更新、吐血推薦) | 2:Java基礎面試題(史上最全、持續更新、吐血推薦 |
| 3:架構設計面試題 (史上最全、持續更新、吐血推薦) | 4:設計模式面試題 (史上最全、持續更新、吐血推薦) |
| 17、分布式事務面試題 (史上最全、持續更新、吐血推薦) | 一致性協議 (史上最全) |
| 29、多執行緒面試題(史上最全) | 30、HR面經,過五關斬六將后,小心陰溝翻船! |
| 9.網路協議面試題(史上最全、持續更新、吐血推薦) | 更多專題, 請參見【 瘋狂創客圈 高并發 總目錄 】 |
| 分布式 高并發 必讀 的精彩博文 | |
|---|---|
| nacos 實戰(史上最全) | sentinel (史上最全+入門教程) |
| Zookeeper 分布式鎖 (圖解+秒懂+史上最全) | Webflux(史上最全) |
| SpringCloud gateway (史上最全) | TCP/IP(圖解+秒懂+史上最全) |
| 10分鐘看懂, Java NIO 底層原理 | Feign原理 (圖解) |
| 大廠必備之:Reactor模式 | 更多精彩博文 … 請參見【 瘋狂創客圈 高并發 總目錄 】 |
跨JVM的執行緒安全問題
在單體的應用開發場景中,在多執行緒的環境下,涉及并發同步的時候,為了保證一個代碼塊在同一時間只能由一個執行緒訪問,我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式,
也就是說,在同一個JVM內部,大家往往采用synchronized或者Lock的方式來解決多執行緒間的安全問題,但在分布式集群作業的開發場景中,在JVM之間,那么就需要一種更加高級的鎖機制,來處理種跨JVM行程之間的執行緒安全問題.
解決方案是:使用分布式鎖
總之,對于分布式場景,我們可以使用分布式鎖,它是控制分布式系統之間互斥訪問共享資源的一種方式,
比如說在一個分布式系統中,多臺機器上部署了多個服務,當客戶端一個用戶發起一個資料插入請求時,如果沒有分布式鎖機制保證,那么那多臺機器上的多個服務可能進行并發插入操作,導致資料重復插入,對于某些不允許有多余資料的業務來說,這就會造成問題,而分布式鎖機制就是為了解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占了分布式鎖,其他服務沒獲取到鎖,就不進行后續操作,
大致意思如下圖所示(不一定準確):
何為分布式鎖?
何為分布式鎖?
- 當在分布式模型下,資料只有一份(或有限制),此時需要利用鎖的技術控制某一時刻修改資料的行程數,
- 用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識,
分布式鎖的條件:
- 互斥性,在任意時刻,只有一個客戶端能持有鎖,
- 不會發生死鎖,即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖,
- 具有容錯性,只要大部分的 Redis 節點正常運行,客戶端就可以加鎖和解鎖,
- 解鈴還須系鈴人,加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了,
分布式鎖的實作:
分布式鎖的實作由很多種,檔案鎖、資料庫、redis等等,比較多;分布式鎖常見的多種實作方式:
- 資料庫悲觀鎖、
- 資料庫樂觀鎖;
- 基于Redis的分布式鎖;
- 基于ZooKeeper的分布式鎖,
在實踐中,還是redis做分布式鎖性能會高一些
資料庫悲觀鎖
所謂悲觀鎖,悲觀鎖是對資料被的修改持悲觀態度(認為資料在被修改的時候一定會存在并發問題),因此在整個資料處理程序中將資料鎖定,
悲觀鎖的實作,往往依靠資料庫提供的鎖機制(也只有資料庫層提供的鎖機制才能真正保證資料訪問的排他性,否則,即使在應用層中實作了加鎖機制,也無法保證外部系統不會修改資料),
資料庫的行鎖、表鎖、排他鎖等都是悲觀鎖,這里以行鎖為例,進行介紹,以我們常用的MySQL為例,我們通過使用select…for update陳述句, 執行該陳述句后,會在表上加持行鎖,一直到事務提交,解除行鎖,
使用場景舉例:
在秒殺案例中,生成訂單和扣減庫存的操作,可以通過商品記錄的行鎖,進行保護,們通過使用select…for update陳述句,在查詢商品表庫存時將該條記錄加鎖,待下單減庫存完成后,再釋放鎖,
示例的SQL如下:
//0.開始事務
begin;
//1.查詢出商品資訊
select stockCount from seckill_good where id=1 for update;
//2.根據商品資訊生成訂單
insert into seckill_order (id,good_id) values (null,1);
//3.修改商品stockCount減一
update seckill_good set stockCount=stockCount-1 where id=1;
//4.提交事務
commit;
以上,在對id = 1的記錄修改前,先通過for update的方式進行加鎖,然后再進行修改,這就是比較典型的悲觀鎖策略,
如果以上修改庫存的代碼發生并發,同一時間只有一個執行緒可以開啟事務并獲得id=1的鎖,其它的事務必須等本次事務提交之后才能執行,這樣我們可以保證當前的資料不會被其它事務修改,
我們使用select_for_update,另外一定要寫在事務中.
注意:要使用悲觀鎖,我們必須關閉mysql資料庫中自動提交的屬性,命令set autocommit=0;即可關閉,因為MySQL默認使用autocommit模式,也就是說,當你執行一個更新操作后,MySQL會立刻將結果進行提交,
悲觀鎖的實作,往往依靠資料庫提供的鎖機制,在資料庫中,悲觀鎖的流程如下:
- 在對記錄進行修改前,先嘗試為該記錄加上排他鎖(exclusive locking),
- 如果加鎖失敗,說明該記錄正在被修改,那么當前查詢可能要等待或者拋出例外,具體回應方式由開發者根據實際需要決定,
- 如果成功加鎖,那么就可以對記錄做修改,事務完成后就會解鎖了,
- 其間如果有其他事務對該記錄做加鎖的操作,都要等待當前事務解鎖或直接拋出例外,
資料庫樂觀鎖
使用樂觀鎖就不需要借助資料庫的鎖機制了,
樂觀鎖的概念中其實已經闡述了他的具體實作細節:主要就是兩個步驟:沖突檢測和資料更新,其實作方式有一種比較典型的就是Compare and Swap(CAS)技術,
CAS是項樂觀鎖技術,當多個執行緒嘗試使用CAS同時更新同一個變數時,只有其中一個執行緒能更新變數的值,而其它執行緒都失敗,失敗的執行緒并不會被掛起,而是被告知這次競爭中失敗,并可以再次嘗試,
CAS的實作中,在表中增加一個version欄位,操作前先查詢version資訊,在資料提交時檢查version欄位是否被修改,如果沒有被修改則進行提交,否則認為是過期資料,
比如前面的扣減庫存問題,通過樂觀鎖可以實作如下:
//1.查詢出商品資訊
select stockCount, version from seckill_good where id=1;
//2.根據商品資訊生成訂單
insert into seckill_order (id,good_id) values (null,1);
//3.修改商品庫存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;
以上,我們在更新之前,先查詢一下庫存表中當前版本(version),然后在做update的時候,以version 作為一個修改條件,
當我們提交更新的時候,判斷資料庫表對應記錄的當前version與第一次取出來的version進行比對,如果資料庫表當前version與第一次取出來的version相等,則予以更新,否則認為是過期資料,
CAS 樂觀鎖有兩個問題:
(1) CAS 存在一個比較重要的問題,即ABA問題. 解決的辦法是version欄位順序遞增,
(2) 樂觀鎖的方式,在高并發時,只有一個執行緒能執行成功,會造成大量的失敗,這給用戶的體驗顯然是很不好的,
Zookeeper分布式鎖
除了在資料庫層面加分布式鎖,通常還可以使用以下更高性能、更高可用的分布式鎖:
- 分布式快取(如redis)鎖
- 分布式協調(如zookeeper)鎖
有關zookeeper分布式鎖的原理和實作,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)
或者閱讀筆者的《Java高并發核心編程(卷1)》

Redis分布式鎖
本文重點介紹Redis分布式鎖,分為兩個維度進行介紹:
(1)基于Jedis手工造輪子分布式鎖
(2)介紹Redission 分布式鎖的使用和原理,
分布式鎖一般有如下的特點:
- 互斥性: 同一時刻只能有一個執行緒持有鎖
- 可重入性: 同一節點上的同一個執行緒如果獲取了鎖之后能夠再次獲取鎖
- 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
- 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效
- 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒
手工造輪子:基于Jedis 的API實作分布式鎖
我們首先講解 Jedis 普通分布式鎖實作,并且是純手工的模式,從最為基礎的Redis命令開始,
只有充分了解與分布式鎖相關的普通Redis命令,才能更好的了解高級的Redis分布式鎖的實作,因為高級的分布式鎖的實作完全基于普通Redis命令,
Redis幾種架構
Redis發展到現在,幾種常見的部署架構有:
- 單機模式;
- 主從模式;
- 哨兵模式;
- 集群模式;
從分布式鎖的角度來說, 無論是單機模式、主從模式、哨兵模式、集群模式,其原理都是類同的, 只是主從模式、哨兵模式、集群模式的更加的高可用、或者更加高并發,
所以,接下來先基于單機模式,基于Jedis手工造輪子實作自己的分布式鎖,
首先看兩個命令:
Redis分布式鎖機制,主要借助setnx和expire兩個命令完成,
setnx命令:
SETNX 是SET if Not eXists的簡寫,將 key 的值設為 value,當且僅當 key 不存在; 若給定的 key 已經存在,則 SETNX 不做任何動作,
下面為客戶端使用示例:
127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379>
expire命令:
expire命令為 key 設定生存時間,當 key 過期時(生存時間為 0 ),它會被自動洗掉. 其格式為:
EXPIRE key seconds
下面為客戶端使用示例:
127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8
基于Jedis API的分布式鎖的總體流程:
通過Redis的setnx、expire命令可以實作簡單的鎖機制:
- key不存在時創建,并設定value和過期時間,回傳值為1;成功獲取到鎖;
- 如key存在時直接回傳0,搶鎖失敗;
- 持有鎖的執行緒釋放鎖時,手動洗掉key; 或者過期時間到,key自動洗掉,鎖釋放,
執行緒呼叫setnx方法成功回傳1認為加鎖成功,其他執行緒要等到當前執行緒業務操作完成釋放鎖后,才能再次呼叫setnx加鎖成功,

以上簡單redis分布式鎖的問題:
如果出現了這么一個問題:如果setnx是成功的,但是expire設定失敗,一旦出現了釋放鎖失敗,或者沒有手工釋放,那么這個鎖永遠被占用,其他執行緒永遠也搶不到鎖,
所以,需要保障setnx和expire兩個操作的原子性,要么全部執行,要么全部不執行,二者不能分開,
解決的辦法有兩種:
- 使用set的命令時,同時設定過期時間,不再單獨使用 expire命令
- 使用lua腳本,將加鎖的命令放在lua腳本中原子性的執行
簡單加鎖:使用set的命令時,同時設定過期時間
使用set的命令時,同時設定過期時間的示例如下:
127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379>
127.0.0.1:6379> set test "111" EX 100 NX
OK
這樣就完美的解決了分布式鎖的原子性; set 命令的完整格式:
set key value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds:設定失效時長,單位秒
PX milliseconds:設定失效時長,單位毫秒
NX:key不存在時設定value,成功回傳OK,失敗回傳(nil)
XX:key存在時設定value,成功回傳OK,失敗回傳(nil)
使用set命令實作加鎖操作,先展示加鎖的簡單代碼實習,再帶大家慢慢解釋為什么這樣實作,
加鎖的簡單代碼實作
package com.crazymaker.springcloud.standard.lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {
private RedisTemplate redisTemplate;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 嘗試獲取分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請求標識
* @param expireTime 超期時間
* @return 是否獲取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
可以看到,我們加鎖用到了Jedis的set Api:
jedis.set(String key, String value, String nxxx, String expx, int time)
這個set()方法一共有五個形參:
-
第一個為key,我們使用key來當鎖,因為key是唯一的,
-
第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據,
requestId可以使用
UUID.randomUUID().toString()方法生成, -
第三個為nxxx,這個引數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
-
第四個為expx,這個引數我們傳的是PX,意思是我們要給這個key加一個過期的設定,具體時間由第五個引數決定,
-
第五個為time,與第四個引數相呼應,代表key的過期時間,
總的來說,執行上面的set()方法就只會導致兩種結果:
- 當前沒有鎖(key不存在),那么就進行加鎖操作,并對鎖設定個有效期,同時value表示加鎖的客戶端,
- 已有鎖存在,不做任何操作,
心細的童鞋就會發現了,我們的加鎖代碼滿足前面描述的四個條件中的三個,
-
首先,set()加入了NX引數,可以保證如果已有key存在,則函式不會呼叫成功,也就是只有一個客戶端能持有鎖,滿足互斥性,
-
其次,由于我們對鎖設定了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即key被洗掉),不會被永遠占用(而發生死鎖),
-
最后,因為我們將value賦值為requestId,代表加鎖的客戶端請求標識,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端,
-
由于我們只考慮Redis單機部署的場景,所以容錯性我們暫不考慮,
基于Jedis 的API實作簡單解鎖代碼
還是先展示代碼,再帶大家慢慢解釋為什么這樣實作,
解鎖的簡單代碼實作:
package com.crazymaker.springcloud.standard.lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {
private static final Long RELEASE_SUCCESS = 1L;
/**
* 釋放分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請求標識
* @return 是否釋放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
那么這段Lua代碼的功能是什么呢?
其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則洗掉鎖(解鎖),
第一行代碼,我們寫了一個簡單的Lua腳本代碼,
第二行代碼,我們將Lua代碼傳到jedis.eval()方法里,并使引數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId,eval()方法是將Lua代碼交給Redis服務端執行,
那么為什么要使用Lua語言來實作呢?
因為要確保上述操作是原子性的,那么為什么執行eval()方法可以確保原子性,源于Redis的特性.
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,并且直到eval命令執行完成,Redis才會執行其他命
錯誤示例1
最常見的解鎖代碼就是直接使用 jedis.del() 方法洗掉鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的,
public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}
錯誤示例2
這種解鎖代碼乍一看也是沒問題,甚至我之前也差點這樣實作,與正確姿勢差不多,唯一區別的是分成兩條命令去執行,代碼如下:
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
// 判斷加鎖與解鎖是不是同一個客戶端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
jedis.del(lockKey);
}
}
再造輪子:基于Lua腳本實作分布式鎖
lua腳本的好處
前面提到,在redis中執行lua腳本,有如下的好處:
那么為什么要使用Lua語言來實作呢?
因為要確保上述操作是原子性的,那么為什么執行eval()方法可以確保原子性,源于Redis的特性.
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,并且直到eval命令執行完成,Redis才會執行其他命
所以:
大部分的開源框架(如 redission)中的分布式鎖組件,都是用純lua腳本實作的,
題外話: lua腳本是高并發、高性能的必備腳本語言
有關lua的詳細介紹,請參見以下書籍:

那么,我們也來模擬一下
基于純Lua腳本的分布式鎖的執行流程
加鎖和洗掉鎖的操作,使用純lua進行封裝,保障其執行時候的原子性,
基于純Lua腳本實作分布式鎖的執行流程,大致如下:

加鎖的Lua腳本: lock.lua
--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then
--PEXPIRE:以毫秒的形式指定過期時間
redis.call('pexpire', key, ttl)
else
result = -1;
-- 如果value相同,則認為是同一個執行緒的請求,則認為重入鎖
local value = redis.call('get', key)
if (value == requestId) then
result = 1;
redis.call('pexpire', key, ttl)
end
end
-- 如果獲取鎖成功,則回傳 1
return result
解鎖的Lua腳本: unlock.lua:
--- -1 failed
--- 1 success
-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId then
redis.call('del', key);
return 1;
end
return -1
兩個檔案,放在資源檔案夾下備用:

在Java中呼叫lua腳本,完成加鎖操作
下一步,實作Lock介面, 完成JedisLock的分布式鎖,
其加鎖操作,通過呼叫 lock.lua腳本完成,代碼如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {
private RedisTemplate redisTemplate;
RedisScript<Long> lockScript = null;
RedisScript<Long> unLockScript = null;
public static final int DEFAULT_TIMEOUT = 2000;
public static final Long LOCKED = Long.valueOf(1);
public static final Long UNLOCKED = Long.valueOf(1);
public static final Long WAIT_GAT = Long.valueOf(200);
public static final int EXPIRE = 2000;
String key;
String lockValue; // lockValue 鎖的value ,代表執行緒的uuid
/**
* 默認為2000ms
*/
long expire = 2000L;
public JedisLock(String lockKey, String lockValue) {
this.key = lockKey;
this.lockValue = lockValue;
}
private volatile boolean isLocked = false;
private Thread thread;
/**
* 獲取一個分布式鎖 , 超時則回傳失敗
*
* @return 獲鎖成功 - true | 獲鎖失敗 - false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//本地可重入
if (isLocked && thread == Thread.currentThread()) {
return true;
}
expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
long startMillis = System.currentTimeMillis();
Long millisToWait = expire;
boolean localLocked = false;
int turn = 1;
while (!localLocked) {
localLocked = this.lockInner(expire);
if (!localLocked) {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
/**
* 還沒有超時
*/
ThreadUtil.sleepMilliSeconds(WAIT_GAT);
log.info("睡眠一下,重新開始,turn:{},剩余時間:{}", turn++, millisToWait);
} else {
log.info("搶鎖超時");
return false;
}
} else {
isLocked = true;
localLocked = true;
}
}
return isLocked;
}
/**
* 有回傳值的搶奪鎖
*
* @param millisToWait
*/
public boolean lockInner(Long millisToWait) {
if (null == key) {
return false;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(lockValue);
redisKeys.add(String.valueOf(millisToWait));
Long res = (Long) redisTemplate.execute(lockScript, redisKeys);
return res != null && res.equals(LOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("搶鎖失敗").build();
}
}
}
在Java中呼叫lua腳本,完成解鎖操作
其解鎖操作,通過呼叫unlock.lua腳本完成,代碼如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {
private RedisTemplate redisTemplate;
RedisScript<Long> lockScript = null;
RedisScript<Long> unLockScript = null;
//釋放鎖
@Override
public void unlock() {
if (key == null || requestId == null) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("釋放鎖失敗").build();
}
}
}
撰寫RedisLockService用于管理JedisLock
撰寫個分布式鎖服務,用于加載lua腳本,創建 分布式鎖,代碼如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
public class RedisLockService
{
private RedisTemplate redisTemplate;
static String lockLua = "script/lock.lua";
static String unLockLua = "script/unlock.lua";
static RedisScript<Long> lockScript = null;
static RedisScript<Long> unLockScript = null;
{
String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
// String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
if(StringUtils.isEmpty(script))
{
log.error("lua load failed:"+lockLua);
}
lockScript = new DefaultRedisScript<>(script, Long.class);
// script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
if(StringUtils.isEmpty(script))
{
log.error("lua load failed:"+unLockLua);
}
unLockScript = new DefaultRedisScript<>(script, Long.class);
}
public RedisLockService(RedisTemplate redisTemplate)
{
this.redisTemplate = redisTemplate;
}
public Lock getLock(String lockKey, String lockValue) {
JedisLock lock=new JedisLock(lockKey,lockValue);
lock.setRedisTemplate(redisTemplate);
lock.setLockScript(lockScript);
lock.setUnLockScript(unLockScript);
return lock;
}
}
測驗用例
接下來,終于可以上測驗用例了
package com.crazymaker.springcloud.lock;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定啟動類
public class RedisLockTest {
@Resource
RedisLockService redisLockService;
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLock() {
int threads = 10;
final int[] count = {0};
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
String lockValue = UUID.randomUUID().toString();
try {
Lock lock = redisLockService.getLock("test:lock:1", lockValue);
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (locked) {
for (int j = 0; j < 1000; j++) {
count[0]++;
}
log.info("count = " + count[0]);
lock.unlock();
} else {
System.out.println("搶鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10個執行緒每個累加1000為: = " + count[0]);
//輸出統計結果
float time = System.currentTimeMillis() - start;
System.out.println("運行的時長為(ms):" + time);
System.out.println("每一次執行的時長為(ms):" + time / count[0]);
}
}
執行用例,結果如下:
2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest LN:50 count = 6000
2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest LN:50 count = 7000
2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9586
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest LN:50 count = 8000
2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest LN:50 count = 9000
2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:4,剩余時間:9383
2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest LN:50 count = 10000
10個執行緒每個累加1000為: = 10000
運行的時長為(ms):827.0
每一次執行的時長為(ms):0.0827
STW導致的鎖過期問題
下面有一個簡單的使用鎖的例子,在10秒內占著鎖:
//寫資料到檔案
function writeData(filename, data) {
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (!locked) {
throw 'Failed to acquire lock';
}
try {
//將資料寫到檔案
var file = storage.readFile(filename);
var updated = updateContents(file, data);
storage.writeFile(filename, updated);
} finally {
lock.unlock();
}
}
問題是:如果在寫檔案程序中,發生了 fullGC,并且其時間跨度較長, 超過了10秒, 那么,分布式就自動釋放了,
在此程序中,client2 搶到鎖,寫了檔案,
client1 的fullGC完成后,也繼續寫檔案,注意,此時client1 的并沒有占用鎖,此時寫入會導致檔案資料錯亂,發生執行緒安全問題,
這就是STW導致的鎖過期問題,
STW導致的鎖過期問題,具體如下圖所示:

STW導致的鎖過期問題,大概的解決方案,有:
1: 模擬CAS樂觀鎖的方式,增加版本號
2:watch dog自動延期機制
1: 模擬CAS樂觀鎖的方式,增加版本號(如下圖中的token)

此方案如果要實作,需要調整業務邏輯,與之配合,所以會入侵代碼,
2:watch dog自動延期機制
客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?
簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間,
redission,采用的就是這種方案, 此方案不會入侵業務代碼,
為啥推薦使用Redission
作為 Java 開發人員,我們若想在程式中集成 Redis,必須使用 Redis 的第三方庫,目前大家使用的最多的第三方庫是jedis,
和SpringCloud gateway一樣,Redisson也是基于Netty實作的,是更高性能的第三方庫, 所以,這里推薦大家使用Redission替代 jedis,
在使用Redission之前,建議大家先掌握Netty的知識,
推薦大家閱讀被很多小伙伴評價為史上最為易懂的NIO、Netty書籍:《Java高并發核心編程(卷1)》
Redisson簡介
Redisson是一個在Redis的基礎上實作的Java駐記憶體資料網格(In-Memory Data Grid),它不僅提供了一系列的分布式的Java常用物件,還實作了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務,

Redisson提供了使用Redis的最簡單和最便捷的方法,Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上,

Redisson與Jedis的對比
1.概況對比
Jedis是Redis的java實作的客戶端,其API提供了比較全面的的Redis命令的支持,Redisson實作了分布式和可擴展的的java資料結構,和Jedis相比,功能較為簡單,不支持字串操作,不支持排序,事物,管道,磁區等Redis特性,Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中的放在處理業務邏輯上,
2.可伸縮性
Jedis使用阻塞的I/O,且其方法呼叫都是同步的,程式流程要等到sockets處理完I/O才能執行,不支持異步,Jedis客戶端實體不是執行緒安全的,所以需要通過連接池來使用Jedis,
Redisson使用非阻塞的I/O和基于Netty框架的事件驅動的通信層,其方法呼叫時異步的,Redisson的API是執行緒安全的,所以操作單個Redisson連接來完成各種操作,
3.第三方框架整合
Redisson在Redis的基礎上實作了java快取標準規范;Redisson還提供了Spring Session回話管理器的實作,
Redission 的原始碼地址:
官網: https://redisson.org/
github: https://github.com/redisson/redisson#quick-start

特性 & 功能:
-
支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式
-
程式介面呼叫方式采用異步執行和異步流執行兩種方式
-
資料序列化,Redisson 的物件編碼類是用于將物件進行序列化和反序列化,以實作對該物件在 Redis 里的讀取和存盤
-
單個集合資料分片,在集群模式下,Redisson 為單個 Redis 集合型別提供了自動分片的功能
-
提供多種分布式物件,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
-
提供豐富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
-
分布式鎖和同步器的實作,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等
-
提供先進的分布式服務,如分布式遠程服務(Remote Service),分布式實時物件(Live Object)服務,分布式執行服務(Executor Service),分布式調度任務服務(Schedule Service)和分布式映射歸納服務(MapReduce)
Redisson的使用
如何安裝 Redisson
安裝 Redisson 最便捷的方法是使用 Maven 或者 Gradle:
?Maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.4</version>
</dependency>
?Gradle
compile group: 'org.redisson', name: 'redisson', version: '3.11.4'
目前 Redisson 最新版是 3.11.4,當然你也可以通過搜索 Maven 中央倉庫 mvnrepository[1] 來找到 Redisson 的各種版本,
獲取RedissonClient物件
RedissonClient有多種模式,主要的模式有:
-
單節點模式
-
哨兵模式
-
主從模式
-
集群模式
首先介紹單節點模式,
單節點模式的程式化配置方法,大致如下:
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();
SingleServerConfig類的設定引數如下:
address(節點地址)
可以通過
host:port的格式來指定節點地址,subscriptionConnectionMinimumIdleSize(發布和訂閱連接的最小空閑連接數)
默認值:
1用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,
subscriptionConnectionPoolSize(發布和訂閱連接池大小)
默認值:
50用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
connectionMinimumIdleSize(最小空閑連接數)
默認值:
32最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,
connectionPoolSize(連接池大小)
默認值:
64連接池最大容量,連接池的連接數量自動彈性伸縮,
dnsMonitoring(是否啟用DNS監測)
默認值:
false在啟用該功能以后,Redisson將會監測DNS的變化情況,
dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)
默認值:
5000監測DNS的變化情況的時間間隔,
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同節點建立連接時的等待超時,時間單位是毫秒,
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間,該時間從命令發送成功時開始計時,
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,
database(資料庫編號)
默認值:
0嘗試連接的資料庫編號,
password(密碼)
默認值:
null用于節點身份驗證的密碼,
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量,
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱,
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力,
sslProvider(SSL實作方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑,
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼,
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑,
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼,
SpringBoot整合Redisson
Redisson有多種模式,首先介紹單機模式的整合,
一、匯入Maven依賴
<!-- redisson-springboot -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>
二、核心組態檔
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
timeout: 5000
三、添加配置類
RedissonConfig.java
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Autowired
private RedisProperties redisProperties;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
config.useSingleServer().setDatabase(3);
return Redisson.create(config);
}
}
自定義starter
由于redission可以有多種模式,處于學習的目的,將多種模式封裝成一個start,可以學習一下starter的制作,

封裝一個RedissonManager,通過策略模式,根據不同的配置型別,創建 RedissionConfig實體,然后創建RedissionClient物件,

使用RBucket操作分布式物件
Redission模擬了Java的面向物件編程思想,可以簡單理解為一切皆為物件,
每一個 Redisson 物件 實作了RObject and RExpirable 兩個interfaces.
Usage example:
RObject object = redisson.get...()
object.sizeInMemory();
object.delete();
object.rename("newname");
object.isExists();
// catch expired event
object.addListener(new ExpiredObjectListener() {
...
});
// catch delete event
object.addListener(new DeletedObjectListener() {
...
});
每一個Redisson 物件的名字,就是 Redis中的 Key.
RMap map = redisson.getMap("mymap");
map.getName(); // = mymap
可以通過 RKeys 介面操作Redis中的keys.
Usage example:
RKeys keys = redisson.getKeys();
Iterable<String> allKeys = keys.getKeys();
Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
long deletedKeysAmount = keys.deleteByPattern("test?");
String randomKey = keys.randomKey();
long keysAmount = keys.count();
keys.flushall();
keys.flushdb();
Redisson通過RBucket介面代表可以訪問任何型別的基礎物件,或者普通物件,
RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用于設值/取值/獲取尺寸,
RBucket普通物件的最大大小,為512兆位元組,
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
下面是一個完整的實體:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRBucketExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RList 繼承了 java.util.List 介面
RBucket<String> rstring = client.getBucket("redission:test:bucket:string");
rstring.set("this is a string");
RBucket<UserDTO> ruser = client.getBucket("redission:test:bucket:user");
UserDTO dto = new UserDTO();
dto.setToken(UUID.randomUUID().toString());
ruser.set(dto);
System.out.println("string is: " + rstring.get());
System.out.println("dto is: " + ruser.get());
client.shutdown();
}
}
運行上面的代碼時,可以獲得以下輸出:
string is: this is a string
dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)

使用 RList 操作 Redis 串列
下面的代碼簡單演示了如何在 Redisson 中使用 RList 物件,RList 是 Java 的 List 集合的分布式并發實作,
考慮以下代碼:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testListExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RList 繼承了 java.util.List 介面
RList<String> nameList = client.getList("redission:test:nameList");
nameList.clear();
nameList.add("張三");
nameList.add("李四");
nameList.add("王五");
nameList.remove(-1);
System.out.println("List size: " + nameList.size());
boolean contains = nameList.contains("李四");
System.out.println("Is list contains name '李四': " + contains);
nameList.forEach(System.out::println);
client.shutdown();
}
}
運行上面的代碼時,可以獲得以下輸出:
List size: 2
Is list contains name '李四': true
張三
李四

使用 RMap 操作 Redis 哈希
Redisson 還包括 RMap,它是 Java Map 集合的分布式并發實作,考慮以下代碼:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testListExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RMap 繼承了 java.util.concurrent.ConcurrentMap 介面
RMap<String, Object> map = client.getMap("redission:test:personalMap");
map.put("name", "張三");
map.put("address", "北京");
map.put("age", new Integer(50));
System.out.println("Map size: " + map.size());
boolean contains = map.containsKey("age");
System.out.println("Is map contains key 'age': " + contains);
String value = String.valueOf(map.get("name"));
System.out.println("Value mapped by key 'name': " + value);
client.shutdown();
}
}
運行上面的代碼時,將會看到以下輸出:
Map size: 3
Is map contains key 'age': true
Value mapped by key 'name': 張三

執行 Lua腳本
Lua是一種開源、簡單易學、輕量小巧的腳本語言,用標準C語言撰寫,
其設計的目的就是為了嵌入應用程式中,從而為應用程式提供靈活的擴展和定制功能,
Redis從2.6版本開始支持Lua腳本,Redis使用Lua可以:
- 原子操作,Redis會將整個腳本作為一個整體執行,不會被中斷,可以用來批量更新、批量插入
- 減少網路開銷,多個Redis操作合并為一個腳本,減少網路時延
- 代碼復用,客戶端發送的腳本可以存盤在Redis中,其他客戶端可以根據腳本的id呼叫,
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLuaExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
redisson.getBucket("redission:test:foo").set("bar");
String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
"return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
System.out.println("foo: " + r);
// 通過預存的腳本進行同樣的操作
RScript s = redisson.getScript();
// 首先將腳本加載到Redis
String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
// 回傳值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
System.out.println("sha1: " + sha1);
// 再通過SHA值呼叫腳本
Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
sha1,
RScript.ReturnType.VALUE,
Collections.emptyList());
try {
System.out.println("res: " + r1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
client.shutdown();
}
}
運行上面的代碼時,將會看到以下輸出:
foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar

使用 RLock 實作 Redis 分布式鎖
RLock 是 Java 中可重入鎖的分布式實作,下面的代碼演示了 RLock 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLockExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
// RLock 繼承了 java.util.concurrent.locks.Lock 介面
RLock lock = redisson.getLock("redission:test:lock:1");
final int[] count = {0};
int threads = 10;
ExecutorService pool = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
for (int j = 0; j < 1000; j++) {
lock.lock();
count[0]++;
lock.unlock();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10個執行緒每個累加1000為: = " + count[0]);
//輸出統計結果
float time = System.currentTimeMillis() - start;
System.out.println("運行的時長為:" + time);
System.out.println("每一次執行的時長為:" + time/count[0]);
}
}
此代碼將產生以下輸出:
10個執行緒每個累加1000為: = 10000
運行的時長為:14172.0
每一次執行的時長為:1.4172
使用 RAtomicLong 實作 Redis 原子操作
RAtomicLong 是 Java 中 AtomicLong 類的分布式“替代品”,用于在并發環境中保存長值,以下示例代碼演示了 RAtomicLong 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 執行緒數
final int threads = 10;
// 每條執行緒的執行輪數
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}
此代碼的輸出將是:
atomicLong: 10000

整長型累加器(LongAdder)
基于Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的介面,通過利用客戶端內置的LongAdder物件,為分布式環境下遞增和遞減操作提供了很高得性能,據統計其性能最高比分布式AtomicLong物件快 12000 倍,
完美適用于分布式統計計量場景,下面是RLongAdder的使用案例:
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();
以下示例代碼演示了 RLongAdder 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 執行緒數
final int threads = 10;
// 每條執行緒的執行輪數
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}
此代碼將產生以下輸出:
longAdder: 10000
運行的時長為:5085.0
每一次執行的時長為:0.5085
當不再使用整長型累加器物件的時候應該自行手動銷毀,如果Redisson物件被關閉(shutdown)了,則不用手動銷毀,
RLongAdder atomicLong = ...
atomicLong.destroy();
序列化
Redisson的物件編碼類是用于將物件進行序列化和反序列化,以實作對該物件在Redis里的讀取和存盤,Redisson提供了以下幾種的物件編碼應用,以供大家選擇:
| 編碼類名稱 | 說明 |
|---|---|
org.redisson.codec.JsonJacksonCodec | Jackson JSON 編碼 默認編碼 |
org.redisson.codec.AvroJacksonCodec | Avro 一個二進制的JSON編碼 |
org.redisson.codec.SmileJacksonCodec | Smile 另一個二進制的JSON編碼 |
org.redisson.codec.CborJacksonCodec | CBOR 又一個二進制的JSON編碼 |
org.redisson.codec.MsgPackJacksonCodec | MsgPack 再來一個二進制的JSON編碼 |
org.redisson.codec.IonJacksonCodec | Amazon Ion 亞馬遜的Ion編碼,格式與JSON類似 |
org.redisson.codec.KryoCodec | Kryo 二進制物件序列化編碼 |
org.redisson.codec.SerializationCodec | JDK序列化編碼 |
org.redisson.codec.FstCodec | FST 10倍于JDK序列化性能而且100%兼容的編碼 |
org.redisson.codec.LZ4Codec | LZ4 壓縮型序列化物件編碼 |
org.redisson.codec.SnappyCodec | Snappy 另一個壓縮型序列化物件編碼 |
org.redisson.client.codec.JsonJacksonMapCodec | 基于Jackson的映射類使用的編碼,可用于避免序列化類的資訊,以及用于解決使用byte[]遇到的問題, |
org.redisson.client.codec.StringCodec | 純字串編碼(無轉換) |
org.redisson.client.codec.LongCodec | 純整長型數字編碼(無轉換) |
org.redisson.client.codec.ByteArrayCodec | 位元組陣列編碼 |
org.redisson.codec.CompositeCodec | 用來組合多種不同編碼在一起 |
由Redisson默認的編碼器為二進制編碼器,為了序列化后的內容可見,需要使用Json文本序列化編碼工具類,Redisson提供了編碼器 JsonJacksonCodec,作為Json文本序列化編碼工具類,
問題是:JsonJackson在序列化有雙向參考的物件時,會出現無限回圈例外,而fastjson在檢查出雙向參考后會自動用參考符$ref替換,終止回圈,
所以,一些特殊場景中:用fastjson能 正常序列化到redis,而JsonJackson則拋出無限回圈例外,
為了序列化后的內容可見,所以不用redission其他自帶的,自行實作fastjson編碼器:
package com.crayon.distributedredissionspringbootstarter.codec;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
public class FastjsonCodec extends BaseCodec {
private final Encoder encoder = in -> {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
};
private final Decoder<Object> decoder = (buf, state) ->
JSON.parseObject(new ByteBufInputStream(buf), Object.class);
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
}
替換的方法如下:
*/
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {
@Override
public Config createRedissonConfig(RedissonConfig redissonConfig) {
Config config = new Config();
try {
String address = redissonConfig.getAddress();
String password = redissonConfig.getPassword();
int database = redissonConfig.getDatabase();
String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
config.useSingleServer().setAddress(redisAddr);
config.useSingleServer().setDatabase(database);
//密碼可以為空
if (!StringUtils.isEmpty(password)) {
config.useSingleServer().setPassword(password);
}
log.info("初始化[單機部署]方式Config,redisAddress:" + address);
// config.setCodec( new FstCodec());
config.setCodec( new FastjsonCodec());
} catch (Exception e) {
log.error("單機部署 Redisson init error", e);
}
return config;
}
}
哨兵模式
哨兵模式即sentinel模式,配置Redis哨兵服務的官方檔案在這里,
哨兵模式實作代碼和單機模式幾乎一樣,唯一的不同就是Config的構造.
程式化配置哨兵模式的方法如下:
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
// use "rediss://" for SSL connection
.addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
.addSentinelAddress("redis://127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
Redisson的哨兵模式的使用方法如下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig配置引數如下:
配置Redis哨兵服務的官方檔案在這里,Redisson的哨兵模式的使用方法如下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig類的設定引數如下:dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:
5000用來指定檢查節點DNS變化的時間間隔,使用的時候應該確保JVM里的DNS資料的快取時間保持在足夠低的范圍才有意義,用
-1來禁用該功能,masterName(主服務器的名稱)
主服務器的名稱是哨兵行程中用來監測主從服務切換情況的,
addSentinelAddress(添加哨兵節點地址)
可以通過
host:port的格式來指定哨兵節點的地址,多個節點可以一次性批量添加,readMode(讀取操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的資料說明已經至少有兩個節點保存了該資料,確保了資料的高可用性,
設定讀取操作選擇節點的模式,可用值為:
SLAVE- 只在從服務節點里讀取,MASTER- 只在主服務節點里讀取,MASTER_SLAVE- 在主從服務節點里都可以讀取,subscriptionMode(訂閱操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里訂閱)設定訂閱操作選擇節點的模式,可用值為:
SLAVE- 只在從服務節點里訂閱,MASTER- 只在主服務節點里訂閱,loadBalancer(負載均衡演算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer- 權重輪詢調度演算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 輪詢調度演算法org.redisson.connection.balancer.RandomLoadBalancer- 隨機調度演算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:
1多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:
50多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 從服務節點里用于普通操作(非 發布和訂閱)的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時讀取反映速度,
slaveConnectionPoolSize(從節點連接池大小)
默認值:
64多從節點的環境里,每個 從服務節點里用于普通操作(非 發布和訂閱)連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 主節點的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,
masterConnectionPoolSize(主節點連接池大小)
默認值:
64主節點的連接池最大容量,連接池的連接數量自動彈性伸縮,
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同任何節點建立連接時的等待超時,時間單位是毫秒,
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間,該時間從命令發送成功時開始計時,
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,
database(資料庫編號)
默認值:
0嘗試連接的資料庫編號,
password(密碼)
默認值:
null用于節點身份驗證的密碼,
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量,
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱,
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力,
sslProvider(SSL實作方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑,
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼,
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑,
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼,
通過屬性檔案,配置的示例如下:
--- sentinelServersConfig: idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 failedSlaveReconnectionInterval: 3000 failedSlaveCheckInterval: 60000 password: null subscriptionsPerConnection: 5 clientName: null loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 slaveConnectionMinimumIdleSize: 24 slaveConnectionPoolSize: 64 masterConnectionMinimumIdleSize: 24 masterConnectionPoolSize: 64 readMode: "SLAVE" subscriptionMode: "SLAVE" sentinelAddresses: - "redis://127.0.0.1:26379" - "redis://127.0.0.1:26389" masterName: "mymaster" database: 0 threads: 16 nettyThreads: 32 codec: !<org.redisson.codec.MarshallingCodec> {} transportMode: "NIO"
主從模式
介紹配置Redis主從服務組態的檔案在這里.
程式化配置主從模式的方法如下:
Config config = new Config();
config.useMasterSlaveServers()
// use "rediss://" for SSL connection
.setMasterAddress("redis://127.0.0.1:6379")
.addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
.addSlaveAddress("redis://127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
主從模式使用到MasterSlaveServersConfig :
MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();
MasterSlaveServersConfig 類的設定引數如下:
dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:
5000用來指定檢查節點DNS變化的時間間隔,使用的時候應該確保JVM里的DNS資料的快取時間保持在足夠低的范圍才有意義,用
-1來禁用該功能,masterAddress(主節點地址)
可以通過
host:port的格式來指定主節點地址,addSlaveAddress(添加從主節點地址)
可以通過
host:port的格式來指定從節點的地址,多個節點可以一次性批量添加,readMode(讀取操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的資料說明已經至少有兩個節點保存了該資料,確保了資料的高可用性,
設定讀取操作選擇節點的模式,可用值為:
SLAVE- 只在從服務節點里讀取,MASTER- 只在主服務節點里讀取,MASTER_SLAVE- 在主從服務節點里都可以讀取,subscriptionMode(訂閱操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里訂閱)設定訂閱操作選擇節點的模式,可用值為:
SLAVE- 只在從服務節點里訂閱,MASTER- 只在主服務節點里訂閱,loadBalancer(負載均衡演算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer- 權重輪詢調度演算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 輪詢調度演算法org.redisson.connection.balancer.RandomLoadBalancer- 隨機調度演算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:
1多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:
50多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 從服務節點里用于普通操作(非 發布和訂閱)的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時讀取反映速度,
slaveConnectionPoolSize(從節點連接池大小)
默認值:
64多從節點的環境里,每個 從服務節點里用于普通操作(非 發布和訂閱)連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 主節點的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,
masterConnectionPoolSize(主節點連接池大小)
默認值:
64主節點的連接池最大容量,連接池的連接數量自動彈性伸縮,
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同任何節點建立連接時的等待超時,時間單位是毫秒,
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間,該時間從命令發送成功時開始計時,
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,
database(資料庫編號)
默認值:
0嘗試連接的資料庫編號,
password(密碼)
默認值:
null用于節點身份驗證的密碼,
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量,
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱,
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力,
sslProvider(SSL實作方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑,
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼,
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑,
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼,
集群模式
集群模式除了適用于Redis集群環境,也適用于任何云計算服務商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云資料庫Redis版,
介紹配置Redis集群組態的檔案在這里, Redis集群組態的最低要求是必須有三個主節點,
集群模式構造Config如下:
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // 集群狀態掃描間隔時間,單位是毫秒
//可以用"rediss://"來啟用SSL連接
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
集群模式使用到ClusterServersConfig :
ClusterServersConfig clusterConfig = config.useClusterServers();
ClusterServersConfig 配置引數如下:
nodeAddresses(添加節點地址)
可以通過
host:port的格式來添加Redis集群節點的地址,多個節點可以一次性批量添加,scanInterval(集群掃描間隔時間)
默認值:
1000對Redis集群節點狀態掃描的時間間隔,單位是毫秒,
slots(分片數量)
默認值:
231用于指定資料分片程序中的分片數量,支持資料分片/框架結構有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.readMode(讀取操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的資料說明已經至少有兩個節點保存了該資料,確保了資料的高可用性,
設定讀取操作選擇節點的模式,可用值為:
SLAVE- 只在從服務節點里讀取,MASTER- 只在主服務節點里讀取,MASTER_SLAVE- 在主從服務節點里都可以讀取,subscriptionMode(訂閱操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里訂閱)設定訂閱操作選擇節點的模式,可用值為:
SLAVE- 只在從服務節點里訂閱,MASTER- 只在主服務節點里訂閱,loadBalancer(負載均衡演算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer在多Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer- 權重輪詢調度演算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 輪詢調度演算法org.redisson.connection.balancer.RandomLoadBalancer- 隨機調度演算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:
1多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:
50多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 從服務節點里用于普通操作(非 發布和訂閱)的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時讀取反映速度,
slaveConnectionPoolSize(從節點連接池大小)
默認值:
64多從節點的環境里,每個 從服務節點里用于普通操作(非 發布和訂閱)連接的連接池最大容量,連接池的連接數量自動彈性伸縮,
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:
32多節點的環境里,每個 主節點的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,
masterConnectionPoolSize(主節點連接池大小)
默認值:
64多主節點的環境里,每個 主節點的連接池最大容量,連接池的連接數量自動彈性伸縮,
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同任何節點建立連接時的等待超時,時間單位是毫秒,
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間,該時間從命令發送成功時開始計時,
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,
password(密碼)
默認值:
null用于節點身份驗證的密碼,
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量,
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱,
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力,
sslProvider(SSL實作方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑,
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼,
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑,
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼,
簡單Redision鎖的原理
Redis發展到現在,幾種常見的部署架構有:
- 單機模式;
- 哨兵模式;
- 集群模式;
先介紹,基于單機模式的簡單Redision鎖的使用,
簡單Redision鎖的使用
單機模式下,簡單Redision鎖的使用如下:
// 構造redisson實作分布式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
// 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 設定鎖定資源名稱
RLock disLock = redissonClient.getLock("DISLOCK");
//嘗試獲取分布式鎖
boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {
try {
//TODO if get lock success, do something;
Thread.sleep(15000);
} catch (Exception e) {
} finally {
// 無論如何, 最后都要解鎖
disLock.unlock();
}
}
通過代碼可知,經過Redisson的封裝,實作Redis分布式鎖非常方便,和顯式鎖的使用方法是一樣的,RLock介面繼承了 Lock介面,
我們再看一下Redis中的value是啥,和前文分析一樣,hash結構, redis 的key就是資源名稱,
hash結構的key就是UUID+threadId,hash結構的value就是重入值,在分布式鎖時,這個值為1(Redisson還可以實作重入鎖,那么這個值就取決于重入次數了):
172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"
使用客戶端工具看到的效果如下:

getLock()方法
可以看到,呼叫getLock()方法后實際回傳一個RedissonLock物件
tryLock方法
下面來看下tryLock方法,原始碼如下:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
以上代碼使用了異步回呼模式,RFuture 繼承了 java.util.concurrent.Future, CompletionStage兩大介面,異步回呼模式的基礎知識,請參見 《Java高并發核心編程 卷2 》
tryAcquire()方法
在RedissonLock物件的lock()方法主要呼叫tryAcquire()方法

tryLockInnerAsync
由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個方法才是關鍵
首先,看一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
這和前面的jedis呼叫lua腳本類似,最后兩個引數分別是keys和params,
單獨將呼叫的那一段摘出來看,實際呼叫是這樣的:
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
結合上面的引數宣告,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假設:
- 前面獲取鎖時傳的name是“DISLOCK”,
- 假設呼叫的執行緒ID是1,
- 假設成員變數UUID型別的id是01a6d806-d282-4715-9bec-f51b9aa98110
那么KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1
因此,這段腳本的意思是
1、判斷有沒有一個叫“DISLOCK”的key
2、如果沒有,則在其下設定一個欄位為“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值為“1”的鍵值對 ,并設定它的過期時間
3、如果存在,則進一步判斷“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,則其值加1,并重新設定過期時間
4、回傳“DISLOCK”的生存時間(毫秒)
原理:加鎖機制
這里用的資料結構是hash,hash的結構是: key 欄位1 值1 欄位2 值2 ,,,
用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,欄位就表示當前獲得鎖的執行緒
所有競爭這把鎖的執行緒都要判斷在這個key下有沒有自己執行緒的欄位,如果沒有則不能獲得鎖,如果有,則相當于重入,欄位值加1(次數)

Lua腳本的詳解
為何要使用lua語言?
因為一大堆復雜的業務邏輯,可以通過封裝在lua腳本中發送給redis,保證這段復雜業務邏輯執行的原子性

回顧一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
注意,其最后兩個引數分別是keys和params,
關于 lua腳本的引數解釋:
**KEYS[1]**代表的是你加鎖的那個key,比如說:
RLock lock = redisson.getLock(“DISLOCK”);
這里你自己設定了加鎖的那個鎖key就是“DISLOCK”,
**ARGV[1]**代表的就是鎖key的默認生存時間
呼叫的時候,傳遞的引數為 internalLockLeaseTime ,該值默認30秒,
**ARGV[2]**代表的是加鎖的客戶端的ID,類似于下面這樣:
01a6d806-d282-4715-9bec-f51b9aa98110:1
lua腳本的第一段if判斷陳述句,就是用“exists DISLOCK”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就進行加鎖,
如何加鎖呢?很簡單,用下面的redis命令:
hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1
通過這個命令設定一個hash資料結構,這行命令執行后,會出現一個類似下面的資料結構:
DISLOCK:
{
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
}
接著會執行“pexpire DISLOCK 30000”命令,設定DISLOCK這個鎖key的生存時間是30秒(默認)
鎖互斥機制
那么在這個時候,如果客戶端2來嘗試加鎖,執行了同樣的一段lua腳本,會咋樣呢?
很簡單,第一個if判斷會執行“exists DISLOCK”,發現DISLOCK 這個鎖key已經存在了,
接著第二個if判斷,判斷一下,DISLOCK鎖key的hash資料結構中,是否包含客戶端2的ID,但是明顯不是的,因為那里包含的是客戶端1的ID,
所以,客戶端2會獲取到pttl DISLOCK回傳的一個數字,這個數字代表了DISLOCK 這個鎖key的**剩余生存時間,**比如還剩15000毫秒的生存時間,
此時客戶端2會進入一個while回圈,不停的嘗試加鎖,
可重入加鎖機制
如果客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎么樣呢?
RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//業務代碼
lock.lock();
//業務代碼
lock.unlock();
lock.unlock();
分析上面那段lua腳本,
第一個if判斷肯定不成立,“exists DISLOCK”會顯示鎖key已經存在了,
第二個if判斷會成立,因為DISLOCK的hash資料結構中包含的那個ID,就是客戶端1的那個ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此時就會執行可重入加鎖的邏輯,他會用:
incrby DISLOCK
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通過這個命令,對客戶端1的加鎖次數,累加1,
此時DISLOCK資料結構變為下面這樣:
DISLOCK:
{
8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
}
釋放鎖機制
如果執行lock.unlock(),就可以釋放分布式鎖,此時的業務邏輯也是非常簡單的,
其實說白了,就是每次都對DISLOCK資料結構中的那個加鎖次數減1,
如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:
“del DISLOCK”命令,從redis里洗掉這個key,
然后呢,另外的客戶端2就可以嘗試完成加鎖了,
unlock 原始碼
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
再深入一下,實際呼叫的是unlockInnerAsync方法
unlockInnerAsync方法

原理:Redision 解鎖機制
上圖沒有截取完整,完整的原始碼如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
我們還是假設name=DISLOCK,假設執行緒ID是1
同理,我們可以知道
KEYS[1]是getName(),即KEYS[1]=DISLOCK
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存時間
ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1
因此,上面腳本的意思是:
1、判斷是否存在一個叫“DISLOCK”的key
2、如果不存在,回傳nil
3、如果存在,使用Redis Hincrby 命令用于為哈希表中的欄位值加上指定增量值 -1 ,代表減去1
4、若counter >,回傳空,若欄位存在,則欄位值減1
5、若減完以后,counter > 0 值仍大于0,則回傳0
6、減完后,若欄位值小于或等于0,則用 publish 命令廣播一條訊息,廣播內容是0,并回傳1;
可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的執行緒現在可以獲得鎖了

通過redis Channel 解鎖訂閱
以上是正常情況下獲取到鎖的情況,那么當無法立即獲取到鎖的時候怎么辦呢?
再回到前面獲取鎖的位置
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 訂閱
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
protected static final LockPubSub PUBSUB = new LockPubSub();
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
這里會訂閱Channel,當資源可用時可以及時知道,并搶占,防止無效的輪詢而浪費資源
這里的channel為:
redisson_lock__channel:{DISLOCK}


當資源可用用的時候,回圈去嘗試獲取鎖,由于多個執行緒同時去競爭資源,所以這里用了信號量,對于同一個資源只允許一個執行緒獲得鎖,其它的執行緒阻塞
這點,有點兒類似 Zookeeper分布式鎖:
有關zookeeper分布式鎖的原理和實作,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)
watch dog自動延期機制
客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?
簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間,
使用watchDog機制實作鎖的續期
但是聰明的同學肯定會問:
有效時間設定多長,假如我的業務操作比有效時間長,我的業務代碼還沒執行完,就自動給我解鎖了,不就完蛋了嗎,
這個問題就有點棘手了,在網上也有很多討論:
第一種解決方法就是靠程式員自己去把握,預估一下業務代碼需要執行的時間,然后設定有效期時間比執行時間長一些,保證不會因為自動解鎖影響到客戶端業務代碼的執行,
但是這并不是萬全之策,比如網路抖動這種情況是無法預測的,也有可能導致業務代碼執行的時間變長,所以并不安全,
第二種方法,使用監事狗watchDog機制實作鎖的續期,
第二種方法比較靠譜一點,而且無業務入侵,
在Redisson框架實作分布式鎖的思路,就使用watchDog機制實作鎖的續期,
當加鎖成功后,同時開啟守護執行緒,默認有效期是30秒,每隔10秒就會給鎖續期到30秒,只要持有鎖的客戶端沒有宕機,就能保證一直持有鎖,直到業務代碼執行完畢由客戶端自己解鎖,如果宕機了自然就在有效期失效后自動解鎖,
這里,和前面解決 JVM STW的鎖過期問題有點類似,只不過,watchDog自動續期,也沒有完全解決JVM STW的鎖過期問題,
如何徹底解決 JVM STW的鎖過期問題,可以來瘋狂創客圈的社群討論,
redisson watchdog 使用和原理
實際上,redisson加鎖的基本流程圖如下:

這里專注于介紹watchdog,
首先watchdog的具體思路是 加鎖時,默認加鎖 30秒,每10秒鐘檢查一次,如果存在就重新設定 過期時間為30秒,
然后設定默認加鎖時間的引數是 lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
官方檔案描述如下
lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
默認值:
30000監控鎖的看門狗超時時間單位為毫秒,該引數只適用于分布式鎖的加鎖請求中未明確使用
leaseTimeout引數的情況,如果該看門狗未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態,這個引數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況,
需要注意的是
1.watchDog 只有在未顯示指定加鎖時間時才會生效,(這點很重要)
2.lockWatchdogTimeout設定的時間不要太小 ,比如我之前設定的是 100毫秒,由于網路直接導致加鎖完后,watchdog去延期時,這個key在redis中已經被洗掉了,
tryAcquireAsync原理
在呼叫lock方法時,會最終呼叫到tryAcquireAsync,詳細解釋如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果指定了加鎖時間,會直接去加鎖
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//沒有指定加鎖時間 會先進行加鎖,并且默認時間就是 LockWatchdogTimeout的時間
//這個是異步操作 回傳RFuture 類似netty中的future
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//這里也是類似netty Future 的addListener,在future內容執行完成后執行
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//這里是定時執行 當前鎖自動延期的動作
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
scheduleExpirationRenewal 中會呼叫renewExpiration,
renewExpiration執行延期動作
這里我們可以看到是 啟用了一個timeout定時,去執行延期動作
private void renewExpiration() {
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
//如果 沒有報錯,就再次定時延期
// reschedule itself
renewExpiration();
}
});
}
// 這里我們可以看到定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
最終 scheduleExpirationRenewal會呼叫到 renewExpirationAsync,
renewExpirationAsync
執行下面這段 lua腳本,他主要判斷就是 這個鎖是否在redis中存在,如果存在就進行 pexpire 延期,
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
watchLog總結
1.要使 watchLog機制生效 ,lock時 不要設定 過期時間
2.watchlog的延時時間 可以由 lockWatchdogTimeout指定默認延時時間,但是不要設定太小,如100
3.watchdog 會每 lockWatchdogTimeout/3時間,去延時,
4.watchdog 通過 類似netty的 Future功能來實作異步延時
5.watchdog 最侄訓是通過 lua腳本來進行延時
Redisson框架的分布式鎖
Redisson框架十分強大,除了前面介紹的 getLock方法獲取的分布式鎖(輸入可重入鎖的型別),還有很多其他的分布式鎖型別,
總體的Redisson框架的分布式鎖型別,大致如下:
- 可重入鎖
- 公平鎖
- 聯鎖
- 紅鎖
- 讀寫鎖
- 信號量
- 可過期信號量
- 閉鎖(/倒數閂)
1.可重入鎖(Reentrant Lock)
Redisson的分布式可重入鎖RLock Java物件實作了java.util.concurrent.locks.Lock介面,同時還支持自動過期解鎖,
public void testReentrantLock(RedissonClient redisson){
RLock lock = redisson.getLock("anyLock");
try{
// 1. 最常見的使用方法
//lock.lock();
// 2. 支持過期解鎖功能,10秒鐘以后自動解鎖, 無需呼叫unlock方法手動解鎖
//lock.lock(10, TimeUnit.SECONDS);
// 3. 嘗試加鎖,最多等待3秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
if(res){ //成功
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
Redisson同時還為分布式鎖提供了異步執行的相關方法:
public void testAsyncReentrantLock(RedissonClient redisson){
RLock lock = redisson.getLock("anyLock");
try{
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
if(res.get()){
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
2.公平鎖(Fair Lock)
Redisson分布式可重入公平鎖也是實作了java.util.concurrent.locks.Lock介面的一種RLock物件,在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒,
public void testFairLock(RedissonClient redisson){
RLock fairLock = redisson.getFairLock("anyLock");
try{
// 最常見的使用方法
fairLock.lock();
// 支持過期解鎖功能, 10秒鐘以后自動解鎖,無需呼叫unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
fairLock.unlock();
}
}
Redisson同時還為分布式可重入公平鎖提供了異步執行的相關方法:
RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
3.聯鎖(MultiLock)
Redisson的RedissonMultiLock物件可以將多個RLock物件關聯為一個聯鎖,每個RLock物件實體可以來自于不同的Redisson實體,
public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
try {
// 同時加鎖:lock1 lock2 lock3, 所有的鎖都上鎖成功才算成功,
lock.lock();
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
4.紅鎖(RedLock)
Redisson的RedissonRedLock物件實作了Redlock介紹的加鎖演算法,該物件也可以用來將多個RLock物件關聯為一個紅鎖,每個RLock物件實體可以來自于不同的Redisson實體,
public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功,
lock.lock();
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
5.讀寫鎖(ReadWriteLock)
Redisson的分布式可重入讀寫鎖RReadWriteLock,Java物件實作了java.util.concurrent.locks.ReadWriteLock介面,同時還支持自動過期解鎖,該物件允許同時有多個讀取鎖,但是最多只能有一個寫入鎖,
RReadWriteLock rwlock = redisson.getLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 支持過期解鎖功能
// 10秒鐘以后自動解鎖
// 無需呼叫unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
6.信號量(Semaphore)
Redisson的分布式信號量(Semaphore)Java物件RSemaphore采用了與java.util.concurrent.Semaphore相似的介面和用法,
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
7.可過期性信號量(PermitExpirableSemaphore)
Redisson的可過期性信號量(PermitExpirableSemaphore)實在RSemaphore物件的基礎上,為每個信號增加了一個過期時間,每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放,
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個信號,有效期只有2秒鐘,
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
8.閉鎖/倒數閂(CountDownLatch)
Redisson的分布式閉鎖(CountDownLatch)Java物件RCountDownLatch采用了與java.util.concurrent.CountDownLatch相似的介面和用法,
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他執行緒或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
redis分布式鎖的高可用
關于Redis分布式鎖的高可用問題,大致如下:
在master- slave的集群架構中,就是如果你對某個redis master實體,寫入了DISLOCK這種鎖key的value,此時會異步復制給對應的master slave實體,
但是,這個程序中一旦發生redis master宕機,主備切換,redis slave變為了redis master,而此時的主從復制沒有徹底完成…
接著就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖,
此時就會導致多個客戶端對一個分布式鎖完成了加鎖,
這時系統在業務語意上一定會出現問題,導致臟資料的產生,
所以這個是是redis master-slave架構的主從異步復制導致的redis分布式鎖的最大缺陷:
在redis master實體宕機的時候,可能導致多個客戶端同時完成加鎖,
高可用的RedLock(紅鎖)原理
RedLock演算法思想:
不能只在一個redis實體上創建鎖,應該是在多個redis實體上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實體上加鎖而帶來的問題,
這個場景是假設有一個 redis cluster,有 5 個 redis master 實體,然后執行如下步驟獲取一把紅鎖:
- 獲取當前時間戳,單位是毫秒;
- 跟上面類似,輪流嘗試在每個 master 節點上創建鎖,過期時間較短,一般就幾十毫秒;
- 嘗試在大多數節點上建立一個鎖,比如 5 個節點就要求是 3 個節點 n / 2 + 1;
- 客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了;
- 要是鎖建立失敗了,那么就依次之前建立過的鎖洗掉;
- 只要別人建立了一把分布式鎖,你就得不斷輪詢去嘗試獲取鎖,

RedLock是基于redis實作的分布式鎖,它能夠保證以下特性:
-
互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖:
-
當客戶端拿到鎖后,即使發生了網路磁區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)
-
容錯性:只要多數節點的redis實體正常運行,就能夠對外提供服務,加鎖或者釋放鎖;
以sentinel模式架構為例,如下圖所示,有sentinel-1,sentinel-2,sentinel-3總計3個sentinel模式集群,如果要獲取分布式鎖,那么需要向這3個sentinel集群通過EVAL命令執行LUA腳本,需要3/2+1=2,即至少2個sentinel集群回應成功,才算成功的以Redlock演算法獲取到分布式鎖:

高可用的紅鎖會導致性能降低
提前說明,使用redis分布式鎖,是追求高性能, 在cap理論中,追求的是 ap 而不是cp,
所以,如果追求高可用,建議使用 zookeeper分布式鎖,
redis分布式鎖可能導致的資料不一致性,建議使用業務補償的方式去彌補,所以,不太建議使用紅鎖,但是從學習的層面來說,大家還是一定要掌握的,
實作原理
Redisson中有一個MultiLock的概念,可以將多個鎖合并為一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖
而Redisson中實作RedLock就是基于MultiLock 去做的,接下來就具體看看對應的實作吧
RedLock使用案例
先看下官方的代碼使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);
// traditional lock method
redLock.lock();
// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
redLock.unlock();
}
}
這里是分別對3個redis實體加鎖,然后獲取一個最后的加鎖結果,
RedissonRedLock實作原理
上面示例中使用redLock.lock()或者tryLock()最終都是執行RedissonRedLock中方法,
RedissonRedLock 繼承自RedissonMultiLock, 實作了其中的一些方法:
public class RedissonRedLock extends RedissonMultiLock {
public RedissonRedLock(RLock... locks) {
super(locks);
}
/**
* 鎖可以失敗的次數,鎖的數量-鎖成功客戶端最小的數量
*/
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
/**
* 鎖的數量 / 2 + 1,例如有3個客戶端加鎖,那么最少需要2個客戶端加鎖成功
*/
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
/**
* 計算多個客戶端一起加鎖的超時時間,每個客戶端的等待時間
* remainTime默認為4.5s
*/
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}
@Override
public void unlock() {
unlockInner(locks);
}
}
看到locks.size()/2 + 1 ,例如我們有3個客戶端實體,那么最少2個實體加鎖成功才算分布式鎖加鎖成功,
接著我們看下lock()的具體實作
RedissonMultiLock實作原理
public class RedissonMultiLock implements Lock {
final List<RLock> locks = new ArrayList<RLock>();
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
// 如果等待時間設定了,那么將等待時間 * 2
newLeaseTime = unit.toMillis(waitTime)*2;
}
// time為當前時間戳
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// 計算鎖的等待時間,RedLock中:如果remainTime=-1,那么lockWaitTime為1
long lockWaitTime = calcLockWaitTime(remainTime);
// RedLock中failedLocksLimit即為n/2 + 1
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
// 回圈每個redis客戶端,去獲取鎖
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 呼叫tryLock方法去獲取鎖,如果獲取鎖成功,則lockAcquired=true
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
lockAcquired = false;
}
// 如果獲取鎖成功,將鎖加入到list集合中
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
// 如果獲取鎖失敗,判斷失敗次數是否等于失敗的限制次數
// 比如,3個redis客戶端,最多只能失敗1次
// 這里locks.size = 3, 3-x=1,說明只要成功了2次就可以直接break掉回圈
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
// 如果最大失敗次數等于0
if (failedLocksLimit == 0) {
// 釋放所有的鎖,RedLock加鎖失敗
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// 重置迭代器 重試再次獲取鎖
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 失敗的限制次數減一
// 比如3個redis實體,最大的限制次數是1,如果遍歷第一個redis實體,失敗了,那么failedLocksLimit會減成0
// 如果failedLocksLimit就會走上面的if邏輯,釋放所有的鎖,然后回傳false
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
}
核心代碼都已經加了注釋,實作原理其實很簡單,基于RedLock思想,遍歷所有的Redis客戶端,然后依次加鎖,最后統計成功的次數來判斷是否加鎖成功,
文章核心內容和原始碼來源
圖書:《Netty Zookeeper Redis 高并發實戰》 圖書簡介 - 瘋狂創…
參考檔案:
圖書:《Netty Zookeeper Redis 高并發實戰》 圖書簡介 - 瘋狂創…
Distributed locks with Redis
how-to-do-distributed-locking
redisson watchdog 使用和原理
zookeeper實作分布式鎖_java_腳本之家
基于Zookeeper 的分布式鎖實作 - SegmentFault 思否
分布式鎖用 Redis 還是 Zookeeper - 知乎
ZooKeeper分布式鎖的實作原理 - 菜鳥奮斗史 - 博客園
https://blog.csdn.net/men_wen/article/details/72853078
本文的問題交流:瘋狂創客圈社群

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/283211.html
標籤:其他





