主頁 > 後端開發 > 這才是圖文并茂:我寫了1萬多字,就是為了讓你了解AQS是怎么運行的

這才是圖文并茂:我寫了1萬多字,就是為了讓你了解AQS是怎么運行的

2020-11-18 12:41:15 後端開發

前言

如果你想深入研究Java并發的話,那么AQS一定是繞不開的一塊知識點,Java并發包很多的同步工具類底層都是基于AQS來實作的,比如我們作業中經常用的Lock工具ReentrantLock、柵欄CountDownLatch、信號量Semaphore等,而且關于AQS的知識點也是面試中經常考察的內容,所以,無論是為了更好的使用還是為了應付面試,深入學習AQS都很有必要,

CAS

學習AQS之前,我們有必要了解一個知識點,就是AQS底層中大量使用的CAS,關于CAS,大家應該都不陌生,如果還有哪位同學不清楚的話,可以看看我之前的文章《面試必備知識點:悲觀鎖和樂觀鎖的那些事兒》 ,這里不多復述,哈哈,給自己舊文章加了閱讀量

此時,好幾塊搬磚朝我飛了過來,,,,,

好吧,開個玩笑,還是大概講解一下吧,了解的同學可以跳過這一段,

CAS是樂觀鎖的一種思想,它假設執行緒對資源的訪問是沒有沖突的,同時所有的執行緒執行都不需要等待,可以持續執行, 如果有沖突的話,就用比較+交換的方式來檢測沖突,有沖突就不斷重試,

CAS的全稱是Compare-and-Swap,也就是比較并交換,它包含了三個引數:V,A,B,V表示要讀寫的記憶體位置,A表示舊的預期值,B表示新值,當執行CAS時,只有當V的值等于預期值A時,才會把V的值改為B,這樣的方式可以讓多個執行緒同時去修改,但也會因為執行緒操作失敗而不斷重試,對CPU有一定程式上的開銷,

AQS簡介

本文主角正式登場,

AQS,全名AbstractQueuedSynchronizer,是一個抽象類的佇列式同步器,它的內部通過維護一個狀態volatile int state(共享資源),一個FIFO執行緒等待佇列來實作同步功能,

state用關鍵字volatile修飾,代表著該共享資源的狀態一更改就能被所有執行緒可見,而AQS的加鎖方式本質上就是多個執行緒在競爭state,當state為0時代表執行緒可以競爭鎖,不為0時代表當前物件鎖已經被占有,其他執行緒來加鎖時則會失敗,加鎖失敗的執行緒會被放入一個FIFO的等待佇列中,這些執行緒會被UNSAFE.park()操作掛起,等待其他獲取鎖的執行緒釋放鎖才能夠被喚醒,

而這個等待佇列其實就相當于一個CLH佇列,用一張原理圖來表示大致如下:

基礎定義

AQS支持兩種資源分享的方式:Exclusive(獨占,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch),

自定義的同步器繼承AQS后,只需要實作共享資源state的獲取和釋放方式即可,其他如執行緒佇列的維護(如獲取資源失敗入隊/喚醒出隊等)等操作,AQS在頂層已經實作了,

AQS代碼內部提供了一系列操作鎖和執行緒佇列的方法,主要操作鎖的方法包含以下幾個:

  • compareAndSetState():利用CAS的操作來設定state的值
  • tryAcquire(int):獨占方式獲取鎖,成功則回傳true,失敗則回傳false,
  • tryRelease(int):獨占方式釋放鎖,成功則回傳true,失敗則回傳false,
  • tryAcquireShared(int):共享方式釋放鎖,負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源,
  • tryReleaseShared(int):共享方式釋放鎖,如果釋放后允許喚醒后續等待結點回傳true,否則回傳false,

像ReentrantLock就是實作了自定義的tryAcquire-tryRelease,從而操作state的值來實作同步效果,

除此之外,AQS內部還定義了一個靜態類Node,表示CLH佇列的每一個結點,該結點的作用是對每一個等待獲取資源做了封裝,包含了需要同步的執行緒本身、執行緒等待狀態.....

