文章目錄
- Time 2021-12-31——Hireek
- LockSupport 工具類
- void park()
- void unpark(Thread thread) 方法
- void parkNanos(long nanos) 方法
- void park(Object blocker)
- void parkNanos(Object blocker, long nanos)
- void parkUtil(Object blocker, long deadline)
- 最后再看一個例子
- hotspot底層實作
- Unsafe_Park
- Unsafe_Unpark
- 抽象同步佇列 AQS 概述
- 架構
- why
- what
- how
- 類圖
- Node——同步佇列的結構
- ConditionObject——條件變數
- state——原子變數
- exclusive mode
- acquire
- release
- shared mode
- acquireShared
- releaseShared
- ReentrantLock
- 類圖
- nonfair or fair
- lock()對比
- tryAcquire(int acquires)對比
- ReentrantReadWritelock
- state狀態
- nonfair or fair
- 寫鎖的獲取
- 讀鎖獲取
- StampedLock
Time 2021-12-31——Hireek
LockSupport 工具類
JDK 中的 rt.jar 包里面的是個 LockSupport 是個工具類,它的主要作用是掛起和喚醒執行緒,該工具類是創建鎖和其他同步類的基礎,
LockSupport 類與每個使用它的執行緒都會關聯一個許可證,在默認情況下呼叫 LockSupport 類的方法的執行緒是不持有許可證的,LockSupport 是使用 Unsafe 類實作的,下面介紹 LockSupport 中的幾個主要函式,
void park()
如果呼叫park方法的執行緒已經拿到了與LockSupport關聯的許可證,則呼叫LockSupport.park()時會馬上回傳,否則呼叫執行緒會被禁止參與執行緒的調度,也就是會被阻塞掛起,
如下代碼直接在main函式里面呼叫park方法,最終只會輸出begin park!,然后當前執行緒被掛起,這是因為在默認情況下呼叫執行緒是不持有許可證的,
public static void main(String[] args) {
System.out.println("begin park! ");
LockSupport.park();
System.out.println("end park! ");
}
在其他執行緒呼叫unpark(Thread thread)方法并且將當前執行緒作為引數時,呼叫park方法而被阻塞的執行緒會回傳,另外,如果其他執行緒呼叫了阻塞執行緒的interrupt()方法,設定了中斷標志或者被虛假喚醒,則阻塞執行緒也會回傳,所以呼叫park方法時最好也使用回圈條件判斷方式,
需要注意的是,因呼叫park()方法而被阻塞的執行緒被其他執行緒中斷而回傳時并不會拋出InterruptedException例外,
void unpark(Thread thread) 方法
當一個執行緒呼叫unpark時,如果引數thread執行緒沒有持有thread與LockSupport類關聯的許可證,則讓thread執行緒持有,如果thread之前因呼叫park()而被掛起,則呼叫unpark后,該執行緒會被喚醒,如果thread之前沒有呼叫park,則呼叫unpark方法后,再呼叫park方法,則會立即回傳,修改代碼如下,
public static void main(String[] args) {
System.out.println("begin park! ");
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
System.out.println("end park! ");
/*
輸出結果:
begin park!
end park!
*/
}
下面再來看一個例子以加深對 park 和 unpark 的理解
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("child thread begin park!");
LockSupport.park();
System.out.println("child thread unpark!");
}
});
thread.start();
Thread.sleep(1000);
System.out.println("main thread begin unpark!");
LockSupport.unpark(thread);
/*
輸出結果
child thread begin park!
main thread begin unpark!
child thread unpark!
*/
}
上邊代碼執行程序如下:
- 首先創建了一個子執行緒 thread,然后子執行緒啟動呼叫 park 方法,由于默認情況下子執行緒沒有持有許可證,因而會把自己掛起,
- 主執行緒休眠 1s 是為了讓主執行緒呼叫 unpark 方法前讓子執行緒輸出 child thread begin park! 并阻塞,
- 主執行緒執行 unpark 方法,引數為創建的子執行緒 thread,這樣做的目的是讓子執行緒持有許可證,然后子執行緒呼叫 park 方法就回傳了,
park 方法回傳時不會告訴你因何種原因回傳,所以呼叫者需要根據之前呼叫 park 方法的原因,再次檢查條件是否滿足,如果不滿足還需再次呼叫 park 方法,
例如,根據呼叫前后中斷狀態的對比就可以判斷是不是因為被中斷才回傳的,
為了說明呼叫 park 方法后的執行緒被中斷后會回傳,我們修改上面的例子代碼,洗掉 LockSupport.unpark(thread),然后添加 thread.interrupt(),具體代碼如下,
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("child thread begin park!");
while (!Thread.currentThread().isInterrupted()){
LockSupport.park();
}
System.out.println("child thread unpark!");
}
});
thread.start();
Thread.sleep(1000);
System.out.println("main thread begin unpark!");
thread.interrupt();
/*
輸出結果
child thread begin park!
main thread begin unpark!
child thread unpark!
*/
}
在如上代碼中,只有中斷子執行緒,子執行緒才會運行結束,如果子執行緒不被中斷,即使呼叫 unpark(thread) 方法子執行緒也不會結束,
void parkNanos(long nanos) 方法
和 park 方法類似,如果呼叫 park 方法的執行緒已經拿到了與 LockSupport 關聯的許可證,則呼叫 LockSupport.parkNanos(Long nanos) 方法后會馬上回傳,該方法的不同在于,如果沒有拿到許可證,則呼叫執行緒會被掛起 nanos 時間后修改為自動回傳,
void park(Object blocker)
park 方法還支持帶有 blocker 引數的方法 void park(Object blocker) 方法,當執行緒在沒有持有許可證的情況下呼叫 park 方法而被阻塞掛起時,這個 blocker 物件會被記錄到該執行緒內部,
使用診斷工具可以觀察執行緒被阻塞的原因,診斷工具是通過呼叫 getBlocker(Thread) 方法來獲取 blocker 物件的,所以 JDK 推薦我們使用帶有 blocker 引數的 park 方法,并且 blocker 被設定為 this,這樣當在列印執行緒堆疊排查問題時就能知道是哪個類被阻塞了,
例如下面的代碼,
/**
* 描述
*
* @author Hireek
* @date 2021/12/31 07:51
*/
public class TestPark {
public void testPark() {
LockSupport.park();
}
public static void main(String[] args) {
TestPark testPark = new TestPark();
testPark.testPark();
}
}
運行代碼后,使用 jps 查看運行行程號,然后通過 jstack 命令查看執行緒堆疊時可以啊看到如下輸出結果,

