主頁 > 軟體設計 > Java 并發編程決議 | 基于JDK原始碼決議Java領域中的并發鎖,我們可以從中學習到什么內容?

Java 并發編程決議 | 基于JDK原始碼決議Java領域中的并發鎖,我們可以從中學習到什么內容?

2022-09-18 07:51:29 軟體設計

蒼穹之邊,浩瀚之摯,眰恦之美; 悟心悟性,善始善終,惟善惟道! —— 朝槿《朝槿兮年說》

Picture-Navigation

寫在開頭

Picture-Header

在Java領域中, 尤其是在并發編程領域,對于多執行緒并發執行一直有兩大核心問題:同步和互斥,其中:

  • 互斥(Mutual Exclusion):一個公共資源同一時刻只能被一個行程或執行緒使用,多個行程或執行緒不能同時使用公共資源,即就是同一時刻只允許一個執行緒訪問共享資源的問題,
  • 同步(Synchronization):兩個或兩個以上的行程或執行緒在運行程序中協同步調,按預定的先后次序運行,即就是執行緒之間如何通信、協作的問題,

針對對于這兩大核心問題,利用管程是能夠解決和實作的,因此可以說,管程是并發編程的萬能鑰匙,

雖然,Java在基于語法層面(synchronized 關鍵字)實作了對管程技術,但是從使用方式和性能上來說,內置鎖(synchronized 關鍵字)的粒度相對過大,不支持超時和中斷等問題,

為了彌補這些問題,從JDK層面對其“重復造輪子”,在JDK內部對其重新設計和定義,甚至實作了新的特性,

關健術語

Picture-Keyword

本文用到的一些關鍵詞語以及常用術語,主要如下:

  • 信號量(Semaphore): 是在多執行緒環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被并發呼叫,也是作系統用來解決并發中的互斥和同步問題的一種方法,
  • 信號量機制(Semaphores): 用來解決同步/互斥的問題的,它是1965年,荷蘭學者 Dijkstra提出了一種卓有成效的實作行程互斥與同步的方法,
  • 管程(Monitor) : 一般是指管理共享變數以及對共享變數的操作程序,讓它們支持并發的一種機制,

基本概述

在Java領域中,我們可以將鎖大致分為基于Java語法層面(關鍵詞)實作的鎖和基于JDK層面實作的鎖,

在Java領域中,從JDK原始碼分析來看,基于JDK層面實作的鎖大致主要可以分為以下4種方式:

  • 基于Lock介面實作的鎖
  • 基于ReadWriteLock介面實作的鎖
  • 基于AQS基礎同步器實作的鎖
  • 基于自定義API操作實作的鎖

從閱讀原始碼不難發現,在Java SDK 并發包主要通過AbstractQueuedSynchronizer(AQS)實作多執行緒同步機制的封裝與定義,而通過Lock 和 Condition 兩個介面來實作管程,其中 Lock 用于解決互斥問題,Condition 用于解決同步問題,

一. 基本理論

在并發編程領域,有兩大核心問題:一個是互斥,即同一時刻只允許一個執行緒訪問共享資源;另一個是同步,即執行緒之間如何通信、協作,

vq67Hs.png

在作業系統中,一般有如果I/O操作時,對于阻塞和非阻塞是從函式呼叫角度來說的,其中:

  • 阻塞:如果讀寫操作沒有就緒或者完成,則函式一直等待,
  • 非阻塞: 函式立即呼叫,然后讓應用程式輪詢回圈,

而同步和異步則是從“讀寫是主要是由誰完成”的角度來說的,其中:

  • 同步: 讀寫操作主要交給應用程式完成
  • 異步: 讀寫操作主要由作業系統完成,一般完成之后,回呼函式和事件通知應用程式,

其中,信號量機制(Semaphores)是用來解決同步/互斥的問題的,但是信號量(Semaphore)的操作分散在各個行程或執行緒中,不方便進行管理,因每次需呼叫P/V(來自荷蘭語 proberen和 verhogen)操作,還可能導致死鎖或破壞互斥請求的問題,

由于PV操作對于解決行程互斥/同步編程復雜,因而在此基礎上提出了與信號量等價的——“管程技術”,

其中,管程(Monitor)當中定義了共享資料結構只能被管程內部定義的函式所修改,所以如果我們想修改管程內部的共享資料結構的話,只能呼叫管程內部提供的函式來間接的修改這些資料結構,

一般來說,管程(Monitor)和信號量(Semaphore)是等價的,所謂等價指的是用管程能夠實作信號量,也能用信號量實作管程,

在管程的發展歷程上,先后出現過Hasen模型、Hoare模型和MESA模型等三種不同的管程模型,現在正在廣泛使用的是MESA模型,

在MESA模型中,管程中引入了條件變數(Conditional Variable)的概念,而且每個條件變數都對應有一個等待佇列(Wait Queue),其中,條件變數和等待佇列的作用是解決執行緒之間的同步問題,

而對于解決執行緒之間的互斥問題,將共享變數(Shared Variable)及其對共享變數的操作統一封裝起來,一般主要是實作一個執行緒安全的阻塞佇列(Blocking Queue),將執行緒不安全的佇列封裝起來,對外提供執行緒安全的操作方法,例如入隊操作(Enqueue)和出隊操作(Dequeue),

在Java領域中,對于Java語法層面實作的鎖(synchronized 關鍵字), 其實就是參考了 MESA 模型,并且對 MESA 模型進行了精簡,一般在MESA 模型中,條件變數可以有多個,Java 語言內置的管程(synchronized)里只有一個條件變數,

這就意味著,被synchronized 關鍵字修飾的代碼塊或者直接標記靜態方法以及實體方法,在編譯期會自動生成相關加鎖(lock)和解鎖(unlock)的代碼,即就是monitorenter和monitorexit指令,

對于synchronized 關鍵字來說,主要是在Java HotSpot(TM) VM 虛擬機通過Monitor(監視器)來實作monitorenter和monitorexit指令的,

同時,在Java HotSpot(TM) VM 虛擬機中,每個物件都會有一個監視器,監視器和物件一起創建、銷毀,

