ReentrantReadWriteLock讀寫鎖原始碼分析
讀寫狀態的設計
ReentrantReadWriteLock也是通過自定義AQS(抽象佇列同步器)實作,同步器內部只有一個狀態,而讀寫鎖需要維護兩個狀態:讀狀態與寫狀態,
ReentrantReadWriteLock將同步器內部的狀態state按位進行拆分:高16位代表讀狀態,低16位代表寫狀態,
java.util.concurrent.locks.ReentrantReadWriteLock.Sync
static final int SHARED_SHIFT = 16; // 讀鎖偏移的位數
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 每多獲取一次讀鎖,state=state+SHARED_UNIT,相當于左移一位
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 寫鎖可重入的最大次數、讀鎖最大可獲取次數(不等于獲取讀鎖的執行緒數,一個執行緒可能獲取多次)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 寫鎖的掩碼,用于計算寫鎖的值
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 讀鎖計數
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 寫鎖的計數,也就是它的重入次數
舉個例子來說明一下state加鎖時值的變化:
- 假如現在state=65537:
- 讀鎖計數:將65537無符號右移16位,獲得讀鎖的計數為1,
- 寫鎖計數:將65537的高16位全部置為0,也就是65537&65535=1,獲取寫鎖的計數為1,
t1和t2同時加讀鎖
讀讀共享并發,
package com.morris.concurrent.lock.reentrantreadwritelock.trace;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 兩個執行緒加讀鎖
*/
@Slf4j
public class ReadReadDemo {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
public static void main(String[] args) {
new Thread(ReadReadDemo::read, "t1").start();
new Thread(ReadReadDemo::read, "t2").start();
}
private static void read() {
readLock.lock();
try {
log.info(Thread.currentThread().getName() + " get readLock");
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
}

t1讀鎖的重入
package com.morris.concurrent.lock.reentrantreadwritelock.trace;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 讀鎖的重入
*/
@Slf4j
public class ReadLockDemo2 {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
readLock.lock();
try {
log.info(Thread.currentThread().getName() + " get readLock");
readLock.lock();
try {
log.info(Thread.currentThread().getName() + " get readLock");
} finally {
readLock.unlock();
}
} finally {
readLock.unlock();
}
}, "t1").start();
TimeUnit.SECONDS.sleep(1);
}
}

t1寫鎖的重入
package com.morris.concurrent.lock.reentrantreadwritelock.trace;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class Write2Demo {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
public static void main(String[] args) {
new Thread(() -> {
writeLock.lock();
try {
writeLock.lock();
try {
} finally {
writeLock.unlock();
}
} finally {
writeLock.unlock();
}
}, "t1").start();
}
}

t1寫鎖的降級
鎖降級指的是寫鎖降級成為讀鎖,鎖降級是指當前執行緒持有寫鎖,然后再去獲取到讀鎖,隨后釋放之前擁有的寫鎖的程序,如果當前執行緒擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的程序不能稱之為鎖降級,
為什么可以降級?因為當執行緒持有寫鎖的時候沒有其它執行緒可以獲取讀寫鎖,因此再獲取讀鎖是安全的,而此時再釋放寫鎖就會降級為讀鎖,其它執行緒也可以獲得讀鎖,
為什么不可以升級?因為這會導致死鎖,假設有A、B、C三個執行緒,它們都已持有讀鎖,假設執行緒A嘗試從讀鎖升級到寫鎖,那么它必須等待B和C釋放掉已經獲取到的讀鎖,如果隨著時間推移,B和C逐漸釋放了它們的讀鎖,此時執行緒 A確實是可以成功升級并獲取寫鎖,但是我們考慮一種特殊情況,假設執行緒A和B都想升級到寫鎖,那么對于執行緒A而言,它需要等待其他所有執行緒,包括執行緒B在內釋放讀鎖,而執行緒B也需要等待所有的執行緒,包括執行緒A釋放讀鎖,這就是一種非常典型的死鎖的情況,誰都愿不愿意率先釋放掉自己手中的鎖,
package com.morris.concurrent.lock.reentrantreadwritelock.trace;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class WriteReadDemo {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
public static void main(String[] args) {
new Thread(() -> {
writeLock.lock();
try {
readLock.lock();
try {
} finally {
readLock.unlock();
}
} finally {
writeLock.unlock();
}
}, "t1").start();
}
}

