主頁 > 軟體設計 > Redis分布式鎖(圖解 - 秒懂 - 史上最全)

Redis分布式鎖(圖解 - 秒懂 - 史上最全)

2021-05-07 11:46:40 軟體設計

文章很長,建議收藏起來,慢慢讀! 高并發 發燒友社群:瘋狂創客圈 為小伙伴奉上以下珍貴的學習資源:

  • 瘋狂創客圈 經典升級 : 極致經典 《 Java 高并發 三部曲 》 面試必備 + 大廠必備 + 漲薪必備

  • 瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高并發實戰》 面試必備 + 大廠必備 +漲薪必備 免費領

  • 瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高并發核心編程》 面試必備 + 大廠必備 + 漲薪必備 免費領

  • 瘋狂創客圈 資源寶庫: Java 必備 百度網盤資源大合集 價值>1000元 【免費取 】


推薦: 瘋狂創客圈 必看/必收/高質量/博文

史上最全 分布式鎖 2 大篇圖解 + 史上最全 + 吐血推薦
1:Redis 分布式鎖 (圖解-秒懂-史上最全)2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全)

史上最全 Java 面試題 21 個專題阿里、京東、美團、頭條… 隨意挑、橫著走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦)2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦)4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分布式事務面試題 (史上最全、持續更新、吐血推薦)一致性協議 (史上最全)
29、多執行緒面試題(史上最全)30、HR面經,過五關斬六將后,小心陰溝翻船!
9.網路協議面試題(史上最全、持續更新、吐血推薦)更多專題, 請參見【 瘋狂創客圈 高并發 總目錄 】

分布式 高并發 必讀 的精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
Zookeeper 分布式鎖 (圖解+秒懂+史上最全) Webflux(史上最全)
SpringCloud gateway (史上最全)TCP/IP(圖解+秒懂+史上最全)
10分鐘看懂, Java NIO 底層原理Feign原理 (圖解)
大廠必備之:Reactor模式更多精彩博文 … 請參見【 瘋狂創客圈 高并發 總目錄 】

跨JVM的執行緒安全問題

在單體的應用開發場景中,在多執行緒的環境下,涉及并發同步的時候,為了保證一個代碼塊在同一時間只能由一個執行緒訪問,我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式,

也就是說,在同一個JVM內部,大家往往采用synchronized或者Lock的方式來解決多執行緒間的安全問題,但在分布式集群作業的開發場景中,在JVM之間,那么就需要一種更加高級的鎖機制,來處理種跨JVM行程之間的執行緒安全問題.

解決方案是:使用分布式鎖

總之,對于分布式場景,我們可以使用分布式鎖,它是控制分布式系統之間互斥訪問共享資源的一種方式,

比如說在一個分布式系統中,多臺機器上部署了多個服務,當客戶端一個用戶發起一個資料插入請求時,如果沒有分布式鎖機制保證,那么那多臺機器上的多個服務可能進行并發插入操作,導致資料重復插入,對于某些不允許有多余資料的業務來說,這就會造成問題,而分布式鎖機制就是為了解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占了分布式鎖,其他服務沒獲取到鎖,就不進行后續操作,

大致意思如下圖所示(不一定準確):

在這里插入圖片描述

何為分布式鎖?

何為分布式鎖?

  • 當在分布式模型下,資料只有一份(或有限制),此時需要利用鎖的技術控制某一時刻修改資料的行程數,
  • 用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識,

分布式鎖的條件:

  • 互斥性,在任意時刻,只有一個客戶端能持有鎖,
  • 不會發生死鎖,即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖,
  • 具有容錯性,只要大部分的 Redis 節點正常運行,客戶端就可以加鎖和解鎖,
  • 解鈴還須系鈴人,加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了,

分布式鎖的實作:

分布式鎖的實作由很多種,檔案鎖、資料庫、redis等等,比較多;分布式鎖常見的多種實作方式:

  1. 資料庫悲觀鎖、
  2. 資料庫樂觀鎖;
  3. 基于Redis的分布式鎖;
  4. 基于ZooKeeper的分布式鎖,

在實踐中,還是redis做分布式鎖性能會高一些


資料庫悲觀鎖

所謂悲觀鎖,悲觀鎖是對資料被的修改持悲觀態度(認為資料在被修改的時候一定會存在并發問題),因此在整個資料處理程序中將資料鎖定,

悲觀鎖的實作,往往依靠資料庫提供的鎖機制(也只有資料庫層提供的鎖機制才能真正保證資料訪問的排他性,否則,即使在應用層中實作了加鎖機制,也無法保證外部系統不會修改資料),

資料庫的行鎖、表鎖、排他鎖等都是悲觀鎖,這里以行鎖為例,進行介紹,以我們常用的MySQL為例,我們通過使用select…for update陳述句, 執行該陳述句后,會在表上加持行鎖,一直到事務提交,解除行鎖,

使用場景舉例:

在秒殺案例中,生成訂單和扣減庫存的操作,可以通過商品記錄的行鎖,進行保護,們通過使用select…for update陳述句,在查詢商品表庫存時將該條記錄加鎖,待下單減庫存完成后,再釋放鎖,

示例的SQL如下:

//0.開始事務
begin; 
	
//1.查詢出商品資訊

select stockCount from seckill_good where id=1 for update;

//2.根據商品資訊生成訂單

insert into seckill_order (id,good_id) values (null,1);

//3.修改商品stockCount減一

update seckill_good set stockCount=stockCount-1 where id=1;

//4.提交事務

commit;


以上,在對id = 1的記錄修改前,先通過for update的方式進行加鎖,然后再進行修改,這就是比較典型的悲觀鎖策略,

如果以上修改庫存的代碼發生并發,同一時間只有一個執行緒可以開啟事務并獲得id=1的鎖,其它的事務必須等本次事務提交之后才能執行,這樣我們可以保證當前的資料不會被其它事務修改,

我們使用select_for_update,另外一定要寫在事務中.

注意:要使用悲觀鎖,我們必須關閉mysql資料庫中自動提交的屬性,命令set autocommit=0;即可關閉,因為MySQL默認使用autocommit模式,也就是說,當你執行一個更新操作后,MySQL會立刻將結果進行提交,

悲觀鎖的實作,往往依靠資料庫提供的鎖機制,在資料庫中,悲觀鎖的流程如下:

  • 在對記錄進行修改前,先嘗試為該記錄加上排他鎖(exclusive locking),
  • 如果加鎖失敗,說明該記錄正在被修改,那么當前查詢可能要等待或者拋出例外,具體回應方式由開發者根據實際需要決定,
  • 如果成功加鎖,那么就可以對記錄做修改,事務完成后就會解鎖了,
  • 其間如果有其他事務對該記錄做加鎖的操作,都要等待當前事務解鎖或直接拋出例外,

資料庫樂觀鎖

使用樂觀鎖就不需要借助資料庫的鎖機制了,

樂觀鎖的概念中其實已經闡述了他的具體實作細節:主要就是兩個步驟:沖突檢測和資料更新,其實作方式有一種比較典型的就是Compare and Swap(CAS)技術

CAS是項樂觀鎖技術,當多個執行緒嘗試使用CAS同時更新同一個變數時,只有其中一個執行緒能更新變數的值,而其它執行緒都失敗,失敗的執行緒并不會被掛起,而是被告知這次競爭中失敗,并可以再次嘗試,

CAS的實作中,在表中增加一個version欄位,操作前先查詢version資訊,在資料提交時檢查version欄位是否被修改,如果沒有被修改則進行提交,否則認為是過期資料,

比如前面的扣減庫存問題,通過樂觀鎖可以實作如下:

//1.查詢出商品資訊
			
select stockCount, version from seckill_good where id=1;
			
//2.根據商品資訊生成訂單
insert into seckill_order (id,good_id) values (null,1);

//3.修改商品庫存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;

以上,我們在更新之前,先查詢一下庫存表中當前版本(version),然后在做update的時候,以version 作為一個修改條件,

當我們提交更新的時候,判斷資料庫表對應記錄的當前version與第一次取出來的version進行比對,如果資料庫表當前version與第一次取出來的version相等,則予以更新,否則認為是過期資料,

CAS 樂觀鎖有兩個問題:

(1) CAS 存在一個比較重要的問題,即ABA問題. 解決的辦法是version欄位順序遞增,

(2) 樂觀鎖的方式,在高并發時,只有一個執行緒能執行成功,會造成大量的失敗,這給用戶的體驗顯然是很不好的,


Zookeeper分布式鎖

除了在資料庫層面加分布式鎖,通常還可以使用以下更高性能、更高可用的分布式鎖:

  • 分布式快取(如redis)鎖
  • 分布式協調(如zookeeper)鎖

有關zookeeper分布式鎖的原理和實作,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)

或者閱讀筆者的《Java高并發核心編程(卷1)》

在這里插入圖片描述


Redis分布式鎖

本文重點介紹Redis分布式鎖,分為兩個維度進行介紹:

(1)基于Jedis手工造輪子分布式鎖

(2)介紹Redission 分布式鎖的使用和原理,