我們可以看下該類的一些重點變數:

static final class Node {
        /** 表示共享模式下等待的Node */
        static final Node SHARED = new Node();
        /** 表示獨占模式下等待的mode */
        static final Node EXCLUSIVE = null;

        /** 下面幾個為waitStatus的具體值 */
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    
        volatile int waitStatus;
        
         /** 表示前面的結點 */
        volatile Node prev;
         /** 表示后面的結點 */
        volatile Node next;
         /**當前結點裝載的執行緒,初始化時被創建,使用后會置空*/
        volatile Thread thread;
         /**鏈接到下一個節點的等待條件,用到Condition的時候會使用到*/
        Node nextWaiter;
    
    }

代碼里面定義了一個表示當前Node結點等待狀態的欄位waitStatus,該欄位的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,這五個值代表了不同的特定場景:

  • CANCELLED:表示當前結點已取消調度,當timeout或被中斷(回應中斷的情況下),會觸發變更為此狀態,進入該狀態后的結點將不會再變化,
  • SIGNAL:表示后繼結點在等待當前結點喚醒,后繼結點入隊時,會將前繼結點的狀態更新為SIGNAL(記住這個-1的值,因為后面我們講的時候經常會提到)
  • CONDITION:表示結點等待在Condition上,當其他執行緒呼叫了Condition的signal()方法后,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖,(注:Condition是AQS的一個組件,后面會細說)
  • PROPAGATE:共享模式下,前繼結點不僅會喚醒其后繼結點,同時也可能會喚醒后繼的后繼結點,
  • 0:新結點入隊時的默認狀態,

也就是說,當waitStatus為負值表示結點處于有效等待狀態,為正值的時候表示結點已被取消,

在AQS內部中還維護了兩個Node物件headtail,一開始默認都為null

private transient volatile Node head;
private transient volatile Node tail;   

講完了AQS的一些基礎定義,我們就可以開始學習同步的具體運行機制了,為了更好的演示,我們用ReentrantLock作為使用入口,一步步跟進原始碼探究AQS底層是如何運作的,這里說明一下,因為ReentrantLock底層呼叫的AQS是獨占模式,所以下文講解的AQS原始碼也是針對獨占模式的操作

好了,熱身正式結束,來吧,

獨占模式

加鎖程序

我們都知道,ReentrantLock的加鎖和解鎖方法分別為lock()和unLock(),我們先來看獲取鎖的方法,

final void lock() {
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		acquire(1);
}

邏輯很簡單,執行緒進來后直接利用CAS嘗試搶占鎖,如果搶占成功state值回被改為1,且設定物件獨占鎖執行緒為當前執行緒,否則就呼叫acquire(1)再次嘗試獲取鎖,

我們假定有兩個執行緒A和B同時競爭鎖,A進來先搶占到鎖,此時的AQS模型圖就類似這樣:

繼續走下面的方法,

public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

acquire包含了幾個函式的呼叫,

tryAcquire:嘗試直接獲取鎖,如果成功就直接回傳;

addWaiter:將該執行緒加入等待佇列FIFO的尾部,并標記為獨占模式;

acquireQueued:執行緒阻塞在等待佇列中獲取鎖,一直獲取到資源后才回傳,如果在整個等待程序中被中斷過,則回傳true,否則回傳false,

selfInterrupt:自我中斷,就是既拿不到鎖,又在等待時被中斷了,執行緒就會進行自我中斷selfInterrupt(),將中斷補上,

我們一個個來看原始碼,并結合上面的兩個執行緒來做場景分析,

tryAcquire

不用多說,就是為了再次嘗試獲取鎖

