好好學習,天天向上
本文已收錄至我的Github倉庫DayDayUP:github.com/RobodLee/DayDayUP,歡迎Star
- 暢購商城(一):環境搭建
- 暢購商城(二):分布式檔案系統FastDFS
- 暢購商城(三):商品管理
- 暢購商城(四):Lua、OpenResty、Canal實作廣告快取與同步
- 暢購商城(五):Elasticsearch實作商品搜索
- 暢購商城(六):商品搜索
- 暢購商城(七):Thymeleaf實作靜態頁
- 暢購商城(八):微服務網關和JWT令牌
- 暢購商城(九):Spring Security Oauth2
- 暢購商城(十):購物車
- 暢購商城(十一):訂單
- 暢購商城(十二):接入微信支付
- 暢購商城(十三):秒殺系統「上」
- 暢購商城(十四):秒殺系統「下」
防止秒殺重復排隊
回顧一下上一篇文章中講到的下單的流程,當用戶點擊下單之后,用戶名和商品id就會組裝成一個SeckillStatus物件存入Redis佇列中等待被處理,這個程序叫做排隊,所以說,只要用戶點擊了一次下單后不論最后是否下單成功,他都會進入到排隊的狀態,如果用戶重復點擊下單,那么Redis佇列中就會有很多個相同的SeckillStatus物件,也就是一個用戶排隊多次,這顯然是不符合邏輯的,一個用戶應該只能排隊一次,
為了避免用戶重復排隊的情況,可以為每個用戶在Redis中設定一個自增值,每次排隊的時候加1,如果大于1,說明重復排隊了,那么直接拋出例外,告訴用戶重復排隊了,
//SeckillOrderServiceImpl
@Override
public boolean add(Long id, String time, String username) {
Long increment = redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).increment(username, 1);
if (increment>1) { //記錄指定hashkey的增量,大于1說明排隊次數超過1次,重復排隊
throw new RuntimeException("重復排隊");
}
…………
}
這段代碼中,添加了對用戶重復排隊的判斷,先自增1,再進行判斷,這里的key設定的是username,因為一個用戶只能下單一件商品,如果去下單其它商品,同樣也是重復排隊,

測驗了一下,是成功的,但是有一個問題:如果用戶在這里排隊未成功,該怎么清理排隊資訊呢?這個下一步就會說,接著往下看👇
并發超賣問題解決
現在的代碼看似很完美,但是漏洞百出,比如就存在并發超賣的問題,為什么這么說,看代碼說話:

這個是多執行緒下單的方法,流程是查庫存——>下單——>減庫存,假如現在有件商品還剩1件,正好有多個執行緒同時走到了查詢庫存這一步,結果查出來都是一件,然后這三個執行緒就可以往下接著走,最后三個執行緒都成功下單了,不就多賣了兩件嘛,所以這段代碼還存在問題,那怎么解決呢?可不可以采用加鎖的方法,不可以,因為如果是在集群環境下,一臺機器上多個執行緒走到了同一步確實可以鎖住防止超賣,但是不同機器上的執行緒走到了同一部就鎖不住了,
所以可以采用Redis佇列的方式去解決,

給每個sku創建一個佇列,比如id為4399的商品數量為4,那么就在4399的佇列里放入4件商品,然后每次查詢就從佇列里去取,假如現在有五個執行緒去查庫存,因為只有4件商品,所以5個執行緒只有4個執行緒能夠查詢出庫存,因為Redis是單執行緒的,所以不會出現多個執行緒同時訪問資料出錯的情況,這樣就可以避免并發超賣的問題,
之前在SeckillGoodsPushTask中只是將商品存入Redis中,現在再加一步,為每個sku都創建一個佇列并存入庫存數量的資料到佇列中,
//定時將秒殺商品加載到redis中
@Scheduled(cron = "0/5 * * * * ?")
public void loadGoodsPushRedis() {
…………
for (SeckillGoods seckillGood : seckillGoods) {
boundHashOperations.put(seckillGood.getId(),seckillGood); //把商品存入到redis
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGood.getId())
.leftPushAll(getGoodsNumber(seckillGood.getNum())); //存到Redis佇列
}
}
}
//獲取秒殺商品數量的陣列
public Byte[] getGoodsNumber(int num) {
Byte[] arr = new Byte[num];
for (int i = 0; i < num; i++) {
arr[i] = '0';
}
return arr;
}
佇列的內容就是商品數量的Byte,視頻中用的是商品id,但是商品id是Long型的,Byte比Long要省空間,而且放什么無所謂關鍵是放幾個,所以我就放了對應數量的Byte進去,
接下來就該在下單之前獲取庫存的資訊:
@Async
public void createOrder() {
…………
//從秒殺商品佇列中獲取資料,如果獲取不到則說明已經賣完了,清除掉排隊資訊
Object o = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.rightPop();
if (o == null) {
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(seckillStatus.getUsername()); //清除排隊佇列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(seckillStatus.getUsername()); //排隊狀態佇列
return;
}
//創建秒殺訂單
…………
}
如果商品庫存不足,那么應該清除掉排隊的資訊,否則用戶該商品下不了單還不能下單其它商品,這里將排隊的佇列以及查詢狀態的佇列清除了,
同步庫存不精準問題