分布式鎖一般有如下的特點:

  • 互斥性: 同一時刻只能有一個執行緒持有鎖
  • 可重入性: 同一節點上的同一個執行緒如果獲取了鎖之后能夠再次獲取鎖
  • 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
  • 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效
  • 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒

手工造輪子:基于Jedis 的API實作分布式鎖

我們首先講解 Jedis 普通分布式鎖實作,并且是純手工的模式,從最為基礎的Redis命令開始,

只有充分了解與分布式鎖相關的普通Redis命令,才能更好的了解高級的Redis分布式鎖的實作,因為高級的分布式鎖的實作完全基于普通Redis命令,

Redis幾種架構

Redis發展到現在,幾種常見的部署架構有:

  • 單機模式;
  • 主從模式;
  • 哨兵模式;
  • 集群模式;

從分布式鎖的角度來說, 無論是單機模式、主從模式、哨兵模式、集群模式,其原理都是類同的, 只是主從模式、哨兵模式、集群模式的更加的高可用、或者更加高并發,

所以,接下來先基于單機模式,基于Jedis手工造輪子實作自己的分布式鎖,

首先看兩個命令:

Redis分布式鎖機制,主要借助setnx和expire兩個命令完成,

setnx命令:

SETNX 是SET if Not eXists的簡寫,將 key 的值設為 value,當且僅當 key 不存在; 若給定的 key 已經存在,則 SETNX 不做任何動作,

下面為客戶端使用示例:

127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379> 

expire命令:

expire命令為 key 設定生存時間,當 key 過期時(生存時間為 0 ),它會被自動洗掉. 其格式為:

EXPIRE key seconds

下面為客戶端使用示例:

127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8

基于Jedis API的分布式鎖的總體流程:

通過Redis的setnx、expire命令可以實作簡單的鎖機制:

  • key不存在時創建,并設定value和過期時間,回傳值為1;成功獲取到鎖;
  • 如key存在時直接回傳0,搶鎖失敗;
  • 持有鎖的執行緒釋放鎖時,手動洗掉key; 或者過期時間到,key自動洗掉,鎖釋放,

執行緒呼叫setnx方法成功回傳1認為加鎖成功,其他執行緒要等到當前執行緒業務操作完成釋放鎖后,才能再次呼叫setnx加鎖成功,

在這里插入圖片描述

以上簡單redis分布式鎖的問題:

如果出現了這么一個問題:如果setnx是成功的,但是expire設定失敗,一旦出現了釋放鎖失敗,或者沒有手工釋放,那么這個鎖永遠被占用,其他執行緒永遠也搶不到鎖,

所以,需要保障setnx和expire兩個操作的原子性,要么全部執行,要么全部不執行,二者不能分開,

解決的辦法有兩種:

  • 使用set的命令時,同時設定過期時間,不再單獨使用 expire命令
  • 使用lua腳本,將加鎖的命令放在lua腳本中原子性的執行

簡單加鎖:使用set的命令時,同時設定過期時間

使用set的命令時,同時設定過期時間的示例如下:

127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379> 
127.0.0.1:6379> set test "111" EX 100 NX
OK

這樣就完美的解決了分布式鎖的原子性; set 命令的完整格式:

set key value [EX seconds] [PX milliseconds] [NX|XX]


EX seconds:設定失效時長,單位秒
PX milliseconds:設定失效時長,單位毫秒
NX:key不存在時設定value,成功回傳OK,失敗回傳(nil)
XX:key存在時設定value,成功回傳OK,失敗回傳(nil)

使用set命令實作加鎖操作,先展示加鎖的簡單代碼實習,再帶大家慢慢解釋為什么這樣實作,

加鎖的簡單代碼實作

package com.crazymaker.springcloud.standard.lock;


@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

    private  RedisTemplate redisTemplate;

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 嘗試獲取分布式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @param expireTime 超期時間
     * @return 是否獲取成功
     */
    public static   boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

  

}

可以看到,我們加鎖用到了Jedis的set Api

jedis.set(String key, String value, String nxxx, String expx, int time)

這個set()方法一共有五個形參:

  • 第一個為key,我們使用key來當鎖,因為key是唯一的,

  • 第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據,

    requestId可以使用UUID.randomUUID().toString()方法生成,

  • 第三個為nxxx,這個引數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;

  • 第四個為expx,這個引數我們傳的是PX,意思是我們要給這個key加一個過期的設定,具體時間由第五個引數決定,

  • 第五個為time,與第四個引數相呼應,代表key的過期時間,

總的來說,執行上面的set()方法就只會導致兩種結果:

  1. 當前沒有鎖(key不存在),那么就進行加鎖操作,并對鎖設定個有效期,同時value表示加鎖的客戶端,
  2. 已有鎖存在,不做任何操作,

心細的童鞋就會發現了,我們的加鎖代碼滿足前面描述的四個條件中的三個,

  • 首先,set()加入了NX引數,可以保證如果已有key存在,則函式不會呼叫成功,也就是只有一個客戶端能持有鎖,滿足互斥性,

  • 其次,由于我們對鎖設定了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即key被洗掉),不會被永遠占用(而發生死鎖),

  • 最后,因為我們將value賦值為requestId,代表加鎖的客戶端請求標識,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端,

  • 由于我們只考慮Redis單機部署的場景,所以容錯性我們暫不考慮,

基于Jedis 的API實作簡單解鎖代碼

還是先展示代碼,再帶大家慢慢解釋為什么這樣實作,

解鎖的簡單代碼實作

package com.crazymaker.springcloud.standard.lock;


@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

  

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 釋放分布式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @return 是否釋放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

}

那么這段Lua代碼的功能是什么呢?

其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則洗掉鎖(解鎖),

第一行代碼,我們寫了一個簡單的Lua腳本代碼,

第二行代碼,我們將Lua代碼傳到jedis.eval()方法里,并使引數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId,eval()方法是將Lua代碼交給Redis服務端執行,

那么為什么要使用Lua語言來實作呢?

因為要確保上述操作是原子性的,那么為什么執行eval()方法可以確保原子性,源于Redis的特性.

簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,并且直到eval命令執行完成,Redis才會執行其他命

錯誤示例1

最常見的解鎖代碼就是直接使用 jedis.del() 方法洗掉鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的,

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
    jedis.del(lockKey);
}

錯誤示例2

這種解鎖代碼乍一看也是沒問題,甚至我之前也差點這樣實作,與正確姿勢差不多,唯一區別的是分成兩條命令去執行,代碼如下:

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
        
    // 判斷加鎖與解鎖是不是同一個客戶端
    if (requestId.equals(jedis.get(lockKey))) {
        // 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
        jedis.del(lockKey);
    }

}

再造輪子:基于Lua腳本實作分布式鎖

lua腳本的好處

前面提到,在redis中執行lua腳本,有如下的好處:

那么為什么要使用Lua語言來實作呢?

因為要確保上述操作是原子性的,那么為什么執行eval()方法可以確保原子性,源于Redis的特性.

簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,并且直到eval命令執行完成,Redis才會執行其他命

所以:

大部分的開源框架(如 redission)中的分布式鎖組件,都是用純lua腳本實作的,

題外話: lua腳本是高并發、高性能的必備腳本語言

有關lua的詳細介紹,請參見以下書籍:

在這里插入圖片描述

那么,我們也來模擬一下

基于純Lua腳本的分布式鎖的執行流程

加鎖和洗掉鎖的操作,使用純lua進行封裝,保障其執行時候的原子性,

基于純Lua腳本實作分布式鎖的執行流程,大致如下:

在這里插入圖片描述

加鎖的Lua腳本: lock.lua

--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then
    --PEXPIRE:以毫秒的形式指定過期時間
    redis.call('pexpire', key, ttl)
else
    result = -1;
    -- 如果value相同,則認為是同一個執行緒的請求,則認為重入鎖
    local value = redis.call('get', key)
    if (value == requestId) then
        result = 1;
        redis.call('pexpire', key, ttl)
    end
end
--  如果獲取鎖成功,則回傳 1
return result

解鎖的Lua腳本: unlock.lua:

--- -1 failed
--- 1 success

-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId then
    redis.call('del', key);
    return 1;
end
return -1

兩個檔案,放在資源檔案夾下備用:

在這里插入圖片描述

在Java中呼叫lua腳本,完成加鎖操作

下一步,實作Lock介面, 完成JedisLock的分布式鎖,