protected final boolean tryAcquire(int acquires) {
	return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

當執行緒B進來后,nonfairTryAcquire方法首先會獲取state的值,如果為0,則正常獲取該鎖,不為0的話判斷是否是當前執行緒占用了,是的話就累加state的值,這里的累加也是為了配合釋放鎖時候的次數,從而實作可重入鎖的效果,

當然,因為之前鎖已經被執行緒A占領了,所以這時候tryAcquire會回傳false,繼續下面的流程,

addWaiter

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

這段代碼首先會創建一個和當前執行緒系結的Node節點,Node為雙向鏈表,此時等待佇列中的tail指標為空,直接呼叫enq(node)方法將當前執行緒加入等待佇列尾部,然后回傳當前結點的前驅結點,

private Node enq(final Node node) {
	// CAS"自旋",直到成功加入隊尾
    for (;;) {
        Node t = tail;
        if (t == null) {
        	// 佇列為空,初始化一個Node結點作為Head結點,并將tail結點也指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        	// 把當前結點插入佇列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一遍回圈時,tail指標為空,初始化一個Node結點,并把head和tail結點都指向它,然后第二次回圈進來之后,tail結點不為空了,就將當前的結點加入到tail結點后面,也就是這樣:

todo 如果此時有另一個執行緒C進來的話,發現鎖已經被A拿走了,然后佇列里已經有了執行緒B,那么執行緒C就只能乖乖排到執行緒B的后面去,

acquireQueued

接著解讀方法,通過tryAcquire()和addWaiter(),我們的執行緒還是沒有拿到資源,并且還被排到了佇列的尾部,如果讓你來設計的話,這個時候你會怎么處理執行緒呢?其實答案也很簡單,能做的事無非兩個:

1、回圈讓執行緒再搶資源,但仔細一推敲就知道不合理,因為如果有多個執行緒都參與的話,你搶我也搶只會降低系統性能

2、進入等待狀態休息,直到其他執行緒徹底釋放資源后喚醒自己,自己再拿到資源

毫無疑問,選擇2更加靠譜,acquireQueued方法做的也是這樣的處理:

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		// 標記是否會被中斷
		boolean interrupted = false;
		// CAS自旋
		for (;;) {
			// 獲取當前結點的前結點
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			// 獲取鎖失敗,則將此執行緒對應的node的waitStatus改為CANCEL
			cancelAcquire(node);
	}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		// 前驅結點等待狀態為"SIGNAL",那么自己就可以安心等待被喚醒了
		return true;
	if (ws > 0) {
		/*
		 * 前驅結點被取消了,通過回圈一直往前找,直到找到等待狀態有效的結點(等待狀態值小于等于0) ,
		 * 然后排在他們的后邊,至于那些被當前Node強制"靠后"的結點,因為已經被取消了,也沒有參考鏈,
		 * 就等著被GC了
		 */
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// 如果前驅正常,那就把前驅的狀態設定成SIGNAL
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}

acquireQueued方法的流程是這樣的:

1、CAS自旋,先判斷當前傳入的Node的前結點是否為head結點,是的話就嘗試獲取鎖,獲取鎖成功的話就把當前結點置為head,之前的head置為null(方便GC),然后回傳

2、如果前驅結點不是head或者加鎖失敗的話,就呼叫shouldParkAfterFailedAcquire,將前驅節點的waitStatus變為了SIGNAL=-1,最后執行parkAndChecknIterrupt方法,呼叫LockSupport.park()掛起當前執行緒,parkAndCheckInterrupt在掛起執行緒后會判斷執行緒是否被中斷,如果被中斷的話,就會重新跑acquireQueued方法的CAS自旋操作,直到獲取資源,

ps:LockSupport.park方法會讓當前執行緒進入waitting狀態,在這種狀態下,執行緒被喚醒的情況有兩種,一是被unpark(),二是被interrupt(),所以,如果是第二種情況的話,需要回傳被中斷的標志,然后在acquire頂層方法的視窗那里自我中斷補上

此時,因為執行緒A還未釋放鎖,所以執行緒B狀態都是被掛起的,

到這里,加鎖的流程就分析完了,其實整體來說也并不復雜,而且當你理解了獨占模式加鎖的程序,后面釋放鎖和共享模式的運行機制也沒什么難懂的了,所以整個加鎖的程序還是有必要多消化下的,也是AQS的重中之重,

為了方便你們更加清晰理解,我加多一張流程圖吧(這個作者也太暖了吧,哈哈)

釋放鎖

說完了加鎖,我們來看看釋放鎖是怎么做的,AQS中釋放鎖的方法是release(),當呼叫該方法時會釋放指定量的資源 (也就是鎖) ,如果徹底釋放了(即state=0),它會喚醒等待佇列里的其他執行緒來獲取資源,

還是一步步看原始碼吧,

public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}

tryRelease

代碼上可以看出,核心的邏輯都在tryRelease方法中,該方法的作用是釋放資源,AQS里該方法沒有具體的實作,需要由自定義的同步器去實作,我們看下ReentrantLock代碼中對應方法的原始碼:

protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}

tryRelease方法會減去state對應的值,如果state為0,也就是已經徹底釋放資源,就回傳true,并且把獨占的執行緒置為null,否則回傳false,

此時AQS中的資料就會變成這樣:

完全釋放資源后,當前執行緒要做的就是喚醒CLH佇列中第一個在等待資源的執行緒,也就是head結點后面的執行緒,此時呼叫的方法是unparkSuccessor()

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
    	//將head結點的狀態置為0
        compareAndSetWaitStatus(node, ws, 0);
	//找到下一個需要喚醒的結點s
    Node s = node.next;
    //如果為慷訓已取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從后向前,直到找到等待狀態小于0的結點,前面說了,結點waitStatus小于0時才有效
        for (Node t = tail; t != null && t != node; t = t.prev) 
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到有效的結點,直接喚醒
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}

方法的邏輯很簡單,就是先將head的結點狀態置為0,避免下面找結點的時候再找到head,然后找到佇列中最前面的有效結點,然后喚醒,我們假設這個時候執行緒A已經釋放鎖,那么此時佇列中排最前邊競爭鎖的執行緒B就會被喚醒,

然后被喚醒的執行緒B就會嘗試用CAS獲取鎖,回到acquireQueued方法的邏輯,

for (;;) {
	// 獲取當前結點的前結點
	final Node p = node.predecessor();
	if (p == head && tryAcquire(arg)) {
		setHead(node);
		p.next = null; // help GC
		failed = false;
		return interrupted;
	}
	if (shouldParkAfterFailedAcquire(p, node) &&
		parkAndCheckInterrupt())
		interrupted = true;
}

當執行緒B獲取鎖之后,會把當前結點賦值給head,然后原先的前驅結點 (也就是原來的head結點) 去掉參考鏈,方便回收,這樣一來,執行緒B獲取鎖的整個程序就完成了,此時AQS的資料就會變成這樣:

到這里,我們已經分析完了AQS獨占模式下加鎖和釋放鎖的程序,也就是tryAccquire->tryRelease這一鏈條的邏輯,除此之外,AQS中還支持共享模式的同步,這種模式下關于鎖的操作核心其實就是tryAcquireShared->tryReleaseShared這兩個方法,我們可以簡單看下

共享模式

獲取鎖

AQS中,共享模式獲取鎖的頂層入口方法是acquireShared,該方法會獲取指定數量的資源,成功的話就直接回傳,失敗的話就進入等待佇列,直到獲取資源,

public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
		doAcquireShared(arg);
}

