springboot-redis-redisson分布式鎖
- 一、Redis分布式鎖實作原理
- 簡介
- 1.普通分布式鎖
- 2.哨兵模式
- 3.集群模式
- 唯一ID
- 看門狗Watchdog
- 可重入鎖
- 獲取鎖
- 釋放鎖
- 二、 完整代碼
- 依賴
- application.properties配置
- 配置類
- 鎖后業務介面規范
- redis加鎖介面規范
- redis加鎖實作類
- 自定義例外
- 測驗類
比較:redis很明顯優于zookeeper;就分布式鎖實作的健壯性而言,zookeeper很明顯優于redis,如何選擇,取決于你的業務!
一、Redis分布式鎖實作原理

簡介
Redis的4種部署架構有:
- 單機模式;
- 主從模式;
- 哨兵模式;
- 集群模式;
Redlock分布式鎖在非單機模式下才有意義,單機模式下可以直接使用普通分布式鎖,
因為RedLock的實作完全基于普通分布式鎖;
RedLock加鎖程序: 向所有節點發送 set key value ex time nx指令,只要過半節點set成功,就認為加鎖成功,
RedLock釋放鎖程序: 向所有節點發送del指令;
因為RedLock操作多個節點,所以效率略有下降;但避免了網路磁區產生的多鎖同時存在的情況;
1.普通分布式鎖
// 構造redisson實作分布式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("1.9.1.0:5379").setPassword("123").setDatabase(0);
// 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 設定鎖定資源名稱
RLock disLock = redissonClient.getLock("DISLOCK");
boolean isLock;
try {
//嘗試獲取分布式鎖
isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {
//TODO if get lock success, do something;
Thread.sleep(15000);
}
} catch (Exception e) {
} finally {
// 無論如何, 最后都要解鎖
disLock.unlock();
}
通過代碼可知,經過Redisson的封裝,實作Redis分布式鎖非常方便,我們再看一下Redis中的value是啥,和前文分析一樣,hash結構,key就是資源名稱,field就是UUID+threadId,value就是重入值,在分布式鎖時,這個值為1(Redisson還可以實作重入鎖,那么這個值就取決于重入次數了):
172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"
2.哨兵模式
即sentinel模式,實作代碼和單機模式幾乎一樣,唯一的不同就是Config的構造:
Config config = new Config();
config.useSentinelServers().addSentinelAddress(
"redis://127.0.0.1:26378","redis://127.0.0.1:26379", "redis://127.0.0.1:26380")
.setMasterName("mymaster")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 還可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
isLock = redLock.tryLock();
// 500ms拿不到鎖, 就認為獲取鎖失敗,10000ms即10s是鎖失效時間,
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
if (isLock) {
//TODO if get lock success, do something;
}
} catch (Exception e) {
} finally {
// 無論如何, 最后都要解鎖
redLock.unlock();
}
3.集群模式
集群模式構造Config如下:
Config config = new Config();
config.useClusterServers().addNodeAddress(
"redis://127.0.0.1:6375","redis://127.0.0.1:6376", "redis://127.0.0.1:6377",
"redis://127.0.0.1:6378","redis://127.0.0.1:6379", "redis://127.0.0.1:6380")
.setPassword("123").setScanInterval(5000);
RedissonClient redissonClient = Redisson.create(config);
// 還可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
isLock = redLock.tryLock();
// 500ms拿不到鎖, 就認為獲取鎖失敗,10000ms即10s是鎖失效時間,
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
if (isLock) {
//TODO if get lock success, do something;
}
} catch (Exception e) {
} finally {
// 無論如何, 最后都要解鎖
redLock.unlock();
}
如果是多個沒有關系的redis單機之間使用RedLock來做分布式鎖
可以直接將各個單機的鎖組成一個鎖:
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
唯一ID
實作分布式鎖的一個非常重要的點就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId,入口在redissonClient.getLock(“REDLOCK_KEY”),原始碼在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
return id + ":" + threadId;
}
看門狗Watchdog