其加鎖操作,通過呼叫 lock.lua腳本完成,代碼如下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

    private RedisTemplate redisTemplate;

    RedisScript<Long> lockScript = null;
    RedisScript<Long> unLockScript = null;

    public static final int DEFAULT_TIMEOUT = 2000;
    public static final Long LOCKED = Long.valueOf(1);
    public static final Long UNLOCKED = Long.valueOf(1);
    public static final Long WAIT_GAT = Long.valueOf(200);
    public static final int EXPIRE = 2000;


    String key;
    String lockValue;  // lockValue 鎖的value ,代表執行緒的uuid

    /**
     * 默認為2000ms
     */
    long expire = 2000L;

    public JedisLock(String lockKey, String lockValue) {
        this.key = lockKey;
        this.lockValue = lockValue;
    }

    private volatile boolean isLocked = false;

    private Thread thread;

    /**
     * 獲取一個分布式鎖 , 超時則回傳失敗
     *
     * @return 獲鎖成功 - true | 獲鎖失敗 - false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        //本地可重入
        if (isLocked && thread == Thread.currentThread()) {
            return true;
        }
        expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
        long startMillis = System.currentTimeMillis();
        Long millisToWait = expire;

        boolean localLocked = false;

        int turn = 1;
        while (!localLocked) {

            localLocked = this.lockInner(expire);
            if (!localLocked) {
                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                startMillis = System.currentTimeMillis();
                if (millisToWait > 0L) {
                    /**
                     * 還沒有超時
                     */
                    ThreadUtil.sleepMilliSeconds(WAIT_GAT);
                    log.info("睡眠一下,重新開始,turn:{},剩余時間:{}", turn++, millisToWait);
                } else {
                    log.info("搶鎖超時");
                    return false;
                }
            } else {
                isLocked = true;
                localLocked = true;
            }
        }
        return isLocked;
    }


  
    /**
     * 有回傳值的搶奪鎖
     *
     * @param millisToWait
     */
    public boolean lockInner(Long millisToWait) {
        if (null == key) {
            return false;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(lockValue);
            redisKeys.add(String.valueOf(millisToWait));
            Long res = (Long) redisTemplate.execute(lockScript, redisKeys);

            return res != null && res.equals(LOCKED);
        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("搶鎖失敗").build();
        }

    }

   
}

在Java中呼叫lua腳本,完成解鎖操作

其解鎖操作,通過呼叫unlock.lua腳本完成,代碼如下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

    private RedisTemplate redisTemplate;

    RedisScript<Long> lockScript = null;
    RedisScript<Long> unLockScript = null;

    //釋放鎖
    @Override
    public void unlock() {
        if (key == null || requestId == null) {
            return;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(requestId);
            Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);

        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("釋放鎖失敗").build();

        }
    }
  
   
}

撰寫RedisLockService用于管理JedisLock

撰寫個分布式鎖服務,用于加載lua腳本,創建 分布式鎖,代碼如下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
public class RedisLockService
{

    private RedisTemplate redisTemplate;

    static String lockLua = "script/lock.lua";
    static String unLockLua = "script/unlock.lua";
    static RedisScript<Long> lockScript = null;
    static RedisScript<Long> unLockScript = null;
    {
        String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
//        String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+lockLua);
        }

        lockScript = new DefaultRedisScript<>(script, Long.class);

//        script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
        script =  IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+unLockLua);
        }
        unLockScript = new DefaultRedisScript<>(script, Long.class);

    }

    public RedisLockService(RedisTemplate redisTemplate)
    {
        this.redisTemplate = redisTemplate;
    }


    public Lock getLock(String lockKey, String lockValue) {
        JedisLock lock=new JedisLock(lockKey,lockValue);
        lock.setRedisTemplate(redisTemplate);
        lock.setLockScript(lockScript);
        lock.setUnLockScript(unLockScript);
        return lock;
    }
}

測驗用例

接下來,終于可以上測驗用例了

package com.crazymaker.springcloud.lock;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定啟動類
public class RedisLockTest {

    @Resource
    RedisLockService redisLockService;

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLock() {
        int threads = 10;
        final int[] count = {0};
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                String lockValue = UUID.randomUUID().toString();

                try {
                    Lock lock = redisLockService.getLock("test:lock:1", lockValue);
                    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);

                    if (locked) {
                        for (int j = 0; j < 1000; j++) {
                            count[0]++;
                        }

                        log.info("count = " + count[0]);
                        lock.unlock();
                    } else {
                        System.out.println("搶鎖失敗");
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("10個執行緒每個累加1000為: = " + count[0]);
        //輸出統計結果
        float time = System.currentTimeMillis() - start;

        System.out.println("運行的時長為(ms):" + time);
        System.out.println("每一次執行的時長為(ms):" + time / count[0]);

    }

}

執行用例,結果如下:

2021-05-04 23:02:11.900  INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest       LN:50 count = 6000
2021-05-04 23:02:11.901  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:11.902  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest       LN:50 count = 7000
2021-05-04 23:02:12.100  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9586
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest       LN:50 count = 8000
2021-05-04 23:02:12.102  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest       LN:50 count = 9000
2021-05-04 23:02:12.304  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:4,剩余時間:9383
2021-05-04 23:02:12.307  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest       LN:50 count = 10000
10個執行緒每個累加1000為: = 10000
運行的時長為(ms):827.0
每一次執行的時長為(ms):0.0827

STW導致的鎖過期問題

下面有一個簡單的使用鎖的例子,在10秒內占著鎖:

  //寫資料到檔案
function writeData(filename, data) {
    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
    if (!locked) {
        throw 'Failed to acquire lock';
    }

    try {
        //將資料寫到檔案
        var file = storage.readFile(filename);
        var updated = updateContents(file, data);
        storage.writeFile(filename, updated);
    } finally {
        lock.unlock();
    }
}

問題是:如果在寫檔案程序中,發生了 fullGC,并且其時間跨度較長, 超過了10秒, 那么,分布式就自動釋放了,

在此程序中,client2 搶到鎖,寫了檔案,

client1 的fullGC完成后,也繼續寫檔案,注意,此時client1 的并沒有占用鎖,此時寫入會導致檔案資料錯亂,發生執行緒安全問題,

這就是STW導致的鎖過期問題,

STW導致的鎖過期問題,具體如下圖所示:

在這里插入圖片描述

STW導致的鎖過期問題,大概的解決方案,有:

1: 模擬CAS樂觀鎖的方式,增加版本號

2:watch dog自動延期機制

1: 模擬CAS樂觀鎖的方式,增加版本號(如下圖中的token)

c

此方案如果要實作,需要調整業務邏輯,與之配合,所以會入侵代碼,

2:watch dog自動延期機制

客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間,

redission,采用的就是這種方案, 此方案不會入侵業務代碼,

為啥推薦使用Redission

作為 Java 開發人員,我們若想在程式中集成 Redis,必須使用 Redis 的第三方庫,目前大家使用的最多的第三方庫是jedis,

和SpringCloud gateway一樣,Redisson也是基于Netty實作的,是更高性能的第三方庫, 所以,這里推薦大家使用Redission替代 jedis,

在使用Redission之前,建議大家先掌握Netty的知識,

推薦大家閱讀被很多小伙伴評價為史上最為易懂的NIO、Netty書籍:《Java高并發核心編程(卷1)》

在這里插入圖片描述


Redisson簡介

Redisson是一個在Redis的基礎上實作的Java駐記憶體資料網格(In-Memory Data Grid),它不僅提供了一系列的分布式的Java常用物件,還實作了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務,

img

Redisson提供了使用Redis的最簡單和最便捷的方法,Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上,

img

Redisson與Jedis的對比

1.概況對比

Jedis是Redis的java實作的客戶端,其API提供了比較全面的的Redis命令的支持,Redisson實作了分布式和可擴展的的java資料結構,和Jedis相比,功能較為簡單,不支持字串操作,不支持排序,事物,管道,磁區等Redis特性,Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中的放在處理業務邏輯上,

2.可伸縮性

Jedis使用阻塞的I/O,且其方法呼叫都是同步的,程式流程要等到sockets處理完I/O才能執行,不支持異步,Jedis客戶端實體不是執行緒安全的,所以需要通過連接池來使用Jedis,

Redisson使用非阻塞的I/O和基于Netty框架的事件驅動的通信層,其方法呼叫時異步的,Redisson的API是執行緒安全的,所以操作單個Redisson連接來完成各種操作,

3.第三方框架整合

Redisson在Redis的基礎上實作了java快取標準規范;Redisson還提供了Spring Session回話管理器的實作,

Redission 的原始碼地址:

官網: https://redisson.org/

github: https://github.com/redisson/redisson#quick-start

特性 & 功能:

  • 支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式

  • 程式介面呼叫方式采用異步執行和異步流執行兩種方式

  • 資料序列化,Redisson 的物件編碼類是用于將物件進行序列化和反序列化,以實作對該物件在 Redis 里的讀取和存盤

  • 單個集合資料分片,在集群模式下,Redisson 為單個 Redis 集合型別提供了自動分片的功能

  • 提供多種分布式物件,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等

  • 提供豐富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等

  • 分布式鎖和同步器的實作,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等

  • 提供先進的分布式服務,如分布式遠程服務(Remote Service),分布式實時物件(Live Object)服務,分布式執行服務(Executor Service),分布式調度任務服務(Schedule Service)和分布式映射歸納服務(MapReduce)


Redisson的使用

如何安裝 Redisson

安裝 Redisson 最便捷的方法是使用 Maven 或者 Gradle:

?Maven

<dependency>	
    <groupId>org.redisson</groupId>	
    <artifactId>redisson</artifactId>	
    <version>3.11.4</version>	
</dependency>

?Gradle

compile group: 'org.redisson', name: 'redisson', version: '3.11.4'

目前 Redisson 最新版是 3.11.4,當然你也可以通過搜索 Maven 中央倉庫 mvnrepository[1] 來找到 Redisson 的各種版本,

獲取RedissonClient物件

RedissonClient有多種模式,主要的模式有:

