介紹與配置
Redisson官方檔案:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D
Springboot 自動配置類: RedissonAutoConfiguration
pom配置:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency>
如果什么都不配置的話,會默認使用單Redis節點模式,代碼中直接就可以使用 RedissonClient
具體配置可參考官方檔案:https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95
分布式鎖測驗
@Slf4j @SpringBootTest(classes = DemoWebApplication.class) public class RedissonTest { @Resource private RedissonClient redissonClient; @Test public void redissonTest() throws InterruptedException { log.info("===redissonTest====start==============="); for (int i = 0; i < 10; i++) { new Thread(() -> { lockTest(); }).start(); } Thread.sleep(30000); log.info("===redissonTest====end==============="); }
private void tryLockTest() {
// 見下文
}
private void lockTest() {
// 見下文
}
}
private void tryLockTest() { String threadName = Thread.currentThread().getName(); log.info("===Thread=={}===start===", threadName); RLock lock = redisson.getLock("DistributedRedisLockTest"); // 嘗試加鎖,最多等待10秒,上鎖以后30秒自動解鎖 boolean lockFlag = false; try { // 嘗試去加鎖,10秒沒獲取到鎖,則回傳false // res = lock.tryLock(10, TimeUnit.SECONDS); // 嘗試去加鎖,10秒沒獲取到鎖,則回傳false,獲取到則回傳true,獲取到鎖后30秒自動釋放 // 當waitTime設定為0時,就相當于setNx,獲取不到鎖直接退出 lockFlag = lock.tryLock(5, 1, TimeUnit.SECONDS); if (!lockFlag) { log.info("===Thread=={}==res={}==沒有獲取到鎖,退出===", threadName, lockFlag); return; } log.info("===Thread=={}============getLock===", threadName); // 模擬業務邏輯 Thread.sleep(2000); } catch (Exception e) { log.error("執行例外,e:{}", ExceptionUtils.getStackTrace(e)); } finally { log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread()); // 釋放鎖也可能出現例外,比如業務代碼沒執行完,鎖就過期,此時進行釋放會拋例外,加個當前執行緒是否持有所的判斷 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } log.info("===Thread=={}==lockFlag={}=end===", threadName, lockFlag); }
private void lockTest() { String threadName = Thread.currentThread().getName(); log.info("===Thread=={}===start===", threadName); RLock lock = redisson.getLock("DistributedRedisLockTest"); // lock表示去加鎖,加鎖成功,沒有回傳值,繼續執行下面代碼;加鎖失敗,它會一直阻塞,直到鎖被釋放,再繼續往下執行 // lock.lock(); // 1秒自動釋放時間,但是后續執行unlock操作時會報錯(自己只能解鎖自己的,第一個執行緒釋放之后執行到unlock方法,但是此時鎖已經是第二個執行緒的了) lock.lock(1, TimeUnit.SECONDS); log.info("===Thread=={}============getLock===", threadName); try { Thread.sleep(2000); } catch (Exception e) { log.error("執行例外,e:{}", ExceptionUtils.getStackTrace(e)); } finally { log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread()); // 釋放鎖也可能出現例外,比如業務代碼沒執行完,鎖就過期,此時進行釋放會拋例外,加個當前執行緒是否持有所的判斷 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } log.info("===Thread=={}===end===", threadName); }
特點
- 分布式
- 可以自動釋放鎖,防止死鎖
- 可重入鎖 (相同執行緒不需要在等待鎖,而是可以直接進行相應操作)
- 防誤刪,當前執行緒只能洗掉當前執行緒的鎖 (業務執行時間過長,超過鎖失效時間,鎖被釋放,第二個執行緒獲取鎖,此時第一個執行緒執行到釋放鎖代碼時,不能洗掉第二個執行緒的鎖)
- 可阻塞等待
- 看門狗機制,延長過期時間(沒有設定過期時間的情況,leaseTime=-1,默認失效時間為30秒,啟動看門狗執行緒,定時檢查是否需要延長時間scheduleExpirationRenewal)
- 鎖種類多樣:可重入鎖、公平鎖、聯鎖、紅鎖、讀寫鎖
存在的問題
分布式架構中的CAP理論,分布式系統只能同時滿足兩個
-
- 一致性(Consistency)
- 可用性(Availability)
- 磁區容錯性(Partition tolerance)
- Redisson分布式鎖是AP模式,當鎖存在的redis節點宕機,可能會被誤判為鎖失效,或者沒有加鎖,(Zookeeper實作的分布式鎖,是CP理論)
原理
本文中Redisson版本為 redisson-spring-boot-starter 3.13.6
先看下介面方法:
public interface RRLock extends Lock, RLockAsync{ //----------------------Lock介面方法----------------------- /** * 加鎖 鎖的有效期默認30秒 */ void lock(); /** * tryLock()方法是有回傳值的,它表示用來嘗試獲取鎖,如果獲取成功,則回傳true,如果獲取失敗(即鎖已被其他執行緒獲取),則回傳false . */ boolean tryLock(); /** * tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在于這個方法在拿不到鎖時會等待一定的時間, * 在時間期限之內如果還拿不到鎖,就回傳false,如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則回傳true, * * @param time 等待時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 解鎖 */ void unlock(); /** * 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B執行緒可以通過 * Thread.currentThread().interrupt(); 方法真正中斷該執行緒 */ void lockInterruptibly(); //----------------------RLock介面方法----------------------- /** * 加鎖 上面是默認30秒這里可以手動設定鎖的有效時間 * * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lock(long leaseTime, TimeUnit unit); /** * 這里比上面多一個引數,多添加一個鎖的有效時間 * * @param waitTime 等待時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * 檢驗該鎖是否被執行緒使用,如果被使用回傳True */ boolean isLocked(); /** * 檢查當前執行緒是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前執行緒獲得此鎖,而不是此鎖是否被執行緒占有) * 這個比上面那個實用 */ boolean isHeldByCurrentThread(); /** * 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lockInterruptibly(long leaseTime, TimeUnit unit); }
-
tryLock方法
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 獲取到鎖,則直接回傳true if (ttl == null) { return true; } // 沒獲取到鎖,校驗是否超過等待時長,超過則回傳false time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis();
// 訂閱監聽redis訊息,并且創建RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性物件,用來控制本地的鎖請求的信號量同步,回傳的是netty框架的Future實作, RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的結果物件,如果subscribe方法呼叫超過了time,說明已經超過了客戶端設定的最大wait time,則直接回傳false,取消訂閱,不再繼續申請鎖了,
// 只有await回傳true,才進入回圈嘗試獲取鎖
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 如果沒有超過嘗試獲取鎖的等待時間,那么通過While一直獲取鎖,最終只會有兩種結果
// 1) 在等待時間內獲取鎖成功 回傳true 2)等待時間結束了還沒有獲取到鎖那么回傳false, while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 獲取鎖成功 if (ttl == null) { return true; } // 獲取鎖失敗 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 如果指定了失效時間,就按指定的失效時間執行,然后回傳 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); }
// 如果沒有指定失效時間(leaseTime=-1),則默認配置30秒 (getLockWatchdogTimeOut()=30) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 加鎖完畢之后,啟動看門狗執行緒,定時的延期失效時間(定時任務為 internalLockLeaseTime / 3 毫秒之后執行)
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { return; } // lock acquired if (ttlRemaining == null) {
// 啟動看門狗任務 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 通過lua腳本訪問Redis,保證操作的原子性, 以及達到批量操作的效果,提升性能
// KEYS[1] :需要加鎖的key,這里需要是字串型別,
// ARGV[1] :鎖的超時時間,防止死鎖
// ARGV[2] :鎖的唯一標識,id(UUID.randomUUID()) + “:” + threadId
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 檢查是否key已經被占用,如果沒有則設定超時時間和唯一標識,初始化value=https://www.cnblogs.com/dong320/archive/2020/11/17/1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " +
// 回傳剩余的過期時間 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
注:tryLock一般用于特定滿足需求的場合,但不建議作為一般需求的分布式鎖,一般分布式鎖建議用void lock(long leaseTime, TimeUnit unit),因為從性能上考慮,在高并發情況下后者效率是前者的好幾倍
- unlock方法
@Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> {
// 釋放鎖后取消重繪鎖失效時間的調度任務 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } // 非鎖的持有者釋放鎖時拋出例外 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; } // 通過 Lua 腳本執行 Redis 命令釋放鎖
// KEYS[1] :需要加鎖的key,這里需要是字串型別,
// KEYS[2] :redis訊息的ChannelName,一個分布式鎖對應唯一的一個channelName:“redisson_lock__channel__{” + getName() + “}”
// ARGV[1] :reids訊息體,這里只需要一個位元組的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖訊息的客戶端執行緒申請鎖,=
// ARGV[2] :鎖的超時時間,防止死鎖
// ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + “:” + threadId
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// key和field不匹配,說明當前客戶端執行緒沒有持有鎖,不能主動解鎖, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果counter>0說明鎖在重入,不能洗掉key "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " +
// 洗掉key并且publish 解鎖訊息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
使用 EVAL 命令執行 Lua 腳本來釋放鎖:
- key 不存在,說明鎖已釋放,直接執行
publish命令發布釋放鎖訊息并回傳1, - key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,回傳
nil, - 因為鎖可重入,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執行
hincrby對鎖的值減一, - 釋放一把鎖后,如果還有剩余的鎖,則重繪鎖的失效時間并回傳
0;如果剛才釋放的已經是最后一把鎖,則執行del命令洗掉鎖的 key,并發布鎖釋放訊息,回傳1,
注意這里有個實際開發程序中,容易出現很容易出現上面第二步例外,非鎖的持有者釋放鎖時拋出例外,
參考:
Redisson實作分布式鎖(1)---原理
Redisson實作分布式鎖(2)—RedissonLock
利用Redisson實作分布式鎖及其底層原理決議
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/222969.html
標籤:其他
