目錄
- 一、簡介
- 二、maven依賴
- 三、配置類
- 3.1、屬性配置
- 3.2、redis配置
- 3.3、redisson配置
- 四、分布式鎖使用
- 4.1、service層
- 4.2、controller層
- 五、組態檔application.yml
- 5.1、單節點配置
- 5.2、集群節點配置
- 六、測驗
- 6.1、多實體準備和Nginx配置
- 6.2、jemeter并發測驗
- 6.2.1、不加鎖并發結果
- 6.2.2、使用redisson加鎖結果
- 七、redisson原理(注釋都在代碼里)
- 7.1、獲取鎖物件
- 7.1.1、加鎖getLock()
- 7.2、加鎖lock()
- 7.2.1、加鎖lock()(重要)
- 7.2.2、tryAcquire()
- 7.2.3、tryAcquireAsync()
- 7.2.4、tryLockInnerAsync()(核心)
- 7.2.5、流程圖
- 7.3、釋放鎖unlock
- 7.3.1、unlock()
- 7.3.2、unlockAsync()
- 7.3.3、unlockInnerAsync()(核心)
- 7.3.4、流程圖
- 7.4、鎖的續期
- 7.4.1、scheduleExpirationRenewal()
- 7.4.2、renewExpiration()(重要)
- 7.4.3、renewExpirationAsync()(核心)
- 7.5、lua腳本
一、簡介
??在分布式系統中,為了保證同一時間只能由一個客戶端對共享資源操作,我們通常采用加鎖來實作,常見有三種方式:
- 基于資料庫實作的分布式鎖
- 基于zookeeper實作的分布式鎖
- 基于redis實作的分布式鎖
??而在使用redis實作時Redisson是官方推薦的一種方案也是我們今天的主角,
二、maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/>
</parent>
<groupId>com.alian</groupId>
<artifactId>redisson</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redisson</name>
<description>Spring Boot分布式鎖之redisson</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${parent.version}</version>
</dependency>
<!--redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${parent.version}</version>
</dependency>
<!--用于序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
</dependency>
<!--java 8時間序列化-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.9.10</version>
</dependency>
<!--JSON-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--redisson版本-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
??今天的主角就是redisson,如果關于redis不知道整合的可以參考我之前的文章:SpringBoot整合redis(redis支持單節點和集群),這里我就不過多的介紹了,
<!--redisson版本-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.3</version>
</dependency>
三、配置類
3.1、屬性配置
AppProperties.java
package com.alian.redisson.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "app.redisson")
public class AppProperties {
/**
* redis地址
*/
private String address;
/**
* redis資料庫
*/
private int database;
}
3.2、redis配置
??這里使用redis配置只是為了模擬分布式環境下資料庫減庫存,
RedisConfig.java
package com.alian.redisson.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
@Configuration
public class RedisConfig {
/**
* redis配置
*
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
// 實體化redisTemplate
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
//設定連接工廠
redisTemplate.setConnectionFactory(redisConnectionFactory);
// key采用String的序列化
redisTemplate.setKeySerializer(keySerializer());
// value采用jackson序列化
redisTemplate.setValueSerializer(valueSerializer());
// Hash key采用String的序列化
redisTemplate.setHashKeySerializer(keySerializer());
// Hash value采用jackson序列化
redisTemplate.setHashValueSerializer(valueSerializer());
//執行函式,初始化RedisTemplate
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* key型別采用String序列化
*
* @return
*/
private RedisSerializer<String> keySerializer() {
return new StringRedisSerializer();
}
/**
* value采用JSON序列化
*
* @return
*/
private RedisSerializer<Object> valueSerializer() {
//設定jackson序列化
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
//設定序列化物件
jackson2JsonRedisSerializer.setObjectMapper(getMapper());
return jackson2JsonRedisSerializer;
}
/**
* 使用com.fasterxml.jackson.databind.ObjectMapper
* 對資料進行處理包括java8里的時間
*
* @return
*/
private ObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
//設定可見性
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//默認鍵入物件
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
//設定Java 8 時間序列化
JavaTimeModule timeModule = new JavaTimeModule();
timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
//禁用把時間轉為時間戳
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.registerModule(timeModule);
return mapper;
}
}
3.3、redisson配置
RedissonConfig.java
package com.alian.redisson.config;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RedissonConfig {
private static final String COMMA = ",";
@Autowired
private AppProperties appProperties;
@Bean
public Redisson redisson() {
//實體化redisson配置實體
Config config = new Config();
//獲取組態檔的配置的地址
String address = appProperties.getAddress();
//如果配置項中包含逗號,我們認為是集群模式(也就是相當于內部一個規定,沒必要去較真哦,這里哨兵模式就不考慮了)
if (address != null && address.contains(COMMA)) {
//通過逗號分隔為字符陣列
String[] nodeAddress = address.split(COMMA);
//集群模式
config.useClusterServers().addNodeAddress(nodeAddress).setScanInterval(5000);
} else {
//單機模式
config.useSingleServer().setAddress(address).setDatabase(appProperties.getDatabase());
}
//設定加鎖時間,默認情況下,加鎖的時間是30秒,此處單位為毫秒(實際中你還可以設定)
config.setLockWatchdogTimeout(appProperties.getLeaseTime());
return (Redisson) Redisson.create(config);
}
}
四、分布式鎖使用
4.1、service層
??模擬啟動系統初始化庫存為1000,要注意的是setIfAbsent方法,表示不存在就設定,存在就不會設定了,
RedisService.java
package com.alian.redisson.service;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Slf4j
@Service
public class RedisService {
private final String stock = "com.alian.redisson.stock";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void init() {
log.info("模擬初始化庫存為:1000");
redisTemplate.opsForValue().setIfAbsent(stock, 1000);
log.info("初始化庫存完成");
}
}
??具體減庫存業務實作
RedissonLockService.java
package com.alian.redisson.service;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class RedissonLockService {
private final String stock = "com.alian.redisson.stock";
@Autowired
private Redisson redisson;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 使用redisson分布式鎖
*
* @return
*/
public int deductInventoryWithLock() {
//簡單定義鎖物件,實際中可以對訂單號(202110141605211234565)或者其他組合(幾個條件一起)作為業務的鎖
String lockKey = "redisson_lock";
// 獲取鎖物件
RLock lock = redisson.getLock(lockKey);
try {
// 加鎖
lock.lock();
//獲取快取里的庫存數量
Integer stockNum = (Integer) redisTemplate.opsForValue().get(stock);
if (stockNum != null && stockNum > 0) {
//扣減庫存
stockNum--;
//快取資料
redisTemplate.opsForValue().set(stock, stockNum);
//更新資料庫等其他操作
log.info("使用redisson鎖庫存減1,剩余:{}", stockNum);
return stockNum;
} else {
log.info("使用redisson鎖,獲取庫存例外或者已經沒有庫存了");
return 0;
}
} finally {
try {
// 釋放鎖
lock.unlock();
} catch (Exception e) {
log.error("釋放鎖例外",e);
}
}
}
/**
* 不使用任何鎖
*
* @return
*/
public int deductInventoryWithOutLock() {
//獲取快取里的庫存數量
Integer stockNum = (Integer) redisTemplate.opsForValue().get(stock);
if (stockNum != null && stockNum > 0) {
//扣減庫存
stockNum--;
//快取資料
redisTemplate.opsForValue().set(stock, stockNum);
//更新資料庫等其他操作
log.info("不加鎖庫存減1,剩余:{}", stockNum);
return stockNum;
} else {
log.info("不加鎖,獲取庫存例外或者已經沒有庫存了");
return 0;
}
}
}
4.2、controller層
RedissonController.java
package com.alian.redisson.controller;
import com.alian.redisson.service.RedissonLockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RequestMapping("/test")
@RestController
public class RedissonController {
@Autowired
RedissonLockService redissonLockService;
@RequestMapping("/deductWithLock")
public int deductInventoryWithLock() {
return redissonLockService.deductInventoryWithLock();
}
@RequestMapping("/deductWithOutLock")
public int deductInventoryWithOutLock() {
return redissonLockService.deductInventoryWithOutLock();
}
}
五、組態檔application.yml
如果是使用我的配置類,呼叫加鎖時不傳入任何引數,則默認值還是一定要配置的,
否則就會出現
java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: c305b4a2-42fe-4384-9e06-309675d21258 thread-id: 86
因為config.setLockWatchdogTimeout();會設定為0,鎖設定的時間0小于了被鎖程式的執行時間,導致redisson解鎖時,鎖已經因為超時被釋放掉了,故拋出改例外
5.1、單節點配置
server:
port: 8084
servlet:
context-path: /redisson
tomcat:
accept-count: 200
threads:
max: 500
spring:
redis:
database: 0
host: 192.168.0.193
port: 6379
password:
timeout: 1000
jedis:
pool:
max-active: 10
min-idle: 5
max-idle: 10
max-wait: -1
app:
redisson:
database: 0
address: redis://192.168.0.193:6379
lease-time: 30000
5.2、集群節點配置
server:
port: 8084
servlet:
context-path: /redisson
tomcat:
accept-count: 200
threads:
max: 500
spring:
redis:
cluster:
nodes: 192.168.0.111:6379,192.168.0.112:6379,192.168.0.113:6379,192.168.0.101:6379,192.168.0.102:6379,192.168.0.103:6379,192.168.0.114:6379,192.168.0.104:6379
app:
redisson:
database: 0
address: redis://192.168.0.111:6379,redis://192.168.0.112:6379,redis://192.168.0.113:6379,redis://192.168.0.101:6379,redis://192.168.0.102:6379,redis://192.168.0.103:6379,redis://192.168.0.114:6379,redis://192.168.0.104:6379
lease-time: 30000
六、測驗
6.1、多實體準備和Nginx配置
??既然是分布式,所以我們的請求能通過nginx發送到不同的實體上,在idea下,我們分別啟動兩個實體,埠分別為8086、8087,如果對于多實體啟動及nginx配置這塊不懂的可以參考另一篇文章:windows下Nginx配置及負載均衡使用,里面有詳細的介紹,linux下也差不多,案例寫的是windows而已,
#nginx轉發配置
location ~ ^/redisson/ {
proxy_redirect off;
#埠
proxy_set_header Host $host;
#遠程地址
proxy_set_header X-Real-IP $remote_addr;
#程式可獲取遠程ip地址
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
#此處會用的upstream.conf,此檔案在nginx.conf已經引入了
proxy_pass http://redisson-lock;
}
#負載均衡配置
upstream redisson-lock {
server 127.0.0.1:8086 ;
server 127.0.0.1:8087 ;
}
6.2、jemeter并發測驗
??既然是分布式的并發,所以我們使用測驗工具jemeter來進行測驗,因為結果太多,我們使用10個執行緒進行并發測驗,
6.2.1、不加鎖并發結果
我們請求介面為:http://localhost/redisson/test/deductWithOutLock
-
埠為8086實體輸出結果
2021-10-16 19:01:36 362 [http-nio-8086-exec-6] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 363 [http-nio-8086-exec-1] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 363 [http-nio-8086-exec-2] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 363 [http-nio-8086-exec-5] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 363 [http-nio-8086-exec-3] INFO :不加鎖庫存減1,剩余:999 -
埠為8087實體輸出結果
2021-10-16 19:01:36 362 [http-nio-8087-exec-1] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 362 [http-nio-8087-exec-2] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 363 [http-nio-8087-exec-3] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 363 [http-nio-8087-exec-4] INFO :不加鎖庫存減1,剩余:999
2021-10-16 19:01:36 364 [http-nio-8087-exec-5] INFO :不加鎖庫存減1,剩余:999
從結果我們可以看到當10個執行緒并發請求時,分別轉發到兩個實體上了,nginx是沒有問題的,為什么是這個結果呢?因為當庫存還沒有扣減更新時,獲取的庫存都是1000,然后都做庫存減1的操作,變成999,當然也和我機器配置高有關系,你有可能會得到998或997等等,
6.2.2、使用redisson加鎖結果
-
埠為8086實體輸出結果
2021-10-16 19:11:29 737 [http-nio-8087-exec-9] INFO :使用redisson鎖庫存減1,剩余:999 2021-10-16 19:11:29 804 [http-nio-8087-exec-5] INFO :使用redisson鎖庫存減1,剩余:996 2021-10-16 19:11:29 827 [http-nio-8087-exec-3] INFO :使用redisson鎖庫存減1,剩余:995 2021-10-16 19:11:29 842 [http-nio-8087-exec-1] INFO :使用redisson鎖庫存減1,剩余:994 2021-10-16 19:11:29 884 [http-nio-8087-exec-6] INFO :使用redisson鎖庫存減1,剩余:991 -
埠為8087實體輸出結果
2021-10-16 19:11:29 772 [http-nio-8086-exec-1] INFO :使用redisson鎖庫存減1,剩余:998 2021-10-16 19:11:29 787 [http-nio-8086-exec-5] INFO :使用redisson鎖庫存減1,剩余:997 2021-10-16 19:11:29 855 [http-nio-8086-exec-4] INFO :使用redisson鎖庫存減1,剩余:993 2021-10-16 19:11:29 866 [http-nio-8086-exec-3] INFO :使用redisson鎖庫存減1,剩余:992 2021-10-16 19:11:29 901 [http-nio-8086-exec-6] INFO :使用redisson鎖庫存減1,剩余:990
從結果看到,我們扣減庫存是正確的,可能會有小伙伴說
//獲取快取里的庫存數量
Integer stockNum = (Integer) redisTemplate.opsForValue().get(stock);
//扣減庫存
stockNum--;
//快取資料
redisTemplate.opsForValue().set(stock, stockNum);
就不能寫成下面這個就解決了問題么?
redisTemplate.opsForValue().decrement(stock);
兄弟,我這里只是一個模擬業務的程序,比如你的業務里可能還有會有對資料庫的操作等以及key的超時等,分多步執行,每一步都會有并發的情況,也可能每一步都可能出現例外,我們的著重點在用分布式鎖,也就是三步,
- 獲取鎖
- 加鎖
- 釋放鎖
七、redisson原理(注釋都在代碼里)
每個版本原始碼都有不同,我這里redisson版本是3.16.3
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.3</version>
</dependency>
7.1、獲取鎖物件
7.1.1、加鎖getLock()
首先是:RLock lock = redisson.getLock(lockKey);
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
}
從這里我們知道默認實作是用的:org.redisson.RedissonLock
7.2、加鎖lock()
7.2.1、加鎖lock()(重要)
然后是加鎖方法:lock.lock();
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//獲取執行緒Id
long threadId = Thread.currentThread().getId();
//嘗試獲取鎖
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired即獲取鎖
if (ttl == null) {
return;
}
// 異步訂閱redis chennel
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
//同步訂閱中斷
commandExecutor.syncSubscriptionInterrupted(future);
} else {
// 同步訂閱
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 回圈嘗試獲取鎖
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired,也就是獲取到了鎖
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消訂閱
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
7.2.2、tryAcquire()
接下里我們看tryAcquire方法,意思是嘗試獲取鎖,
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// get(future)實作同步,通過異步獲取鎖tryAcquireAsync
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
7.2.3、tryAcquireAsync()
接下里是tryAcquireAsync方法,嘗試異步方式獲取鎖,
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
//1 如果設定了超時時間,直接呼叫 tryLockInnerAsync
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//如果leaseTime==-1,則默認超時時間為30s,本文中的配置類可修改默認值:config.setLockWatchdogTimeout()
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//監聽Future,獲取Future回傳值ttlRemaining(剩余過期時間),獲取鎖成功,且ttlRemaining為空leaseTime為-1,則重繪過期時間
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired,也就是獲取到了鎖
if (ttlRemaining == null) {
if (leaseTime != -1) {
//過期時間轉為毫秒
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//計劃到期續期,也就是重繪過期時間(重要)
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
7.2.4、tryLockInnerAsync()(核心)
下面就是最重要的方法:tryLockInnerAsync,意思是嘗試異步方式獲取內部鎖
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//此處是判斷key是否存在,這里的KEYS[1]是我們傳入的鎖的名字
"if (redis.call('exists', KEYS[1]) == 0) then " +
//ARGV[2]值是getLockName(threadId),得到型別UUID+執行緒Id的欄位
//key不存在,則在redis中key為KEYS[1],欄位ARGV[2]設定值為1,(資料結構為Hash)
//相當于Hash型別, redisTemplate.opsForHash().increment(key,欄位,欄位的值)
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//設定key的過期日期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//key存在,則判斷ARGV[2]的值是否相等,也就是1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//值相等,設定一個欄位ARGV[2]的值為1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重新設定過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//回傳key的生存時間,后續監聽用
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
其中KEYS[]和ARGV[]實際是個占位符,具體的引數得看下面這個單位
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
7.2.5、流程圖

7.3、釋放鎖unlock
7.3.1、unlock()
首先我們看下lock.unlock(),
public void unlock() {
try {
//同步get(future),里面通過異步釋放鎖實作
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
7.3.2、unlockAsync()
接下里看下unlockAsync方法
@Override
public RFuture<Void> unlockAsync(long threadId) {
//給redisson一個承諾
RPromise<Void> result = new RedissonPromise<>();
//異步方式釋放內部鎖
RFuture<Boolean> future = unlockInnerAsync(threadId);
//取消看門狗機制
future.onComplete((opStatus, e) -> {
//取消到期續約
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);
7.3.3、unlockInnerAsync()(核心)
釋放鎖最重要的方法unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判斷是否存在key,也就是我們鎖的名字,此ARGV[3]值是getLockName(threadId),得到型別UUID+執行緒Id的欄位
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//存在key,則設定一個欄位ARGV[3]的值減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//判斷結果是否大于0
"if (counter > 0) then " +
//ARGV[2]過期時間
//大于0,則重新設定過期時間
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
//回傳0
"return 0; " +
"else " +
//小于0則洗掉key
"redis.call('del', KEYS[1]); " +
//KEYS[2]為getChannelName(),也就是:KEYS[1]__channel:{KEYS[1]}
//ARGV[1]為LockPubSub.unlckMessage,也就是0
//想通道中廣播一個訊息ARGV[1],內容也就是0
"redis.call('publish', KEYS[2], ARGV[1]); " +
//回傳1
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
同樣的引數KEYS[]和ARGV[],主要還是看下面這個
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
7.3.4、流程圖

7.4、鎖的續期
在tryAcquireAsync方法中有個看門狗,里面有段代碼如下:
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
//過期時間轉為毫秒
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//計劃到期續期,也就是重繪過期時間(重要)
scheduleExpirationRenewal(threadId);
}
}
});
7.4.1、scheduleExpirationRenewal()
這里的scheduleExpirationRenewal(threadId);就是實作了鎖的續期,我們進去看看
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
//續期(重要)
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
7.4.2、renewExpiration()(重要)
我們看下renewExpiration方法,
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//timeTask() 任務,過期時間的三分之一的時間開始執行,
//如果說起時間是30秒,那么第10秒的時候執行,再次設定為30秒,實際上對于這個key過期時間就變成了10秒+30秒=40秒
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself 也就是執行成功再次呼叫
renewExpiration();
} else {
//取消到期續期
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
7.4.3、renewExpirationAsync()(核心)
重要方法
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//KEYS[1]是鎖的名字, ARGV[2]是getLockName(threadId),得到型別UUID+執行緒Id的欄位
//是否存在一個KEYS[1]物件,它的欄位ARGV[2]的值為1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//存在則設定過期時間,ARGV[1]為過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
7.5、lua腳本
??上面的鎖的實作都用到了lua腳本,因為redis執行時是單執行緒的,redis會保證這個lua腳本的原子性,他會把這個lua腳本當成一行代碼去執行,要么全部成功,要么全部失敗,其中nil 型別表示一種沒有任何有效值,它只有一個值nil,關于lua腳本大家可以去相關教程學習,我這里就簡單寫本文里見到的兩個語法,
if(布爾運算式)
then
--[ 在布爾運算式為 true 時執行的陳述句 --]
end
if(布爾運算式)
then
--[ 布爾運算式為 true 時執行該陳述句塊 --]
else
--[ 布爾運算式為 false 時執行該陳述句塊 --]
end
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/325527.html
標籤:其他
上一篇:如何對以逗號分隔的數字進行排序