  • 單節點模式

  • 哨兵模式

  • 主從模式

  • 集群模式

首先介紹單節點模式,

單節點模式的程式化配置方法,大致如下:

Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();

SingleServerConfig類的設定引數如下

address(節點地址)

可以通過host:port的格式來指定節點地址,

subscriptionConnectionMinimumIdleSize(發布和訂閱連接的最小空閑連接數)

默認值:1

用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,

subscriptionConnectionPoolSize(發布和訂閱連接池大小)

默認值:50

用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

connectionMinimumIdleSize(最小空閑連接數)

默認值:32

最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,

connectionPoolSize(連接池大小)

默認值:64

連接池最大容量,連接池的連接數量自動彈性伸縮,

dnsMonitoring(是否啟用DNS監測)

默認值:false

在啟用該功能以后,Redisson將會監測DNS的變化情況,

dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)

默認值:5000

監測DNS的變化情況的時間間隔,

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同節點建立連接時的等待超時,時間單位是毫秒,

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間,該時間從命令發送成功時開始計時,

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,

database(資料庫編號)

默認值:0

嘗試連接的資料庫編號,

password(密碼)

默認值:null

用于節點身份驗證的密碼,

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量,

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱,

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力,

sslProvider(SSL實作方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑,

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼,

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑,

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼,

SpringBoot整合Redisson

Redisson有多種模式,首先介紹單機模式的整合,

一、匯入Maven依賴

<!-- redisson-springboot -->
   <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson-spring-boot-starter</artifactId>
       <version>3.11.4</version>
   </dependency>

二、核心組態檔

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 5000

三、添加配置類

RedissonConfig.java

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        config.useSingleServer().setDatabase(3);
        return Redisson.create(config);
    }

}

自定義starter

由于redission可以有多種模式,處于學習的目的,將多種模式封裝成一個start,可以學習一下starter的制作,

在這里插入圖片描述

封裝一個RedissonManager,通過策略模式,根據不同的配置型別,創建 RedissionConfig實體,然后創建RedissionClient物件,

在這里插入圖片描述

使用RBucket操作分布式物件

Redission模擬了Java的面向物件編程思想,可以簡單理解為一切皆為物件,

每一個 Redisson 物件 實作了RObject and RExpirable 兩個interfaces.

Usage example:

RObject object = redisson.get...()

object.sizeInMemory();

object.delete();

object.rename("newname");

object.isExists();

// catch expired event
object.addListener(new ExpiredObjectListener() {
   ...
});

// catch delete event
object.addListener(new DeletedObjectListener() {
   ...
});

每一個Redisson 物件的名字,就是 Redis中的 Key.

RMap map = redisson.getMap("mymap");
map.getName(); // = mymap

可以通過 RKeys 介面操作Redis中的keys.

Usage example:

RKeys keys = redisson.getKeys();

Iterable<String> allKeys = keys.getKeys();

Iterable<String> foundedKeys = keys.getKeysByPattern('key*');

long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");

long deletedKeysAmount = keys.deleteByPattern("test?");

String randomKey = keys.randomKey();

long keysAmount = keys.count();

keys.flushall();

keys.flushdb();

Redisson通過RBucket介面代表可以訪問任何型別的基礎物件,或者普通物件

RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用于設值/取值/獲取尺寸,

RBucket普通物件的最大大小,為512兆位元組

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

下面是一個完整的實體:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

   @Test
    public void testRBucketExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RList 繼承了 java.util.List 介面
        RBucket<String> rstring  = client.getBucket("redission:test:bucket:string");
        rstring.set("this is a string");

        RBucket<UserDTO> ruser  = client.getBucket("redission:test:bucket:user");
        UserDTO dto = new UserDTO();
        dto.setToken(UUID.randomUUID().toString());
        ruser.set(dto);
        System.out.println("string is: " + rstring.get());
        System.out.println("dto is: " + ruser.get());

        client.shutdown();
    }


}

運行上面的代碼時,可以獲得以下輸出:

string is: this is a string
dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)

在這里插入圖片描述

使用 RList 操作 Redis 串列

下面的代碼簡單演示了如何在 Redisson 中使用 RList 物件,RList 是 Java 的 List 集合的分布式并發實作,

考慮以下代碼:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testListExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RList 繼承了 java.util.List 介面
        RList<String> nameList = client.getList("redission:test:nameList");
        nameList.clear();
        nameList.add("張三");
        nameList.add("李四");
        nameList.add("王五");
        nameList.remove(-1);

        System.out.println("List size: " + nameList.size());


        boolean contains = nameList.contains("李四");
        System.out.println("Is list contains name '李四': " + contains);
        nameList.forEach(System.out::println);

        client.shutdown();
    }


}

運行上面的代碼時,可以獲得以下輸出:

List size: 2
Is list contains name '李四': true
張三
李四

在這里插入圖片描述

使用 RMap 操作 Redis 哈希

Redisson 還包括 RMap,它是 Java Map 集合的分布式并發實作,考慮以下代碼:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testListExamples() {
         // 默認連接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RMap 繼承了 java.util.concurrent.ConcurrentMap 介面
        RMap<String, Object> map = client.getMap("redission:test:personalMap");
        map.put("name", "張三");
        map.put("address", "北京");
        map.put("age", new Integer(50));

        System.out.println("Map size: " + map.size());

        boolean contains = map.containsKey("age");
        System.out.println("Is map contains key 'age': " + contains);
        String value = String.valueOf(map.get("name"));
        System.out.println("Value mapped by key 'name': " + value);

        client.shutdown();
    }


}

運行上面的代碼時,將會看到以下輸出:

Map size: 3
Is map contains key 'age': true
Value mapped by key 'name': 張三

在這里插入圖片描述

執行 Lua腳本

Lua是一種開源、簡單易學、輕量小巧的腳本語言,用標準C語言撰寫,

其設計的目的就是為了嵌入應用程式中,從而為應用程式提供靈活的擴展和定制功能,

Redis從2.6版本開始支持Lua腳本,Redis使用Lua可以:

  1. 原子操作,Redis會將整個腳本作為一個整體執行,不會被中斷,可以用來批量更新、批量插入
  2. 減少網路開銷,多個Redis操作合并為一個腳本,減少網路時延
  3. 代碼復用,客戶端發送的腳本可以存盤在Redis中,其他客戶端可以根據腳本的id呼叫,
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testLuaExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();

        redisson.getBucket("redission:test:foo").set("bar");
        String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
                "return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
        System.out.println("foo: " + r);

        // 通過預存的腳本進行同樣的操作
        RScript s = redisson.getScript();
        // 首先將腳本加載到Redis
        String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
        // 回傳值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
        System.out.println("sha1: " + sha1);
        // 再通過SHA值呼叫腳本
        Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
                sha1,
                RScript.ReturnType.VALUE,
                Collections.emptyList());
        try {
            System.out.println("res: " + r1.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        client.shutdown();
    }
}

運行上面的代碼時,將會看到以下輸出:

foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar

在這里插入圖片描述

使用 RLock 實作 Redis 分布式鎖

RLock 是 Java 中可重入鎖的分布式實作,下面的代碼演示了 RLock 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

 @Test
    public void testLockExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        // RLock 繼承了 java.util.concurrent.locks.Lock 介面
        RLock lock = redisson.getLock("redission:test:lock:1");

        final int[] count = {0};
        int threads = 10;
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                for (int j = 0; j < 1000; j++) {
                    lock.lock();

                    count[0]++;
                    lock.unlock();
                }
                countDownLatch.countDown();
            });
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10個執行緒每個累加1000為: = " + count[0]);
        //輸出統計結果
        float time = System.currentTimeMillis() - start;

        System.out.println("運行的時長為:" + time);
        System.out.println("每一次執行的時長為:" + time/count[0]);
    }

}

此代碼將產生以下輸出:

10個執行緒每個累加1000為: = 10000
運行的時長為:14172.0
每一次執行的時長為:1.4172

使用 RAtomicLong 實作 Redis 原子操作

RAtomicLong 是 Java 中 AtomicLong 類的分布式“替代品”,用于在并發環境中保存長值,以下示例代碼演示了 RAtomicLong 的用法:


public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 執行緒數
        final int threads = 10;
        // 每條執行緒的執行輪數
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代碼的輸出將是:


atomicLong: 10000

在這里插入圖片描述

整長型累加器(LongAdder)

基于Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的介面,通過利用客戶端內置的LongAdder物件,為分布式環境下遞增和遞減操作提供了很高得性能,據統計其性能最高比分布式AtomicLong物件快 12000 倍,

完美適用于分布式統計計量場景,下面是RLongAdder的使用案例:

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

以下示例代碼演示了 RLongAdder 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 執行緒數
        final int threads = 10;
        // 每條執行緒的執行輪數
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代碼將產生以下輸出:

longAdder: 10000
運行的時長為:5085.0
每一次執行的時長為:0.5085

