不適用分布式鎖會怎樣?
以搶購商品的高并發場景為例,通常單體的應用可以通過同步代碼來實作順序對資料庫的操作,保證程式是按照預想來執行扣減操作的,不發生超賣情況,
但是在分布式系統中,同一個服務多實體部署,同步代碼就不能解決該問題,簡單來講就是同步代碼在多實體的情況下只能管好自己,管不了別人,而且因為synchronized的粗粒度,單執行緒執行造成請求擠壓情況,
使用redis實作分布式鎖
使用redis實作分布式鎖主要是使用其SETNX操作:SETNX [key] [value],當key不存在時,將value存入redis,成功回傳1,否則回傳0,由于redis單執行緒的特點,所以可以通過使用SETNX來實作分布式鎖,
實作思路:當第一個請求進入代碼后,在執行下單扣減庫存之前,向redis中SETNX一個商品id為key的value(value可以為時間戳),當下單扣減庫存執行結束之后,再洗掉這個key value,由于key已經存在,其他請求SETNX不進去,所以就保證了多實體間的執行緒安全,
注意:
- 要考慮執行業務代碼拋例外之后無法解鎖的問題
- 要考慮某個執行緒執行完加鎖操作后,突然程式掛了,其他實體無法解鎖的情況
手寫redis實作分布式鎖
這里使用的是springboot2.x,所以spring-data-redis是lettuce實作的
為了避免意外死鎖的情況,就要設定redis鎖的超時時間,但是設定超時時間會存在如下問題:
- 超時時間內一旦沒有執行完成業務代碼之后洗掉鎖,此時就會被其他客戶端請求在某些分布式實體節點上獲取到鎖
- 當執行緩慢的執行緒執行完成后,洗掉的可能是別的執行緒的鎖,高并發情況下,可能造成鎖連環失效(洗掉鎖的時候要判斷是否是自己的鎖)
以下demo中解決了以上問題
controller:
package com.leolee.msf.controller;
import com.google.gson.Gson;
import com.leolee.msf.service.serviceInterface.DistributedTransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* @ClassName DistributedTransactionController
* @Description: 分布式事務測驗
* 產品下單為例
* @Author LeoLee
* @Date 2020/11/20
* @Version V1.0
**/
@RestController
@RequestMapping("/product")
public class DistributedTransactionController {
//限購一件
private final int num = 1;
@Autowired
Gson gson;
@Autowired
DistributedTransactionService distributedTransactionService;
@GetMapping("/{id}")
public String productQuantity(@PathVariable(name = "id")String productId) {
return gson.toJson(distributedTransactionService.getProductQuantity(productId));
}
@GetMapping("/order/{id}")
public String order(@PathVariable(name = "id")String productId) {
boolean b = distributedTransactionService.orderByProductId(productId);
Map<String, Object> resultMap = distributedTransactionService.getProductQuantity(productId);
if (b) {
resultMap.put("msg", "搶購成功");
resultMap.put("code", true);
} else {
resultMap.put("msg", "搶購成功");
resultMap.put("code", false);
}
return gson.toJson(resultMap);
}
}
service:
package com.leolee.msf.service;
import com.leolee.msf.service.serviceInterface.DistributedTransactionService;
import com.leolee.msf.utils.RedisLockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @ClassName DistributedTransactionServiceImpl
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/20
* @Version V1.0
**/
@Service("distributedTransactionService")
public class DistributedTransactionServiceImpl implements DistributedTransactionService {
@Autowired
RedisTemplate redisTemplate;
@Autowired
RedisLockUtil redisLockUtil;
//庫存 key-productId value-數量
private HashMap<String, Long> productStockQuantity;
//訂單 key-uuid value-productId
private HashMap<String, String> order;
//總量 key-productId value-數量
private HashMap<String, Long> total;
//搶購商品id寫死
private final String productId = "123";
public DistributedTransactionServiceImpl() {
this.total = new HashMap<String, Long>();
this.productStockQuantity = new HashMap<String, Long>();
this.total.put(productId, 10000l);
this.productStockQuantity.put(productId, 10000l);
this.order = new HashMap<String, String>();
}
@Override
public Map<String, Object> getProductQuantity(String productId) {
Map<String, Object> info = new HashMap<>();
info.put("productId", productId);
info.put("soldOut", order.size());//已售
info.put("total", total.get(productId));
info.put("stock", productStockQuantity.get(productId));
return info;
}
/*該方案存在問題
1.當前鎖過期之后,高并發情況下多個客戶端同時執行getAndSet方法,那么雖然最終只有一個客戶端可以加鎖,雖然其他沒有獲得鎖的請求沒有成功執行業務操作,但是覆寫了鎖的value時間戳
2.雖然這樣為了處理死鎖問題,由于存在一個客戶端請求在鎖失效前還是沒有執行完畢,甚至計算庫存是否>0都沒有完成,下一個客戶端請求的時候,判斷前一個鎖已經失效,覆寫了前一個鎖,所以兩個執行緒間還是會出現超賣的問題,
*/
@Override
public boolean orderByProductId(String productId) {
//加分布式鎖
//value設定為10秒后
String cuurentTimeMills = String.valueOf(System.currentTimeMillis() + 10000);
if (!redisLockUtil.redisLock(productId, cuurentTimeMills)) {
return false;
}
boolean result = false;
try {
//=======================執行業務邏輯=========================
//判斷是否存在該商品
if (checkExist(productId)) {
try {
//模擬資料庫操作
Thread.sleep(1000);
//產生訂單,扣減庫存
order.put(UUID.randomUUID().toString(), productId);
productStockQuantity.put(productId, productStockQuantity.get(productId) - 1);
result = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//=======================業務邏輯結束=========================
} catch (Exception e) {
e.printStackTrace();
} finally {
//解鎖
redisLockUtil.deleteLock(productId, cuurentTimeMills);
}
return result;
}
/*
* 功能描述: <br>
* 〈檢查商品是否存在,是否有庫存〉
* @Param: [productId]
* @Return: boolean
* @Author: LeoLee
* @Date: 2020/11/20 10:52
*/
private boolean checkExist(String productId) {
return total.containsKey(productId) && productStockQuantity.containsKey(productId) && productStockQuantity.get(productId) > 0 ? true : false;
}
}
RedisLock:
package com.leolee.msf.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
/**
* @ClassName RedisLockUtil
* @Description: 分布式鎖(還是存在超賣的情況,該實體僅供理解學習redis分布式鎖)
* @Author LeoLee
* @Date 2020/11/20
* @Version V1.0
**/
public class RedisLockUtil {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
/*
* 功能描述: <br>
* 〈分布式鎖——加鎖〉
* @Param: [key, timeMillis 設定要大于當前時間戳]
* @Return: boolean
* @Author: LeoLee
* @Date: 2020/11/20 12:24
*/
public boolean redisLock(String key, String timeMillis) {
//加鎖成功直接回傳true,證明目前還沒有該key
if (redisTemplate.opsForValue().setIfAbsent(key, timeMillis)) {
return true;
}
//解決死鎖
String current = (String) redisTemplate.opsForValue().get(key);
if (current != null && Long.valueOf(current) < System.currentTimeMillis()) {//之前的鎖過期了,應該允許新的請求獲取鎖
//Set value of key and return its old value.設定新值回傳舊值,
// 考慮多執行緒并發的情況,只有一個執行緒的設定值和當前值相同,它才有權利加鎖
String old = (String) redisTemplate.opsForValue().getAndSet(key, timeMillis);//getAndSet執行緒安全
if (old != null && old.equals(current)) {
return true;
}
}
return false;
}
public boolean deleteLock(String key, String timeMillis) {
if (String.valueOf(redisTemplate.opsForValue().get(key)).equals(timeMillis)) {
return redisTemplate.delete(key);
}
return false;
}
}
上面就是一個簡單的redis鎖實作,但是!!!!!這個實作方式還是存在問題:
- 各個服務端實體所在服務器的時間要一致
- 當前鎖過期之后,高并發情況下多個客戶端同時執行getAndSet方法,那么雖然最終只有一個客戶端可以加鎖,雖然其他沒有獲得鎖的請求沒有成功執行業務操作,但是覆寫了鎖的value時間戳
- 雖然這樣為了處理死鎖問題,由于存在一個客戶端請求在鎖失效前還是沒有執行完畢,甚至計算庫存是否>0都沒有完成,下一個客戶端請求的時候,判斷前一個鎖已經失效,覆寫了前一個鎖,所以兩個執行緒間還是會出現超賣的問題,
廢了這么大勁只是為了解釋清楚redis分布式鎖的實作思路,以及可能存在哪些問題,
那什么方案才是最完美的呢?
問題的根本是可能存在當前獲取到鎖的客戶端,業務執行過慢,超過了鎖的有效期,又不能把鎖的有效期設定的過大
這里就需要一個機制來在業務執行超過鎖有效期后能延長鎖的有效時間,這就引出了一種新的解決思路:
- 當某一個客戶端獲取到一把鎖之后,另起一個持有該客戶端鎖特征的執行緒
- 這個執行緒通過該客戶端鎖的特征可以去redis中找到對應的鎖,每個一定時間就延長該鎖的有效期
- 當該客戶端對應執行緒執行完畢后,停止其對應的延長鎖有效期的執行緒,并洗掉redis鎖
這種延長分布式鎖存活時間的思想是和Redisson框架實作分布式鎖的思想一致的,之后會詳細介紹一下Redisson
依照上面的思路,做出了如下改動:
- 當第一個客戶端請求進來之后,使用SETNX創建了一個key為productId,value為UUID的分布式鎖,超時時間是5000毫秒
- 設定分布式鎖成功之后,開始執行下單扣減庫存的邏輯,同時通過另起一個執行緒來延長當前執行緒的redis鎖超時時間,延長超時時間的邏輯為
- 根據分布式鎖的超時時間/3,作為檢查分布式鎖超時的輪詢間隔
- 在每一次輪詢中獲取當前分布式鎖剩余的存活時間,如果剩余存活時間小于輪詢時間,則續期2秒
- 當業務執行完成后會去洗掉當前的分布式鎖,并通知該續期執行緒interrupt,由于輪詢執行在Runnable中,并使用了Thread.sleep(),無法向上層代碼拋出InterruptedException(只能try/catch)并且isInterrupted狀態會被變為false,執行緒并沒有停下來,所以在catch到InterruptedException: sleep interruped之后,手動執行Thread.currentThread().interrupt()重置執行緒中斷狀態為true,這樣就可以在上層代碼判斷該執行緒的狀態來進行可控的操作,由于我這里還是需要讓該執行緒停下,所以我最侄訓是呼叫了extensionExpirationTime.stop();,但是這種中斷執行緒的邏輯是沒有問題的,具體可以參考:https://blog.csdn.net/trackle400/article/details/81775189
controller不變
service:
package com.leolee.msf.service;
import com.leolee.msf.service.serviceInterface.DistributedTransactionService;
import com.leolee.msf.utils.redisLock.RedisLockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @ClassName DistributedTransactionServiceImpl
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/20
* @Version V1.0
**/
@Service("distributedTransactionService")
public class DistributedTransactionServiceImpl implements DistributedTransactionService {
@Autowired
RedisTemplate redisTemplate;
@Autowired
RedisLockUtil redisLockUtil;
//庫存 key-productId value-數量
private HashMap<String, Long> productStockQuantity;
//訂單 key-uuid value-productId
private HashMap<String, String> order;
//總量 key-productId value-數量
private HashMap<String, Long> total;
//搶購商品id寫死
private final String productId = "123";
public DistributedTransactionServiceImpl() {
this.total = new HashMap<String, Long>();
this.productStockQuantity = new HashMap<String, Long>();
this.total.put(productId, 10000l);
this.productStockQuantity.put(productId, 10000l);
this.order = new HashMap<String, String>();
}
@Override
public Map<String, Object> getProductQuantity(String productId) {
Map<String, Object> info = new HashMap<>();
info.put("productId", productId);
info.put("soldOut", order.size());//已售
info.put("total", total.get(productId));
info.put("stock", productStockQuantity.get(productId));
return info;
}
//====================================================================================================
public boolean orderByProductId2(String productId) {
//加分布式鎖
String uuid = UUID.randomUUID().toString();
redisLockUtil.redisLock(productId, uuid, 5000);
boolean result = false;
try {
//=======================執行業務邏輯=========================
//判斷是否存在該商品
if (checkExist(productId)) {
try {
//模擬資料庫操作
Thread.sleep(4000);
//產生訂單,扣減庫存
order.put(UUID.randomUUID().toString(), productId);
productStockQuantity.put(productId, productStockQuantity.get(productId) - 1);
result = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//=======================業務邏輯結束=========================
} catch (Exception e) {
e.printStackTrace();
} finally {
//解鎖
redisLockUtil.newDeleteLock(productId, uuid);
}
return result;
}
/*
* 功能描述: <br>
* 〈檢查商品是否存在,是否有庫存〉
* @Param: [productId]
* @Return: boolean
* @Author: LeoLee
* @Date: 2020/11/20 10:52
*/
private boolean checkExist(String productId) {
return total.containsKey(productId) && productStockQuantity.containsKey(productId) && productStockQuantity.get(productId) > 0 ? true : false;
}
}
RedisLock:
package com.leolee.msf.utils.redisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @ClassName RedisLockUtil
* @Description: 分布式鎖(還是存在超賣的情況,該實體僅供理解學習redis分布式鎖)
* @Author LeoLee
* @Date 2020/11/20
* @Version V1.0
**/
public class RedisLockUtil {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
//========================方案2============================
ExtensionExpirationTime extensionExpirationTime = null;
/*
* 功能描述: <br>
* 〈分布式加鎖,延長過期時間版〉
* @Param: [key, value, time]
* @Return: boolean
* @Author: LeoLee
* @Date: 2020/11/20 17:02
*/
public boolean redisLock(String key, String value, long time) {
if (redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.MILLISECONDS)) {
extensionExpirationTime = new ExtensionExpirationTime(key, value, time, redisTemplate);
extensionExpirationTime.start();
return true;
}
return false;
}
public boolean newDeleteLock(String key, String value) {
if (String.valueOf(redisTemplate.opsForValue().get(key)).equals(value)) {
extensionExpirationTime.interrupt();//終止續期執行緒
if (extensionExpirationTime.isInterrupted()) {
try {
extensionExpirationTime.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
return redisTemplate.delete(key);
}
return false;
}
}
延長時間的執行緒類:
package com.leolee.msf.utils.redisLock;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* @ClassName ExtensionExpirationTime
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/20
* @Version V1.0
**/
public class ExtensionExpirationTime extends Thread {
private String productId;
private String value;
private long checkTime;
private RedisTemplate<String,Object> redisTemplate;
private int i;
public ExtensionExpirationTime(String productId, String value, long time, RedisTemplate<String,Object> redisTemplate) {
this.productId = productId;
this.value = value;
this.checkTime = time/3 > 0 ? time/3 : 5000;
this.redisTemplate = redisTemplate;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(checkTime);
//延長過期時間
System.out.println("prudctId:" + productId + ",第" + ++i + "次續期");
checkExpiretion();
} catch (InterruptedException e) {
e.printStackTrace();
//中斷狀態在拋出例外前,被清除掉,因此在此處重置中斷狀態
Thread.currentThread().interrupt();
}
}
}
private void checkExpiretion() {
long currentExpire = redisTemplate.opsForValue().getOperations().getExpire(productId);
if (currentExpire < checkTime/1000) {
redisTemplate.expire(productId, checkTime + currentExpire * 2000, TimeUnit.MILLISECONDS);
}
}
}
執行結果:


那么這樣一個簡單的分布式鎖就完成了,目前測驗是沒有出現問題的,如果誰發現了什么問題,請評論聯系我,
對于使用該代碼出現的任何問題,本人不負責任QaQ~~~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/225799.html
標籤:其他
上一篇:off by null 小結