監視器相當于一個用來監視這些執行緒進入的特殊房間,其義務是保證(同一時間)只有一個執行緒可以訪問被保護的臨界區代碼塊,

本質上,監視器是一種同步工具,也可以說是JVM對管程的同步機制的封裝實作,主要特點是:

  • 同步:監視器所保護的臨界區代碼是互斥地執行的,一個監視器是一個運行許可,任一執行緒進入臨界區代碼都需要獲得這個許可,離開時把許可歸還,
  • 協作:監視器提供Signal機制,允許正持有許可的執行緒暫時放棄許可進入阻塞等待狀態,等待其他執行緒發送Signal去喚醒;其他擁有許可的執行緒可以發送Signal,喚醒正在阻塞等待的執行緒,讓它可以重新獲得許可并啟動執行,

在Hotspot虛擬機中,監視器是由C++類ObjectMonitor實作的,ObjectMonitor類定義在ObjectMonitor.hpp檔案中,其中:

v7MaNT.png

  • Owner: 指向的執行緒即為獲得鎖的執行緒
  • Cxq:競爭佇列(Contention Queue),所有請求鎖的執行緒首先被放在這個競爭佇列中
  • EntryList:物件物體串列,表示Cxq中那些有資格成為候選資源的執行緒被移動到EntryList中,
  • WaitSet:類似于等待佇列,某個擁有ObjectMonitor的執行緒在呼叫Object.wait()方法之后將被阻塞,然后該執行緒將被放置在WaitSet鏈表中,

同時,管程與Java中面向物件原則(Object Oriented Principle)也是非常契合的,主要體現在 java.lang.Object類中wait()、notify()、notifyAll() 這三個方法,其中:

  • wait()方法: 阻塞執行緒并且進入等待佇列
  • notify()方法:隨機地通知等待佇列中的一個執行緒
  • notifyAll()方法: 通知等待佇列中的所有執行緒

不難發現,在Java中synchronized 關鍵字及 java.lang.Object類中wait()、notify()、notifyAll() 這三個方法都是管程的組成部分,

由此可見,我們可以得到一個比較通用的并發同步工具基礎模型,大致包含如下幾個內容,其中:

  • 條件變數(Conditional Variable): 利用執行緒間共享的變數進行同步的一種作業機制
  • 共享變數((Shared Variable)):一般指物件物體物件的成員變數和屬性
  • 阻塞佇列(Blocking Queue):共享變數(Shared Variable)及其對共享變數的操作統一封裝
  • 等待佇列(Wait Queue):每個條件變數都對應有一個等待佇列(Wait Queue),內部需要實作入隊操作(Enqueue)和出隊操作(Dequeue)方法
  • 變數狀態描述機(Synchronization Status):描述條件變數和共享變數之間狀態變化,又可以稱其為同步狀態
  • 作業模式(Operation Mode): 執行緒資源具有排他性,因此定義獨占模式和共享模式兩種作業模式

綜上所述,條件變數和等待佇列的作用是解決執行緒之間的同步問題;共享變數與阻塞佇列的作用是解決執行緒之間的互斥問題,

二.AQS基礎同步器的設計與實作

在Java領域中,同步器是專門為多執行緒并發設計的同步機制,主要是多執行緒并發執行時執行緒之間通過某種共享狀態來實作同步,只有當狀態滿足這種條件時執行緒才往下執行的一種同步機制,

對于多執行緒實作實作并發處理機制來說,一直以來,多執行緒都存在2個問題:

  • 執行緒之間記憶體共享,需要通過加鎖進行控制,但是加鎖會導致性能下降,同時復雜的加鎖機制也會增加編程編碼難度
  • 過多執行緒造成執行緒之間的背景關系切換,導致效率低下

因此,在并發編程領域中,一直有一個很重要的設計原則: “ 不要通過記憶體共享來實作通信,而應該通過通信來實作記憶體共享,”

簡單來說,就是盡可能通過訊息通信,而不是記憶體共享來實作行程或者執行緒之間的同步,

其中,同步器是專門為多執行緒并發設計的同步機制,主要是多執行緒并發執行時執行緒之間通過某種共享狀態來實作同步,只有當狀態滿足這種條件時執行緒才往下執行的一種同步機制,

由于在不同的應用場景中,對于同步器的需求也會有所不同,一般在我們自己去實作和設計一種并發工具的時候,都需會考慮以下幾個問題:

  • 是否支持回應中斷? 如果阻塞狀態的執行緒能夠回應中斷信號,也就是說當我們給阻塞的執行緒發送中斷信號的時候,能夠喚醒它,那它就有機會釋放曾經持有的鎖,
  • 是否支持超時?如果執行緒在一段時間之內沒有獲取到鎖,不是進入阻塞狀態,而是回傳一個錯誤,那這個執行緒也有機會釋放曾經持有的鎖,
  • 是否支持非阻塞地獲取鎖資源 ? 如果嘗試獲取鎖失敗,并不進入阻塞狀態,而是直接回傳,那這個執行緒也有機會釋放曾經持有的鎖,

從閱讀JDK原始碼不難發現,主要是采用設計模式中模板模式的原則,JDK將各種同步器中相同的部分抽象封裝成了一個統一的基礎同步器,然后基于基礎同步器為模板通過繼承的方式來實作不同的同步器,

也就是說,在實際開發程序中,除了直接使用JDK實作的同步器,還可以基于這個基礎同步器我們也可以自己自定義實作符合我們業務需求的同步器,

在JDK原始碼中,同步器位于java.util.concurrent.locks包下,其基本定義是AbstractQueuedSynchronizer類,即就是我們常說的AQS同步器,

1. 設計思想

一個標準的AQS同步器主要有同步狀態機制,等待佇列,條件佇列,獨占模式,共享模式等五大核心要素組成,

