圖解Janusgraph系列-并發安全:鎖機制(本地鎖+分布式鎖)分析
大家好,我是洋仔,JanusGraph圖解系列文章,實時更新~
圖資料庫文章總目錄:
- 整理所有圖相關文章,請移步(超鏈):圖資料庫系列-文章總目錄
- 地址:https://liyangyang.blog.csdn.net/article/details/111031257
原始碼分析相關可查看github(碼文不易,求個star~): https://github.com/YYDreamer/janusgraph
下述流程高清大圖地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629
版本:JanusGraph-0.5.2
轉載文章請保留以下宣告:
作者:洋仔聊編程
微信公眾號:匠心Java
原文地址:https://liyangyang.blog.csdn.net/
在分布式系統中,難免涉及到對同一資料的并發操作,如何保證分布式系統中資料的并發安全呢?分布式鎖!
一:分布式鎖
常用的分布式鎖實作方式:
1、基于資料庫實作分布式鎖
? 針對于資料庫實作的分布式鎖,如mysql使用使用for update共同競爭一個行鎖來實作; 在JanusGraph中,也是基于資料庫實作的分布式鎖,這里的資料庫指的是我們當前使用的第三方backend storage,具體的實作方式也和mysql有所不同,具體我們會在下文分析
2、基于Redis實作的分布式鎖
? 基于lua腳本+setNx實作
3、基于zk實作的分布式鎖
? 基于znode的有序性和臨時節點+zk的watcher機制實作
4、MVCC多版本并發控制樂觀鎖實作
本文主要介紹Janusgraph的鎖機制,其他的實作機制就不在此做詳解了
下面我們來分析一下JanusGraph的鎖機制實作~
二:JanusGraph鎖機制
在JanusGraph中使用的鎖機制是:本地鎖 + 分布式鎖來實作的;
2.1 一致性行為
在JanusGraph中主要有三種一致性修飾詞(Consistency Modifier)來表示3種不同的一致性行為,來控制圖庫使用程序中的并發問題的控制程度;
public enum ConsistencyModifier {
DEFAULT,
LOCK,
FORK
}
原始碼中ConsistencyModifier列舉類主要作用:用于控制JanusGraph在最終一致或其他非事務性后端系統上的一致性行為!其作用分別為:
- DEFAULT:默認的一致性行為,不使用分布式鎖進行控制,對配置的存盤后端使用由封閉事務保證的默認一致性模型,一致性行為主要取決于存盤后端的配置以及封閉事務的(可選)配置;無需顯示配置即可使用
- LOCK:在存盤后端支持鎖的前提下,顯示的獲取分布式鎖以保證一致性!確切的一致性保證取決于所配置的鎖實作;需
management.setConsistency(element, ConsistencyModifier.LOCK);陳述句進行配置 - FORK:只適用于
multi-edges和list-properties兩種情況下使用;使JanusGraph修改資料時,采用先洗掉后添加新的邊/屬性的方式,而不是覆寫現有的邊/屬性,從而避免潛在的并發寫入沖突;需management.setConsistency(element, ConsistencyModifier.FORK);進行配置
LOCK
在查詢或者插入資料時,是否使用分布式鎖進行并發控制,在圖shcema的創建程序中,如上述可以通過配置schema元素為ConsistencyModifier.LOCK方式控制并發,則在使用程序中就會用分布式鎖進行并發控制;
為了提高效率,JanusGraph默認不使用鎖定, 因此,用戶必須為定義一致性約束的每個架構元素決定是否使用鎖定,
使用JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)顯式啟用對架構元素的鎖定
代碼如下所示:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('consistentName').dataType(String.class).make()
index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
mgmt.commit()
FORK
由于邊緣作為單個記錄存盤在基礎存盤后端中,因此同時修改單個邊緣將導致沖突,
FORK就是為了代替LOCK,可以將邊緣標簽配置為使用ConsistencyModifier.FORK,
下面的示例創建一個新的edge label,并將其設定為ConsistencyModifier.FORK
mgmt = graph.openManagement()
related = mgmt.makeEdgeLabel('related').make()
mgmt.setConsistency(related, ConsistencyModifier.FORK)
mgmt.commit()
經過上述配置后,修改標簽配置為FORK的edge時,操作步驟為:
- 首先,洗掉該邊
- 將修改后的邊作為新邊添加
因此,如果兩個并發事務修改了同一邊緣,則提交時將存在邊緣的兩個修改后的副本,可以在查詢遍歷期間根據需要解決這些副本,
注意edge fork僅適用于MULTI edge, 具有多重性約束的邊緣標簽不能使用此策略,因為非MULTI的邊緣標簽定義中內置了一個唯一性約束,該約束需要顯式鎖定或使用基礎存盤后端的沖突解決機制
下面我們具體來看一下janusgrph的鎖機制的實作:
2.2 LoackID
在介紹鎖機制之前,先看一下鎖應該鎖什么東西呢?
我們都知道在janusgraph的底層存盤中,vertexId作為Rowkey,屬性和邊存盤在cell中,由column+value組成
當我們修改節點的屬性和邊+邊的屬性時,很明顯只要鎖住對應的Rowkey + Column即可;
在Janusgraph中,這個鎖的標識的基礎部分就是LockID:
LockID = RowKey + Column
原始碼如下:
KeyColumn lockID = new KeyColumn(key, column);
2.3 本地鎖
本地鎖是在任何情況下都需要獲取的一個鎖,只有獲取成功后,才會進行下述分布式鎖的獲取!
本地鎖是基于圖實體維度存在的;主要作用是保證當前圖實體下的操作中無沖突!
本地鎖的實作是通過ConcurrentHashMap資料結構來實作的,在圖實體維度下唯一;
基于當前事務+lockId來作為鎖標識;
獲取的主要流程:

