前言
上一篇文章中,著重介紹了執行緒池的搭建和jdk8發起任務的API CompletableFuture,說這些事情的意義是什么,強調的是在面對大量請求的時候,為了更好地支持并發、管控資源,咱們使用手動創建執行緒池的方法把許多的任務牢牢地拿在手中, 本篇文章介紹另外一個知識點,分布式鎖,鎖這個概念大家都不陌生,java提供了volatile鎖、synchronized鎖、Lock鎖等等為我們解決執行緒的安全問題,但是除了這些之外,還存在一個非常重要的概念,分布式鎖,下面將從為什么需要分布式鎖、如何利用springboot框架、redisson框架結合redis實作分布式鎖,展開一、為什么需要分布式鎖
在開發應用時,當多個客戶或者多個執行緒需要對某個共享的資料進行操作時,就需要使用執行緒同步,在Java開發中,對于單機應用,因為是在同一個JVM內部,所以我們可以采用Java提供的各種多執行緒操作的技巧來實作執行緒同步, 而對于分布式系統來說,由于多個請求可能被分發到不同的機器上去處理,如果這多個請求都是對同一個資源進行操作,那么使用基本的Java多執行緒執行緒同步技術可能就解決不了這個問題, 大家能理解上面兩段話嗎,假設我現在有一個共享的變數存在于資料庫,我將它抽象為A,如果我只是單機模式訪問這個A,那么在操作這個大A的地方加一個synchronized鎖是一點問題都沒有的,執行緒都需要排隊去操作A, 但問題在于,如果不是單機模式這就顯然可能出現問題的,比方兩臺服務都可以去操作A,及時你給每個操作A的服務都在操作A的地方增加了分布式鎖,那么依舊可能出現多個執行緒同時操作A的情形,畢竟兩個服務之間又不感知!
如上圖,請求A、B、C都是發起扣減同一個商品的庫存操作,三個請求被分發到三臺不同的服務部署機器上進行處理,而三臺機器并不在同一個JVM,所以Java提供的執行緒同步技巧就發揮不了作用了,但是對于扣減庫存這樣的場景,必須要使用執行緒同步來保證同一個商品的庫存不會被漏扣或者多扣,
為了保證在高并發的場景下,臨界資源(共享資源)同時只能被一個執行緒執行,在傳統單體應用單機部署的情況下,可以使用Java并發處理相關的API(如ReentrantLock或Synchronized)進行互斥控制,在單機環境中,Java中提供了很多并發處理相關的API,
但是在分布式系統中,由于分布式系統多執行緒、多行程并且分布在不同機器上,這將使原單機部署情況下的并發控制鎖策略失效,單純的Java API并不能提供分布式鎖的能力,為了解決這個問題就需要一種跨JVM的互斥機制來控制共享資源的訪問,這就是分布式鎖要解決的問題!
二、springboot整合redisson實作分布式鎖
1.引入依賴、撰寫啟動類、撰寫組態檔
首先來引入下依賴,其實不需要這么多,我太懶了,就不改了
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<!--web專案-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--使用Redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.9.1</version>
</dependency>
<!--測驗-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--lombok簡化pojo代碼-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
<version>2.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
撰寫啟動類
package com.cmdc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RedissonApplication {
/**
* 啟動類方法
* @param args 引數
*/
public static void main(String[] args) {
SpringApplication.run(RedissonApplication.class,args);
}
}
撰寫springboot默認會讀取的組態檔application.yml(其實你不寫也會走默認)
server:
port: 8111
spring:
application:
name: springboot-redisson
撰寫redisson需要的組態檔redisson-config.yml
#Redisson配置
singleServerConfig:
address: "redis://127.0.0.1:6379"
password: null
clientName: null
database: 7 #選擇使用哪個資料庫0~15
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
reconnectionTimeout: 3000
failedAttempts: 3
subscriptionsPerConnection: 5
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 32
connectionPoolSize: 64
dnsMonitoringInterval: 5000
#dnsMonitoring: false
threads: 0
nettyThreads: 0
codec:
class: "org.redisson.codec.JsonJacksonCodec"
transportMode: "NIO"
ok.現在專案長下面這個樣子

