前言
Redisson是一個在Redis的基礎上實作的Java駐記憶體資料網格(In-Memory Data Grid),Redisson有一樣功能是可重入的分布式鎖,本文來討論一下這個功能的特點以及原始碼分析,
前置知識
在講Redisson,咱們先來聊聊分布式鎖的特點以及Redis的發布/訂閱機制,磨刀不誤砍柴工,
分布式鎖的思考
首先思考下,如果我們自己去實作一個分布式鎖,這個鎖需要具備哪些功能?
- 互斥(這是一個鎖最基本的功能)
- 鎖失效機制(也就是可以設定鎖定時長,防止死鎖)
- 高性能、高可用
- 阻塞、非阻塞
- 可重入、公平鎖
- ,,,
可見,實作一個分布式鎖,需要考慮的東西有很多,那么,如果用Redis來實作分布式鎖呢?如果只需要具備上面說的1、2點功能,要怎么寫?(ps:我就不寫了,自己想去)
Redis訂閱/發布機制
Redisson中用到了Redis的訂閱/發布機制,下面簡單介紹下,
簡單來說就是如果client2 、 client5 和 client1 訂閱了 channel1,當有訊息發布到 channel1 的時候,client2 、 client5 和 client1 都會收到這個訊息,

圖片來自 菜鳥教程-Redis發布訂閱
Redisson
原始碼版本:3.17.7
下面以Redisson官方的可重入同步鎖例子為入口,解讀下原始碼,
RLock lock = redisson.getLock("anyLock");
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
加鎖
我用時序圖來表示加鎖和訂閱的程序,時序圖中括號后面的c1、c2代表client1,client2