結合原始碼如下:
上述圖建議依照原始碼一塊分析,原始碼在LocalLockMediator類中的下述方法,下面原始碼分析模塊會詳細分析
public boolean lock(KeyColumn kc, T requester, Instant expires) {
}
引入本地鎖機制,主要目的: 在圖實體維度來做一層鎖判斷,減少分布式鎖的并發沖突,減少分布式鎖帶來的性能消耗
2.4 分布式鎖
在本地鎖獲取成功之后才會去嘗試獲取分布式鎖;
分布式鎖的獲取整體分為兩部分流程:
分布式鎖資訊插入分布式鎖資訊狀態判斷
分布式鎖資訊插入
該部分主要是通過lockID來構造要插入的Rowkey和column并將資料插入到hbase中;插入成功即表示這部分處理成功!
具體流程如下:

分布式鎖資訊狀態判斷
該部分在上一部分完成之后才會進行,主要是判斷分布式鎖是否獲取成功!
查詢出當前hbase中對應Rowkey的所有column,過濾未過期的column集合,比對集合的第一個column是否等于當前事務插入的column;
等于則獲取成功!不等于則獲取失敗!
具體流程如下:

三:原始碼分析 與 整體流程
原始碼分析已經push到github:https://github.com/YYDreamer/janusgraph
1、獲取鎖的入口
public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
// locker是一個一致性key鎖物件
if (locker != null) {
// 獲取當前事務物件
ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;
// 判斷:當前的獲取鎖操作是否當前事務的操作中存在增刪改的操作
if (tx.isMutationStarted())
throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");
// 使用key+column組裝為lockID,供下述加鎖使用!!!!!
KeyColumn lockID = new KeyColumn(key, column);
log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);
// 獲取本地當前jvm行程中的寫鎖(看下述的 1:寫鎖獲取分析)
// (此處的獲取鎖只是將對應的KLV存盤到Hbase中!存盤成功并不代表獲取鎖成功)
// 1. 獲取成功(等同于存盤成功)則繼續執行
// 2. 獲取失敗(等同于存盤失敗),會拋出例外,拋出到最上層,列印錯誤日志“Could not commit transaction ["+transactionId+"] due to exception” 并拋出對應的例外,本次插入資料結束
locker.writeLock(lockID, tx.getConsistentTx());
// 執行前提:上述獲取鎖成功!
// 存盤期望值,此處為了實作當相同的key + value + tx多個加鎖時,只處理第一個
// 存盤在事務物件中,標識在commit判斷鎖是否獲取成功時,當前事務插入的是哪個鎖資訊
tx.storeExpectedValue(this, lockID, expectedValue);
} else {
// locker為空情況下,直接拋出一個運行時例外,終止程式
store.acquireLock(key, column, expectedValue, unwrapTx(txh));
}
}
2、執行 locker.writeLock(lockID, tx.getConsistentTx()) 觸發鎖獲取
public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {
if (null != tx.getConfiguration().getGroupName()) {
MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();
}
// 判斷當前事務是否在圖實體的維度 已經占據了lockID的鎖
// 此處的lockState在一個事務成功獲取本地鎖+分布式鎖后,以事務為key、value為map,其中key為lockID,value為加鎖狀態(開始時間、過期時間等)
if (lockState.has(tx, lockID)) {
log.debug("Transaction {} already wrote lock on {}", tx, lockID);
return;
}
// 當前事務沒有占據lockID對應的鎖
// 進行(lockLocally(lockID, tx) 本地加鎖鎖定操作,
if (lockLocally(lockID, tx)) {
boolean ok = false;
try {
// 在本地鎖獲取成功的前提下:
// 嘗試獲取基于Hbase實作的分布式鎖;
// 注意!!!(此處的獲取鎖只是將對應的KLV存盤到Hbase中!存盤成功并不代表獲取鎖成功)
S stat = writeSingleLock(lockID, tx);
// 獲取鎖分布式鎖成功后(即寫入成功后),更新本地鎖的過期時間為分布式鎖的過期時間
lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time
// 將上述獲取的鎖,存盤在標識當前存在鎖的集合中Map<tx,Map<lockID,S>>, key為事務、value中的map為當前事務獲取的鎖,key為lockID,value為當前獲取分布式鎖的ConsistentKeyStatus(一致性密匙狀態)物件
lockState.take(tx, lockID, stat);
ok = true;
} catch (TemporaryBackendException tse) {
// 在獲取分布式鎖失敗后,捕獲該例外,并拋出該例外
throw new TemporaryLockingException(tse);
} catch (AssertionError ae) {
// Concession to ease testing with mocks & behavior verification
ok = true;
throw ae;
} catch (Throwable t) {
// 出現底層存盤錯誤! 則直接加鎖失敗!
throw new PermanentLockingException(t);
} finally {
// 判斷是否成功獲取鎖,沒有獲分布式鎖的,則釋放本地鎖
if (!ok) {
// 沒有成功獲取鎖,則釋放本地鎖
// lockState.release(tx, lockID); // has no effect
unlockLocally(lockID, tx);
if (null != tx.getConfiguration().getGroupName()) {
MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc();
}
}
}
} else {
// 如果獲取本地鎖失敗,則直接拋出例外,不進行重新本地爭用
// Fail immediately with no retries on local contention
throw new PermanentLockingException("Local lock contention");
}
}
包含兩個部分:
- 本地鎖的獲取
lockLocally(lockID, tx) - 分布式鎖的獲取
writeSingleLock(lockID, tx)注意此處只是將鎖資訊寫入到Hbase中,并不代表獲取分布式鎖成功,只是做了上述介紹的第一個階段分布式鎖資訊插入
3、本地鎖獲取 lockLocally(lockID, tx)
public boolean lock(KeyColumn kc, T requester, Instant expires) {
assert null != kc;
assert null != requester;
final StackTraceElement[] acquiredAt = log.isTraceEnabled() ?
new Throwable("Lock acquisition by " + requester).getStackTrace() : null;
// map的value,以事務為核心
final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt);
// ConcurrentHashMap實作locks, 以lockID為key,事務為核心value
final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit);
boolean success = false;
// 代表當前map中不存在lockID,標識著鎖沒有被占用,成功獲取鎖
if (null == inMap) {
// Uncontended lock succeeded
if (log.isTraceEnabled()) {
log.trace("New local lock created: {} namespace={} txn={}",
kc, name, requester);
}
success = true;
} else if (inMap.equals(audit)) {
// 代表當前存在lockID,比對舊value和新value中的事務物件是否是同一個
// requester has already locked kc; update expiresAt
// 上述判斷后,事務物件為同一個,標識當前事務已經獲取這個lockID的鎖;
// 1. 這一步進行cas替換,作用是為了重繪過期時間
// 2. 并發處理,如果因為鎖過期被其他事務占據,則占用鎖失敗
success = locks.replace(kc, inMap, audit);
if (log.isTraceEnabled()) {
if (success) {
log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
kc, name, requester, inMap.expires, audit.expires);
} else {
log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
kc, name, requester, inMap.expires, audit.expires);
}
}
} else if (0 > inMap.expires.compareTo(times.getTime())) {
// 比較過期時間,如果鎖已經過期,則當前事務可以占用該鎖
// the recorded lock has expired; replace it
// 1. 當前事務占用鎖
// 2. 并發處理,如果因為鎖過期被其他事務占據,則占用鎖失敗
success = locks.replace(kc, inMap, audit);
if (log.isTraceEnabled()) {
log.trace("Discarding expired lock: {} namespace={} txn={} expired={}",
kc, name, inMap.holder, inMap.expires);
}
} else {
// 標識:鎖被其他事務占用,并且未過期,則占用鎖失敗
// we lost to a valid lock
if (log.isTraceEnabled()) {
log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})",
kc, name, requester, inMap);
log.trace("Owner stacktrace:\n {}", Joiner.on("\n ").join(inMap.acquiredAt));
}
}
return success;
}
如上述介紹,本地鎖的實作是通過ConcurrentHashMap資料結構來實作的,在圖實體維度下唯一!
4、分布式鎖獲取第一個階段:分布式鎖資訊插入
protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable {
// 組裝插入hbase資料的Rowkey
final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn());
StaticBuffer oldLockCol = null;
// 進行嘗試插入 ,默認嘗試次數3次
for (int i = 0; i < lockRetryCount; i++) {
// 嘗試將資料插入到hbase中;oldLockCol表示要洗掉的column代表上一次嘗試插入的資料
WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh);
// 如果插入成功
if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) {
final Instant writeInstant = wr.getWriteTimestamp(); // 寫入時間
final Instant expireInstant = writeInstant.plus(lockExpire);// 過期時間
return new ConsistentKeyLockStatus(writeInstant, expireInstant); // 回傳插入物件
}
// 賦值當前的嘗試插入的資料,要在下一次嘗試時洗掉
oldLockCol = wr.getLockCol();
// 判斷插入失敗原因,臨時例外進行嘗試,非臨時例外停止嘗試!
handleMutationFailure(lockID, lockKey, wr, txh);
}
// 處理在嘗試了3次之后還是沒插入成功的情況,洗掉最后一次嘗試插入的資料
tryDeleteLockOnce(lockKey, oldLockCol, txh);
// TODO log exception or successful too-slow write here
// 拋出例外,標識匯入資料失敗
throw new TemporaryBackendException("Lock write retry count exceeded");
}
上述只是將鎖資訊插入,插入成功標識該流程結束
5、分布式鎖獲取第一個階段:分布式鎖鎖定是否成功判定
這一步,是在commit階段進行的驗證
public void commit() throws BackendException {
// 此方法內呼叫checkSingleLock 檢查分布式鎖的獲取結果
flushInternal();
tx.commit();
}
最侄訓呼叫checkSingleLock方法,判斷獲取鎖的狀態!
protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls,
final StoreTransaction tx) throws BackendException, InterruptedException {
// 檢查是否被檢查過
if (ls.isChecked())
return;
// Slice the store
KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START,
LOCK_COL_END);
// 此處從hbase中查詢出鎖定的行的所有列! 默認查詢重試次數3
List<Entry> claimEntries = getSliceWithRetries(ksq, tx);
// 從每個回傳條目的列中提取timestamp和rid,然后過濾出帶有過期時間戳的timestamp物件
final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries,
e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times));
final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable));
for (TimestampRid tr : iterable) { // 過濾獲取未過期的鎖!
final Instant cutoffTime = now.minus(lockExpire);
if (tr.getTimestamp().isBefore(cutoffTime)) {
...
}
// 將還未過期的鎖記錄存盤到一個集合中
unexpiredTRs.add(tr);
}
// 判斷當前tx是否成功持有鎖! 如果我們插入的列是讀取的第一個列,或者前面的列只包含我們自己的rid(因為我們是在第一部分的前提下獲取的鎖,第一部分我們成功獲取了基于當前行程的鎖,所以如果rid相同,代表著我們也成功獲取到了當前的分布式鎖),那么我們持有鎖,否則,另一個行程持有該鎖,我們無法獲得鎖
// 如果,獲取鎖失敗,拋出TemporaryLockingException例外!!!! 拋出到頂層的mutator.commitStorage()處,最侄訓入失敗進行事務回滾等操作
checkSeniority(kc, ls, unexpiredTRs);
// 如果上述步驟未拋出例外,則標識當前的tx已經成功獲取鎖!
ls.setChecked();
}
四:整體流程
總流程如下圖:

整體流程為:
- 獲取本地鎖
- 獲取分布式鎖
- 插入分布式鎖資訊
- commit階段判斷分布式鎖獲取是否成功
- 獲取失敗,則重試
五:總結
JanusGraph的鎖機制主要是通過本地鎖+分布式鎖來實作分布式系統下的資料一致性;
分布式鎖的控制維度為:property、vertex、edge、index都可以;
JanusGraph支持在資料匯入時通過前面一致性行為部分所說的LOCK來開關分布式鎖:
- LOCK:資料匯入時開啟分布式鎖保證分布式一致性
- DEFAULT、FORK:資料匯入時關閉分布式鎖
是否開啟分布式鎖思考:
在開啟分布式鎖的情況下,資料匯入開銷非常大;如果是資料不是要求很高的一致性,并且資料量比較大,我們可以選擇關閉分布式鎖相關,來提高匯入速度;
然后,針對于小資料量的要求高一致性的資料,單獨開啟分布式鎖來保證資料安全;
另外,我們在不開啟分布式鎖定的情況下,可以通過針對于匯入的資料的充分探查來減少沖突!
針對于圖schema的元素開啟還是關閉分布式鎖,還是根據實際業務情況來決定,
本文有任何問題,可加博主微信或評論指出,感謝!
碼文不易,給個贊和star吧~
本文由博客群發一文多發等運營工具平臺 OpenWrite 發布
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/236497.html
標籤:其他