2.整合
1.撰寫類RedissonConfig,將redisson的核心類RedissonClient交給spring管理,
package com.cmdc.config;
import com.cmdc.lockimpl.RedissonDistributeLocker;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* redisson bean管理
*/
@Configuration
public class RedissonConfig {
/**
* Redisson客戶端注冊
* 單機模式
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient createRedissonClient() throws IOException {
// Config config = new Config();
// SingleServerConfig singleServerConfig = config.useSingleServer();
// singleServerConfig.setAddress("redis://127.0.0.1:6379");
// singleServerConfig.setPassword("12345");
// singleServerConfig.setTimeout(3000);
// return Redisson.create(config)
// 本例子使用的是yaml格式的組態檔,讀取使用Config.fromYAML,如果是Json檔案,則使用Config.fromJSON
Config config = Config.fromYAML(RedissonConfig.class.getClassLoader().getResource("redisson-config.yml"));
return Redisson.create(config);
}
/**
* 主從模式 哨兵模式
*
**/
/* @Bean
public RedissonClient getRedisson() {
RedissonClient redisson;
Config config = new Config();
config.useMasterSlaveServers()
//可以用"rediss://"來啟用SSL連接
.setMasterAddress("redis://***(主服務器IP):6379").setPassword("web2017")
.addSlaveAddress("redis://***(從服務器IP):6379")
.setReconnectionTimeout(10000)
.setRetryInterval(5000)
.setTimeout(10000)
.setConnectTimeout(10000);//(連接超時,單位:毫秒 默認值:3000);
// 哨兵模式config.useSentinelServers().setMasterName("mymaster").setPassword("web2017").addSentinelAddress("***(哨兵IP):26379", "***(哨兵IP):26379", "***(哨兵IP):26380");
redisson = Redisson.create(config);
return redisson;
}*/
}
撰寫類DistributeLocker,這是一個介面,用來操作redisson提供給我們的核心類RedissonClient,
package com.cmdc.abstractlock;
import java.util.concurrent.TimeUnit;
/**
*
*/
public interface DistributeLocker {
/**
* 加鎖
* @param lockKey key
*/
void lock(String lockKey);
/**
* 加鎖鎖,設定有效期
*
* @param lockKey key
* @param timeout 有效時間,默認時間單位在實作類傳入
*/
void lock(String lockKey, int timeout);
/**
* 加鎖,設定有效期并指定時間單位
* @param lockKey key
* @param timeout 有效時間
* @param unit 時間單位
*/
void lock(String lockKey, int timeout, TimeUnit unit);
/**
* 釋放鎖
*
* @param lockKey key
*/
void unlock(String lockKey);
/**
* 嘗試獲取鎖,獲取到則持有該鎖回傳true,未獲取到立即回傳false
* @param lockKey 鎖
* @return true-獲取鎖成功 false-獲取鎖失敗
*/
boolean tryLock(String lockKey);
/**
* 嘗試獲取鎖,獲取到則持有該鎖leaseTime時間.
* 若未獲取到,在waitTime時間內一直嘗試獲取,超過waitTime還未獲取到則回傳false
* @param lockKey key
* @param waitTime 嘗試獲取時間
* @param leaseTime 鎖持有時間
* @param unit 時間單位
* @return true-獲取鎖成功 false-獲取鎖失敗
* @throws InterruptedException e
*/
boolean tryLock(String lockKey, long waitTime, long leaseTime, TimeUnit unit)
throws InterruptedException;
/**
* 鎖是否被任意一個執行緒鎖持有
* @param lockKey 鎖
* @return true-被鎖 false-未被鎖
*/
boolean isLocked(String lockKey);
/**
* isHeldByCurrentThread()的作用是查詢當前執行緒是否保持此鎖定
* @param lockKey 鎖
* @return true or false
*/
boolean isHeldByCurrentThread(String lockKey);
}
下面將其實作,撰寫DistributeLocker的實作類RedissonDistributeLocker,將上述方法全部實作,方法的作用介面類中已經描述的比較清楚了,咱們這把不研究API,感興趣的童鞋自己看看鴨
package com.cmdc.lockimpl;
import com.cmdc.abstractlock.DistributeLocker;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
/**
* redisson實作分布式鎖介面
*/
public class RedissonDistributeLocker implements DistributeLocker {
private final RedissonClient redissonClient;
/**
* 構造方法 賦予本類的redisClient以實體
* @param redissonClient client
*/
public RedissonDistributeLocker(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
@Override
public void lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
}
@Override
public void lock(String lockKey, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(leaseTime, TimeUnit.MILLISECONDS);
}
@Override
public void lock(String lockKey, int timeout, TimeUnit unit) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
@Override
public boolean tryLock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock();
}
@Override
public boolean tryLock(String lockKey, long waitTime, long leaseTime,
TimeUnit unit) throws InterruptedException {
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock(waitTime, leaseTime, unit);
}
@Override
public boolean isLocked(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.isLocked();
}
@Override
public boolean isHeldByCurrentThread(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.isHeldByCurrentThread();
}
}
ok,將RedissonDistributeLocker交給spring進行管理,別問,問就是習慣了~
這個需要在之前的配置類RedissonConfig中增加一個注入的方法,
@Bean
public RedissonDistributeLocker redissonLocker(RedissonClient redissonClient) {
// redissonClient 是本來就由redisson提供給我們,我們創建RedissonDistributeLocker實體交給spring進行管理
RedissonDistributeLocker locker = new RedissonDistributeLocker(redissonClient);
return locker;
}
繼續,增加一個注解RedissonLockAnnotation用于標記需要用上分布式鎖的方法以及提供表示鎖的字串
package com.cmdc.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 分布式鎖自定義注解
* 注解在方法
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonLockAnnotation {
/**
* 指定組成分布式鎖的key
* @return 分布式鎖的key
*/
String lockRedisKey();
}
將這個注解AOP增強,賦予它生命
package com.cmdc.annotationimpl;
import com.cmdc.abstractlock.DistributeLocker;
import com.cmdc.annotation.RedissonLockAnnotation;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 分布式鎖的 aop
*
* 無論是否拋出例外,也無論從什么地方return回傳,finally陳述句塊總是會執行,這樣你有機會呼叫Close來關閉資料庫連接(即使未打開或打開失敗,關閉操作永遠是可以執行的),以便于釋放已經產生的連接,釋放資源,
*
* 順便說明,return是可以放在try陳述句塊中的,但不管在什么時機回傳,在回傳前,finally將會執行,
* 小結:
* try { //執行的代碼,其中可能有例外,一旦發現例外,則立即跳到catch執行,否則不會執行catch里面的內容 }
* catch { //除非try里面執行代碼發生了例外,否則這里的代碼不會執行 }
* finally { //不管什么情況都會執行,包括try catch 里面用了return ,可以理解為只要執行了try或者catch,就一定會執行 finally }
*
* Case2:
* 至少有兩種情況下finally陳述句是不會被執行的:
* (1)try陳述句沒有被執行到,如在try陳述句之前return就回傳了,這樣finally陳述句就不會執行,這也說明了finally陳述句被執行的必要而非充分條件是:相應的try陳述句一定被執行到,
* (2)在try塊|catch塊中有System.exit(0);這樣的陳述句,System.exit(0)是終止Java虛擬機JVM的,連JVM都停止了,所有都結束了,當然finally陳述句也不會被執行到,
*
* 在try-catch-finally中, 當return遇到finally,return對finally無效,即:
*
* 1.在try catch塊里return的時候,finally也會被執行,
*
* 2.finally里的return陳述句會把try catch塊里的return陳述句效果給覆寫掉,
*
* 結論:return陳述句并不一定都是函式的出口,執行return時,只是把return后面的值復制了一份到回傳值變數里去了,
*/
@Aspect
@Component
@Slf4j
public class RedissonLockAop {
public static final int WAIT_GET_LOCK_TIME = 3000;
public static final int WAIT_RELEASE_LOCK_TIME = 5000;
@Autowired
private DistributeLocker locker;
/**
* 切點,攔截被 @RedissonLockAnnotation 修飾的方法
* 說白了就是你這把面向切面從哪里切
*/
@Pointcut("@annotation(com.cmdc.annotation.RedissonLockAnnotation)")
public void redissonLockPoint() {
}
/**
*
* @param pjp 代表當前正在運行的方法
* @return string
* @throws InterruptedException e
*/
@Around("redissonLockPoint()")
@ResponseBody
public String checkLock(ProceedingJoinPoint pjp) throws InterruptedException {
// 當前執行緒名
String threadName = Thread.currentThread().getName();
log.info("執行緒{}------進入分布式鎖aop------", threadName);
// 獲取該注解的實體物件
RedissonLockAnnotation annotation = ((MethodSignature) pjp.getSignature()).
getMethod().getAnnotation(RedissonLockAnnotation.class);
// 生成分布式鎖key的鍵名,以逗號分隔
String lockRedisKey = annotation.lockRedisKey();
log.info("存在于注解中的key值是:{}",lockRedisKey);
// 獲取存在于請求頭中的唯一id值
String lockRedisValue = ((ServletRequestAttributes) Objects.requireNonNull(
RequestContextHolder.getRequestAttributes()))
.getRequest()
.getHeader(lockRedisKey);
if (StringUtils.isEmpty(lockRedisKey)) {
log.info("執行緒{} lockRedisKey設定為空,不加鎖", threadName);
try {
pjp.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
log.info("process method failed...now print message:{}",throwable.getMessage());
}
return "NULL LOCK";
} else {
log.info("執行緒{} 鎖的value值是:{}", threadName, lockRedisValue);
// 獲取鎖 3000 等到獲取鎖的時間 leaseTime 獲取鎖后持有時間 時間單位 MILLISECONDS:毫秒
if (locker.tryLock(lockRedisValue, WAIT_GET_LOCK_TIME, WAIT_RELEASE_LOCK_TIME, TimeUnit.MILLISECONDS)) {
// 下面的邏輯我想說一下,大家應該非常清楚try catch finally的邏輯 關于這一塊的邏輯已經卸載頂層注釋上了
try {
log.info("執行緒{} 獲取鎖成功", threadName);
return (String) pjp.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
log.info("process method failed...now print message:{}",throwable.getMessage());
} finally {
if (locker.isLocked(lockRedisValue)) {
log.info("key={}對應的鎖被持有,執行緒{}",lockRedisValue, threadName);
if (locker.isHeldByCurrentThread(lockRedisValue)) {
log.info("當前執行緒 {} 保持鎖定", threadName);
locker.unlock(lockRedisValue);
log.info("執行緒{} 釋放鎖", threadName);
}
}
}
} else {
log.info("執行緒{} 獲取鎖失敗", threadName);
return " GET LOCK FAIL";
}
}
return null;
}
}
這邊稍微說一下,增強的思想就是,需要分布式鎖的介面有多個執行緒進來的時候,每個執行緒都在請求頭中放置一個唯一的id,這個唯一的id就抽象地對應一個共享變數,利用redis set值是一個原子操作,讓執行緒去set 這個唯一的id,完成這個set的執行緒才能執行業務處理,不能完成的則不能進行業務的處理,當持有鎖的執行緒完成業務處理之后即釋放鎖,以此回圈往復,具體的low一眼代碼也就明白啦,
下面咱們弄一個介面出來執行測驗~
package com.cmdc.controller;
import com.cmdc.annotation.RedissonLockAnnotation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
*/
@RestController
@Slf4j
public class TestController {
public static final int THREAD_SLEEP_TIME = 5000;
/**
* 測驗介面
* @return 回傳值
*/
@PostMapping(value = "testLock", consumes = "application/json")
@RedissonLockAnnotation(lockRedisKey = "the-only-id")
public String testLock() {
/**
* 請求總攜帶一個唯一的id 誰拿到誰執行,非常的好理解
*/
try {
Thread.sleep(THREAD_SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("執行相關業務...");
log.info("業務執行中.....");
log.info("業務執行結束.....");
return "success";
}
}
專案的結構最終長下面這個樣子

沉睡5秒鐘模擬業務的處理,咱們同時發送兩個請求,看看會發生什么,我有postman和Insomnia兩個測驗的工具,就分別發請求,如果你只有一個工具,就一個請求復制兩份就可以了
先發postman,看測驗結果

接著是Insomnia來看測驗的結果

Insomnia后發的,未成功獲取到鎖,和我們的預期一致的!
總結
執行緒池是為了支持并發、管控資源,
分布式鎖是為了限制并發、解決執行緒安全問題,這兩個都非常深刻,我們慢慢體會,
其實不一定需要用redisson,純redis實作分布式鎖也一點問題沒有!我這里只是不想自己造輪子了~
不用redis用zookeeper也行,都沒問題,看你喜歡什么啦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/263034.html
標籤:其他
上一篇:攻防世界-wp-PWN-新手區-5-hello_pwn
下一篇:致未來的那個她
