作者:鹽汽水
鏈接:https://juejin.cn/post/7116401645323288613
問題拋出
在近期的專案里面有一個功能是領取優惠券的功能,
問題描述:
每一個優惠券一共發行多少張,每個用戶可以領取多少張:
如:A優惠券一共發行120張,每一個用戶可以領取140張,當一個用戶領取優惠券成功的時候,把領取的記錄寫入到另外一個表中(這張表我們暫且稱為表B)


<!--減優惠券庫存的SQL-->
<update id="reduceStock">
update coupon set stock = stock - 1 where id = #{coupon_id}
</update>
上面的代碼按照我們的邏輯是沒有問題,我通過使用PostMan軟體測驗也是沒有問題,但是上面的代碼確實是有問題的,
往往我們寫的一些業務功能,在低并發的時候很多的問題會體現不出來,所以這個領取優惠券的功能我通過Jmeter軟體來進行壓測,

這里配置了一下,大概會發送500次請求,那來驗證下優惠券會不會出現超發的問題

執行結果,里面沒有出現例外什么的,樣本為500,包括在匯總報告里面也出現了一些回傳資訊是優惠券不足的資訊,那來看下資料庫里面的優惠券的總發行數量有沒有變成負數呢?也就是有沒有超發,

在測驗的時候是測驗的id為19的這條資料,測驗完之后這里的總發行數量(stock)居然變成了-1(也就是超發了一張),
問題引發
在解決這個問題之前,先來看下這個問題是如何引發出來的,

上面這張圖是整個領取優惠券的流程(上圖并沒有使用流程圖來畫,我覺的這樣畫可能表達更清楚一些),在藍色的框那里就是出現超扣減庫存的時候,為啥這樣說呢?
如果同時來了兩個執行緒(你可以理解成是兩個請求),比如先來的那個請求通過了檢查(執行緒A),這時執行緒A還沒有扣減庫存,這時執行緒B經過一翻操作也通過了這個檢查優惠券是否可領取的方法,然后執行緒A和執行緒B依次扣減庫存或者是同時扣減庫存,這樣就會出現優惠券超領的情況,

清楚了問題引發的原因,那就來看看如何解決它們,
推薦一個開源免費的 Spring Boot 最全教程:
https://github.com/javastacks/spring-boot-best-practice
解決方案一(Java代碼加鎖)
在引起超發原因的那張圖內可以看出,導致這一問題的根本原因是多個執行緒同時訪問這個領取優惠券的方法,那只要保證在同一段只有一個執行緒進入到這個方法就可以了,
上面貼的代碼就可以改成下面這樣:
synchronized (this){
LoginUser loginUser = LoginInterceptor.threadLocal.get();
CouponDO couponDO = couponMapper.selectOne(new QueryWrapper<CouponDO>()
.eq("id", couponId)
.eq("category", categoryEnum.name()));
if(couponDO == null){
throw new BizException(BizCodeEnum.COUPON_NO_EXITS);
}
this.checkCoupon(couponDO,loginUser.getId());
//構建領券記錄
CouponRecordDO couponRecordDO = new CouponRecordDO();
BeanUtils.copyProperties(couponDO,couponRecordDO);
couponRecordDO.setCreateTime(new Date());
couponRecordDO.setUseState(CouponStateEnum.NEW.name());
couponRecordDO.setUserId(loginUser.getId());
couponRecordDO.setUserName(loginUser.getName());
couponRecordDO.setCouponId(couponDO.getId());
couponRecordDO.setId(null);
int row = couponMapper.reduceStock(couponId);
if(row == 1){
couponRecordMapper.insert(couponRecordDO);
}else{
log.info("發送優惠券失敗:{},用戶:{}",couponDO,loginUser);
}
}

這樣,經過Jmeter的壓測優惠券并沒有出現超發的情況,
雖然這樣可以解決超發的問題,但是在專案中我們不可以這樣寫,原因如下:
- synchronized的作用范圍是單個JVM實體,如果是集群部署系統這里的加鎖你可以理解成失效
- 在使用了synchronized加鎖后,就會形成串行等待的問題,當一個執行緒A在領取優惠券方法內執行過久時,其它執行緒會等待直到執行緒A執行結束
解決方案二(Sql層面解決超發)
<update id="reduceStock">
update coupon set stock = stock - 1 where id = #{coupon_id} and stock > 0
</update>
Mysql默認使用的是InnoDB引擎,使用InnoDB時在修改某一個記錄的時候會將這條記錄上鎖,所以這個修改資料時不會出現多個執行緒同時修改資料,這樣也可以避免優惠券超發,
如果在業務中只要有庫存就可以發放優惠券的可以使用上面這種方式,
還有一種Sql的方式,可以將stock自身做為樂觀鎖,
<update id="reduceStock">
update product set stock=stock-1 where stock=#{上一次的庫存} and id = 1 and stock>0
</update>
上面這種方式會存在ABA的問題,當然如果業務不在意ABA問題可以使用上面的sql,不過性能可能差一點,如果stock不匹配,這條sql也就失效了,
如果業務在意ABA問題的話也可以在表中加一個version的欄位,每次修改資料的時候這個欄位會加1,這樣就可以避免ABA問題
<update id="reduceStock">
update product set stock=stock-1,versioin = version+1 where id = 1 and stock>0 and version=#{上一次的版本號}
</update>
上面的這三條Sql層面的代碼都可以解決優惠券超發的問題,具體使用那種就根據業務來選擇了
解決方案三(通過Redis分布式鎖來解決問題)
引入Redis后,當領取優惠券時會先去Redis里面去獲取鎖,當鎖獲取成功后才可以對資料庫進行操作

