
一、JUC的由來
synchronized 關鍵字是JDK官方人員用C++代碼寫的,在JDK6以前是重量級鎖,Java大牛 Doug Lea對 synchronized 在并發編程條件下的性能表現不滿意就自己寫了個JUC,以此來提升并發性能,本文要講的就是JUC并發包下的AbstractQueuedSynchronizer,
在JUC中 CountDownLatch、ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock 等底層用的都是AQS,AQS幾乎占據了JUC并發包里的半壁江山,如果想要獲取鎖可以被中斷、超時獲取鎖、嘗試獲取鎖那就用AQS吧,
**二、AQS前置知識點
2.1、模板方法
AbstractQueuedSynchronizer是個抽象類,所有用到方法的類都要繼承此類的若干方法,對應的設計模式就是模版模式,
模版模式定義:一個抽象類公開定義了執行它的方法的方式/模板,它的子類可以按需要重寫方法實作,但呼叫將以抽象類中定義的方式進行,這種型別的設計模式屬于行為型模式,
抽象類:
public abstract class SendCustom {
public abstract void to();
public abstract void from();
public void date() {
System.out.println(new Date());
}
public abstract void send();
// 注意此處 框架方法-模板方法
public void sendMessage() {
to();
from();
date();
send();
}
}
模板方法派生類:
public class SendSms extends SendCustom {
@Override
public void to() {
System.out.println("sowhat");
}
@Override
public void from() {
System.out.println("xiaomai");
}
@Override
public void send() {
System.out.println("Send message");
}
public static void main(String[] args) {
SendCustom sendC = new SendSms();
sendC.sendMessage();
}
}
2.2、LookSupport
LockSupport 是一個執行緒阻塞工具類,所有的方法都是靜態方法,可以讓執行緒在任意位置阻塞,當然阻塞之后肯定得有喚醒的方法,常用方法如下:
public static void park(Object blocker); // 暫停當前執行緒
public static void parkNanos(Object blocker, long nanos); // 暫停當前執行緒,不過有超時時間的限制
public static void parkUntil(Object blocker, long deadline); // 暫停當前執行緒,直到某個時間
public static void park(); // 無期限暫停當前執行緒
public static void parkNanos(long nanos); // 暫停當前執行緒,不過有超時時間的限制
public static void parkUntil(long deadline); // 暫停當前執行緒,直到某個時間
public static void unpark(Thread thread); // 恢復當前執行緒
public static Object getBlocker(Thread t);
叫park是因為park英文意思為停車,我們如果把Thread看成一輛車的話,park就是讓車停下,unpark就是讓車啟動然后跑起來,
與Object類的wait/notify機制相比,park/unpark有兩個優點:
1.以thread為操作物件更符合阻塞執行緒的直觀定義
2.操作更精準,可以準確地喚醒某一個執行緒(notify隨機喚醒一個執行緒,notifyAll 喚醒所有等待的執行緒),增加了靈活性,
park/unpark呼叫的是 Unsafe(提供CAS操作) 中的 native代碼,
park/unpark 功能在Linux系統下是用的Posix執行緒庫pthread中的mutex(互斥量),condition(條件變數)來實作的,mutex和condition保護了一個 _counter 的變數,當 park 時,這個變數被設定為0,當unpark時,這個變數被設定為1,
2.3、CAS
CAS 是 CPU指令級別實作了原子性的比較和交換(Conmpare And Swap)操作,注意CAS不是鎖只是CPU提供的一個原子性操作指令,
CAS在語言層面不進行任何處理,直接將原則操作實作在硬體級別實作,之所以可以實作硬體級別的操作核心是因為CAS操作類中有個核心類UnSafe類,
關于CAS引發的ABA問題、性能開銷問題、只能保證一個共享變數之間的原則性操作問題,以前 CAS 中寫過,在此不再重復講解,
注意:并不是說 CAS 一定比SYN好,如果高并發執行時間久 ,用SYN好, 因為SYN底層用了wait() 阻塞后是不消耗CPU資源的,如果鎖競爭不激烈說明自旋不嚴重,此時用CAS,
三、AQS重要方法
模版方法分為獨占式跟共享式,子類根據需要不同呼叫不同的模版方法(講解有點多,想看底層可直接下滑到第四章節),
3.1 模板方法
3.1.1 獨占式獲取
3.1.1.1 accquire
不可中斷獲取鎖accquire是獲取獨占鎖方法,acquire嘗試獲取資源,成功則直接回傳,不成功則進入等待佇列,這個程序不會被執行緒中斷,被外部中斷也不回應,獲取資源后才再進行自我中斷selfInterrupt(),
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.acquire(arg) tryAcquire(arg) 顧名思義,它就是嘗試獲取鎖,需要我們自己實作具體細節,一般要求是:
如果該鎖沒有被另一個執行緒保持,則獲取該鎖并立即回傳,將鎖的保持計數設定為 1,
如果當前執行緒已經保持該鎖,則將保持計數加 1,并且該方法立即回傳,
如果該鎖被另一個執行緒保持,則出于執行緒調度的目的,禁用當前執行緒,并且在獲得鎖之前,該執行緒將一直處于休眠狀態,此時鎖保持計數被設定為 1,
2.addWaiter(Node.EXCLUSIVE)
主要功能是 一旦嘗試獲取鎖未成功,就要使用該方法將其加入同步佇列尾部,由于可能有多個執行緒并發加入隊尾產生競爭,因此采用compareAndSetTail鎖方法來保證同步
3.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
一旦加入同步佇列,就需要使用該方法,自旋阻塞 喚醒來不斷的嘗試獲取鎖,直到被中斷或獲取到鎖,
3.1.1.2 acquireInterruptibly
可中斷獲取鎖acquireInterruptibly相比于acquire支持回應中斷,
1、如果當前執行緒未被中斷,則嘗試獲取鎖,
2、如果鎖空閑則獲鎖并立即回傳,state = 1,
3、如果當前執行緒已持此鎖,state + 1,并且該方法立即回傳,
4、如果鎖被另一個執行緒保持,出于執行緒調度目的,禁用當前執行緒,執行緒休眠ing,除非鎖由當前執行緒獲得或者當前執行緒被中斷了,中斷后會拋出InterruptedException,并且清除當前執行緒的已中斷狀態,
5、此方法是一個顯式中斷點,所以要優先考慮回應中斷,
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException(); // acquireInterruptibly 選擇
interrupted = true; // acquire 的選擇
3.1.1.3 tryAcquireNanos
該方法可以被中斷,增加了超時則失敗的功能,可以說該方法的實作與上述兩方法沒有任何區別,時間功能上就是用的標準超時功能,如果剩余時間小于0那么acquire失敗,如果該時間大于一次自旋鎖時間(spinForTimeoutThreshold = 1000),并且可以被阻塞,那么呼叫LockSupport.parkNanos方法阻塞執行緒,
doAcquireNanos內部:
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
該方法一般會有以下幾種情況產生:
1.在指定時間內,執行緒獲取到鎖,回傳true,
2.當前執行緒在超時時間內被中斷,拋中斷例外后,執行緒退出,
3.到截止時間后執行緒仍未獲取到鎖,此時執行緒獲得鎖失敗,不再等待直接回傳false,
3.1.2 共享式獲取
3.1.2.1 acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
該模版方法的作業:
1.呼叫tryAcquireShared(arg) 嘗試獲得資源,回傳值代表如下含義:
負數表示失敗,
0 表示成功,但沒有剩余可用資源,
正數表示成功,且有剩余資源,
doAcquireShared作用:
創建節點然后加入到佇列中去,這一塊和獨占模式下的 **addWaiter **代碼差不多,不同的是結點的模式是SHARED,在獨占模式 EXCLUSIVE,
3.1.2.2 acquireSharedInterruptibly
無非就是可中斷性的共享方法
public final void acquireSharedInterruptibly(long arg) throws InterruptedException {
if (Thread.interrupted()) // 如果執行緒被中斷,則拋出例外
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 如果tryAcquireShared()方法獲取失敗,則呼叫如下的方法
doAcquireSharedInterruptibly(arg);
}
3.1.2.3. tryAcquireSharedNanos
嘗試以共享模式獲取,如果被中斷則中止,如果超過給定超時期則失敗,實作此方法首先要檢查中斷狀態,然后至少呼叫一次 tryacquireshared(long),并在成功時回傳,否則,在成功、執行緒中斷或超過超時期之前,執行緒將加入佇列,可能反復處于阻塞或未阻塞狀態,并一直呼叫 tryacquireshared(long),
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
3.1.3 獨占式釋放
獨占鎖的釋放呼叫unlock方法,而該方法實際呼叫了AQS的release方法,這段代碼邏輯比較簡單,如果同步狀態釋放成功(tryRelease回傳true)則會執行if塊中的代碼,當head指向的頭結點不為null,并且該節點的狀態值不為0的話才會執行unparkSuccessor()方法,
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3.1.4 共享式釋放
releaseShared首先去嘗試釋放資源tryReleaseShared(arg),如果釋放成功了,就代表有資源空閑出來,那么就用doReleaseShared()去喚醒后續結點,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
比如CountDownLatch的countDown()具體實作:
public void countDown() {
sync.releaseShared(1);
}
3.2 子類需實作方法
子類要實作父類方法也分為獨占式跟共享式,
3.2.1 獨占式獲取
tryAcquire 顧名思義,就是嘗試獲取鎖,AQS在這里沒有對其進行功能的實作,只有一個拋出例外的陳述句,我們需要自己對其進行實作,可以對其重寫實作公平鎖、不公平鎖、可重入鎖、不可重入鎖
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3.2.2 獨占式釋放
tryRelease 嘗試釋放 獨占鎖,需要子類實作,
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
3.2.3 共享式獲取
tryAcquireShared 嘗試進行共享鎖的獲得,需要子類實作,
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}
3.2.4 共享式釋放
tryReleaseShared嘗試進行共享鎖的釋放,需要子類實作,
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}
3.3 狀態標志位
state因為用 volatile 修飾 保證了我們操作的可見性,所以任何執行緒通過getState()獲得狀態都是可以得到最新值,但是setState()無法保證原子性,因此AQS給我們提供了compareAndSetState方法利用底層UnSafe的CAS功能來實作原子性,
private volatile long state;
protected final long getState() {
return state;
}
protected final void setState(long newState) {
state = newState;
}
protected final boolean compareAndSetState(long expect, long update) {
return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
}
3.4 查詢是否獨占模式
isHeldExclusively 該函式的功能是查詢當前的作業模式是否是獨占模式,需要子類實作,
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
3.5 自定義實作鎖
這里需要重點說明一點,JUC中一般是用一個子類繼承自Lock,然后在子類中定義一個內部類來實作AQS的繼承跟使用,
public class SowhatLock implements Lock
{
private Sync sync = new Sync();
@Override
public void lock()
{
sync.acquire(1);
}
@Override
public boolean tryLock()
{
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
{
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock()
{
sync.release(1);
}
@Override
public Condition newCondition()
{
return sync.newCondition();
}
@Override
public void lockInterruptibly() throws InterruptedException
{
}
private class Sync extends AbstractQueuedSynchronizer
{
@Override
protected boolean tryAcquire(int arg)
{
assert arg == 1;
if (compareAndSetState(0, 1))
{
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg)
{
assert arg == 1;
if (!isHeldExclusively())
{
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively()
{
return getExclusiveOwnerThread() == Thread.currentThread();
}
Condition newCondition() {
return new ConditionObject();
}
}
}
自定義實作類:
public class SoWhatTest
{
public static int m = 0;
public static CountDownLatch latch = new CountDownLatch(50);
public static Lock lock = new SowhatLock();
public static void main(String[] args) throws Exception
{
Thread[] threads = new Thread[50];
for (int i = 0; i < threads.length ; i++)
{
threads[i] = new Thread(()->{
try{
lock.lock();
for (int j = 0; j <100 ; j++)
{
m++;
}
}finally
{
lock.unlock();
}
latch.countDown();
});
}
for(Thread t : threads) t.start();
latch.await();
System.out.println(m);
}
}
四、AQS底層
4.1 CLH
CLH(Craig、 Landin、 Hagersten locks三個人名字綜合而命名):
1.是一個自旋鎖,能確保無饑餓性,提供先來先服務的公平性,
2.CLH 鎖也是一種基于鏈表的可擴展、高性能、公平的自旋鎖,申請執行緒只在本地變數上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋,
4.2 Node
CLH 佇列由Node物件組成,其中Node是AQS中的內部類,
static final class Node {
// 標識共享鎖
static final Node SHARED = new Node();
// 標識獨占鎖
static final Node EXCLUSIVE = null;
// 前驅節點
volatile Node prev;
// 后繼節點
volatile Node next;
// 獲取鎖失敗的執行緒保存在Node節點中,
volatile Thread thread;
// 當我們呼叫了Condition后他也有一個等待佇列
Node nextWaiter;
//在Node節點中一般通過waitStatus獲得下面節點不同的狀態,狀態對應下方,
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
waitStatus 有如下5中狀態:
1.CANCELLED = 1
表示當前結點已取消調度,當超時或被中斷(回應中斷的情況下),會觸發變更為此狀態,進入該狀態后的結點將不會再變化,
2.SIGNAL = -1
表示后繼結點在等待當前結點喚醒,后繼結點入隊時,會將前繼結點的狀態更新為 SIGNAL,
3.CONDITION = -2
表示結點等待在 Condition 上,當其他執行緒呼叫了 Condition 的 signal() 方法后,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖,
4.PROPAGATE = -3
共享模式下,前繼結點不僅會喚醒其后繼結點,同時也可能會喚醒后繼的后繼結點,
5.INITIAL = 0
新結點入隊時的默認狀態,
4.3 AQS實作
4.3.1 公平鎖和非公平鎖
銀行售票視窗營業中:
公平排隊:每個客戶來了自動在最后面排隊,輪到自己辦理業務的時候拿出身份證等證件取票,
非公平排隊:有個旅客火車馬上開車了,他拿著自己的各種證件著急這想跟視窗作業人員說是否可以加急辦理下,可以的話則直接辦理,不可以的話則去隊尾排隊去,
在JUC中同樣存在公平鎖跟非公平鎖,一般非公平鎖效率好一些,因為非公平鎖狀態下打算搶鎖的執行緒不用排隊掛起了,
4.3.2 AQS細節
AQS內部維護著一個FIFO的佇列,即CLH佇列,提供先來先服務的公平性,AQS的同步機制就是依靠CLH佇列實作的,CLH佇列是FIFO的雙端雙向鏈表佇列(方便尾部節點插入),執行緒通過AQS獲取鎖失敗,就會將執行緒封裝成一個Node節點,通過CAS原子操作插入佇列尾,當有執行緒釋放鎖時,會嘗試讓隊頭的next節點占用鎖,個人理解AQS具有如下幾個特點:
1.在AQS 同步佇列中 -1 表示執行緒在睡眠狀態
2.當前Node節點執行緒會把前一個Node.ws = -1,當前節點把前面節點ws設定為-1,你可以理解為:你自己能知道自己睡著了嗎?只能是別人看到了發現你睡眠了!
3.持有鎖的執行緒永遠不在佇列中,
4.在AQS佇列中第二個才是最先排隊的執行緒,
5.如果是交替型任務或者單執行緒任務,即使用了Lock也不會涉及到AQS 佇列,
6.不到萬不得已不要輕易park執行緒,很耗時的!所以排隊的頭執行緒會自旋的嘗試幾個獲取鎖,
4.4 加鎖跟解鎖流程圖
以最經典的 ReentrantLock 為例逐步分析下 lock 跟 unlock 底層流程圖(要原圖的話公眾號回復:lock),
private Lock lock = new ReentrantLock();
public void test(){
lock.lock();
try{
doSomeThing();
}catch (Exception e){
...
}finally {
lock.unlock();
}
}
4.4.1 獨占式加入同步佇列
同步器AQS中包含兩個節點型別的參考:一個指向頭結點的參考(head),一個指向尾節點的參考(tail),如果加入的節點是OK的則會直接運行該節點,當若干個執行緒搶鎖失敗了那么就會搶著加入到同步佇列的尾部,因為是搶著加入這個時候用CAS來設定尾部節點,入口代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.tryAcquire
該方法是需要自我實作的,在上面的demo中可見一斑,就是回傳是否獲得了鎖,
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 是否需要加入佇列,不需要的話則嘗試CAS獲得鎖,獲得成功后 設定當前鎖的擁有者
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 這就是可重入鎖的實作
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2.addWaiter(Node.EXCLUSIVE,arg)
/**
* 如果嘗試獲取同步狀態失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法將該節點加入到同步佇列的隊尾,
*/
private Node addWaiter(Node mode) {
// 用當前執行緒構造一個Node物件,mode是一個表示Node型別的欄位,或者說是這個節點是獨占的還是共享的
Node node = new Node(Thread.currentThread(), mode);
// 將目前佇列中尾部節點給pred
Node pred = tail;
// 佇列不為空的時候
if (pred != null) {
node.prev = pred;
// 先嘗試通過AQS方式修改尾節點為最新的節點,如果修改失敗,意味著有并發,
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次嘗試添加尾部失敗說明有并發,此時進入自旋
enq(node);
return node;
}
3.自旋enq 方法將并發添加節點的請求通過CAS跟自旋將尾節點的添加變得串行化起來,說白了就是讓節點放到正確的隊尾位置,
/**
* 這里進行了回圈,如果此時存在了tail就執行同上一步驟的添加隊尾操作,如果依然不存在,
* 就把當前執行緒作為head結點,插入節點后,呼叫acquireQueued()進行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4.acquireQueued 是當前Node節點執行緒在死回圈中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取鎖,原因是:
1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的執行緒釋放了同步狀態以后,將會喚醒其后繼節點,后繼節點的執行緒被喚醒后要檢查自己的前驅節點是否為頭結點,
2.維護同步佇列的FIFO原則,節點進入同步佇列之后,會嘗試自旋幾次,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋檢查當前節點的前驅節點是否為頭結點,才能獲取鎖
for (;;) {
// 獲取節點的前驅節點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 節點中的執行緒回圈的檢查,自己的前驅節點是否為頭節點
// 只有當前節點 前驅節點是頭節點才會 再次呼叫我們實作的方法tryAcquire
// 接下來無非就是將當前節點設定為頭結點,移除之前的頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否則檢查前一個節點的狀態,看當前獲取鎖失敗的執行緒是否要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
//如果需要掛起,借助JUC包下面的LockSupport類的靜態方法park掛起當前執行緒,直到被喚醒
parkAndCheckInterrupt())
interrupted = true; // 兩個判斷都是true說明 則置true
}
} finally {
//如果等待程序中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那么取消結點在佇列中的等待,
if (failed)
//取消請求,將當前節點從佇列中移除
cancelAcquire(node);
}
}
如果成功就回傳,否則就執行shouldParkAfterFailedAcquire、parkAndCheckInterrupt來達到阻塞效果,
5.shouldParkAfterFailedAcquire 第二步的addWaiter()構造的新節點,waitStatus的默認值是0,此時,會進入最后一個if判斷,CAS設定pred.waitStatus SIGNAL,最后回傳false,由于回傳false,第四步的acquireQueued會繼續進行回圈,假設node的前繼節點pred仍然不是頭結點或鎖獲取失敗,則會再次進入shouldParkAfterFailedAcquire(),上一輪回圈中已經將pred.waitStatu = -1了,則這次會進入第一個判斷條件,直接回傳true,表示應該阻塞呼叫parkAndCheckInterrupt,
那么什么時候會遇到ws > 0呢?當pred所維護的獲取請求被取消時(也就是node.waitStatus = CANCELLED,這時就會回圈移除所有被取消的前繼節點pred,直到找到未被取消的pred,移除所有被取消的前繼節點后,直接回傳false,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 獲得前驅節點的狀態
if (ws == Node.SIGNAL) //此處是第二次設定
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此處是第一次設定 unsafe級別呼叫設定
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
6.parkAndCheckInterrupt 主要任務是暫停當前執行緒然后查看是否已經暫停了,
private final boolean parkAndCheckInterrupt() {
// 呼叫park()使執行緒進入掛起狀態,什么時候呼叫了unpark再繼續執行下面
LockSupport.park(this);
// 如果被喚醒,查看自己是不是已經被中斷了,
return Thread.interrupted();
}
7.cancelAcquireacquireQueued方法的finally會判斷 failed值,正常運行時候自旋出來的時候會是false,如果中斷或者timeout了 則會是true,執行cancelAcquire,其中核心代碼是node.waitStatus = Node.CANCELLED,
8.selfInterrupt
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
4.4.2 獨占式釋放佇列頭節點
release()會呼叫tryRelease方法嘗試釋放當前執行緒持有的鎖,成功的話喚醒后繼執行緒,并回傳true,否則直接回傳false,
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
1.tryRelease 這個是子類需要自我實作的,沒啥說的根據業務需要實作,
2.unparkSuccessor 喚醒頭結點的后繼節點,
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 獲得頭節點狀態
if (ws < 0) //如果頭節點裝小于0 則將其置為0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; //這個是新的頭節點
if (s == null || s.waitStatus > 0) {
// 如果新頭節點不滿足要求
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//從佇列尾部開始往前去找最前面的一個waitStatus小于0的節點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//喚醒后繼節點對應的執行緒
LockSupport.unpark(s.thread);
}
4.4.3 AQS 中增加跟洗掉形象圖
五、CountDownLatch底層
5.1 共享鎖 CountDownLatch底層
CountDownLatch 雖然相對簡單,但也實作了共享鎖模型,但是如何正確的吹逼 CountDownLatch 呢?如果在理解了上述流程的基礎上,從CountDownLatch入手來看AQS 中關于共享鎖的代碼還比較好看懂,在看的時候可以 以看懂大致內容為主,學習其設計的思路,不要陷入所有條件處理細節中,多執行緒環境中,對與錯有時候不是那么容易看出來的,個人追原始碼繪制了如下圖:
5.2 計數信號量Semaphore
Semaphore 這就是共享鎖的一個實作類,在初始化的時候就規定了共享鎖池的大小N,有一個執行緒獲得了鎖,可用數就減少1個,有一個執行緒釋放鎖可用數就增加1個,如果有 >=2 的執行緒同時釋放鎖,則此時有多個鎖可用,這個時候就可以 同時喚醒 兩個鎖setHeadAndPropagate (流程圖懶的繪制了),
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//找先驅結點
final Node p = node.predecessor();
if (p == head) {
// 嘗試獲取資源
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設定當前結點為頭結點,然后去喚醒后續結點,注意傳播性 喚醒!
setHeadAndPropagate(node, r);
p.next = null; // help GC 釋放頭結點,等待GC
if (interrupted)
selfInterrupt();
failed = false;//獲取到資源
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)//如果最后沒有獲取到資源,則cancel
cancelAcquire(node);
}
}
5.3 ReentrantReadWriteLock
在 ReentrantReadWriteLock 類中也是只有一個32位的int state來表示讀鎖跟寫鎖,如何實作的?
1.后16位用來保存獨享的寫鎖個數,第一次獲得就是01,第二次重入就是10了,這樣的方式來保存,
2.但是多個執行緒都可以獲得讀鎖,并且每個執行緒可能讀多次,如何保存?我們用前16位來保存有多少個執行緒獲得了讀鎖,
3.每個讀鎖執行緒獲得的重入讀鎖個數 由內部類HoldCounter與讀鎖配套使用,
六、Condition
synchronized 可用 wait() 和 notify()/notifyAll() 方法相結合可以實作等待/通知模式,Lock 也提供了 Condition 來提供類似的功能,
Condition是JDK5后引入的Interface,它用來替代傳統的Object的wait()/notify()實作執行緒間的協作,相比使用Object的wait()/notify(),使用Condition的await()/signal()這種方式 實作執行緒間協作更加安全和高效,簡單說,他的作用是使得某些執行緒一起等待某個條件(Condition),只有當該條件具備(signal 或者 signalAll方法被呼叫)時,這些等待執行緒才會被喚醒,從而重新爭奪鎖,wait()/notify()這些都更傾向于底層的實作開發,而Condition介面更傾向于代碼實作的等待通知效果,兩者之間的區別與共通點如下:
6.1 條件等待佇列
條件等待佇列,指的是 Condition 內部自己維護的一個佇列,不同于 AQS 的 同步等待佇列,它具有以下特點:
要加入條件等待佇列的節點,不能在 同步等待佇列,
從 條件等待佇列 移除的節點,會進入同步等待佇列,
一個鎖物件只能有一個同步等待佇列,但可以有多個條件等待佇列,
這里以 AbstractQueuedSynchronizer 的內部類 ConditionObject 為例(Condition 的實作類)來分析下它的具體實作程序,首先來看該類內部定義的幾個成員變數:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
它采用了 AQS 的 Node 節點構造(前面說過Node類有nextWaiter屬性),并定義了兩個成員變數:firstWaiter、lastWaiter ,說明在 ConditionObject 內部也維護著一個自己的單向等待佇列,目前可知它的結構如下:
6.2 await、signal
比如有執行緒 1、2競爭鎖,下面來說下具體程序 執行緒1:
1、執行緒1 呼叫 reentrantLock.lock時,持有鎖,
2、執行緒1 呼叫 await 方法,進入條件等待佇列 ,同時釋放鎖,
3、執行緒1 獲取到執行緒2 signal 信號,從條件等待佇列進入同步等待佇列,
執行緒2:
1、執行緒2 呼叫 reentrantLock.lock時,由于鎖被執行緒1 持有,進入同步等待佇列 ,
2、由于執行緒1 釋放鎖,執行緒2 從同步等待佇列 移除,獲取到鎖,
3、執行緒2 呼叫 signal 方法,導致執行緒 1 被喚醒,執行緒2 呼叫unlock ,執行緒1 獲取鎖后繼續下走,
6.2.1 await
當我們看await、signal 的原始碼時候不要認為等待佇列跟同步佇列是完全分開的,其實個人感覺底層原始碼是有點 HashMap 中的紅黑樹跟雙向鏈表的意思,
當呼叫await方法時候,說明當前任務佇列的頭節點拿著鎖呢,此時要把該Thread從任務佇列挪到等待佇列再喚醒任務佇列最前面排隊的運行任務,如圖:
- thread 表示節點存放的執行緒,
- waitStatus 表示節點等待狀態,條件等待佇列中的節點等待狀態都是 CONDITION,否則會被清除,
- nextWaiter 表示后指標,
6.2.2 signal
當我們呼叫signal方法的時候,我們要將等待佇列中的頭節點移出來,讓其去搶鎖,如果是公平模式就要去排隊了,流程如圖:
上面只是形象流程圖,如果從代碼級別看的話大致流程如下:
6.2.3 signalAll
signalAll 與 signal 方法的區別體現在 doSignalAll 方法上,前面我們已經知道doSignal方法只會對等待佇列的頭節點進行操作,doSignalAll方法只不過將等待佇列中的每一個節點都移入到同步佇列中,即通知當前呼叫condition.await()方法的每一個執行緒:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null); // 回圈
}
6.3 End
一個 Condition 物件就有一個單項的等待任務隊列,在一個多執行緒任務中我們可以new出多個等待任務佇列,比如我們new出來兩個等待佇列,
private Lock lock = new ReentrantLock();
private Condition FirstCond = lock.newCondition();
private Condition SecondCond = lock.newCondition();
所以真正的AQS任務中一般是一個任務佇列N個等待佇列的,因此我們盡量呼叫signal而少用signalAll,因為在指定的實體化等待佇列中只有一個可以拿到鎖的,
Synchronized 中的 wait 跟 notify 底層代碼的等待佇列只有一個,多個執行緒呼叫wait的時候我們是無法知道頭節點是那個具體執行緒的,因此只能notifyAll,
寫在最后
歡迎大家關注我的公眾號【風平浪靜如碼】,海量Java相關文章,學習資料都會在里面更新,整理的資料也會放在里面,
覺得寫的還不錯的就點個贊,加個關注唄!點關注,不迷路,持續更新!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/244542.html
標籤:Java
上一篇:十大經典排序——java實作