執行緒重入計數器
AQS的同步狀態的高16位只能記錄當前持有讀鎖的次數,那么每個執行緒對讀鎖的重入次數怎么記錄呢?使用HoldCounter,
static final class HoldCounter {
int count = 0; // 讀鎖的重入次數
final long tid = getThreadId(Thread.currentThread()); // 獲取讀鎖的執行緒id
}
//使用ThreadLocal保證每個執行緒都有一份HoldCounter,執行緒安全
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds; // 保存所有執行緒的HoldCounter,HoldCounter里面存的每個執行緒重入讀鎖的次數
private transient HoldCounter cachedHoldCounter; // 快取當前執行緒的HoldCounter
private transient Thread firstReader = null; // 記錄第一個獲取讀鎖的執行緒,不會放入readHolds
private transient int firstReaderHoldCount; // 記錄第一個獲取讀鎖的執行緒持有讀鎖的次數,不會放入readHolds
Sync() {
readHolds = new ThreadLocalHoldCounter(); // 初始化ThreadLocal
setState(getState()); // ensures visibility of readHolds
}
寫鎖的獲取
寫鎖的邏輯主要在java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire,其他邏輯跟ReentrantLock差不多,
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 獲取寫鎖的次數
// c!=0 說明存在鎖
if (c != 0) {
// c != 0 && w == 0 說明存在寫鎖
// current != getExclusiveOwnerThread() 表示當前執行緒不是獲取鎖的執行緒,不是重入,其他執行緒獲取了寫鎖,所以回傳false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 鎖的重入
setState(c + acquires);
return true;
}
// writerShouldBlock()由子類實作是否要公平
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
寫鎖的釋放
寫鎖的邏輯主要在java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0; // 判斷寫鎖的次數是否為0,其他與獨占鎖的釋放邏輯一致
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
讀鎖的獲取
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
public void lock() {
sync.acquireShared(1);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread(); // 獲取當前執行緒
int c = getState(); // 獲取鎖的狀態
// exclusiveCount(c) != 0 有執行緒獲取寫鎖
// getExclusiveOwnerThread() != current // 獲取寫鎖的執行緒不是自己
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 到這里有兩種情況
// 1. 沒有執行緒獲取寫鎖
// 2. 有執行緒獲取了寫鎖,并且獲取寫鎖的執行緒是自己,也就寫鎖降級為讀鎖
int r = sharedCount(c); // 獲取讀鎖的次數
if (!readerShouldBlock() && // 處理公平與非公平
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 進入這里說明獲取到了讀鎖
if (r == 0) { // 說明沒有執行緒獲取讀鎖
firstReader = current; // 記錄第一個獲取讀鎖的執行緒
firstReaderHoldCount = 1; // 記錄第一個獲取讀鎖的執行緒的重入次數
} else if (firstReader == current) {
firstReaderHoldCount++; // 第一個獲取讀鎖的執行緒的重入次數+1
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get(); // 快取當前執行緒的HoldCounter
else if (rh.count == 0)
readHolds.set(rh); // 這里只有最后一個獲取鎖的的執行緒先持有了鎖,然后釋放了,再獲取才會進入(此時前面還有個執行緒持有了鎖)
rh.count++; // 重入次數+1
}
return 1;
}
return fullTryAcquireShared(current);
}
// 快速嘗試獲取讀鎖失敗,則改為自旋獲取,與上面的邏輯大同小異
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
為了提高讀鎖的效率,會記錄第一個獲取鎖的執行緒及它的重入次數,也會記錄最后一個持有鎖的執行緒的重入次數,而其他持有鎖的執行緒的重入次數會存到每個執行緒對應的ThreadLocal中,
當上面嘗試獲取讀鎖失敗后,后進入到佇列然后開始休眠:
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 加入到同步佇列
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 頭節點嘗試獲取鎖
if (r >= 0) {
setHeadAndPropagate(node, r); // 當前執行緒獲取到讀鎖后會喚醒下一個獲取讀鎖的執行緒
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 休眠
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 當前執行緒對應的節點會成為頭節點,以前的頭結點會被GC回收
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 下一個節點是讀
doReleaseShared(); // 喚醒
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 喚醒
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
讀鎖的釋放
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock
public void unlock() {
sync.releaseShared(1);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { 只有讀鎖全部都被釋放了才會回傳true
doReleaseShared(); // 喚醒獲取寫鎖的執行緒
return true;
}
return false;
}
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove(); // ThreadLocal用完要洗掉,避免記憶體泄漏
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 獲取讀鎖的次數-1
return nextc == 0; // 沒有執行緒持了鎖的
}
}
公平的實作
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
writerShouldBlock和readerShouldBlock方法都表示當有別的執行緒也在嘗試獲取鎖時,是否應該阻塞,
對于公平模式,hasQueuedPredecessors()方法表示前面是否有等待執行緒,一旦前面有等待執行緒,那么為了遵循公平,當前執行緒也就應該被加入同步佇列,然后阻塞,
非公平的實作
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // 寫鎖一直能夠被獲取
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// 如果同步佇列中的第一個節點獲取寫鎖,則讀鎖不能獲取成功,為了避免寫鎖的饑餓
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/131745.html
標籤:AI
上一篇:Java將檔案或者檔案夾壓縮成zip(修復檔案夾中存在多個檔案報Stream Closed錯誤問題)
下一篇:全域添加日志-自定義注解
