同步框架AbstractQueuedSynchronizer
Java并發編程核心在于java.concurrent.util包 而juc當中的大多數同步器實作都是圍繞著共同的基礎行為,比如等待佇列、條件佇列、獨占獲取、共享獲取等,而這個行為的抽象就是基于AbstractQueuedSynchronizer簡稱AQS,AQS定義了一套多執行緒訪問共享資源的同步器框架,是一個依賴狀態(state)的同步器,
AQS具備特性 阻塞等待佇列、 共享/獨占、 公平/非公平、 可重入(同一把鎖同一個執行緒可重復拿)、 允許中斷,
一般通過定義內部類Sync繼承AQS 將同步器所有呼叫都映射到Sync對應的方法

state 記錄加鎖的次數,體現鎖的可重入性;訪問方式喲有三種 getState()、setState()、compareAndSetState()
exclusiveOwnerThread 體現獨占執行緒的特性,記錄的是當前獨占的執行緒,
AQS定義兩種資源共享方式 Exclusive-獨占,只有一個執行緒能執行,如ReentrantLock Share-共享,多個執行緒可以同時執行,如Semaphore/CountDownLatch
Node,阻塞佇列的體現:等待佇列,條件佇列,各種屬性定義如下
static final class Node {
/**
* 標記節點未共享模式
* */
static final Node SHARED = new Node();
/**
* 標記節點為獨占模式
*/
static final Node EXCLUSIVE = null;
/**
* 在同步佇列中等待的執行緒等待超時或者被中斷,需要從同步佇列中取消等待
* */
static final int CANCELLED = 1;
/**
* 后繼節點的執行緒處于等待狀態,而當前的節點如果釋放了同步狀態或者被取消,
* 將會通知后繼節點,使后繼節點的執行緒得以運行,
*/
static final int SIGNAL = -1;
/**
* 節點在等待佇列中,節點的執行緒等待在Condition上,當其他執行緒對Condition呼叫了signal()方法后,
* 該節點會從等待佇列中轉移到同步佇列中,加入到同步狀態的獲取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步狀態獲取將會被無條件地傳播下去
*/
static final int PROPAGATE = -3;
/**
* 標記當前節點的信號量狀態 (1,0,-1,-2,-3)5種狀態
* 使用CAS更改狀態,volatile保證執行緒可見性,高并發場景下,
* 即被一個執行緒修改后,狀態會立馬讓其他執行緒可見,
*/
volatile int waitStatus;
/**
* 前驅節點,當前節點加入到同步佇列中被設定
*/
volatile Node prev;
/**
* 后繼節點
*/
volatile Node next;
/**
* 節點同步狀態的執行緒
*/
volatile Thread thread;
/**
* 等待佇列中的后繼節點,如果當前節點是共享的,那么這個欄位是一個SHARED常量,
* 也就是說節點型別(獨占和共享)和等待佇列中的后繼節點共用同一個欄位,
*/
Node nextWaiter;
如果節點存在條件佇列中,只能是獨占模式不能是共享方式,
等待佇列其實是一個雙向鏈表結構,每個節點記錄的有前驅節點和后驅節點,
兩種佇列都是基于Node節點構建的,每個節點的Node都會有信號量,也就是屬性waitSate

條件佇列其實是單向鏈表;

ReentrantLock是一把可重入的,獨占的顯示鎖,構造的時候可以傳入一個布林值來決定是不是公平/非公平鎖
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
true為公平鎖,false為非公平鎖
如何理解公平鎖和非公平鎖
假如執行緒A結束后喚醒了執行緒B,此時新來一個執行緒C,如果執行緒C能和執行緒B搶鎖,那么這個是非公平鎖,如果新來的執行緒C只能怪怪去排隊,那么就是公平鎖,
加鎖程序,每加一次鎖state都會加1,釋放一次鎖,state都會減1,exclusiveOwnerThread記錄的當前持有鎖的執行緒,

分析一下取鎖的原始碼
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {--------表示沒有執行緒占用鎖,可以取拿鎖 if (!hasQueuedPredecessors() && -------------判斷佇列里面沒有執行緒在等待才能去搶鎖,這就是公平的體現 compareAndSetState(0, acquires)) { -------cas演算法原子操作改變state值,state值又被volitale修飾,保證并發下修改state的安全性, setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

執行緒被阻塞之后會被放在等待佇列里面,即我們那個雙鏈表結夠,注意我們這個雙鏈表的head的屬性thread是null,也就是說head不放任何執行緒資訊,僅僅是做head標記位使用,

ReentrantLock加鎖的程序
public final void acquire(int arg) { // 嘗試獲取鎖,獲取鎖失敗,addWaiter方法加入到CLH等待佇列,Node.EXCLUSIVE表示以獨占的方式入隊; if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) {
// 1. 將當前執行緒構建成Node型別
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 2. 1當前尾節點是否為null?
if (pred != null) {
// 2.2 將當前節點尾插入的方式
node.prev = pred;
// 2.3 CAS將節點插入同步佇列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
注意這里的for(;;)回圈;
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//佇列為空需要初始化,創建空的頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//set尾部節點
if (compareAndSetTail(t, node)) {//當前節點置為尾部
t.next = node; //前驅節點的next指標指向當前節點
return t;
}
}
}
}
注意AQS里面的執行緒喚醒不會喚醒所有的等待執行緒,而回喚醒頭節點的next執行緒(head頭節點不放執行緒),做到順序喚醒,而object的notify方法和notifyall方法會喚醒所以有的執行緒,無序,
執行緒的阻塞和喚醒用的是魔術類Unsafe里面的park()和unpark()方法,底層是呼叫Pthead_mutex_lock指令庫方法,
代碼;
public final void acquire(int arg) { // 嘗試獲取鎖,獲取鎖失敗,addWaiter方法加入到CLH等待佇列,Node.EXCLUSIVE表示以獨占的方式入隊; // acquireQueued對入隊的執行緒進行阻塞 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/**
* 已經在佇列當中的Thread節點,準備阻塞等待獲取鎖
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//死回圈
final Node p = node.predecessor();//找到當前結點的前驅結點
if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的,
setHead(node);//獲取同步狀態成功,將當前結點設定為頭結點,
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 如果前驅節點不是Head,通過shouldParkAfterFailedAcquire判斷是否應該阻塞
* 前驅節點信號量為-1,當前執行緒可以安全被parkAndCheckInterrupt用來阻塞執行緒
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.
private static void setBlocker(Thread t, Object arg) { //unsafe魔術類阻塞執行緒,
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/140439.html
標籤:Java
上一篇:Git使用教程