當不再使用整長型累加器物件的時候應該自行手動銷毀,如果Redisson物件被關閉(shutdown)了,則不用手動銷毀,

RLongAdder atomicLong = ...
atomicLong.destroy();

序列化

Redisson的物件編碼類是用于將物件進行序列化和反序列化,以實作對該物件在Redis里的讀取和存盤,Redisson提供了以下幾種的物件編碼應用,以供大家選擇:

編碼類名稱說明
org.redisson.codec.JsonJacksonCodecJackson JSON 編碼 默認編碼
org.redisson.codec.AvroJacksonCodecAvro 一個二進制的JSON編碼
org.redisson.codec.SmileJacksonCodecSmile 另一個二進制的JSON編碼
org.redisson.codec.CborJacksonCodecCBOR 又一個二進制的JSON編碼
org.redisson.codec.MsgPackJacksonCodecMsgPack 再來一個二進制的JSON編碼
org.redisson.codec.IonJacksonCodecAmazon Ion 亞馬遜的Ion編碼,格式與JSON類似
org.redisson.codec.KryoCodecKryo 二進制物件序列化編碼
org.redisson.codec.SerializationCodecJDK序列化編碼
org.redisson.codec.FstCodecFST 10倍于JDK序列化性能而且100%兼容的編碼
org.redisson.codec.LZ4CodecLZ4 壓縮型序列化物件編碼
org.redisson.codec.SnappyCodecSnappy 另一個壓縮型序列化物件編碼
org.redisson.client.codec.JsonJacksonMapCodec基于Jackson的映射類使用的編碼,可用于避免序列化類的資訊,以及用于解決使用byte[]遇到的問題,
org.redisson.client.codec.StringCodec純字串編碼(無轉換)
org.redisson.client.codec.LongCodec純整長型數字編碼(無轉換)
org.redisson.client.codec.ByteArrayCodec位元組陣列編碼
org.redisson.codec.CompositeCodec用來組合多種不同編碼在一起

由Redisson默認的編碼器為二進制編碼器,為了序列化后的內容可見,需要使用Json文本序列化編碼工具類,Redisson提供了編碼器 JsonJacksonCodec,作為Json文本序列化編碼工具類,

問題是:JsonJackson在序列化有雙向參考的物件時,會出現無限回圈例外,而fastjson在檢查出雙向參考后會自動用參考符$ref替換,終止回圈,

所以,一些特殊場景中:用fastjson能 正常序列化到redis,而JsonJackson則拋出無限回圈例外,

為了序列化后的內容可見,所以不用redission其他自帶的,自行實作fastjson編碼器:

package com.crayon.distributedredissionspringbootstarter.codec;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import java.io.IOException;

public class FastjsonCodec extends BaseCodec {

    private final Encoder encoder = in -> {
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        try {
            ByteBufOutputStream os = new ByteBufOutputStream(out);
            JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
            return os.buffer();
        } catch (IOException e) {
            out.release();
            throw e;
        } catch (Exception e) {
            out.release();
            throw new IOException(e);
        }
    };

    private final Decoder<Object> decoder = (buf, state) ->
            JSON.parseObject(new ByteBufInputStream(buf), Object.class);

    @Override
    public Decoder<Object> getValueDecoder() {
        return decoder;
    }

    @Override
    public Encoder getValueEncoder() {
        return encoder;
    }
}

替換的方法如下:


 */
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {

    @Override
    public Config createRedissonConfig(RedissonConfig redissonConfig) {
        Config config = new Config();
        try {
            String address = redissonConfig.getAddress();
            String password = redissonConfig.getPassword();
            int database = redissonConfig.getDatabase();
            String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
            config.useSingleServer().setAddress(redisAddr);
            config.useSingleServer().setDatabase(database);
            //密碼可以為空
            if (!StringUtils.isEmpty(password)) {
                config.useSingleServer().setPassword(password);
            }
            log.info("初始化[單機部署]方式Config,redisAddress:" + address);

//            config.setCodec( new FstCodec());
            config.setCodec( new FastjsonCodec());
        } catch (Exception e) {
            log.error("單機部署 Redisson init error", e);
        }
        return config;
    }
}

哨兵模式

哨兵模式即sentinel模式,配置Redis哨兵服務的官方檔案在這里,

哨兵模式實作代碼和單機模式幾乎一樣,唯一的不同就是Config的構造.

程式化配置哨兵模式的方法如下

Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    // use "rediss://" for SSL connection
    .addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
    .addSentinelAddress("redis://127.0.0.1:26319");

RedissonClient redisson = Redisson.create(config);

Redisson的哨兵模式的使用方法如下:

SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig配置引數如下

配置Redis哨兵服務的官方檔案在這里,Redisson的哨兵模式的使用方法如下:SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig 類的設定引數如下:

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)

默認值:5000

用來指定檢查節點DNS變化的時間間隔,使用的時候應該確保JVM里的DNS資料的快取時間保持在足夠低的范圍才有意義,用-1來禁用該功能,

masterName(主服務器的名稱)

主服務器的名稱是哨兵行程中用來監測主從服務切換情況的,

addSentinelAddress(添加哨兵節點地址)

可以通過host:port的格式來指定哨兵節點的地址,多個節點可以一次性批量添加,

readMode(讀取操作的負載均衡模式)

默認值: SLAVE(只在從服務節點里讀取)

注:在從服務節點里讀取的資料說明已經至少有兩個節點保存了該資料,確保了資料的高可用性,

設定讀取操作選擇節點的模式,可用值為:SLAVE - 只在從服務節點里讀取,MASTER - 只在主服務節點里讀取,MASTER_SLAVE - 在主從服務節點里都可以讀取,

subscriptionMode(訂閱操作的負載均衡模式)

默認值:SLAVE(只在從服務節點里訂閱)

設定訂閱操作選擇節點的模式,可用值為:SLAVE - 只在從服務節點里訂閱,MASTER - 只在主服務節點里訂閱,

loadBalancer(負載均衡演算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度演算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度演算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度演算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)

默認值:1

多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)

默認值:50

多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 從服務節點里用于普通操作( 發布和訂閱)的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時讀取反映速度,

slaveConnectionPoolSize(從節點連接池大小)

默認值:64

多從節點的環境里,每個 從服務節點里用于普通操作( 發布和訂閱)連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

masterConnectionMinimumIdleSize(主節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 主節點的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,

masterConnectionPoolSize(主節點連接池大小)

默認值:64

主節點的連接池最大容量,連接池的連接數量自動彈性伸縮,

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同任何節點建立連接時的等待超時,時間單位是毫秒,

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間,該時間從命令發送成功時開始計時,

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,

database(資料庫編號)

默認值:0

嘗試連接的資料庫編號,

password(密碼)

默認值:null

用于節點身份驗證的密碼,

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量,

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱,

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力,

sslProvider(SSL實作方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑,

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼,

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑,

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼,

通過屬性檔案,配置的示例如下:

---
sentinelServersConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  failedSlaveReconnectionInterval: 3000
  failedSlaveCheckInterval: 60000
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  slaveConnectionMinimumIdleSize: 24
  slaveConnectionPoolSize: 64
  masterConnectionMinimumIdleSize: 24
  masterConnectionPoolSize: 64
  readMode: "SLAVE"
  subscriptionMode: "SLAVE"
  sentinelAddresses:
  - "redis://127.0.0.1:26379"
  - "redis://127.0.0.1:26389"
  masterName: "mymaster"
  database: 0
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
transportMode: "NIO"

主從模式

介紹配置Redis主從服務組態的檔案在這里.

程式化配置主從模式的方法如下

Config config = new Config();
config.useMasterSlaveServers()
    // use "rediss://" for SSL connection
    .setMasterAddress("redis://127.0.0.1:6379")
    .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
    .addSlaveAddress("redis://127.0.0.1:6399");

RedissonClient redisson = Redisson.create(config);

主從模式使用到MasterSlaveServersConfig :

MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();

MasterSlaveServersConfig 類的設定引數如下:

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)

默認值:5000

用來指定檢查節點DNS變化的時間間隔,使用的時候應該確保JVM里的DNS資料的快取時間保持在足夠低的范圍才有意義,用-1來禁用該功能,

masterAddress(主節點地址)

可以通過host:port的格式來指定主節點地址,

addSlaveAddress(添加從主節點地址)

可以通過host:port的格式來指定從節點的地址,多個節點可以一次性批量添加,

readMode(讀取操作的負載均衡模式)

默認值: SLAVE(只在從服務節點里讀取)

注:在從服務節點里讀取的資料說明已經至少有兩個節點保存了該資料,確保了資料的高可用性,

設定讀取操作選擇節點的模式,可用值為:SLAVE - 只在從服務節點里讀取,MASTER - 只在主服務節點里讀取,MASTER_SLAVE - 在主從服務節點里都可以讀取,

subscriptionMode(訂閱操作的負載均衡模式)

默認值:SLAVE(只在從服務節點里訂閱)

設定訂閱操作選擇節點的模式,可用值為:SLAVE - 只在從服務節點里訂閱,MASTER - 只在主服務節點里訂閱,

loadBalancer(負載均衡演算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度演算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度演算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度演算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)

默認值:1

多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)