該方法里包含了兩個方法的呼叫,

tryAcquireShared:嘗試獲取一定資源的鎖,回傳的值代表獲取鎖的狀態,

doAcquireShared:進入等待佇列,并回圈嘗試獲取鎖,直到成功,

tryAcquireShared

tryAcquireShared在AQS里沒有實作,同樣由自定義的同步器去完成具體的邏輯,像一些較為常見的并發工具Semaphore、CountDownLatch里就有對該方法的自定義實作,雖然實作的邏輯不同,但方法的作用是一樣的,就是獲取一定資源的資源,然后根據回傳值判斷是否還有剩余資源,從而決定下一步的操作,

回傳值有三種定義:

  • 負值代表獲取失敗;
  • 0代表獲取成功,但沒有剩余的資源,也就是state已經為0;
  • 正值代表獲取成功,而且state還有剩余,其他執行緒可以繼續領取

當回傳值小于0時,證明此次獲取一定數量的鎖失敗了,然后就會走doAcquireShared方法

doAcquireShared

此方法的作用是將當前執行緒加入等待佇列尾部休息,直到其他執行緒釋放資源喚醒自己,自己成功拿到相應量的資源后才回傳,這是它的原始碼:

private void doAcquireShared(int arg) {
	// 加入佇列尾部
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		boolean interrupted = false;
		// CAS自旋
		for (;;) {
			final Node p = node.predecessor();
			// 判斷前驅結點是否是head
			if (p == head) {
				// 嘗試獲取一定數量的鎖
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					// 獲取鎖成功,而且還有剩余資源,就設定當前結點為head,并繼續喚醒下一個執行緒
					setHeadAndPropagate(node, r);
					// 讓前驅結點去掉參考鏈,方便被GC
					p.next = null; // help GC
					if (interrupted)
						selfInterrupt();
					failed = false;
					return;
				}
			}
			// 跟獨占模式一樣,改前驅結點waitStatus為-1,并且當前執行緒掛起,等待被喚醒
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // head指向自己
    setHead(node);
     // 如果還有剩余量,繼續喚醒下一個鄰居執行緒
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

看到這里,你會不會一點熟悉的感覺,這個方法的邏輯怎么跟上面那個acquireQueued() 那么類似啊?對的,其實兩個流程并沒有太大的差別,只是doAcquireShared()比起獨占模式下的獲取鎖上多了一步喚醒后繼執行緒的操作,當獲取完一定的資源后,發現還有剩余的資源,就繼續喚醒下一個鄰居執行緒,這才符合"共享"的思想嘛,

這里我們可以提出一個疑問,共享模式下,當前執行緒釋放了一定數量的資源,但這部分資源滿足不了下一個等待結點的需要的話,那么會怎么樣?

按照正常的思維,共享模式是可以多個執行緒同時執行的才對,所以,多個執行緒的情況下,如果老大釋放完資源,但這部分資源滿足不了老二,但能滿足老三,那么老三就可以拿到資源,可事實是,從原始碼設計中可以看出,如果真的發生了這種情況,老三是拿不到資源的,因為等待佇列是按順序排列的,老二的資源需求量大,會把后面量小的老三以及老四、老五等都給卡住,從這一個角度來看,雖然AQS嚴格保證了順序,但也降低了并發能力

接著往下說吧,喚醒下一個鄰居執行緒的邏輯在doReleaseShared()中,我們放到下面的釋放鎖來決議,

釋放鎖

共享模式釋放鎖的頂層方法是releaseShared,它會釋放指定量的資源,如果成功釋放且允許喚醒等待執行緒,它會喚醒等待佇列里的其他執行緒來獲取資源,下面是releaseShared()的原始碼:

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		doReleaseShared();
		return true;
	}
	return false;
}

