概述
單機架構下,一個行程中的多個執行緒競爭同一共享資源時,通常使用 JVM 級別的鎖即可保證互斥,以對商品下單并扣庫存為例:
public String deductStock() {
synchronized (this){
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "")
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
}
return "end";
}
然而,當使用分布式架構時,這種方式就不管用了,因為 JVM 鎖只能控制自家應用,其他機器的應用時管不了的,這時候分布式鎖就派上用場了,它能保證分布式系統下不同行程對共享資源訪問的互斥性
案例分析
下面對使用 Redis 實作分布式鎖的案例進行分析:
1. Case1
使用 Redis 中的 setnx() 設計一個入門級別的分布式鎖
public String deductStock1() {
String localKey = "lock:product:0001";
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
// 即使中間的任何一處邏輯拋出例外,也能保證鎖釋放
stringRedisTemplate.delete(localKey);
}
return "end";
}
存在的問題:鎖沒有釋放,機器卻宕機了,這時候其他機器將無法獲取鎖
2. Case2
設定一個過期時間,解決 Case1 中存在的宕機沒有釋放鎖的問題
public String deductStock2() {
String localKey = "lock:product:0001";
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
stringRedisTemplate.expire(localKey,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
stringRedisTemplate.delete(localKey);
}
return "end";
}
存在的問題:有可能還沒有執行到 expire() 就宕機了,沒有保證原子性
3. Case3
在加鎖時就設定超時時間,保證加鎖和設定超時時間是原子操作
public String deductStock3() {
String localKey = "lock:product:0001";
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true",10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
stringRedisTemplate.delete(localKey);
}
return "end";
}
存在問題:如果系統并發量不是特別的大,那么問題不大,但如果并發量很大,就會出現嚴重的并發問題:
- 假設執行緒 A 的時間超過了超時時間,鎖失效了,此時該執行緒 A 還沒有執行 delete 方法
- 執行緒 B 這時候加鎖成功了,與此同時執行緒 A 執行了 delete 方法,但是這時候執行緒 A 釋放的鎖是執行緒 B 的
- 于是極端情況下就會出現:執行緒 A 釋放執行緒 B 的鎖,B 釋放 C 的,C 釋放 D 的 ......
4. Case4
Case3 存在的問題的根本原因就是在執行 delete 方法的時候,自己的鎖被其他的執行緒釋放了,所以解決辦法就是給每個執行緒生成一個唯一 ID,在最后釋放鎖的時候判斷是否是自己的鎖,如果是自己的才釋放
public String deductStock4() {
String localKey = "lock:product:0001";
String uuid = UUID.randomUUID().toString();
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
stringRedisTemplate.delete(localKey);
}
}
return "end";
}
存在問題:存在原子性問題,問題代碼如下:
if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
stringRedisTemplate.delete(localKey);
}
有可能出現當前執行緒執行完 if 判斷卻還沒執行 delete 操作的時候當前鎖過期了,于是又會出現當前執行緒釋放了其他執行緒的鎖的情況
5. Case5
對于 Case4 的問題,本質是 「判斷是不是當前執行緒加的鎖」和「釋放鎖」不是一個原子操作,可以用 Lua 腳本代替,Redis 會將整個腳本作為一個整體執行
String redisScript = "
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end;"
public String deductStock5() {
String localKey = "lock:product:0001";
String uuid = UUID.randomUUID().toString();
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
redisTemplate.execute(redisScript, Arrays.asList(localKey), uuid);
}
return "end";
}
也可以使用鎖續命的方式解決,即創建一個守護執行緒,每過一段時間,判斷業務的主執行緒有沒有結束(是否還加著鎖),如果還加著鎖,將鎖的超時時間重新設定
public String deductStock5() {
String localKey = "lock:product:0001";
String uuid = UUID.randomUUID().toString();
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
} else {
// 續命
Thread demo = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Boolean expire = redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
// 有可能已經主動洗掉key,不需要在續命
if(!expire){
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
demo.setDaemon(true);
demo.start();
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
stringRedisTemplate.delete(localKey);
}
}
return "end";
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/503022.html
標籤:其他
上一篇:Redis 分布式鎖