JDK的JUC(java.util.concurrent.)包中提供了各種并發工具,但是大部分同步工具的實作基于AbstractQueuedSynchronizer類實作,其內部結構主要如下:

  • 同步狀態機制(Synchronization Status):主要用于實作鎖(Lock)機制,是指同步狀態,其要求對于狀態的更新必須原子性的
  • 等待佇列(Wait Queue):主要用于存放等待執行緒獲取到的鎖資源,并且把執行緒維護到一個Node(節點)里面和維護一個非阻塞的CHL Node FIFO(先進先出)佇列,主要是采用自旋鎖+CAS操作來保證節點插入和移除的原子性操作,
  • 條件佇列(Condition Queue):用于實作鎖的條件機制,一般主要是指替換“等待-通知”作業機制,主要是通過ConditionObject物件實作Condition介面提供的方法實作,
  • 獨占模式(Exclusive Mode):主要用于實作獨占鎖,主要是基于靜態內部類Node的常量標志EXCLUSIVE來標識該節點是獨占模式
  • 共享模式(Shared Mode):主要用于實作共享鎖,主要是基于靜態內部類Node的常量標志SHARED來標識該節點是共享模式

其中,對于AbstractQueuedSynchronizer類的實作原理,我們可以從如下幾個方面來看:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691 L;


    protected AbstractQueuedSynchronizer() {}

    /**
     * 等待佇列: head-頭節點
     */
    private transient volatile Node head;

    /**
     * 等待佇列: tail-尾節點
     */
    private transient volatile Node tail;

    /**
     * 同步狀態:32位整數型別,更新同步狀態(state)時必須保證其是原子性的
     */
    private volatile int state;

    /**
     * 自旋鎖消耗超時時間閥值(threshold): threshold < 1000ns時,表示競爭時選擇自旋;threshold > 1000ns時,表示競爭時選擇系統阻塞
     */
    static final long spinForTimeoutThreshold = 1000 L;

    /**
     * CAS原子性操作
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    /**
     * stateOffset
     */
    private static final long stateOffset;

    /**
     * headOffset
     */
    private static final long headOffset;

    /**
     * tailOffset
     */
    private static final long tailOffset;

    /**
     * waitStatusOffset
     */
    private static final long waitStatusOffset;

    /**
     * nextOffset
     */
    private static final long nextOffset;


    static {
        try {
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }
		
		    private final boolean compareAndSetHead(Node update)  {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
		
		    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
		
		    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
		
		    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
		
		    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

[1]. AbstractQueuedSynchronizer類的實作原理是繼承了基于AbstractOwnableSynchronizer類的抽象類,其中主要對AQS同步器的通用特性和方法進行抽象封裝定義,主要包括如下方法:

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {


    private static final long serialVersionUID = 3737899427754241961 L;


    protected AbstractOwnableSynchronizer() {}

    /**
     *  同步器擁有者
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 設定同步器擁有者:把執行緒當作引數傳入,指定某個執行緒為獨享
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 獲取同步器擁有者:獲取指定的某個執行緒
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
  • setExclusiveOwnerThread(Thread thread)方法: 把某個執行緒作為引數傳入,從而設定AQS同步器的所有者,即就是我們設定的某個執行緒
  • getExclusiveOwnerThread()方法: 獲取當前AQS同步器的所有者,即就是我們指定的某個執行緒

[2]. 對于同步狀態(state),其型別是32位整數型別,并且是被volatile修飾的,表示在更新同步狀態(state)時必須保證其是原子性的,

[3]. 對于等待佇列的結構,主要是在Node定義了head和tail變數,其中head表示頭部節點,tail表示尾部節點

[4].對于等待佇列的結構提到的Node類來說,主要內容如下:

  static final class Node {
      /** Marker to indicate a node is waiting in shared mode */
      static final Node SHARED = new Node();
			
      /** Marker to indicate a node is waiting in exclusive mode */
      static final Node EXCLUSIVE = null;

      /** waitStatus value to indicate thread has cancelled */
      static final int CANCELLED = 1;
			
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL = -1;
			
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
			
      /**
       * waitStatus value to indicate the next acquireShared should
       * unconditionally propagate
       */
      static final int PROPAGATE = -3;

      /**
       * Status field, taking on only the values:
       *   SIGNAL:     The successor of this node is (or will soon be)
       *               blocked (via park), so the current node must
       *               unpark its successor when it releases or
       *               cancels. To avoid races, acquire methods must
       *               first indicate they need a signal,
       *               then retry the atomic acquire, and then,
       *               on failure, block.
       *   CANCELLED:  This node is cancelled due to timeout or interrupt.
       *               Nodes never leave this state. In particular,
       *               a thread with cancelled node never again blocks.
       *   CONDITION:  This node is currently on a condition queue.
       *               It will not be used as a sync queue node
       *               until transferred, at which time the status
       *               will be set to 0. (Use of this value here has
       *               nothing to do with the other uses of the
       *               field, but simplifies mechanics.)
       *   PROPAGATE:  A releaseShared should be propagated to other
       *               nodes. This is set (for head node only) in
       *               doReleaseShared to ensure propagation
       *               continues, even if other operations have
       *               since intervened.
       *   0:          None of the above
       *
       *
       * The field is initialized to 0 for normal sync nodes, and
       * CONDITION for condition nodes.  It is modified using CAS
       * (or when possible, unconditional volatile writes).
       */
      volatile int waitStatus;

      /**
       * Link to predecessor node that current node/thread relies on
       */
      volatile Node prev;

      /**
       * Link to the successor node that the current node/thread
       */
      volatile Node next;

      /**
       * The thread that enqueued this node.  Initialized on
       * construction and nulled out after use.
       */
      volatile Thread thread;

      /**
       * Link to next node waiting on condition, or the special
       */
      Node nextWaiter;

      /**
       * Returns true if node is waiting in shared mode.
       */
      final boolean isShared() {
          return nextWaiter == SHARED;
      }


      final Node predecessor() throws NullPointerException {
          Node p = prev;
          if (p == null)
              throw new NullPointerException();
          else
              return p;
      }

      Node() { // Used to establish initial head or SHARED marker
      }

      Node(Thread thread, Node mode) { // Used by addWaiter
          this.nextWaiter = mode;
          this.thread = thread;
      }

      Node(Thread thread, int waitStatus) { // Used by Condition
          this.waitStatus = waitStatus;
          this.thread = thread;
      }
  }

  • 標記Node的作業模式常量標記:主要維護了SHARED和EXCLUSIVE等2個靜態字面常量,其中 SHARED 用于標記Node中是共享模式,EXCLUSIVE:用于標記Node中是獨享模式
  • 標記等待狀態的靜態字面常量標記: 主要維護了0(表示無狀態),SIGNAL(-1,表示后續節點中的執行緒通過park進入等待,當前節點在釋放和取消時,需要通過unpark解除后后續節點的等待),CANCELLED(1,表示當前節點中的執行緒因為超時和中斷被取消),CONDITION(-2,表示當前節點在條件佇列中),PROPAGATE(-3,SHARED共享模式的頭節點描述狀態,表示無條件往下傳播)等5個靜態字面常量
  • 維護了一個等待狀態(waitStatus): 主要用于描述等待佇列中節點的狀態,其取值范圍為0(waitStatus=0,表示無狀態),SIGNAL(waitStatus=-1,表示等待信號狀態),CANCELLED(waitStatus=1,表示取消狀態),CONDITION(waitStatus=-2,表示條件狀態),PROPAGATE(waitStatus=-3,表示SHARED共享模式狀態)等5個靜態字面常量,CAS操作時寫入,默認值為0,
  • 維護了Node的2個結構節點變數: 主要是prev和next,其中,prev表示前驅節點,next表示后續節點,表示構成雙向向鏈表,構成了等待佇列的資料結構
  • 維護了一個狀態作業模式標記: 主要是維護了一個nextWaiter,用于表示在等待佇列中當前節點在是共享模式還是獨享模式,而對于條件佇列來說,用于組成單向鏈表結構
  • 維護了一個執行緒物件變數: 主要用于記錄當前節點中的執行緒thread

[5].對于自旋鎖消耗超時時間閥值(spinForTimeoutThreshold),主要表示系統依據這個閥值來選擇自旋方式還是系統阻塞,一般假設這個threshold,當 threshold < 1000ns時,表示競爭時選擇自旋;否則,當threshold > 1000ns時,表示競爭時選擇系統阻塞

[6].對于帶有Offset 等變數對應各自的句柄,主要用于執行CAS操作,在JDK1.8版本之前,CAS操作主要通過Unsafe類來說實作;在JDK1.8版本之后,已經開始利用VarHandle來替代Unsafe類操作實作,

[7].對于CAS操作來說,主要提供了如下幾個方法:

  • compareAndSetState(int expect, int update)方法:CAS操作原子更新狀態
  • compareAndSetHead(Node update)方法:CAS操作原子更新頭部節點
  • compareAndSetTail(Node expect, Node update)方法:CAS操作原子更新尾部節點
  • compareAndSetWaitStatus(Node node, int expect,int update)方法:CAS操作原子更新等待狀態
  • compareAndSetNext(Node node,Node expect,Node update)方法:CAS操作原子更新后續節點

[8].對于條件佇列(ConditionObject)來說,主要內容如下:

  public class ConditionObject implements Condition, java.io.Serializable {

      private static final long serialVersionUID = 1173984872572414699 L;

      /** First node of condition queue. */
      private transient Node firstWaiter;

      /** Last node of condition queue. */
      private transient Node lastWaiter;

      /** Mode meaning to reinterrupt on exit from wait */
      private static final int REINTERRUPT = 1;

      /** Mode meaning to throw InterruptedException on exit from wait */
      private static final int THROW_IE = -1;

      /**
       * Creates a new {@code ConditionObject} instance.
       */
      public ConditionObject() {}
			
  }

  • 基于Condition的介面實作條件佇列,其核心主要是實作阻塞和喚醒的作業機制
  • 基于Node定義了firstWaiter和lastWaiter變數,其中,firstWaiter表示的是頭節點,lastWaiter是尾節點
  • 還定義了2個字面常量REINTERRUPT和THROW_IE,其中REINTERRUPT=1,描述的是當中斷是退出條件佇列,THROW_IE=-1表示的是發生例外時退出

[8].除此之外,在AQS基礎同步器中,一般可以通過構造方法直接將引數值賦給對應變數,也可以通過變數句柄進行賦值操作:

  • isShared()方法: 用于判斷等待佇列是否為共享模式
  • predecessor()方法: 用于獲取當前節點對應的前驅節點,如果為空,則 throw new NullPointerException();

2. 基本實作

一個標準的AQS同步器最核心底層設計實作是一個非阻塞的CHL Node FIFO(先進先出)佇列資料結構,通過采用自旋鎖+CAS操作的方法來保證原子性操作,

總的來說,一個AQS基礎同步器,底層的資料結構采用的是一個非阻塞的CHL Node FIFO(先進先出)佇列資料結構,而實作的核心演算法則是采用自旋鎖+CAS操作的方法,

首先,對于非阻塞的CHL Node FIFO(先進先出)佇列資料結構,一般來說,FIFO(First In First Out,先進先出)佇列是一個有序串列,屬于抽象型資料型別(Abstract Data Type,ADT),所有的插入和洗掉操作都發生在隊首(Front)和隊尾(Rear)兩端,具有先進先出的特性,


    /**
     * 等待佇列: head-頭節點
     */
    private transient volatile Node head;

    /**
     * 等待佇列: tail-尾節點
     */
    private transient volatile Node tail;
		
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在AQS同步器的原始碼中,主要是通過靜態內部類Node來實作的這個非阻塞的CHL Node FIFO(先進先出)佇列資料結構, 維護了兩個變數head和tail,其中head對應隊首(Front),tail對應隊尾(Rear),同時,還定義了addWaiter(Node mode)方法來表示入隊操作,其中有個enq(final Node node)方法,主要用于初始化佇列中head和tail的設定,

其次,AQS同步器以CLH鎖為基礎,其中CLH鎖是一種自旋鎖,對于自旋鎖的實作方式來看,主要可以分為普通自旋鎖和自適應自旋鎖,CLH鎖和MCS鎖等4種,其中:

  • 普通自旋鎖:多個執行緒不斷自旋,不斷嘗試獲取鎖,其不具備公平性和由于要保證CPU和快取以及主存之間的資料一致性,其開銷較大,
  • 自適應自旋鎖:主要是為解決普通自旋鎖的公平性問題,引入了一個排隊機制,一般稱為排他自旋鎖,其具備公平性,但是沒有解決保證CPU和快取以及主存之間的資料一致性問題,其開銷較大,
  • CLH鎖:通過一定手段將執行緒對于某一個共享變數的輪詢競爭轉化為一個執行緒佇列,且佇列中的執行緒各自輪詢自己本地變數,
  • MCS鎖:主旨在于解決 CLH鎖的問題,也是基于FIFO佇列,與CLH鎖不同是,只對本地變數自旋,前驅節點負責通知MCS鎖中執行緒自適結束,

自旋鎖是一種實作同步的方案,屬于一種非阻塞鎖,與常規鎖主要的區別就在于獲取鎖失敗之后的處理方式不同,主要體現在:

  • 一般情況下,常規鎖在獲取鎖失敗之后,會將執行緒阻塞并適當時重新喚醒
  • 而自旋鎖則是使用自旋來替換阻塞操作,主要是執行緒會不斷回圈檢查該鎖是否被釋放,一旦釋放執行緒便會獲取鎖資源,

從本質上講,自旋是一鐘忙等待狀態,會一直消耗CPU的執行時間,一般情況下,常規互斥鎖適用于持有鎖長時間的情況,自旋鎖適合持有時間短的情況,

其中,對于CLH鎖來說,其核心是為解決同步帶來的花銷問題,Craig,Landim,Hagersten三人發明了CLH鎖,其中主要是:

  • 構建一個FIFO(先進先出)佇列,構建時主要通過移動尾部節點tail來實作佇列的排隊,每個想獲得鎖的執行緒都會創建一個新節點(next)并通過CAS操作原子操作將新節點賦予給tail,當前執行緒輪詢前一個節點的狀態,
  • 執行完執行緒后,只需將當前執行緒對應節點狀態設定為解鎖即可,主要是判斷當前節點是否為尾部節點,如果是直接設定尾部節點設定為空,由于下一個節點一直在輪詢,所以可以獲得鎖,

CLH鎖將眾多執行緒長時間對資源的競爭,通過有序化這些執行緒將其轉化為只需要對本地變數檢測,唯一存在競爭的地方就是入隊之前對尾部節點tail 的競爭,相對來說,當前執行緒對資源的競爭次數減少,這節省了CPU快取同步的消耗,從而提升了系統性能,

但是同時也有一個問題,CLH鎖雖然解決了大量執行緒同時操作同一個變數時帶來的開銷問題,如果前驅節點和當前節點在本地主存中不存在,則訪問時間過長,也會引起性能問題,

為了讓CLH鎖更容易實作取消和超時的功能,AQS同步器在設計時進行了改造,主要體現在:節點的結構和節點等待機制,其中:

  • 節點的結構: 主要引入了頭節點和尾節點,分別指向佇列頭部和尾部,對于鎖的相關操作都與其息息相關,并且每個節點都引入了前驅節點和后繼節點,
  • 節點等待機制: 主要在原來的自旋基礎上增加了系統阻塞喚醒,主要體現在 自旋鎖消耗超時時間閥值(threshold): threshold < 1000ns時,表示競爭時選擇自旋;threshold > 1000ns時,表示競爭時選擇系統阻塞,

由此可見,主要是通過前驅節點和后繼節點的參考連接起來形成一個鏈表佇列,其中對于入隊,檢測節點,出隊,判斷超時,取消節點等操作主要如下:

  • 入隊(enqueue): 主要采用一個無限回圈進行CAS操作,即就是使用自旋方式競爭直到成功,
  • 檢測節點(checkedPrev): 一般在入隊完成后,主要是檢測判斷當前節點的前驅節點是否為頭節點, 一般自旋方式是直接進入回圈檢測,而系統阻塞方式是當前執行緒先檢測,其中如果是頭節點并成功獲取鎖,則直接回傳,當前執行緒不阻塞,否則對當前執行緒進行阻塞,
  • 出隊(dequeue):主要負責喚醒等待佇列中的后繼節點,并且按照條件往下傳播有序執行
  • 判斷超時(checkedTimeout): 佇列中等待鎖的執行緒可能因為中斷或者超時的情況,當總耗時大于等于自定義耗時就直接回傳,即就是
  • 取消節點(cancel): 主要是對于中斷和超時而涉及到取消操作,而且這樣的情況不再參與鎖競爭,即就是一般通過呼叫compareAndSetNext(Node node, Node expect,Node update)來進行CAS操作,

特別值得注意的是,AQS基礎同步器中主要有等待佇列和條件佇列兩種對列結構,對比便不難發現:等待佇列采用的底層資料結構是雙向鏈表結構,而對于條件佇列則是單向鏈表結構,

最后,AQS同步器中使用了CAS操作,其中CAS(Compare And Swap,比較并交換)操作時一種樂觀鎖策略,主要涉及三個操作資料:記憶體值,預期值,新值,主要是指當且僅當預期值和記憶體值相等時才去修改記憶體值為新值,

一般來說,CAS操作的具體邏輯,主要可以分為三個步驟:

  • 首先,檢查某個記憶體值是否與該執行緒之前取到值一樣,
  • 其次,如果不一樣,表示此記憶體值已經被別的執行緒修改,需要舍棄本次操作,
  • 最后,如果時一樣,表示期間沒有執行緒更改過,則需要用新值執行更新記憶體值,

除此之外,需要注意的是CAS操作具有原子性,主要是由CPU硬體指令來保證,并且通過Java本地介面(Java Native Interface,JNI)呼叫本地硬體指令實作,

當然,CAS操作避免了悲觀策略獨占物件的 問題,同時提高了并發性能,但是也有以下三個問題:

  • 樂觀策略只能保證一個共享變數的原子操作,如果是多個變數,CAS便不如互斥鎖,主要是CAS操作的局限所致,
  • 長時間回圈操作可能導致開銷過大,
  • 經典的ABA問題: 主要是檢查某個記憶體值是否與該執行緒之前取到值一樣,這個判斷邏輯不嚴謹,解決ABA問題的核心在于,引入版本號,每次更新變數值更新版本號,

而在AQS同步器中,為了保證并發實作保證原子性,而且是硬體級別的原子性,一般是通過JNI(Java native interface,Java 本地介面)方式讓Java代碼呼叫C/C++本地代碼,

通過分析原始碼可知,一般使用Unsafe類需要只用關注如下方法即可:


public final class Unsafe {

    private static final Unsafe theUnsafe;

    private static native void registerNatives();

    private Unsafe() {}

    @CallerSensitive
    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

    public native void unpark(Object var1);

    public native void park(boolean var1, long var2);

}
  • rgisterNatives()方法:是一個靜態 方法,主要用于注冊本地方法
  • 建構式是private私有化的,一般無法通過建構式來實體化Unsafe物件
  • getUnsafe()方法:是用來獲取Unsafe物件的,雖然是公有化的,但是如果Java語言開發層面的對進行安全檢查

一般地,由于Unsafe類的操作涉及到硬體底層的操作,JDK對其實體化做了安全校驗,只有受系統信任的代碼才對其實體化,主要是通過類加載器來決議,其實體化方式主要有如下方式:

  • 第一種:直接呼叫該方法,主要新式是Unsafe.getUnsafe(),但是,對于我們實際開發來說,這種方式無法通過安全校驗行不通,系統會拋出 throw new SecurityException("Unsafe")資訊
  • 第二種:通過反射機制繞過安全檢查,主要是修改Unsafe類中theUnsafe欄位的訪問權限,讓其能被訪問從而達到獲取Unsafe物件的目的

需要注意的是,在Java領域中,對于CAS操作實作,主要有兩點問題:

  • JDK1.8版本之前,CAS操作主要使用Unsafe類來執行底層操作,一般并發和執行緒操作時,主要用compareAndSwapObject,compareAndSwapInt,compareAndSwapLong等來實作CAS,而對于執行緒調度主要是park和unpark方法,其主要在sun.misc包下面,
  • JDK1.8版本之后,JDK1.9的CAS操作主要使用VarHandle類,只是用VarHandle替代了一部分Unsafe類的操作,但是對于新版本中Unsafe,本質上Unsafe類會間接呼叫jdk.internal.misc包下面Unsafe類來實作,

3. 具體實作

在Java領域中,AQS同步器利用獨享模式和共享模式來實作同步機制,主要為解決多線并發執行中資料競爭和競爭條件問題,

v7QNMd.png

為解決多線并發執行中資料競爭和競爭條件問題,引入了同步機制,主要是通過控制共享資料和臨界區的訪問,一般比較通用的方式是通過鎖機制來實作,

在Java領域中,JDK對于AQS基礎同步器抽象封裝了鎖的獲取和釋放操作,主要提供了獨享和共享兩種作業模式:

  • 獨享模式(Exclusive Mode) :對應著獨享鎖(Exclusive Lock),表示著對于鎖的獲取和釋放,一次只能至多一個或者只有一個執行緒把持,其他執行緒無法獲得并獲得持有,必須等待持有執行緒釋放鎖,
  • 共享模式(Shared Mode) :對應著共享鎖(Shared Lock),表示著對于鎖的獲取和釋放,一次可以至少一個或者允許多個執行緒把持,其他執行緒可以獲得并獲得持有,不用等待持有執行緒釋放鎖,

其中,AQS基礎同步器對于獨享模式和共享模式的作業模式的基本流程,主要如下:

  • 獲取鎖流程: 先嘗試獲取鎖,如果獲取成功則往下繼續進行,否則把執行緒維護到等待佇列中,執行緒可能會掛起,
  • 釋放鎖流程:喚醒等待佇列中的一個或者多個執行緒去嘗試獲取需要釋放的鎖,

一般地,AQS基礎同步器對于獨享模式和共享模式的封裝和實作,其中:

3.1. 獨享模式的技術實作

[1].獲取鎖操作相關的核心邏輯,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 獨占模式:[1].通過acquire獲取鎖操作
     */

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


    /**
     * 獨占模式:[2].通過tryAcquire嘗試獲取鎖操作
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }



    /**
     * 獨占模式:[3].通過addWaiter入隊操作
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * 獨占模式:[4].通過acquireQueued檢測節點
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 獨占模式:[5].通過cancelAcquire取消鎖獲取
     */
    private void cancelAcquire(Node node) {

        if (node == null)
            return;

        node.thread = null;


        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;


        Node predNext = pred.next;


        node.waitStatus = Node.CANCELLED;


        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {

            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; 
        }
    }


}

[2]. 釋放鎖操作相關的核心邏輯,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


    /**
     * 獨占模式:[1].通過release釋放鎖操作
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 獨占模式:[2].通過tryRelease嘗試釋放鎖操作
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 獨占模式:[3].通過unparkSuccessor喚醒后繼節點
     */
    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

}

