并發編程AQS原始碼分析
AQS的全稱為(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面,它是一個Java提高的底層同步工具類,比如CountDownLatch、ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的
簡單來說:是用一個int型別的變數表示同步狀態,并提供了一系列的CAS操作來管理這個同步狀態物件
一個是 state(用于計數器,為0時釋放鎖)
一個是執行緒標記(當前執行緒是誰加鎖的),
一個是阻塞佇列Node(用于存放其他未拿到鎖的執行緒)
例子:執行緒A呼叫了lock()方法,通過CAS將state賦值為1,然后將該鎖標記為執行緒A加鎖,如果執行緒A還未釋放鎖時,執行緒B來請求,會查詢鎖標記的狀態,因為當前的鎖標記為 執行緒A,執行緒B未能匹配上,所以執行緒B會加入阻塞佇列,直到執行緒A觸發了 unlock() 方法,這時執行緒B才有機會去拿到鎖,但是不一定肯定拿到
原始碼分析階段
直接看ReentrantLock中的lock方法加鎖是如何使用到AQS的、ReentrantLock分為公平鎖和非公平鎖、默認是非公平鎖
公平鎖:按照佇列先進先出的順序進行加鎖解鎖
非公平鎖:哪個執行緒先搶到鎖就是哪個的、有可能造成某一個執行緒一直搶不到鎖
非公平鎖
// 非公平鎖
final void lock() {
// 進行cas操作、state值為0則賦值為1、成功獲取鎖
if (compareAndSetState(0, 1))
// 設定執行緒標記、執行緒標記用來檢測是否是重入鎖
setExclusiveOwnerThread(Thread.currentThread());
else
// 呼叫AQS中的acquire方法、在呼叫ReentrantLock類下定義的Sync類中的nonfairTryAcquire方法進行具體的鎖操作細節
// 傳參1、里面一直進行for回圈CAS賦值、直到哪個執行緒搶到回傳
acquire(1);
}
// acquire方法是操作state狀態位的方法、通過tryAcquire里面的CAS原子操作檢測鎖狀態
public final void acquire(int arg) {
// tryAcquire呼叫nonfairTryAcquire方法
if (!tryAcquire(arg) &&
// addWaiter: 根據給定模式創建一個當前執行緒的Node節點并回傳、當前是創建一個獨占鎖的Node節點
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中斷當前執行緒
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
// 先獲取當前執行緒識別符號、用于檢測當前物件的執行緒識別符號是否是同一個、同一個則為可重入鎖、可直接回傳、不是則回傳后呼叫acquireQueued
final Thread current = Thread.currentThread();
// 獲取計數器、表明當前執行緒重入了多少次鎖
int c = getState();
// 為0則代表當前執行緒的鎖全部解鎖完畢
if (c == 0) {
// 給state計數器加1、代表上鎖
if (compareAndSetState(0, acquires)) {
// 設定物件的執行緒識別符號為當前執行緒
setExclusiveOwnerThread(current);
// 直接回傳
return true;
}
}
// 代表當前有執行緒在使用該物件鎖、檢測是否是同一執行緒、是則進入、為可重入鎖
else if (current == getExclusiveOwnerThread()) {
// 計算state
int nextc = c + acquires;
// 小于0則代表鎖過多、即0x7fffffff + 1為負數
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 賦值state
setState(nextc);
// 直接回傳
return true;
}
// 回傳false、表示當前物件鎖被其他執行緒占用了、需要等到加鎖的執行緒解鎖才進行搶占
return false;
}
// 根據給定模式創建一個當前執行緒的Node節點并回傳
private Node addWaiter(Node mode) {
// 根據當前執行緒創建一個Node節點、并傳入模式(獨占鎖、共享鎖)
Node node = new Node(Thread.currentThread(), mode);
// 獲取佇列尾部Node節點
Node pred = tail;
// 檢測佇列是否存在尾部節點、即是否存在節點
if (pred != null) {
// 新創建的Node節點上一個節點設為佇列的尾部節點、然后下面直接將新創建節點插入佇列尾部
node.prev = pred;
// 通過cas操作更換節點順序、即新創建的節點為尾部、原先尾節點的下一個節點設定為當前新創建的Node節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 將新創建的Node節點插入佇列尾部
enq(node);
return node;
}
// 里面for死回圈無限呼叫nonfairTryAcquire方法檢測鎖是否被釋放、然后進行搶占加鎖
final boolean acquireQueued(final Node node, int arg) {
// 錯誤位、當finally執行時該變數為true、則代表有例外發生、取消當前的操作
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 回傳Node節點的上一個節點
final Node p = node.predecessor();
// 檢測是否是頭節點、是的話在進行cas檢測鎖是否解鎖在搶占
if (p == head && tryAcquire(arg)) {
// 搶占到鎖了、設定當前執行緒的Node節點為等待佇列的頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 檢測當前執行緒是否呼叫了interrupt中斷執行緒方法、并且檢測狀態位進行阻塞、呼叫LockSupport.park(this)方式阻塞自己
parkAndCheckInterrupt())
// 如果回圈走到這里在return回傳的話、代表執行緒將中斷
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/505587.html
標籤:其他