該方法同樣包含兩部分的邏輯:

tryReleaseShared:釋放資源,

doAcquireShared:喚醒后繼結點,

tryAcquireShared方法一樣,tryReleaseShared在AQS中沒有具體的實作,由子同步器自己去定義,但功能都一樣,就是釋放一定數量的資源,

釋放完資源后,執行緒不會馬上就收工,而是喚醒等待佇列里最前排的等待結點,

doAcquireShared

喚醒后繼結點的作業在doReleaseShared()方法中完成,我們可以看下它的原始碼:

private void doReleaseShared() {
	for (;;) {
		// 獲取等待佇列中的head結點
		Node h = head;
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			// head結點waitStatus = -1,喚醒下一個結點對應的執行緒
			if (ws == Node.SIGNAL) {
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				// 喚醒后繼結點
				unparkSuccessor(h);
			}
			else if (ws == 0 &&
					 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                // loop on failed CAS
		}
		if (h == head)                   // loop if head changed
			break;
	}
}

代碼沒什么特別的,就是如果等待佇列head結點的waitStatus為-1的話,就直接喚醒后繼結點,喚醒的方法unparkSuccessor()在上面已經講過了,這里也沒必要再復述,

總的來看,AQS共享模式的運作流程和獨占模式很相似,只要掌握了獨占模式的流程運轉,共享模式什么的不就那樣嗎,沒難度,這也是我為什么共享模式講解中不畫流程圖的原因,沒必要嘛,