解決了并發超賣的問題之后,還有一個庫存數量不精準的問題,這個問題出現的原因和超賣問題類似,假如現在同時有兩個執行緒下單完成了開始遞減庫存,A執行緒查詢出庫存有3個,B執行緒也查詢出庫存有3個,然后它們同時遞減,都是2個,寫到了資料庫中,其實此時庫存應該還剩一個,
解決的辦法也很簡單,因為現在是呼叫**seckillGoods.getStockCount()**查詢出的庫存,那我們就不用這個查詢,直接用上一節中的佇列,佇列中剩余多少就說明現在的庫存是多少,絕對準確,
@Async
public void createOrder() {
…………
//減庫存,如果庫存沒了就從redis中洗掉,并將庫存資料寫到MySQL中
//seckillGoods.setStockCount(seckillGoods.getStockCount()-1);
Long size = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId()).size();//獲取庫存
//if (seckillGoods.getStockCount() <= 0) {
seckillGoods.setNum(size.intValue());
if (size <= 0) {
seckillGoodsBoundHashOps.delete(seckillStatus.getGoodsId());
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoodsBoundHashOps.put(seckillStatus.getGoodsId(),seckillGoods);
}
//創建秒殺訂單
…………
}
秒殺支付
改造二維碼創建及支付結果通知方法

秒殺支付的流程和之前做的類似,只不過現在秒殺訂單的支付狀態發送到Queue2中,普通訂單還是發送到queue1中,但是我們怎么知道該將訂單的支付狀態發送給queue1還是queue2呢?如果微信服務器可以將MQ佇列的exchange和routingKey回傳給我們就好了,這樣我們就可以動態地指定要發送的MQ了,
從微信支付的官方檔案中我們可以知道在創建二維碼和接收支付結果的引數中都有一個attach引數

這個是自定義的資料,也就是說我們在創建二維碼的時候發送給微信服務器什么,回傳支付結果的時候就會回傳給我們什么,所以在創建二維碼的時候由前端將指定的exchange和routingKey發送給后端,然后再添加到attach引數中,就可以實作將不同的訂單動態地發送到指定的佇列了,
普通訂單:exchange:exchange.order routingKey:routing.order
秒殺訂單:exchange:exchange.seckill_order routingKey:routing.seckill_order
由于之前寫的createNative方法是接收一個order物件,所以在Order里面添加兩個欄位:
private String exchange; //mq交換機的名稱
private String routingKey; //mq的路由鍵
修改之前**createNative()**的代碼,添加attach引數,
@Override
public Map<String, String> createNative(Order order) {
…………
//獲取exchange和routingKey,封裝程map集合,添加到attach引數中
String exchange = order.getExchange();
String routingKey = order.getRoutingKey();
Map<String,String> attachMap = new HashMap<>(2);
attachMap.put("exchange",exchange);
attachMap.put("routingKey",routingKey);
String attach = JSON.toJSONString(attachMap);
map.put("attach",attach);
…………
}
然后再修改**WeChatPayController.notifyUrl()**方法,從服務器回傳的Map集合中獲取attach,并從attach中獲取exchange和routingKey,
@RequestMapping("/notify/url")
public String notifyUrl(HttpServletRequest request) throws Exception {
…………
Map<String, String> xmlMap = WXPayUtil.xmlToMap(xmlString);
String attach = xmlMap.get("attach");
Map<String, String> attachMap = JSONObject.parseObject(attach, Map.class);
//將java物件轉換成amqp訊息發送出去,呼叫的是send方法
//rabbitTemplate.convertAndSend("exchange.order","routing.order", xmlString);
rabbitTemplate.convertAndSend(attachMap.get("exchange"),attachMap.get("routingKey"), xmlString);
…………
}
監聽秒殺
前面已經將訊息發送到訊息佇列中了現在就可以去監聽訊息佇列了,