在分布式鎖中我們應該考濾如下:
- 排他性,在分布式集群中,同一個方法,在同一個時間只能被某一臺機器上的一個執行緒執行
- 容錯性,當一個執行緒上鎖后,如果機器突然的宕機,如果不釋放鎖,此時這條資料將會被鎖死
- 還要注意鎖的粒度,鎖的開銷
- 滿足高可用,高性能,可重入
我們可以使用Redis里面的setnx命令來設定鎖,因為setnx是原子性的操作不可被打斷

當這個命令執行成功的時候會回傳1,執行失敗會回傳0,我們就可以通過這個特性來判斷是否獲取到了鎖,
先看下偽代碼:
String key = "lock:coupon:" + couponId;
try{
if(setnx(key,"1")){
//獲取到鎖
//設定Key的時期時間
exp(key,30,TimeUnit.MILLISECONDS);
try{
//業務邏輯
}finally{
del(key);
}
}else{
//獲取鎖失敗,遞回呼叫這個方法,或者使用for進行自旋獲取鎖
}
}
這方法里面設定key的過期時間的原因是,當機器突然的宕機后,即使沒有釋放掉鎖,他也會在一段時間后將這個鎖釋放,避免導致死鎖,
雖然看上面的代碼是沒有問題的,但是它是存在一個誤洗掉key的問題

為了避免這個問題,可以將setnx命令設定的那個值,設定成當前執行緒的ID,在洗掉的時候判斷這個執行緒ID是不是與當前執行緒的Id相同就可以了,
String key = "lock:coupon:" + couponId;
String threadId = Thread.currentThread().getId();
try{
if(setnx(key,threadId)){
//獲取到鎖
//設定Key的時期時間
exp(key,30,TimeUnit.MILLISECONDS);
try{
//業務邏輯
}finally{
if(get(key) == threadId){
del(key);
}
}
}else{
//獲取鎖失敗,遞回呼叫這個方法,或者使用for進行自旋獲取鎖
}
}
通過上面這種方法就可以解決誤洗掉key的問題,
在finally中的這個判斷和洗掉key的代碼不是原子性的,我們可以通過lua腳本的方式來實作它們之間的原子性,將洗掉key的代碼修改成如下:
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class), Arrays.asList(key), threadId);
這里的threadId其實也可以不用,寫成uuid也可以,但是在上面setnx的時候,那個值也要寫成uuid
但是這樣還要存在一個鎖自動續期的問題,你可以開一個守護執行緒,每隔多久給他續期一次,或者是直接將這個過期時間延長一些,
在Redis中也有一些官方推薦的分布式鎖的方式,我最后是使用的這種方式
解決方案四(使用Redis推薦的方式)
官網地址:
https://redis.io/docs/manual/patterns/distributed-locks/
這個有多種實作方式,比如:Golang,Java,Php
引入Redisson包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.4</version>
</dependency>
配置RedissoneClient
@Configuration
public class AppConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private String redisPort;
@Bean
public RedissonClient redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort);
return Redisson.create(config);
}
}
配置好RedissonClient后,通過getLock方法獲取到鎖物件后,在我們的Service層中就可以通過lock和unlock來進行加鎖和釋放鎖了,這樣還是很方便的,
public JsonData addCoupon(long couponId, CouponCategoryEnum categoryEnum) {
String key = "lock:coupon:" + couponId;
RLock rLock = redisson.getLock(key);
LoginUser loginUser = LoginInterceptor.threadLocal.get();
rLock.lock();
try{
//業務邏輯
}finally {
rLock.unlock();
}
return JsonData.buildSuccess();
}
通過這種方法也可以解決優惠券超發的問題 ,這也是Rediss官網推薦的一種方式,
使用這種方式也無需關心key過期時間續期的問題,因為在Redisson一旦加鎖成功,就會啟動一個watch dog,你可以將它理解成一個守護執行緒,它默認會每隔30秒檢查一下,如果當前客戶端還占有這把鎖,它會自動對這個鎖的過期時間進行延長,
也可以通過下面的方法設定watch dog的檢測時間間隔
Config config = new Config();
config.setLockWatchdogTimeout();
如上就是我在解決優惠券超發時的一個思路,
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2022最新版)
2.勁爆!Java 協程要來了,,,
3.Spring Boot 2.x 教程,太全了!
4.別再寫滿屏的爆爆爆炸類了,試試裝飾器模式,這才是優雅的方式!!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/537985.html
標籤:Java