由此可見,對于獨占模式的鎖獲取和釋放,主要是依據acquire和release等方法來實作,

3.1. 共享模式的技術實作

[1].獲取鎖操作相關的核心邏輯,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1].通過acquireShared獲取鎖操作
     */

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * 共享模式:[2].通過tryAcquireShared嘗試獲取鎖操作
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3].通過doAcquireShared入隊操作
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

}

[2].釋放鎖操作相關的核心邏輯,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1].通過releaseShared釋放鎖操作
     */

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * 共享模式:[2].通過tryReleaseShared嘗試釋放鎖操作
     */


    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3].通過doReleaseShared釋放鎖就緒操作
     */
    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }
		
}

由此可見,對于共享模式的鎖獲取和釋放,主要是依據acquireShared和releaseShared等方法來實作,

v7QaqI.png

綜上所述,不難看出,AQS同步器設計思想是通過繼承的方式提供一個模板,其核心原理是管理一個共享狀態,通過對狀態的控制來實作不同的控制,

二. LockSupport的設計與實作

在Java領域中,LockSupport主要從執行緒資源角度為同步器和鎖提供基本執行緒阻塞和喚醒原語,是“等待-通知”作業機制的實作,

vLYc3n.png

一般來說,當一個執行緒(Thread)只要參與鎖競爭時,其經歷的主要流程有:

  • 一旦當前執行緒進行鎖競爭時,執行緒都會嘗試獲取鎖,根據獲取鎖的情況進行后續處理,
  • 如果獲取鎖失敗,則會創建節點插入到佇列的尾部,會二次嘗試重新獲取鎖,并不會阻塞當前執行緒,
  • 如果獲取鎖成功,則直接回傳,否則會將節點設定為待運行狀態(SIGNAL),
  • 最后對當前執行緒進行阻塞,當前驅節點運行完成后會喚醒后繼節點,

