作者:京東科技 張石磊
1 案例引入
名詞簡介:
資源:可以理解為一條內容,或者圖+文字+鏈接的載體,
檔位ID: 資源的分類組,資源必須歸屬于檔位,
問題描述:當同一個檔位下2條資源同時審批通過時,收到擎天審批系統2條訊息,消費者應用部署了2臺機器,此時正好由2臺機器分別消費,在并發消費時,先更新資源狀態,然后寫快取,每次取前100條資源,類似select * from resource where gear_id=xxx limit 100 order by id desc;
在寫檔位快取,此時事務未提交,并發查詢時根據檔位Id查詢時查詢不到對方的資料,全量寫快取時導致后寫的快取覆寫了先寫的快取,即快取被覆寫,導致投放資源缺失,
方案思考 :
方案1:一臺機器消費mq–單點問題
方案2:將同檔位ID的資源路由到同一個queue,需要審批系統配合根據檔位Id做路由,審批系統發的訊息不只是cms審批資料,此方案不適用,
方案3:在檔位級別加分布式鎖,
經比較,最終采用方案3是合適的方案.
2 鎖說明和分布式鎖選擇
synchronized鎖的粒度是JVM行程維度,集群模式下,不能對共享資源加鎖,此時需要跨JVM行程的分布式鎖,
分布式鎖方式核心實作方式優點缺點分析
1 資料庫:
悲觀鎖,lock
樂觀鎖,通過版本號實作version
實作簡單,不依賴中間件
資料庫IO瓶頸,性能差
單實體存在單點問題,主從架構存在資料不一致,主從切換時其他客戶端可重復加鎖,
2 zookeeper
創建臨時節點
CP模型,可靠性高,不存在主從切換不一致問題
頻繁創建和銷毀臨時節點,且
集群模式下,leader資料需要同步到follower才算加鎖成功,性能不如redis
主從切換服務不可用
3 redis集群
setnx+expire
性能高
有封裝好的框架redission
支持超時自動洗掉鎖
集群支持高可用,AP模型
主從切換時其他客戶端可重復加鎖,
R2M是基于開源的Redis cluster(Redis 3.0以上版本)構建的高性能分布式快取系統,我們系統一直在使用,3.2.5版本開始支持分布式鎖,
3 r2m分布式鎖原理
示例代碼:
String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());
R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);
//獲取鎖,鎖的默認有效期30s,獲取不到鎖就阻塞
lock.lock();
try {
//業務代碼
resourceService.afterApprovedHandle(resource);
} finally {
//釋放鎖
lock.unlock();
}
1 加鎖核心流程:
加鎖流程圖:
1):嘗試獲取鎖,通過執行加鎖Lua腳本來做;
2):若第一步未獲取到鎖,則去redis訂閱解鎖訊息
3):一旦持有鎖的執行緒釋放了鎖,就會廣播解鎖訊息,其他執行緒自旋重新嘗試獲取鎖,
核心加鎖原理:使用lua腳本封裝了hset和pexpire命令,保證是一個原子操作, KEYS[1]是加鎖的key,argv[2]是加鎖的客戶端ID(UUID+執行緒ID),ARGV[1]是鎖的有效期,默認30s.
private Object acquireInternal(List<String> args) {
if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {
return -1L;
} else {
try {
//hash結構,hash的key是加鎖的key,鍵值對的key為客戶端的UUID+執行緒id,value為鎖的重入計數器值,
return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);
} catch (Exception var3) {
this.setUnlocked();
throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3);
}
}
}
args引數
private List<String> acquireArgs(long leaseTime) {
List<String> args = new ArrayList();
if (leaseTime != -1L) {
args.add(String.valueOf(leaseTime));
} else {
//默認30s
args.add(String.valueOf(this.internalLockLeaseTime()));
}
//UUID+當前執行緒id
args.add(this.currentThreadLockId(Thread.currentThread().getId()));
return args;
}
?
獲取鎖失敗訂閱鎖的channel
//獲取鎖失敗,訂閱釋放鎖的訊息
private boolean failedAcquire() {
this.subLock();
return false;
}
private void subLock() {
CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);
if (cf != null) {
cf.handleAsync(this::reSubIfEx);
}
}
鎖釋放后,訂閱者通過自旋嘗試獲取鎖,
//tryAcquire獲取鎖,!tryAcquire就是獲取鎖失敗,鎖釋放后,通知執行緒喚醒后回傳false,然后通過自旋,嘗試獲取鎖,
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
//內部自旋獲取鎖
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2 釋放鎖核心邏輯:
1)洗掉分布式鎖key(如果可重入鎖計數為0)
- 發釋放鎖的廣播訊息
3)取消watchdog
private Object unlockInternal(List<String> args) {
logger.debug("{} trying to unlock.", Thread.currentThread().getId());
Object var2;
try {
//判斷鎖 key 是否存在,如果存在,然后遞減hash的value值,當value值為0時再洗掉鎖key,并且廣播釋放鎖的訊息
if (this.unlockSha() == null) {
var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);
return var2;
}
var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);
} catch (Exception var6) {
throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6);
} finally {
this.finalizeRelease();
}
return var2;
}
//取消當前執行緒的watchdog
private void finalizeRelease() {
long threadId = Thread.currentThread().getId();
R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);
if (entry != null) {
entry.release(threadId);
if (entry.isReleased()) {
//取消這個執行緒watchdog定時任務
entry.getExtTask().cancel();
this.expEntry.compareAndSet(entry, (Object)null);
//從快取watchdog執行緒的map中洗掉該執行緒
this.entryCache.remove(threadId);
}
}
}
3 鎖的健壯性思考
1 業務沒執行完,鎖超時過期怎么辦?
客戶端加鎖默認有效期30s,超過有效期后如果業務沒執行完,還需要持有這把鎖,r2m客戶端提供了續期機制,也就是watchdog機制,
watchdog原理:客戶端執行緒維度(UUID+執行緒ID,客戶端維護一個MAP,key就是UUID+執行緒ID)的后臺定時執行緒,獲取鎖成功后,如果客戶端還持有當前鎖,每隔10s(this.internalLockLeaseTime() / 3L),去延長鎖的有效期(internalLockLeaseTime)
//watchdog核心機制 ,internalLockLeaseTime默認30s
private void extendLock(long holderId) {
if (this.expEntry.get() != null) {
R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);
if (holderEntry != null) {
//每隔10s,如果當前執行緒持有鎖,則續期30s
if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {
Timeout task = this.timer().newTimeout((timeout) -> {
if (this.extendLockInternal(holderId)) {
this.extendLock(holderId);
}
}, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);
if (this.expEntry.get() != null) {
((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);
}
}
}
}
}
//執行續期lua腳本
private boolean extendLockInternal(long threadId) {
Object result;
try {
//只續期
if (this.extendLockSha() != null) {
result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));
} else {
result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));
}
} catch (Exception var5) {
return false;
}
return Long.parseLong(result.toString()) == 1L;
}
2 客戶端宕機,鎖如何釋放?
分布式鎖是有效期的,客戶端宕機后,watchdog機制失效,鎖過期自動失效,
3 redis分布式鎖集群模式下缺陷
r2m集群模式,極端情況,master加鎖成功,宕機,還未來得及同步到slave,主從切換,slave切換成master,可以繼續加鎖,對于非及其嚴格加鎖場景,該方案可滿足,屬于AP;對于嚴格場景下的分布式鎖,可采用基于zookeeper的分布式鎖,屬于CP,leader宕機,folllower選舉時不可用,性能上redis更優,
4 鎖的釋放問題
注意鎖的釋放在finally中釋放,必須由鎖的持有者釋放,不能由其他執行緒釋放別人的鎖,示例代碼中lock放到try的外面,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/543203.html
標籤:架構設計
上一篇:構建億級別的訊息推送基礎模型
下一篇:返回列表