當執行緒2獲取了鎖但還沒釋放鎖時,如果執行緒1去獲取鎖,會阻塞等待,直到執行緒2解鎖,通過Redis的發布訂閱機制喚醒執行緒1,再次去獲取鎖,
加鎖方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),對應著就是RedissonLock#tryLock
/**
* 獲取鎖
* @param waitTime 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
* @param leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設定為大于業務處理的時間,確保在鎖有效期內業務能處理完)
* @param unit 時間單位
* @return 獲取鎖成功回傳true,失敗回傳false
*/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();// 當前時間
long threadId = Thread.currentThread().getId();// 當前執行緒id
// 嘗試加鎖,加鎖成功回傳null,失敗回傳鎖的剩余超時時間
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 獲取鎖成功
if (ttl == null) {
return true;
}
// time小于0代表此時已經超過獲取鎖的等待時間,直接回傳false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 沒看懂這個方法,里面里面空運行,有知道的大神還請不吝賜教
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
// 出現例外,取消訂閱
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 判斷是否超時(超過了waitTime)
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
// 再次獲取鎖,成功則回傳
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 阻塞等待信號量喚醒或者超時,接收到訂閱時喚醒
// 使用的是Semaphore#tryAcquire()
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 因為是同步操作,所以無論加鎖成功或失敗,都取消訂閱
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
先看一下整體邏輯:
- 嘗試加鎖,成功直接回傳true
- 判斷超時
- 訂閱
- 判斷超時
- 回圈 ( 嘗試獲取鎖 → 判斷超時 → 阻塞等待 )
tryLock方法看著很長,但是有很多代碼都是重復的,本小節重點說一下嘗試加鎖的方法tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
// 呼叫lua腳本,嘗試加鎖
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 這里的if、else的區別就在于,如果沒有設定leaseTime,就使用默認的internalLockLeaseTime(默認30秒)
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
// 如果ttlRemaining為空,也就是tryLockInnerAsync方法中的lua執行結果回傳空,證明獲取鎖成功
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 如果沒有設定鎖的持有時間(leaseTime),則啟動看門狗,定時給鎖續期,防止業務邏輯未執行完成鎖就過期了
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
在tryAcquireAsync方法中,主要分為兩段邏輯:
- 呼叫lua腳本加鎖:tryLockInnerAsync
- 看門狗:scheduleExpirationRenewal
看門狗在后面講,本小節重點還是在加鎖
// RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
Redisson使用了 Hash 結構來表示一個鎖,這樣 Hash 里面的 key 為執行緒id,value 為鎖的次數,這樣巧妙地解決了可重入鎖的問題,
下面我們來分析下這段 lua 腳本的邏輯(下面說的threadId都是指變數,不是說key就叫’threadId’):
- 如果鎖(hash結構)不存在,則創建,并添加一個鍵值對 (threadId : 1),并設定鎖的過期時間
- 如果鎖存在,則將鍵值對 threadId 對應的值 + 1,并設定鎖的過期時間
- 如果不如何1,2點,則回傳鎖的剩余過期時間
訂閱
讓我們把視線重新回到RedissonLock#tryLock中,當經過一些嘗試獲取鎖,超時判斷之后,代碼來到while回圈中,這個while回圈是個死回圈,只有成功獲取鎖或者超時,才會退出,一般死回圈的設計中,都會有阻塞等待的代碼,否則如果回圈中的邏輯短時間拿不到結果,會造成資源搶占和浪費,阻塞代碼就是下面這段
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一個Semaphore信號量物件,這是jdk的內置物件,Semaphore#tryAcquire表示阻塞并等待喚醒,那么信號量什么時候被喚醒呢?在訂閱方法中RedissonLock#subscribe,訂閱方法的邏輯也不少,咱們直接講其最終呼叫的處理方法
// LockPubSub#onMessage
protected void onMessage(RedissonLockEntry value, Long message) {
// 普通的解鎖走的是這個
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 這里就是喚醒信號量的地方
value.getLatch().release();
// 這個是讀寫鎖用的
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
value.getLatch().release() 也就是Semaphore#release ,會喚醒Semaphore#tryAcquire阻塞的執行緒
解鎖
上面我們聊了加鎖,本小節來聊下解鎖,呼叫路徑如下
// RedissonLock#unlock
// RedissonBaseLock#unlockAsync(long threadId)
public RFuture<Void> unlockAsync(long threadId) {
// 呼叫lua解鎖
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 取消看門狗
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
解鎖的邏輯不復雜,呼叫lua腳本解鎖以及取消看門狗,看門狗晚點說,先說下lua解鎖
// RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
老規矩,分析下這段lua:
- 如果鎖不存在,回傳null
- 鎖的值減1,如果鎖的值大于0(也就是可重入鎖仍然有加鎖次數),則重新設定過期時間
- 如果鎖的值小于等于0,這說明可以真正解鎖了,洗掉鎖并通過發布訂閱機制發布解鎖訊息
從 lua 中可以看到,解鎖時會發布訊息到 channel 中,加鎖方法RedissonLock#tryLock中有相對應的訂閱操作,
看門狗
試想一個場景:程式執行需要10秒,程式執行完成才去解鎖,而鎖的存活時間只有5秒,也就是程式執行到一半的時候鎖就可以被其他程式獲取了,這顯然不合適,那么怎么解決呢?
-
方式一:鎖永遠存在,直到解鎖,不設定存活時間,
這種方法的弊端在于,如果程式沒解鎖就掛了,鎖就成了死鎖
-
方式二:依然設定鎖存活時間,但是監控程式的執行,如果程式還沒有執行完成,則定期給鎖續期,
方式二就是Redisson的看門狗機制,看門狗只有在沒有顯示指定鎖的持有時間(leaseTime)時才會生效,
// RedissonLock#tryAcquireAsync
// RedissonBaseLock#scheduleExpirationRenewal
protected void scheduleExpirationRenewal(long threadId) {
// 創建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法會從map中再拿出來用
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 看門狗的具體邏輯
renewExpiration();
} finally {
// 如果執行緒被中斷了,就取消看門狗
if (Thread.currentThread().isInterrupted()) {
// 取消看門狗
cancelExpirationRenewal(threadId);
}
}
}
}
scheduleExpirationRenewal 方法處理了ExpirationEntry和如果出現例外則取消看門狗,具體看門狗邏輯在 renewExpiration 方法中
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 創建延時任務,延時時間是internalLockLeaseTime / 3
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// lua腳本判斷,如果鎖存在,則續期并回傳true,不存在則回傳false
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 鎖續期成功,則再啟動一個延時任務,繼續監測
renewExpiration();
} else {
// 取消看門狗
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
Timeout 是一個延時任務,延時 internalLockLeaseTime / 3 時間執行,任務的內容主要是通過 renewExpirationAsync 方法對鎖進行續期,如果續期失敗(解鎖了、鎖到期等),則取消看門狗,如果續期成功,則遞回 renewExpiration 方法,繼續創建延時任務,
internalLockLeaseTime 也就是 lockWatchdogTimeout 引數,默認是 30 秒,
總結
本文介紹了Redisson的加鎖、解鎖、看門狗機制,以及對Redis發布訂閱機制的應用,因為篇幅有限,很多細節聊得不夠深入,此外Redisson的異步機制、對Netty的使用等都是很值得水文章的,
參考資料
萬字長文帶你解讀Redisson分布式鎖的原始碼 - 知乎 (zhihu.com)
Redis分布式鎖-這一篇全了解(Redission實作分布式鎖完美方案)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/528709.html
標籤:Java