在Java領域中,對于執行緒的阻塞和喚醒,也許我們最早在學習面向物件原則的時候,一般都使用java.lang.Object類中wait()、notify()、notifyAll() 這三個方法,可以用它們幫助我們實作等待-通知”作業機制,

同時,在講解AQS基礎同步器的實作時,提到說CAS操作的核心是使用Unsafe類來執行底層操作,對于執行緒調度主要是park和unpark方法,但是一般的Java語言層 main開發對其呼叫又有安全檢查的限制,

但是,在AQS基礎同步的的阻塞和喚醒操作咋在獲取鎖餓的鎖操作中需要使用,一般地:

  • 如果獲取不到鎖的當前執行緒在進入排到佇列之后需要阻塞當前執行緒,
  • 并且,排到佇列中前驅節點運行完成后,需要負責喚醒后繼節點,

于是,在AQS基礎同步器的設計與實作中,封裝一個專門用于實作“等待-通知”作業機制的LockSupport類,

對于LockSupport類,主要是為同步器和鎖提供基本執行緒阻塞和喚醒原語,AQS同步器和鎖都是使用它來阻塞和喚醒執行緒,主要原始碼如下:


public class LockSupport {

    /**
     *  阻塞操作:利用park阻塞某個執行緒(指定引數)
     */
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0 L);
        setBlocker(t, null);
    }

    /**
     *  阻塞操作:利用park阻塞某個執行緒(無指定引數)
     */
    public static void park() {
        UNSAFE.park(false, 0 L);
    }


    /**
     *  阻塞操作:根據nanos許可,利用parkNanos阻塞某個執行緒
     */
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     *  阻塞操作:根據nanos許可,利用parkNanos阻塞某個執行緒,但是指定阻塞物件
     */
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }

    /**
     *  阻塞操作:根據deadline最大等待時間,利用parkUntil阻塞某個執行緒
     */
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    /**
     *  阻塞操作:根據deadline最大等待時間,利用parkUntil阻塞某個執行緒,需要指定阻塞物件
     */
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }


    /**
     *  喚醒操作:利用unpark喚醒某個執行緒
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     *  設定阻塞器: 指定執行緒和物件
     */
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    /**
     *  獲取阻塞器中執行緒物件
     */
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    /**
     *  獲取阻塞器中執行緒物件
     */
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13; // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;

    private static final long parkBlockerOffset;

    private static final long SEED;

    private static final long PROBE;

    private static final long SECONDARY;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class <? > tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset(tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

}

  • 設計思想: 相比用java.lang.Object類中的wait/notify方式,其LockSupport類更關注執行緒本身,解耦了執行緒之間的同步,
  • 實作原理: 主要還是使用Unsafe類來執行底層操作,主要是間接呼叫是park和unpark本地方法
  • 阻塞操作涉及方法: 一般以park開頭的方法來阻塞執行緒操作,大致可以分為自定義阻塞物件引數和非自定義阻塞物件引數等阻塞方法
  • 喚醒執行緒操作: 主要通過unpark方法來對當前執行緒設定可用,相對于喚醒操作