看門狗是客戶端的行為,創建鎖后每隔10秒幫你把key的超時時間設為30s這樣的話,就算一直持有鎖也不會出現key過期了,其他執行緒獲取到鎖的問題了,
lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
默認值:30000
監控鎖的看門狗超時時間單位為毫秒,該引數只適用于分布式鎖的加鎖請求中未明確使用leaseTimeout引數的情況,如果該看門狗未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態,這個引數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況,
可重入鎖
key是唯一的,key里面的field為uuid:執行緒Id,uuid由呼叫系統生成,一個系統中同一個鎖只會用同一個uuid,不通系統的相同鎖的uuid不一樣,
所以根據uuid+執行緒id就可以確定當前執行緒是否擁有鎖,如果擁有,就把value+1,釋放鎖就-1;
獲取鎖
獲取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心原始碼都是下面這段代碼,只不過前者獲取鎖的默認租約時間(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 獲取鎖時向5個redis實體發送的命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式鎖的KEY不能存在,如果確實不存在,那么執行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通過pexpire設定失效時間(也是鎖的租約時間)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式鎖的KEY已經存在,并且value也匹配,表示是當前執行緒持有的鎖,那么重入次數加1,并且設定失效時間
"if (redis.call('hexists', KEYS[1], [2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 獲取分布式鎖的KEY的失效時間毫秒數
"return redis.call('pttl', KEYS[1]);",
// 這三個引數分別對應KEYS[1],ARGV[1]和ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
獲取鎖的命令中,
KEYS[1]就是Collections.singletonList(getName()),表示分布式鎖的key,即REDLOCK_KEY;
ARGV[1]就是internalLockLeaseTime,即鎖的租約時間,默認30s;
ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值,即UUID+threadId:
釋放鎖
釋放鎖的代碼為redLock.unlock(),核心原始碼如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 向5個redis實體都執行如下命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式鎖KEY不存在,那么向channel發布一條訊息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式鎖存在,但是value不匹配,表示鎖已經被占用,那么直接回傳
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是當前執行緒占有分布式鎖,那么將重入次數減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次數減1后的值如果大于0,表示分布式鎖有重入過,那么只設定失效時間,還不能洗掉
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次數減1后的值如果為0,表示分布式鎖只獲取過1次,那么洗掉這個KEY,并發布解鎖訊息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 這5個引數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
二、 完整代碼
依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>springboot-redis</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-redis</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties配置
# Redis資料庫索引(默認為0)
spring.redis.database=0
# Redis服務器地址
spring.redis.host=192.168.1.7
#spring.redis.cluster.nodes=127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005
# Redis服務器連接埠
spring.redis.port=6379
# Redis服務器連接密碼(默認為空)
#spring.redis.password=123
# 連接池最大連接數(使用負值表示沒有限制)
spring.redis.jedis.pool.max-active=8
# 連接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.jedis.pool.max-wait=
# 連接池中的最大空閑連接
spring.redis.jedis.pool.max-idle=200
# 連接池中的最小空閑連接
spring.redis.jedis.pool.min-idle=0
# 連接超時時間(毫秒)
spring.redis.timeout=1000
配置類
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Redis組態檔的引數
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.redis")
public class RedisProperties {
private String host;
private int port;
private String password;
private int database;
}
Redission配置類,主要作用就是建立redis連接,生成client物件
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Redission配置類,主要作用就是建立redis連接,生成client物件
*/
@Slf4j
@Configuration
public class RedissionConfig {
@Autowired
private RedisProperties redisProperties;
@Bean
public RedissonClient redissonClient() {
RedissonClient redissonClient;
Config config = new Config();
String url = redisProperties.getHost() + ":" + redisProperties.getPort();
config.useSingleServer().setAddress(url)
.setPassword(redisProperties.getPassword())
.setDatabase(redisProperties.getDatabase());
try {
redissonClient = Redisson.create(config);
return redissonClient;
} catch (Exception e) {
log.error("RedissonClient init redis url:[{}], Exception:", url, e);
return null;
}
}
}
鎖后業務介面規范
規定了獲得鎖后,如何呼叫業務邏輯
/**
* 鎖后業務介面規范
*/
public interface AquiredLockWorker<T> {
T invokeAfterLockAquire() throws Exception;
}
redis加鎖介面規范
規定了如何呼叫加鎖規范
/**
* redis加鎖介面規范
*/
public interface RedisLocker {
/**
* 獲取鎖
* @param lockName 鎖的名稱
* @param worker 獲取鎖后的處理類
* @param <T>
* @return 處理完具體的業務邏輯要回傳的資料
* @throws UnableToAquireLockException
* @throws Exception
*/
<T> T lock(String lockName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception;
<T> T lock(String lockName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception;
}
redis加鎖實作類
實作了如何加鎖、釋放鎖;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* redis加鎖實作類
*/
@Component
public class RedisLockerImpl implements RedisLocker {
private final static String LOCKER_PREFIX = "lock:";
@Resource(name = "redissonClient")
private RedissonClient redissonClient;
@Override
public <T> T lock(String lockName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception {
return lock(lockName, worker, 100);
}
@Override
public <T> T lock(String lockName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception {
//加鎖
RLock lock = redissonClient.getLock(LOCKER_PREFIX + lockName);
// Wait for 100 seconds seconds and automatically unlock it after lockTime seconds
//加鎖等待100秒,100秒后仍未獲得鎖則回傳false,獲得鎖后lockTime后自動釋放鎖,
boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);
if (success) {
try {
return worker.invokeAfterLockAquire();
} finally {
lock.unlock();
}
}
throw new UnableToAquireLockException();
}
}
自定義例外
/**
* 例外類
*/
public class UnableToAquireLockException extends RuntimeException {
public UnableToAquireLockException() {
}
public UnableToAquireLockException(String message) {
super(message);
}
public UnableToAquireLockException(String message, Throwable cause) {
super(message, cause);
}
}
測驗類
生成多個執行緒,進行鎖競爭,完成業務邏輯,
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* @Author ccl
* @Date 2021/1/17 14:18
*/
@Slf4j
@RestController
public class LockTestController {
@Autowired
private RedisLocker redisLocker;
@RequestMapping(value = "/redlock")
public String testRedlock() throws Exception {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(5);
for (int i = 0; i < 5; ++i) {
// create and start threads
new Thread(() -> {
try {
startSignal.await();
//主要代碼在這里,第一個引數為鎖的名字,第二個引數為加鎖后要執行的業務
redisLocker.lock("test", () -> {
doTask();
doneSignal.countDown();
return null;
});
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
startSignal.countDown(); // let all threads proceed
doneSignal.await();
log.info("All processors done. Shutdown connection");
return "redlock";
}
/**
* 加鎖后要執行的業務邏輯
*/
void doTask() {
log.info(Thread.currentThread().getName() + " start");
Random random = new Random();
int _int = random.nextInt(200);
log.info(Thread.currentThread().getName() + " sleep{}millis", _int);
try {
Thread.sleep(_int);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("{}end", Thread.currentThread().getName());
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/250202.html
標籤:其他