默認值:50

多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 從服務節點里用于普通操作( 發布和訂閱)的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時讀取反映速度,

slaveConnectionPoolSize(從節點連接池大小)

默認值:64

多從節點的環境里,每個 從服務節點里用于普通操作( 發布和訂閱)連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

masterConnectionMinimumIdleSize(主節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 主節點的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,

masterConnectionPoolSize(主節點連接池大小)

默認值:64

主節點的連接池最大容量,連接池的連接數量自動彈性伸縮,

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同任何節點建立連接時的等待超時,時間單位是毫秒,

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間,該時間從命令發送成功時開始計時,

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,

database(資料庫編號)

默認值:0

嘗試連接的資料庫編號,

password(密碼)

默認值:null

用于節點身份驗證的密碼,

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量,

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱,

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力,

sslProvider(SSL實作方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑,

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼,

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑,

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼,

集群模式

集群模式除了適用于Redis集群環境,也適用于任何云計算服務商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云資料庫Redis版,

介紹配置Redis集群組態的檔案在這里, Redis集群組態的最低要求是必須有三個主節點,

集群模式構造Config如下:

Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // 集群狀態掃描間隔時間,單位是毫秒
    //可以用"rediss://"來啟用SSL連接
    .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
    .addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

集群模式使用到ClusterServersConfig :

ClusterServersConfig clusterConfig = config.useClusterServers();

ClusterServersConfig 配置引數如下

nodeAddresses(添加節點地址)

可以通過host:port的格式來添加Redis集群節點的地址,多個節點可以一次性批量添加,

scanInterval(集群掃描間隔時間)

默認值: 1000

對Redis集群節點狀態掃描的時間間隔,單位是毫秒,

slots(分片數量)

默認值: 231用于指定資料分片程序中的分片數量,支持資料分片/框架結構有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.

readMode(讀取操作的負載均衡模式)

默認值: SLAVE(只在從服務節點里讀取)

注:在從服務節點里讀取的資料說明已經至少有兩個節點保存了該資料,確保了資料的高可用性,

設定讀取操作選擇節點的模式,可用值為:SLAVE - 只在從服務節點里讀取,MASTER - 只在主服務節點里讀取,MASTER_SLAVE - 在主從服務節點里都可以讀取,

subscriptionMode(訂閱操作的負載均衡模式)

默認值:SLAVE(只在從服務節點里訂閱)

設定訂閱操作選擇節點的模式,可用值為:SLAVE - 只在從服務節點里訂閱,MASTER - 只在主服務節點里訂閱,

loadBalancer(負載均衡演算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在多Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度演算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度演算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度演算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)

默認值:1

多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的最小保持連接數(長連接),Redisson內部經常通過發布和訂閱來實作許多功能,長期保持一定數量的發布訂閱連接是必須的,

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)

默認值:50

多從節點的環境里,每個 從服務節點里用于發布和訂閱連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 從服務節點里用于普通操作( 發布和訂閱)的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時讀取反映速度,

slaveConnectionPoolSize(從節點連接池大小)

默認值:64

多從節點的環境里,每個 從服務節點里用于普通操作( 發布和訂閱)連接的連接池最大容量,連接池的連接數量自動彈性伸縮,

masterConnectionMinimumIdleSize(主節點最小空閑連接數)

默認值:32

多節點的環境里,每個 主節點的最小保持連接數(長連接),長期保持一定數量的連接有利于提高瞬時寫入反應速度,

masterConnectionPoolSize(主節點連接池大小)

默認值:64

多主節點的環境里,每個 主節點的連接池最大容量,連接池的連接數量自動彈性伸縮,

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉,時間單位是毫秒,

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同任何節點建立連接時的等待超時,時間單位是毫秒,

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間,該時間從命令發送成功時開始計時,

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤,如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時,

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔,時間單位是毫秒,

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔,時間單位是毫秒,

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點串列里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試,

password(密碼)

默認值:null

用于節點身份驗證的密碼,

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量,

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱,

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力,

sslProvider(SSL實作方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實作SSL連接,

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑,

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼,

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑,

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼,

簡單Redision鎖的原理

Redis發展到現在,幾種常見的部署架構有:

  1. 單機模式;
  2. 哨兵模式;
  3. 集群模式;

先介紹,基于單機模式的簡單Redision鎖的使用,

簡單Redision鎖的使用

單機模式下,簡單Redision鎖的使用如下:

// 構造redisson實作分布式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
// 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 設定鎖定資源名稱
RLock disLock = redissonClient.getLock("DISLOCK");
//嘗試獲取分布式鎖
boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {
   try {
        //TODO if get lock success, do something;
        Thread.sleep(15000);

   } catch (Exception e) {
   } finally {
    // 無論如何, 最后都要解鎖
    disLock.unlock();
   }
}

通過代碼可知,經過Redisson的封裝,實作Redis分布式鎖非常方便,和顯式鎖的使用方法是一樣的,RLock介面繼承了 Lock介面,

我們再看一下Redis中的value是啥,和前文分析一樣,hash結構, redis 的key就是資源名稱,

hash結構的key就是UUID+threadId,hash結構的value就是重入值,在分布式鎖時,這個值為1(Redisson還可以實作重入鎖,那么這個值就取決于重入次數了):

172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"

使用客戶端工具看到的效果如下:

在這里插入圖片描述

getLock()方法

img

可以看到,呼叫getLock()方法后實際回傳一個RedissonLock物件

tryLock方法

下面來看下tryLock方法,原始碼如下:

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

以上代碼使用了異步回呼模式,RFuture 繼承了 java.util.concurrent.Future, CompletionStage兩大介面,異步回呼模式的基礎知識,請參見 《Java高并發核心編程 卷2 》

在這里插入圖片描述

tryAcquire()方法

在RedissonLock物件的lock()方法主要呼叫tryAcquire()方法

img

tryLockInnerAsync

img

由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個方法才是關鍵

img

首先,看一下evalWriteAsync方法的定義

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

這和前面的jedis呼叫lua腳本類似,最后兩個引數分別是keys和params,

單獨將呼叫的那一段摘出來看,實際呼叫是這樣的:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

結合上面的引數宣告,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假設:

  • 前面獲取鎖時傳的name是“DISLOCK”,
  • 假設呼叫的執行緒ID是1,
  • 假設成員變數UUID型別的id是01a6d806-d282-4715-9bec-f51b9aa98110

那么KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1

因此,這段腳本的意思是

1、判斷有沒有一個叫“DISLOCK”的key

2、如果沒有,則在其下設定一個欄位為“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值為“1”的鍵值對 ,并設定它的過期時間

3、如果存在,則進一步判斷“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,則其值加1,并重新設定過期時間

4、回傳“DISLOCK”的生存時間(毫秒)

原理:加鎖機制

這里用的資料結構是hash,hash的結構是: key 欄位1 值1 欄位2 值2 ,,,

用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,欄位就表示當前獲得鎖的執行緒

所有競爭這把鎖的執行緒都要判斷在這個key下有沒有自己執行緒的欄位,如果沒有則不能獲得鎖,如果有,則相當于重入,欄位值加1(次數)

在這里插入圖片描述

Lua腳本的詳解

為何要使用lua語言?

因為一大堆復雜的業務邏輯,可以通過封裝在lua腳本中發送給redis,保證這段復雜業務邏輯執行的原子性

在這里插入圖片描述

回顧一下evalWriteAsync方法的定義

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

注意,其最后兩個引數分別是keys和params,

關于 lua腳本的引數解釋:

**KEYS[1]**代表的是你加鎖的那個key,比如說:

RLock lock = redisson.getLock(“DISLOCK”);

這里你自己設定了加鎖的那個鎖key就是“DISLOCK”,

**ARGV[1]**代表的就是鎖key的默認生存時間

呼叫的時候,傳遞的引數為 internalLockLeaseTime ,該值默認30秒,

**ARGV[2]**代表的是加鎖的客戶端的ID,類似于下面這樣:

01a6d806-d282-4715-9bec-f51b9aa98110:1

lua腳本的第一段if判斷陳述句,就是用“exists DISLOCK”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就進行加鎖,

如何加鎖呢?很簡單,用下面的redis命令:

hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1

通過這個命令設定一個hash資料結構,這行命令執行后,會出現一個類似下面的資料結構:

DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
    }

接著會執行“pexpire DISLOCK 30000”命令,設定DISLOCK這個鎖key的生存時間是30秒(默認)

鎖互斥機制

那么在這個時候,如果客戶端2來嘗試加鎖,執行了同樣的一段lua腳本,會咋樣呢?

很簡單,第一個if判斷會執行“exists DISLOCK”,發現DISLOCK 這個鎖key已經存在了,

接著第二個if判斷,判斷一下,DISLOCK鎖key的hash資料結構中,是否包含客戶端2的ID,但是明顯不是的,因為那里包含的是客戶端1的ID,

所以,客戶端2會獲取到pttl DISLOCK回傳的一個數字,這個數字代表了DISLOCK 這個鎖key的**剩余生存時間,**比如還剩15000毫秒的生存時間,

此時客戶端2會進入一個while回圈,不停的嘗試加鎖,

可重入加鎖機制

如果客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎么樣呢?

RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//業務代碼
lock.lock();
//業務代碼
lock.unlock();
lock.unlock();

分析上面那段lua腳本,

第一個if判斷肯定不成立,“exists DISLOCK”會顯示鎖key已經存在了,

第二個if判斷會成立,因為DISLOCK的hash資料結構中包含的那個ID,就是客戶端1的那個ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此時就會執行可重入加鎖的邏輯,他會用:

incrby DISLOCK

8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通過這個命令,對客戶端1的加鎖次數,累加1,

此時DISLOCK資料結構變為下面這樣:

DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
    }