三. Condition介面的設計與實作

在Java領域中,Condition介面是用來實作管程技術,其中 Condition用于解決同步問題,

vLNCoF.png

相對于LockSupport的設計與實作來說,Condition介面只是在JDK層面對于阻塞和喚醒提供了一個模板的定義,是AQS基礎同步器中條件佇列的定義,而ConditionObject是在AQS基礎同步器具體實作,

對于Condition介面而言,是提供了可替代wait/notify機制的條件佇列模式,其中:


public interface Condition {

    /**
     * 條件佇列模式:等待await操作
     */
    void await() throws InterruptedException;

    /**
     * 條件佇列模式:等待awaitUninterruptibly操作,可中斷模式
     */
    void awaitUninterruptibly();

    /**
     * 條件佇列模式:等待awaitNanos操作,可超時模式
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 條件佇列模式:等待await操作,可超時模式
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     *條件佇列模式:等待awaitUntil操作,可超時模式
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 條件佇列模式:通知signal操作
     */
    void signal();

    /**
     * 條件佇列模式:通知signalAll操作
     */
    void signalAll();
}
  • 定義了關于“等待(wait)”機制的相關實作:而以await開頭的所有方法都是關于等待機制的定義,
  • 定義了關于“通知(signal)”機制的相關實作: signal()方法和signalAll()方法,其中 signal()方法是隨機地通知等待佇列中的一個執行緒,而signalAll()方法是通知等待佇列中的所有執行緒,

