1 同步佇列(CLH)
CLH佇列是Craig, Landin, and Hagersten三人發明的一種基于雙向鏈表資料結構的佇列,是一種基于鏈表的可擴展、高性能、公平的自旋鎖,申請執行緒僅僅在本地變數上自旋,它不斷輪詢前驅的狀態,假設發現前驅釋放了鎖就結束自旋,
JAVA中的CLH佇列是原CLH佇列的一個變種,執行緒由原來的自旋機制改為阻塞機制,

2 AQS簡介
2.1 定義
(1) 全稱是 AbstractQueuedSynchronizer
(2) 阻塞式鎖和相關的同步器工具的框架;
(3) AQS用一個變數(volatile state) 屬性來表示鎖的狀態,子類去維護這個狀態
(4) getState、compareAndSetState cas改變這個變數
(5) 獨占模式是只有一個執行緒能夠訪問資源
(6) 而共享模式可以允許多個執行緒訪問資源(讀寫鎖)
(7) 內部維護了一個FIFO等待佇列,類似于 synchronized關鍵字當中的 Monitor 的 EntryList
(8) 條件變數來實作等待、喚醒機制,支持多個條件變數,類似于 Monitor 的 WaitSet
(9) 內部維護了一個Thread exclusiveOwnerThread 來記錄當前持有鎖的那個執行緒
2.2 功能
(1) 實作阻塞獲取鎖 acquire 拿不到鎖就去阻塞 等待鎖被釋放再次獲取鎖
(2) 實作非阻塞嘗試獲取鎖 tryAcquire 拿不到鎖則直接放棄
(3) 實作獲取鎖超時機制
(4) 實作通過打斷來取消
(5) 實作獨占鎖及共享鎖
(6) 實作條件不滿足的時候等待
3 節點
static final class Node {
/**################節點模式##################*/
/** 共享*/
static final Node SHARED = new Node();
/** 獨占 */
static final Node EXCLUSIVE = null;
/**
* CANCELLED:等待執行緒超時或者被中斷、需要從同步佇列中取消等待(也就是放棄資源的競爭)
* SIGNAL:后繼節點會處于等待狀態,當前節點執行緒如果釋放同步狀態或者被取消則會通知后繼節點執行緒,使后繼節點執行緒的得以運行
* CONDITION:節點在等待佇列中,執行緒在等待在Condition 上,其他執行緒對Condition呼叫singnal()方法后,該節點加入到同步佇列中,
* PROPAGATE:表示下一次共享式獲取同步狀態的時會被無條件的傳播下去,
*/
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**等待狀態*/
volatile int waitStatus;
/**前指標*/
volatile Node prev;
/**后指標*/
volatile Node next;
/**執行緒*/
volatile Thread thread;
/**
* AQS中條件佇列是使用單向串列保存的,用nextWaiter來連接,
*/
Node nextWaiter;
/** 判斷nextWaiter是否為共享*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**獲取當前節點的前驅節點 */
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/**
* ######################################
* ##############構造方法################
* ######################################
*/
Node() {
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
4 AQS主要屬性
head、tail、state、exclusiveOwnerThread


5 ReentrantLock
5.1 ReentrantLock和AQS之間的關系

5.2 非公平鎖加鎖程序
5.2.1 Demo
public class Account {
private static final ReentrantLock LOCK = new ReentrantLock();
private int number;
public Account(int number) {
this.number = number;
}
public void addNumber(int count) {
try {
LOCK.lock();
number = number + count;
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
public int getNumber() {
return this.number;
}
}
public class LockTest {
public static void main(String[] args) throws InterruptedException {
Account account = new Account(0);
//創建
Thread t1 = new Thread(() -> account.addNumber(100), "t1");
Thread t2 = new Thread(() -> account.addNumber(100), "t2");
Thread t3 = new Thread(() -> account.addNumber(100), "t3");
//開啟
t1.start();
t2.start();
t3.start();
//join
t1.join();
t2.join();
t3.join();
System.out.println(account.getNumber());
}
}

5.2.2 AQS初始化狀態

5.2.3 當t1加入時
t1進來時佇列沒有CLH佇列為空,及t1搶到了鎖,compareAndSetState(0,1) 為true,

AQS狀態:

5.2.4 當t2加入時
(1) 呼叫ReentrantLock NonfairSync內部類lock
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//t1睡眠1s,鎖沒有釋放,t2進入acquire(1)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
(2) 調取AQS方法acquire
public final void acquire(int arg) {
//嘗試獲取鎖、入隊、等待
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(3) 調取ReentrantLock NonfairSync內部類nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取狀態
int c = getState();
//如果為0,并且搶到鎖,回傳嘗試獲取成功
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果當前執行緒等于持有鎖的執行緒,更新state的值,回傳嘗試獲取成功
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//嘗試獲取失敗
return false;
}
(4) 調取AQS方法addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
//創建一個node節點
Node node = new Node(Thread.currentThread(), mode);
// 獲取隊尾
Node pred = tail;
//如果隊尾存在
if (pred != null) {
//把當前節點的prev設定成隊尾,cas更新隊尾,回傳新建的節點
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//初始化CLH佇列
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
//獲取隊尾
Node t = tail;
//如果隊尾為空
if (t == null) {
//創建一個節點,設定為隊頭,隊尾,繼續自旋
if (compareAndSetHead(new Node()))
tail = head;
} else {
//更新隊尾,設定新建節點的前驅為隊尾,cas設定隊尾,更新原隊尾的后繼節點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
(5) 調取AQS方法 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
//標記acquireQueued方法是否執行成功
boolean failed = true;
try {
//標記,執行緒打斷狀態
boolean interrupted = false;
//自旋
for (;;) {
//獲取新建節點的前驅節點
final Node p = node.predecessor();
//如果當前節點是第二個節點,并且搶到了鎖
if (p == head && tryAcquire(arg)) {
//更新隊頭
setHead(node);
p.next = null; // help GC
//標記acquireQueued方法執行成功
failed = false;
return interrupted;
}
//自旋第一次,cas設定前驅節點為SIGNAL狀態(進入睡眠前,修改前驅節點狀態,告訴它輪我作業時呼我)
//自旋第二次,執行parkAndCheckInterrupt()方法進入睡眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
(6) AQS狀態

(7) CLH狀態

5.2.5 當t3加入時

5.3 解鎖
(1) ReentrantLock 呼叫方法

(2) AQS 呼叫方法
public final boolean release(int arg) {
//釋放鎖
if (tryRelease(arg)) {
//獲取頭節點
Node h = head;
//頭結點不能為空并且頭結點狀態不能是0,才能喚醒其它節點
//PS:狀態是0的節點是尾部節點
if (h != null && h.waitStatus != 0)
//喚醒
unparkSuccessor(h);
return true;
}
return false;
}
(3) ReentrantLock 呼叫方法tryRelease
protected final boolean tryRelease(int releases) {
//獲取狀態
int c = getState() - releases;
//判斷當前執行緒是否是鎖的持有執行緒
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//釋放標記
boolean free = false;
//如果狀態為0,釋放標記更新成功,設定鎖的持有執行緒為null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//更新狀態
setState(c);
return free;
}
(4) AQS 調取方法unparkSuccessor
private void unparkSuccessor(Node node) {
//獲取頭節點狀態
int ws = node.waitStatus;
//如果狀態為SIGNAL、CONDITION、PROPAGATE更新狀態為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取頭節點的下一個節點
Node s = node.next;
//如果下一個節點為null 或 無效的節點,從隊尾往前找,找出最后一個有效的節點(head下第一個有效節點)
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
5.4 公平鎖和非公平鎖加鎖區別
(1) lock方法不同


(2) tryAcquire不同


轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/213043.html
標籤:其他