釋放鎖機制

如果執行lock.unlock(),就可以釋放分布式鎖,此時的業務邏輯也是非常簡單的,

其實說白了,就是每次都對DISLOCK資料結構中的那個加鎖次數減1,

如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:

“del DISLOCK”命令,從redis里洗掉這個key,

然后呢,另外的客戶端2就可以嘗試完成加鎖了,

unlock 原始碼

  @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

再深入一下,實際呼叫的是unlockInnerAsync方法

unlockInnerAsync方法

在這里插入圖片描述

原理:Redision 解鎖機制

上圖沒有截取完整,完整的原始碼如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

我們還是假設name=DISLOCK,假設執行緒ID是1

同理,我們可以知道

KEYS[1]是getName(),即KEYS[1]=DISLOCK

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存時間

ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1

因此,上面腳本的意思是:

1、判斷是否存在一個叫“DISLOCK”的key

2、如果不存在,回傳nil

3、如果存在,使用Redis Hincrby 命令用于為哈希表中的欄位值加上指定增量值 -1 ,代表減去1

4、若counter >,回傳空,若欄位存在,則欄位值減1

5、若減完以后,counter > 0 值仍大于0,則回傳0

6、減完后,若欄位值小于或等于0,則用 publish 命令廣播一條訊息,廣播內容是0,并回傳1;

可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的執行緒現在可以獲得鎖了

在這里插入圖片描述

通過redis Channel 解鎖訂閱

以上是正常情況下獲取到鎖的情況,那么當無法立即獲取到鎖的時候怎么辦呢?

再回到前面獲取鎖的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //    訂閱
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}


protected static final LockPubSub PUBSUB = new LockPubSub();

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
    PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

這里會訂閱Channel,當資源可用時可以及時知道,并搶占,防止無效的輪詢而浪費資源

這里的channel為:

redisson_lock__channel:{DISLOCK}

在這里插入圖片描述

在這里插入圖片描述

當資源可用用的時候,回圈去嘗試獲取鎖,由于多個執行緒同時去競爭資源,所以這里用了信號量,對于同一個資源只允許一個執行緒獲得鎖,其它的執行緒阻塞

這點,有點兒類似 Zookeeper分布式鎖:

有關zookeeper分布式鎖的原理和實作,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)

watch dog自動延期機制

客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間,


使用watchDog機制實作鎖的續期

但是聰明的同學肯定會問:

有效時間設定多長,假如我的業務操作比有效時間長,我的業務代碼還沒執行完,就自動給我解鎖了,不就完蛋了嗎,

這個問題就有點棘手了,在網上也有很多討論:

第一種解決方法就是靠程式員自己去把握,預估一下業務代碼需要執行的時間,然后設定有效期時間比執行時間長一些,保證不會因為自動解鎖影響到客戶端業務代碼的執行,

但是這并不是萬全之策,比如網路抖動這種情況是無法預測的,也有可能導致業務代碼執行的時間變長,所以并不安全,

第二種方法,使用監事狗watchDog機制實作鎖的續期,

第二種方法比較靠譜一點,而且無業務入侵,

在Redisson框架實作分布式鎖的思路,就使用watchDog機制實作鎖的續期,

當加鎖成功后,同時開啟守護執行緒,默認有效期是30秒,每隔10秒就會給鎖續期到30秒,只要持有鎖的客戶端沒有宕機,就能保證一直持有鎖,直到業務代碼執行完畢由客戶端自己解鎖,如果宕機了自然就在有效期失效后自動解鎖,

這里,和前面解決 JVM STW的鎖過期問題有點類似,只不過,watchDog自動續期,也沒有完全解決JVM STW的鎖過期問題,

如何徹底解決 JVM STW的鎖過期問題,可以來瘋狂創客圈的社群討論,

redisson watchdog 使用和原理

實際上,redisson加鎖的基本流程圖如下:

在這里插入圖片描述

這里專注于介紹watchdog,

首先watchdog的具體思路是 加鎖時,默認加鎖 30秒,每10秒鐘檢查一次,如果存在就重新設定 過期時間為30秒,

然后設定默認加鎖時間的引數是 lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)

官方檔案描述如下

lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)

默認值:30000

監控鎖的看門狗超時時間單位為毫秒,該引數只適用于分布式鎖的加鎖請求中未明確使用leaseTimeout引數的情況,如果該看門狗未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態,這個引數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況,

需要注意的是

1.watchDog 只有在未顯示指定加鎖時間時才會生效,(這點很重要)

2.lockWatchdogTimeout設定的時間不要太小 ,比如我之前設定的是 100毫秒,由于網路直接導致加鎖完后,watchdog去延期時,這個key在redis中已經被洗掉了,

tryAcquireAsync原理

在呼叫lock方法時,會最終呼叫到tryAcquireAsync,詳細解釋如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //如果指定了加鎖時間,會直接去加鎖
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
    //沒有指定加鎖時間 會先進行加鎖,并且默認時間就是 LockWatchdogTimeout的時間
    //這個是異步操作 回傳RFuture 類似netty中的future
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
       //這里也是類似netty Future 的addListener,在future內容執行完成后執行
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
            //這里是定時執行 當前鎖自動延期的動作
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

scheduleExpirationRenewal 中會呼叫renewExpiration,

renewExpiration執行延期動作

這里我們可以看到是 啟用了一個timeout定時,去執行延期動作

    private void renewExpiration() {
   
      
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                     	//如果 沒有報錯,就再次定時延期
                     // reschedule itself
                     
                        renewExpiration();
                    }
                });
            }
            // 這里我們可以看到定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

最終 scheduleExpirationRenewal會呼叫到 renewExpirationAsync,

renewExpirationAsync

執行下面這段 lua腳本,他主要判斷就是 這個鎖是否在redis中存在,如果存在就進行 pexpire 延期,

   protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }

watchLog總結

1.要使 watchLog機制生效 ,lock時 不要設定 過期時間

2.watchlog的延時時間 可以由 lockWatchdogTimeout指定默認延時時間,但是不要設定太小,如100

3.watchdog 會每 lockWatchdogTimeout/3時間,去延時,

4.watchdog 通過 類似netty的 Future功能來實作異步延時

5.watchdog 最侄訓是通過 lua腳本來進行延時

Redisson框架的分布式鎖

Redisson框架十分強大,除了前面介紹的 getLock方法獲取的分布式鎖(輸入可重入鎖的型別),還有很多其他的分布式鎖型別,

總體的Redisson框架的分布式鎖型別,大致如下:

  • 可重入鎖
  • 公平鎖
  • 聯鎖
  • 紅鎖
  • 讀寫鎖
  • 信號量
  • 可過期信號量
  • 閉鎖(/倒數閂)

1.可重入鎖(Reentrant Lock)

Redisson的分布式可重入鎖RLock Java物件實作了java.util.concurrent.locks.Lock介面,同時還支持自動過期解鎖,