四. Lock介面的設計與實作

在Java領域中,Lock介面是用來實作管程技術,其中 Lock 用于解決互斥問題,

vqfsu8.png

Lock介面位于java.util.concurrent.locks包中,是JUC顯式鎖的一個抽象,Lock介面的主要抽象方法:

public interface Lock {

    /**
     * Lock介面-獲取鎖
     */
    void lock();

    /**
     * Lock介面-獲取鎖(可中斷)
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * Lock介面-嘗試獲取鎖
     *
     */
    boolean tryLock();

    /**
     *Lock介面-嘗試獲取鎖(支持超時)
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     *Lock介面-釋放鎖
     *
     */
    void unlock();

    /**
     *  Lock介面-設定條件變數
     */
    Condition newCondition();
}

  • 獲取鎖: lock()
  • 釋放鎖:unlock()
  • 條件變數: Condition

在JDK中,對于Lock介面的具體實作主要是ReentrantLock類,其中:


public class ReentrantLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID = 7373984872572414699 L;

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;


    /**
     * 構造鎖的非公平模式(默認模式)
     */
    public ReentrantLock() {
            sync = new NonfairSync();
        }
				
    /**
     * 構造鎖的公平和非公平模式(可選公平或者非公平)
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * Lock介面-實作嘗試獲取鎖
     */
    public void lock() {
        sync.lock();
    }

    /**
     * Lock介面-實作嘗試獲取鎖
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * Lock介面-實作嘗試獲取鎖
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * Lock介面-實作嘗試獲取鎖
     */
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * Lock介面-釋放鎖
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * Lock介面-創建條件變數
     */
    public Condition newCondition() {
        return sync.newCondition();
    }


}

  • 包含了一個同步器Sync,主要是基于AbstractQueuedSynchronizer實作,同時基于Sync類還實作FairSync類和NonfairSync類,其中FairSync類對應著公平模式,NonfairSync類對應非公平模式,
  • 實作Lock介面設計和定義的相關方法,可以設定其鎖是公平和非公的,默認是非公平模式的,