Condition

介紹完了AQS的核心功能,我們再擴展一個知識點,在AQS中,除了提供獨占/共享模式的加鎖/解鎖功能,它還對外提供了關于Condition的一些操作方法,

Condition是個介面,在jdk1.5版本后設計的,基本的方法就是await()signal()方法,功能大概就對應Objectwait()notify(),Condition必須要配合鎖一起使用,因為對共享狀態變數的訪問發生在多執行緒環境下,一個Condition的實體必須與一個Lock系結,因此Condition一般都是作為Lock的內部實作 ,AQS中就定義了一個類ConditionObject來實作了這個介面,

那么它應該怎么用呢?我們可以簡單寫個demo來看下效果

public class ConditionDemo {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread tA = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("執行緒A加鎖成功");
                System.out.println("執行緒A執行await被掛起");
                condition.await();
                System.out.println("執行緒A被喚醒成功");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("執行緒A釋放鎖成功");
            }
        });

        Thread tB = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("執行緒B加鎖成功");
                condition.signal();
                System.out.println("執行緒B喚醒執行緒A");
            } finally {
                lock.unlock();
                System.out.println("執行緒B釋放鎖成功");
            }
        });
        tA.start();
        tB.start();
    }
}

執行main函式后結果輸出為:

執行緒A加鎖成功
執行緒A執行await被掛起
執行緒B加鎖成功
執行緒B喚醒執行緒A
執行緒B釋放鎖成功
執行緒A被喚醒成功
執行緒A釋放鎖成功

代碼執行的結果很容易理解,執行緒A先獲取鎖,然后呼叫await()方法掛起當前執行緒并釋放鎖,執行緒B這時候拿到鎖,然后呼叫signal喚醒執行緒A,

毫無疑問,這兩個方法讓執行緒的狀態發生了變化,我們仔細來研究一下,

翻看AQS的原始碼,我們會發現Condition中定義了兩個屬性firstWaiterlastWaiter,前面說了,AQS中包含了一個FIFO的CLH等待佇列,每個Conditon物件就包含這樣一個等待佇列,而這兩個屬性分別表示的是等待佇列中的首尾結點,

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

注意:Condition當中的等待佇列和AQS主體的同步等待佇列是分開的,兩個佇列雖然結構體相同,但是作用域是分開的

await

先看await()的原始碼:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前執行緒加入到等待佇列中
    Node node = addConditionWaiter();
    // 完全釋放占有的資源,并回傳資源數
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 回圈判斷當前結點是不是在Condition的佇列中,是的話掛起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

當一個執行緒呼叫Condition.await()方法,將會以當前執行緒構造結點,這個結點的waitStatus賦值為Node.CONDITION,也就是-2,并將結點從尾部加入等待佇列,然后尾部結點就會指向這個新增的結點,

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

我們依然用上面的demo來演示,此時,執行緒A獲取鎖并呼叫Condition.await()方法后,AQS內部的資料結構會變成這樣:

在Condition佇列中插入對應的結點后,執行緒A會釋放所持有的資源,走到while回圈那層邏輯,

while (!isOnSyncQueue(node)) {
	LockSupport.park(this);
	if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
		break;
}

isOnSyncQueue方法的會判斷當前的執行緒節點是不是在同步佇列中,這個時候此結點還在Condition佇列中,所以該方法回傳false,這樣的話回圈會一直持續下去,執行緒被掛起,等待被喚醒,此時,執行緒A的流程暫時停止了,

當執行緒A呼叫await()方法掛起的時候,執行緒B獲取到了執行緒A釋放的資源,然后執行signal()方法:

signal

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

先判斷當前執行緒是否為獲取鎖的執行緒,如果不是則直接拋出例外, 接著呼叫doSignal()方法來喚醒執行緒,

private void doSignal(Node first) {
	// 回圈,從佇列一直往后找不為空的首結點
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	// CAS回圈,將結點的waitStatus改為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
	// 上面已經分析過,此方法會把當前結點加入到等待佇列中,并回傳前驅結點
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

doSignal的代碼中可以看出,這時候程式尋找的是Condition等待佇列中首結點firstWaiter的結點,此時該結點指向的是執行緒A的結點,所以之后的流程作用的都是執行緒A的結點,

這里分析下transferForSignal方法,先通過CAS自旋將結點waitStatus改為0,然后就把結點放入到同步佇列 (此佇列不是Condition的等待佇列) 中,然后再用CAS將同步佇列中該結點的前驅結點waitStatus改為Node.SIGNAL,也就是-1,此時AQS的資料結構大概如下 (額.....少畫了個箭頭,大家就當head結點是執行緒A結點的前驅結點就好):

回到await()方法,當執行緒A的結點被加入同步佇列中時,isOnSyncQueue()會回傳true,跳出回圈,

while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

接著執行acquireQueued()方法,這里就不用多說了吧,嘗試重新獲取鎖,如果獲取鎖失敗繼續會被掛起,直到另外執行緒釋放鎖才被喚醒,

所以,當執行緒B釋放完鎖后,執行緒A被喚醒,繼續嘗試獲取鎖,至此流程結束,

對于這整個通信程序,我們可以畫一張流程圖展示下:

總結

說完了Condition的使用和底層運行機制,我們再來總結下它跟普通 wait/notify 的比較,一般這也是問的比較多的,Condition大概有以下兩點優勢:

  • Condition 需要結合 Lock 進行控制,使用的時候要注意一定要對應的unlock(),可以對多個不同條件進行控制,只要new 多個 Condition物件就可以為多個執行緒控制通信,wait/notify 只能和 synchronized 關鍵字一起使用,并且只能喚醒一個或者全部的等待佇列;
  • Condition 有類似于 await 的機制,因此不會產生加鎖方式而產生的死鎖出現,同時底層實作的是 park/unpark 的機制,因此也不會產生先喚醒再掛起的死鎖,一句話就是不會產生死鎖,但是 wait/notify 會產生先喚醒再掛起的死鎖,

最后

對AQS的原始碼分析到這里就全部結束了,雖然還有很多知識點沒講解,比如公平鎖/非公平鎖下AQS是怎么作用的,篇幅所限,部分知識點沒有擴展還請見諒,盡管如此,如果您能看完文章的話,相信對AQS也算是有足夠的了解了,

回顧本篇文章,我們不難發現,無論是獨占還是共享模式,或者結合是Condition工具使用,AQS本質上的同步功能都是通過對鎖和佇列中結點的操作來實作的,從設計上講,AQS的組成結構并不算復雜,底層的運轉機制也不會很繞,所以,大家如果看原始碼的時候覺得有些困難的話也不用灰心,多看幾遍,順便畫個圖之類的,理清下流程還是沒什么問題的,

當然,自己看得懂是一回事,寫出來讓別人看懂又是另一回事了,就像這篇文章,我花了好長的時間來準備,又是畫圖又是理流程的,期間還參考了不少網上大神的博文,肝了幾天才算是成文了,雖然我知道本文不算什么高質文,但我也算是費盡心力了,寫技術文真是挺累的,大家看的覺得不錯的話還請幫忙轉發下或點個贊吧!這也是對我最好的鼓勵了


作者:鄙人薛某,一個不拘于技術的互聯網人,技術三流,吹水一流,想看更多精彩文章可以關注我的公眾號哦~~~

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/223883.html

標籤:Java

上一篇:Flink Native Kubernetes實戰

下一篇:Java中不可或缺的59個小技巧,賊好用!

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more