public void testReentrantLock(RedissonClient redisson){
	RLock lock = redisson.getLock("anyLock");
	try{
		// 1. 最常見的使用方法
		//lock.lock();
		// 2. 支持過期解鎖功能,10秒鐘以后自動解鎖, 無需呼叫unlock方法手動解鎖
		//lock.lock(10, TimeUnit.SECONDS);
		// 3. 嘗試加鎖,最多等待3秒,上鎖以后10秒自動解鎖
		boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
		if(res){ //成功
		// do your business
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

Redisson同時還為分布式鎖提供了異步執行的相關方法:

public void testAsyncReentrantLock(RedissonClient redisson){
	RLock lock = redisson.getLock("anyLock");
	try{
		lock.lockAsync();
		lock.lockAsync(10, TimeUnit.SECONDS);
		Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
		if(res.get()){
		// do your business
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

2.公平鎖(Fair Lock)

Redisson分布式可重入公平鎖也是實作了java.util.concurrent.locks.Lock介面的一種RLock物件,在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒,

public void testFairLock(RedissonClient redisson){
	RLock fairLock = redisson.getFairLock("anyLock");
	try{
		// 最常見的使用方法
		fairLock.lock();
		// 支持過期解鎖功能, 10秒鐘以后自動解鎖,無需呼叫unlock方法手動解鎖
		fairLock.lock(10, TimeUnit.SECONDS);
		// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
		boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		fairLock.unlock();
	}
}

Redisson同時還為分布式可重入公平鎖提供了異步執行的相關方法:

RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

3.聯鎖(MultiLock)

Redisson的RedissonMultiLock物件可以將多個RLock物件關聯為一個聯鎖,每個RLock物件實體可以來自于不同的Redisson實體,

public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 所有的鎖都上鎖成功才算成功,
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

4.紅鎖(RedLock)

Redisson的RedissonRedLock物件實作了Redlock介紹的加鎖演算法,該物件也可以用來將多個RLock物件關聯為一個紅鎖,每個RLock物件實體可以來自于不同的Redisson實體,

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功,
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

5.讀寫鎖(ReadWriteLock)

Redisson的分布式可重入讀寫鎖RReadWriteLock,Java物件實作了java.util.concurrent.locks.ReadWriteLock介面,同時還支持自動過期解鎖,該物件允許同時有多個讀取鎖,但是最多只能有一個寫入鎖,

RReadWriteLock rwlock = redisson.getLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 支持過期解鎖功能
// 10秒鐘以后自動解鎖
// 無需呼叫unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

6.信號量(Semaphore)

Redisson的分布式信號量(Semaphore)Java物件RSemaphore采用了與java.util.concurrent.Semaphore相似的介面和用法,

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

7.可過期性信號量(PermitExpirableSemaphore)

Redisson的可過期性信號量(PermitExpirableSemaphore)實在RSemaphore物件的基礎上,為每個信號增加了一個過期時間,每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放,

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個信號,有效期只有2秒鐘,
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

8.閉鎖/倒數閂(CountDownLatch)

Redisson的分布式閉鎖(CountDownLatch)Java物件RCountDownLatch采用了與java.util.concurrent.CountDownLatch相似的介面和用法,

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他執行緒或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

redis分布式鎖的高可用

關于Redis分布式鎖的高可用問題,大致如下:

在master- slave的集群架構中,就是如果你對某個redis master實體,寫入了DISLOCK這種鎖key的value,此時會異步復制給對應的master slave實體,

但是,這個程序中一旦發生redis master宕機,主備切換,redis slave變為了redis master,而此時的主從復制沒有徹底完成…

接著就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖,

此時就會導致多個客戶端對一個分布式鎖完成了加鎖,

這時系統在業務語意上一定會出現問題,導致臟資料的產生,

所以這個是是redis master-slave架構的主從異步復制導致的redis分布式鎖的最大缺陷:

在redis master實體宕機的時候,可能導致多個客戶端同時完成加鎖,

高可用的RedLock(紅鎖)原理

RedLock演算法思想:

不能只在一個redis實體上創建鎖,應該是在多個redis實體上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實體上加鎖而帶來的問題,

這個場景是假設有一個 redis cluster,有 5 個 redis master 實體,然后執行如下步驟獲取一把紅鎖:

  1. 獲取當前時間戳,單位是毫秒;
  2. 跟上面類似,輪流嘗試在每個 master 節點上創建鎖,過期時間較短,一般就幾十毫秒;
  3. 嘗試在大多數節點上建立一個鎖,比如 5 個節點就要求是 3 個節點 n / 2 + 1;
  4. 客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了;
  5. 要是鎖建立失敗了,那么就依次之前建立過的鎖洗掉;
  6. 只要別人建立了一把分布式鎖,你就得不斷輪詢去嘗試獲取鎖,

img

RedLock是基于redis實作的分布式鎖,它能夠保證以下特性:

  • 互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖:

  • 當客戶端拿到鎖后,即使發生了網路磁區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)

  • 容錯性:只要多數節點的redis實體正常運行,就能夠對外提供服務,加鎖或者釋放鎖;

以sentinel模式架構為例,如下圖所示,有sentinel-1,sentinel-2,sentinel-3總計3個sentinel模式集群,如果要獲取分布式鎖,那么需要向這3個sentinel集群通過EVAL命令執行LUA腳本,需要3/2+1=2,即至少2個sentinel集群回應成功,才算成功的以Redlock演算法獲取到分布式鎖:

Redisson

高可用的紅鎖會導致性能降低

提前說明,使用redis分布式鎖,是追求高性能, 在cap理論中,追求的是 ap 而不是cp,

所以,如果追求高可用,建議使用 zookeeper分布式鎖,

redis分布式鎖可能導致的資料不一致性,建議使用業務補償的方式去彌補,所以,不太建議使用紅鎖,但是從學習的層面來說,大家還是一定要掌握的,

實作原理

Redisson中有一個MultiLock的概念,可以將多個鎖合并為一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖

而Redisson中實作RedLock就是基于MultiLock 去做的,接下來就具體看看對應的實作吧

RedLock使用案例

先看下官方的代碼使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);

// traditional lock method
redLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       redLock.unlock();
   }
}

這里是分別對3個redis實體加鎖,然后獲取一個最后的加鎖結果,

RedissonRedLock實作原理

上面示例中使用redLock.lock()或者tryLock()最終都是執行RedissonRedLock中方法,

RedissonRedLock 繼承自RedissonMultiLock, 實作了其中的一些方法:

public class RedissonRedLock extends RedissonMultiLock {
    public RedissonRedLock(RLock... locks) {
        super(locks);
    }

    /**
     * 鎖可以失敗的次數,鎖的數量-鎖成功客戶端最小的數量
     */
    @Override
    protected int failedLocksLimit() {
        return locks.size() - minLocksAmount(locks);
    }
    
    /**
     * 鎖的數量 / 2 + 1,例如有3個客戶端加鎖,那么最少需要2個客戶端加鎖成功
     */
    protected int minLocksAmount(final List<RLock> locks) {
        return locks.size()/2 + 1;
    }

    /** 
     * 計算多個客戶端一起加鎖的超時時間,每個客戶端的等待時間
     * remainTime默認為4.5s
     */
    @Override
    protected long calcLockWaitTime(long remainTime) {
        return Math.max(remainTime / locks.size(), 1);
    }
    
    @Override
    public void unlock() {
        unlockInner(locks);
    }

}

看到locks.size()/2 + 1 ,例如我們有3個客戶端實體,那么最少2個實體加鎖成功才算分布式鎖加鎖成功,

接著我們看下lock()的具體實作

RedissonMultiLock實作原理


public class RedissonMultiLock implements Lock {

    final List<RLock> locks = new ArrayList<RLock>();

    public RedissonMultiLock(RLock... locks) {
        if (locks.length == 0) {
            throw new IllegalArgumentException("Lock objects are not defined");
        }
        this.locks.addAll(Arrays.asList(locks));
    }

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            // 如果等待時間設定了,那么將等待時間 * 2
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
        
        // time為當前時間戳
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        // 計算鎖的等待時間,RedLock中:如果remainTime=-1,那么lockWaitTime為1
        long lockWaitTime = calcLockWaitTime(remainTime);
        
        // RedLock中failedLocksLimit即為n/2 + 1
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
        // 回圈每個redis客戶端,去獲取鎖
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                // 呼叫tryLock方法去獲取鎖,如果獲取鎖成功,則lockAcquired=true
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                lockAcquired = false;
            }
            
            // 如果獲取鎖成功,將鎖加入到list集合中
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                // 如果獲取鎖失敗,判斷失敗次數是否等于失敗的限制次數
                // 比如,3個redis客戶端,最多只能失敗1次
                // 這里locks.size = 3, 3-x=1,說明只要成功了2次就可以直接break掉回圈
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }

                // 如果最大失敗次數等于0
                if (failedLocksLimit == 0) {
                    // 釋放所有的鎖,RedLock加鎖失敗
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // 重置迭代器 重試再次獲取鎖
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // 失敗的限制次數減一
                    // 比如3個redis實體,最大的限制次數是1,如果遍歷第一個redis實體,失敗了,那么failedLocksLimit會減成0
                    // 如果failedLocksLimit就會走上面的if邏輯,釋放所有的鎖,然后回傳false
                    failedLocksLimit--;
                }
            }
            
            if (remainTime != -1) {
                remainTime -= (System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        
        return true;
    }
}

核心代碼都已經加了注釋,實作原理其實很簡單,基于RedLock思想,遍歷所有的Redis客戶端,然后依次加鎖,最后統計成功的次數來判斷是否加鎖成功,

文章核心內容和原始碼來源

圖書:《Netty Zookeeper Redis 高并發實戰》 圖書簡介 - 瘋狂創…

參考檔案:

圖書:《Netty Zookeeper Redis 高并發實戰》 圖書簡介 - 瘋狂創…

Distributed locks with Redis

how-to-do-distributed-locking

redisson watchdog 使用和原理

zookeeper實作分布式鎖_java_腳本之家

基于Zookeeper分布式鎖實作 - SegmentFault 思否

分布式鎖用 Redis 還是 Zookeeper - 知乎

ZooKeeper分布式鎖的實作原理 - 菜鳥奮斗史 - 博客園

https://blog.csdn.net/men_wen/article/details/72853078

本文的問題交流:瘋狂創客圈社群

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/283211.html

標籤:其他

上一篇:php 有哪些殺手級超厲害框架或庫或應用?

下一篇:架構師成長記_第八周_10_ES-分詞與五種內置分詞器

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more