修改代碼 (1) 為 LockSupport.park(this) 后運行代碼,則使用 jstack 命令輸出結果為:

使用帶 blocker 引數的 park 方法,執行緒堆疊可以提供更多有關阻塞物件的資訊,
接下來看看內部實作
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
Thread 類里面有個變數 volatile Object parkBlocker,用來存放 park 方法傳遞的 blocker 物件,也就是把 blocker 變數存放到了呼叫 park 方法的執行緒的成員變數里面,
void parkNanos(Object blocker, long nanos)
相比于 park(Object blocker) 方法多了個超時時間,
void parkUtil(Object blocker, long deadline)
它的代碼如下:
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
其中引數 deadline 的時間單位為 ms,改時間是從 1970 年到現在某一時間點的毫秒值,這個方法和 park(Object blocker, long nanos) 方法的區別是,后則會是從當前算等待 nanos 秒時間,而前者是指定一個時間點,比如需要等到 2019.11.11 日 11:11:11, 則把這個時間點轉化為從 1970 年到這個時間點的總毫秒數,
最后再看一個例子
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
/**
* 描述
*
* @author Hireek
* @date 2021/12/31 07:57
*/
public class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean((false));
private final Queue<Thread> waiters = new ConcurrentLinkedDeque<>();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) {
wasInterrupted = true;
}
}
waiters.remove();
if (wasInterrupted) {
current.interrupt();
}
}
public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
這是一個先進先出的鎖,也就是只有佇列的首元素可以獲取鎖,在代碼 (1) 處,如果當前執行緒不是隊首或者當前鎖已經被其他執行緒獲取,則呼叫 park 方法掛起自己,
然后在代碼 (2) 處判斷,如果 park 方法是因為被中斷而回傳,則忽略中斷,并且重置中斷標志,做個標記,然后再次判斷當前執行緒是不是隊首元素或者當前鎖是否已經被其他執行緒獲取,如果是則繼續呼叫 park 方法掛起自己,
然后再代碼 (3) 中,判斷標記,如果標記為true則中斷該執行緒,這個怎么理解呢?其實就是其他執行緒中斷了該執行緒,雖然我對中斷信號不感興趣,忽略它,但是不代表其他執行緒對該標志不感興趣,所以要恢復下,
上述都是原文,為了加深下使用和理解,下面我們探討下cpp原始碼
hotspot底層實作
Unsafe_Park
jdk12->os_posix.cpp檔案
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END
// 每個執行緒都有一個Parker型別的_parker變數,最終park()在Parker實作
void Parker::park(bool isAbsolute, jlong time) {
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's
// an interrupt pending.
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
struct timespec absTime;
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
if (time > 0) {
to_abstime(&absTime, time, isAbsolute);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unparking. Also re-check interrupt before trying wait.
if (Thread::is_interrupted(thread, false) ||
pthread_mutex_trylock(_mutex) != 0) { // 鎖住信號量
return;
}
int status;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex); //釋放信號量,并在條件變數上等待
assert_status(status == 0, status, "cond_timedwait");
}
else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
_cur_index = -1;
_counter = 0;
status = pthread_mutex_unlock(_mutex); // 釋放信號量
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
Unsafe_Unpark
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
Parker* p = NULL;
if (jthread != NULL) {
ThreadsListHandle tlh;
JavaThread* thr = NULL;
oop java_thread = NULL;
(void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
if (java_thread != NULL) {
// This is a valid oop.
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Not cached in the java.lang.Thread oop yet (could be an
// older version of library).
if (thr != NULL) {
// The JavaThread is alive.
p = thr->parker();
if (p != NULL) {
// Cache the Parker in the java.lang.Thread oop for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
} // ThreadsListHandle is destroyed here.
if (p != NULL) {
HOTSPOT_THREAD_UNPARK((uintptr_t) p);
p->unpark();
}
} UNSAFE_END
決議可以參考:https://blog.csdn.net/weixin_43767015/article/details/107207643
抽象同步佇列 AQS 概述
架構
why
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state.
* This class provides an efficient and scalable basis for synchronization in part by specializing its range of * use to synchronizers that can rely on int state, acquire, and release parameters, and an internal FIFO wait * queue.
/
為了實作一個同步器的框架(模板,基石),
what
一個同步器框架,一個抽象同步器模板,依賴CLH佇列和一個原子變數(volatile修飾),
how
如何實作呢?其實借鑒的是jvm底層加鎖的一套機制,
有同步佇列(雙向鏈表->內部類Node實作),ConditionObject(維護一個單向鏈表->等待佇列),一個原子變數,LockSupport,CAS…
具體變數參照下面類圖
類圖

Node——同步佇列的結構
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
/** 注意是下一個執行緒需要阻塞 */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
ConditionObject——條件變數
ConditionObject是條件變數,每個條件變數對應一個條件佇列(單向鏈表佇列),其用來存放呼叫條件變數的await方法后被阻塞的執行緒,
類似wait()和notify()配合synchronized實作執行緒同步(通信),區別是ConditionObject(條件變數)可以有多個,而synchronized只有一個共享變數,
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// ...
}
state——原子變數
/**
* The synchronization state.
*/
private volatile int state;
在AQS中維持了一個單一的狀態資訊state,可以通過getState、setState、compareAndSetState函式修改其值,對于ReentrantLock的實作來說,state可以用來表示當前執行緒獲取鎖的可重入次數;對于讀寫鎖ReentrantReadWriteLock來說,state的高16位表示讀狀態,也就是獲取該讀鎖的次數,低16位表示獲取到寫鎖的執行緒的可重入次數;對于semaphore來說,state用來表示當前可用信號的個數:對于CountDownlatch來說,state用來表示計數器當前的值,
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:
- tryAcquire // 獨占鎖
- tryRelease // 獨占鎖
- tryAcquireShared // 共享
- tryReleaseShared // 共享
- isHeldExclusively // 如果同步是獨占的,則為true ; 否則為false
下面講講獨占和共享兩種模式(不對中斷回應)
exclusive mode
acquire
當一個執行緒呼叫 acquire(int arg)方法獲取獨占資源時,會首先使用 tryAcquire方 法嘗試獲取資源, 具體是設定狀態變數 state 的值,成功則直接回傳,失敗則將當前執行緒 封裝為型別為 Node.EXCLUSIVE 的 Node 節點后插入到 AQS 阻塞 佇列的尾部,并呼叫 LockSupport.park(this) 方法掛起自己 ,
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
release
當一個執行緒呼叫 release(int arg)方法時會嘗試使用 tryRelease操作釋放資源,這里 是設定狀態變 量 state 的值,然后呼叫 LockSupport.unpark(thread)方法激活 AQS 佇列 里 面 被阻塞的一個執行緒(thread), 被激活的執行緒則使用 tryAcquire嘗試,看當前狀態變數 state 的值是否能滿足自己的需要,滿足則該執行緒被激活,然后繼續 向下運行,否則還是會被放 入 AQS 佇列并被掛起,
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
shared mode
acquireShared
當執行緒呼叫 acquireShared(intarg)獲取共享資源時,會首先使用 trγAcq山reShared 嘗試獲取資源 , 具體是設定狀態變數 state 的 值,成功則 直接返 回,失敗則將當前線 程 封 裝為型別為 Node.SHARED 的 Node 節 點后插入 到 AQS 阻 塞 佇列的尾部,并使用 LockSupport.park(this) 方法掛起自己,
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
releaseShared
當一個執行緒呼叫 releaseShared(int a電)時會嘗試使用 tryReleaseShared 操作釋放資 源,這里是設定狀態變數 state 的值,然后使用 LockSupport.unpark (thread)激活 AQS 隊 列里面被阻塞的一個執行緒 (thread),被激活的執行緒則使用 tryReleaseShared查看當前狀態變 量 state 的值是否能 滿足自 己的 需要,滿足 則 該執行緒被撤活,然后繼續向下運行,否則還 是會被放入 AQS 佇列并被掛起 ,
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
@ReservedStackAccess
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
下面就來談談基于AQS具體的鎖實作ReentrantLock
ReentrantLock
如果對AQS十分了解的話,就能快速了解ReentrantLock的思想,AQS獨占模式一種具體實作,
在看檔案時,Serialization of this class behaves in the same way as built-in locks: a deserialized lock is in the unlocked state, regardless of its state when serialized.鎖和序列化的關系,為什么ReentrantLock要實作序列化介面?還有AQS僅僅在https://stackoverflow.com/questions/17979009/why-locks-are-serializable-in-java找到了一點答案,之后在研究…
類圖

/** Synchronizer providing all implementation mechanics */
private final Sync sync; //
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
// 默認nonfair
}
nonfair or fair
默認非公平鎖,搶奪策略,新的執行緒可以直接搶占,可能不需要阻塞、排隊、喚醒,也就減少了背景關系的切換,吞吐量更高,
lock()對比
/**
* nonfair
* Performs lock. Try immediate barge, backing up to normal 立即進行cas搶占鎖,否則正常流程
* acquire on failure.
*/
@ReservedStackAccess // This lock supports a maximum of 2147483647 recursive locks by the same thread. 增加額外堆疊空間,但也有可能延遲拋出StackOverflowError
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* fair 正常流程
*/
final void lock() {
acquire(1);
}
tryAcquire(int acquires)對比
/**
* nonfair
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 直接嘗試cas
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 當前執行緒和當前已獲得鎖的執行緒是同一個 可重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // no waiters or is first才會嘗試cas
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantReadWritelock
解決執行緒安全問題使用ReentrantLock就可以,但是ReentrantLock是獨占鎖,某時只有一個執行緒可以獲取該鎖,而實際中會有寫少讀多的場景,顯然ReentrantLock滿足不了這個需求,所以ReentrantReadWriteLock應運而生,ReentrantReadWriteLock采用讀寫分離的策略,允許多個執行緒可以同時獲取讀鎖**,AQS的獨占和共享模式結合使用**,支持鎖降級(寫鎖降級到讀鎖,不支持升級),
state狀態
之前講AQS也提過,用state的高16位表示讀狀態,也就是獲取到讀鎖的次數;使用低16位表示獲取到寫鎖的執行緒的可重入次數,
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
nonfair or fair
在這兩個方法的區分是否公平
/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();
/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();
寫鎖的獲取
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 獲取寫鎖可重入次數
if (c != 0) { // 有鎖
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 讀鎖或當前執行緒是不是該寫鎖的持有者
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷邊界
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() || // 非公平 writerShouldBlock false,公平 判斷當前執行緒節點是否有前驅節點
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
讀鎖獲取
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 重入次數記錄
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 重試
return fullTryAcquireShared(current);
}
StampedLock
未完待遇…
2022年愿你成為光!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/400455.html
標籤:其他
上一篇:《Python入門到精通》函式