從流程圖中可以看到,在寫監聽的方法之前,需要有兩個方法:改訂單狀態和洗掉訂單,
SeckillOrderServiceImpl.updatePayStatus
public void updatePayStatus(String username, String transactionId, String endTime) {
//從Redis中將訂單資訊查詢出來
SeckillOrder order = (SeckillOrder) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY)
.get(username);
if (order != null) {
try {
order.setStatus("1");
order.setTransactionId(transactionId);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
order.setPayTime(simpleDateFormat.parse(endTime));
seckillOrderMapper.insertSelective(order); //將訂單資訊存到mysql中
//洗掉redis中的訂單資訊
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//洗掉用戶的排隊資訊
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排隊佇列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排隊狀態佇列
} catch (ParseException e) {
e.printStackTrace();
}
}
}
首先將訂單資訊從Redis中查詢出來,將訂單狀態改為已支付,然后將交易流水號和支付時間補充完整存入MySQL,這時候交易已經完成了,可以將訂單資訊從Redis中洗掉,并將用戶的排隊資訊也一并洗掉,
SeckillOrderServiceImpl.deleteOrder
public void deleteOrder(String username) {
//洗掉Redis中的訂單
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//洗掉用戶的排隊資訊
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排隊佇列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排隊狀態佇列
//查詢出秒殺的狀態資訊
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY)
.get(username);
//回滾庫存
SeckillGoods seckillGoods = (SeckillGoods) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.get(seckillStatus.getGoodsId());
if (seckillGoods == null) {
seckillGoodsMapper.selectByPrimaryKey(seckillGoods.getId());
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
}
redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.put(seckillGoods.getId(),seckillGoods);
//將商品放入佇列
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.leftPush("0");
}
支付失敗了,應該將訂單洗掉掉,首先將Redis中的訂單洗掉,然后洗掉用戶的排隊資訊,接著回滾庫存,如果Redis中沒有則說明已經賣完了,就從MySQL中查詢出來然后將商品數量加1再存入MySQL;如果Redis中有資料就將Redis中的商品數量加1即可,上面講防止并發超賣的時候不是為每個商品都在Redis佇列中存放了一下么,所以最后將商品放回到佇列中,
SeckillMessageListener
@Component
@RabbitListener(queues = "queue.seckillorder")
public class SeckillMessageListener {
@Autowired
private SeckillOrderService seckillOrderService;
//https://pay.weixin.qq.com/wiki/doc/api/native.php?chapter=9_7&index=8
@RabbitHandler
public void getMessage(String message) {
try {
Map<String, String> resultMap = JSON.parseObject(message,Map.class);
String returnCode = resultMap.get("return_code"); //狀態碼
if ("SUCCESS".equals(returnCode)) {
String resultCode = resultMap.get("result_code"); //業務結果
String attach = resultMap.get("attach");
Map<String,String> attachMap = JSON.parseObject(attach,Map.class);
if ("SUCCESS".equals(resultCode)) {
//改訂單狀態
seckillOrderService.updatePayStatus(attachMap.get("username"),
resultMap.get("transaction_id"),resultMap.get("time_end"));
} else {
//洗掉訂單
seckillOrderService.deleteOrder(attachMap.get("username"));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
這個方法使用來監聽秒殺佇列的訊息的,exchange和queue需要我們手動地在RabbitMQ的網頁中創建并進行系結

在該方法中,首先去讀取狀態碼和業務結果,如果都為“SUCCESS”的話則說明訂單支付成功,修改訂單的狀態,反之訂單支付失敗,洗掉訂單,
總結
文章鴿了快一個月了,終于補上了,主要是上篇文章寫完后就在做個小東西,然后就是國慶節放假,在家待著有點懶,回校后又在參加電賽,沒時間,所以一路鴿到現在,
這篇文章主要是將之前的秒殺流程進行一個完善,實作了防止秒殺重復排隊,解決并發超賣的問題,并解決了同步庫存不精準的問題,最后實作了秒殺支付,
碼字不易,可以的話,給我來個
點贊,收藏,關注
如果你喜歡我的文章,歡迎關注微信公眾號 『 R o b o d 』
代碼:https://github.com/RobodLee/changgou
本文已收錄至我的Github倉庫DayDayUP:github.com/RobodLee/DayDayUP,歡迎Star
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/180756.html
標籤:其他
下一篇:攤牌了,我要手寫一個RPC