相對于Java內置鎖,Java SDK 并發包里的 Lock介面主要區別有能夠回應中斷、支持超時和非阻塞地獲取鎖等三個特性,

五. ReadWriteLock介面的設計與實作

在Java領域中,ReadWriteLock介面主要是基于Lock介面來封裝了ReadLock鎖和WriteLock鎖等2種鎖的實作方法,其中ReadLock鎖是讀鎖的介面定義,WriteLock鎖是寫鎖的介面定義,

vqffCn.png

ReadWriteLock介面的內部實作,主要是基于Lock介面來獲取ReadLock鎖和WriteLock鎖的,其具體代碼如下:


public interface ReadWriteLock {
    /**
     * ReadWriteLock介面-基于Lock介面實作ReadLock
     */
    Lock readLock();

    /**
     * ReadWriteLock介面-基于Lock介面實作WriteLock
     */
    Lock writeLock();
}

  • readLock()方法: 獲取讀鎖,主要是基于基于Lock介面來定義,表示著其具體實作類都會基于AQS基礎同步器實作
  • writeLock()方法:獲取寫鎖,主要是基于基于Lock介面來定義,表示著其具體實作類都會基于AQS基礎同步器實作

在JDK中,對于ReadWriteLock介面的具體實作主要是ReentrantReadWriteLock類,其中:


public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {

    private static final long serialVersionUID = -6992448646407690164 L;

    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    /** Performs all synchronization mechanics */
    final Sync sync;


    /**
     * 同步器-基于AbstractQueuedSynchronizer實作
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //.....

        abstract boolean readerShouldBlock();

        abstract boolean writerShouldBlock();
    }

    /**
     * 同步器-非公平模式
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037 L;

        final boolean writerShouldBlock() {
            return false;
        }

        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    /**
     * 同步器-公平模式
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451 L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }


    /**
     * 構造鎖的非公平模式(默認預設)
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * 構造鎖的公平和非公平模式(可選公平或者非公平)
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /**
     * ReadWriteLock介面-基于Lock具體實作的writeLock()
     */
    public ReentrantReadWriteLock.WriteLock writeLock() {
        return writerLock;
    }

    /**
     * ReadWriteLock介面-基于Lock具體實作的readLock()
     */
    public ReentrantReadWriteLock.ReadLock readLock() {
        return readerLock;
    }

    /**
     * ReadWriteLock介面-ReadLock內置類
     */
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164 L;

        private final Sync sync;

        //.....
    }

    /**
     * ReadWriteLock介面-WriteLock內置類
     */
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164 L;

        private final Sync sync;

        //.....
    }

}

  • 包含了一個同步器Sync,主要是基于AbstractQueuedSynchronizer實作,同時基于Sync類還實作FairSync類和NonfairSync類,其中FairSync類對應著公平模式,NonfairSync類對應非公平模式,
  • 主要是內置定義了ReadLock類和WriteLock類等兩個內部類,其中ReadLock類為讀鎖,而而WriteLock類為寫鎖,可以設定其鎖是公平和非公的,默認是非公平模式的,
  • 實作了ReadWriteLock介面,通過內部類的定義來具體實作對應的鎖,其中ReadLock類對應readLock()方法,而WriteLock類對應writeLock()方法,

綜上所述,從JDK中對于ReadWriteLock介面的具體實作來看,我們不難發現,ReadWriteLock介面是實作讀寫鎖一種模板定義,我們可以基于這個介面來實作滿足我們實際業務需求的讀寫鎖,

寫在最后

Picture-Footer

在并發編程領域,有兩大核心問題:一個是互斥,即同一時刻只允許一個執行緒訪問共享資源;另一個是同步,即執行緒之間如何通信、協作,

主要原因是,對于多執行緒實作實作并發,一直以來,多執行緒都存在2個問題:

  • 執行緒之間記憶體共享,需要通過加鎖進行控制,但是加鎖會導致性能下降,同時復雜的加鎖機制也會增加編程編碼難度
  • 過多執行緒造成執行緒之間的背景關系切換,導致效率低下

因此,在并發編程領域中,一直有一個很重要的設計原則: “ 不要通過記憶體共享來實作通信,而應該通過通信來實作記憶體共享,”

簡單來說,就是盡可能通過訊息通信,而不是記憶體共享來實作行程或者執行緒之間的同步,

最后,技術研究之路任重而道遠,愿我們熬的每一個通宵,都撐得起我們想在這條路上走下去的勇氣,未來仍然可期,與各位程式編程君共勉!

著作權宣告:本文為博主原創文章,遵循相關著作權協議,如若轉載或者分享請附上原文出處鏈接和鏈接來源,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/508963.html

標籤:其他

上一篇:Java 并發編程決議 | 基于JDK原始碼決議Java領域中的并發鎖,我們可以從中學習到什么內容?

下一篇:Redis 生產架構選型對比,一文整治選擇困難癥

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more