文章目錄
- 一、java.util.concurrent.locks.Lock介面
- 二、公平鎖介紹
- 簡單使用
ReentrantLock是juc中的一個可重入鎖獨占鎖,該鎖分為公平鎖和非公平鎖,獨占鎖是這個鎖一旦被一個執行緒占據其他執行緒就不能搶占,可重入鎖是如果這個執行緒重復獲取這個鎖不需要進行等待,直接進入,公平鎖和非公平鎖是剛進入的執行緒是否可以和在佇列中等待的第一個執行緒進行競爭,如果是非公平鎖則剛進入的執行緒在當前鎖空閑的時候可以和等待佇列中的第一個執行緒進行競爭,如果是公平鎖則不能,剛進入的執行緒必須進入佇列等待,
一、java.util.concurrent.locks.Lock介面
public class ReentrantLock implements Lock, java.io.Serializable {
.....
}
從原始碼中我們可以看到ReentrantLock 實作了Lock介面,Lock介面是juc中定義的一個鎖的介面,
public interface Lock {
// 獲取鎖
void lock();
// 獲取鎖的程序能回應中斷
void lockInterruptibly() throws InterruptedException;
// 非阻塞式回應中斷能立即回傳,獲取鎖回傳true反之回傳false
boolean tryLock();
// 超時獲取鎖,在超時內或者未中斷的情況下能獲取鎖
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
// 獲取與lock系結的等待通知組件,當前執行緒必須獲得了鎖才能進行等待,進行等待時會先釋放鎖,當再次獲取鎖時才能從等待中回傳
Condition newCondition();
}
進行加鎖主要有lock()、tryLock()、lockInterruptibly()這三個方法,tryLock()方法只是嘗試進行獲取當前鎖,獲取到了則回傳true,獲取不到回傳false,不會直接讓執行緒加入到等待佇列中進行阻塞等待,lock()方法則會直接讓執行緒阻塞等待,使用unlock才能喚醒執行緒,lockInterruptibly()和lock()方法主要的區別在于對例外的處理,
二、公平鎖介紹
從原始碼中我們可以看出ReentrantLock使用了配接器設計模式,ReentrantLock繼承了lock方法,同時擁有Sync類,Lock的方法都是大部分呼叫Sync類實作的,Sync類繼承了AbstractQueuedSynchronizer類,該類是佇列同步器會將等待的執行緒加入到佇列的末端等待喚醒,通過公平鎖的例子,講解一下lock幾個主要的方法,
// ReentrantLock的lock方法呼叫了sync的lock方法
public void lock() {
sync.lock();
}
// 公平鎖類
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// ReentrantLock公平鎖模式下lock呼叫的方法
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
// 獲取當前呼叫獲取鎖方法的執行緒
final Thread current = Thread.currentThread();
// 獲取當前鎖的狀態,如果c=0表示還沒有被解鎖,可以進行競爭獲取鎖
int c = getState();
// 表明重入鎖沒有被搶占
if (c == 0) {
// 等待物件中沒有其他需要處理的 就cas將鎖的狀態設定為acquires,公平鎖與非公平鎖主要的區別在于是否是要要讓佇列等待的先進行執行,
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 把獨占執行緒設定為自己
setExclusiveOwnerThread(current);
return true;
}
}
// 走到這里表明鎖被鎖住了
// 如果鎖的獨占執行緒是自己,則直接重入
else if (current == getExclusiveOwnerThread()) {
// 記錄鎖被重入的次數,在釋放鎖的時候判斷是否滿足釋放條件
int nextc = c + acquires;
// 出現例外
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因為當前鎖被獨占了所有其他執行緒是無法獲取這個鎖的,state的狀態只能被自己修改,所以不需要處理并發的情況,
setState(nextc);
// 獲取鎖成功
return true;
}
// 獲取鎖失敗
return false;
}
}
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
// 同步狀態不允許獲取
// 如果不在佇列,當前執行緒放入佇列
// 可能會阻塞當前前程
// 如果當前執行緒在佇列,將其移出佇列
public final void acquire(int arg) {
// 先嘗試tryAcquire方法獲取鎖,如果失敗了不進行阻塞直接回傳,呼叫acquireQueued將當前執行緒封裝成一個節點并進行阻塞,加入到等待佇列中,
// Node.EXCLUSIVE
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 將當前的執行緒封裝成一個Node,在ReentrantLock鎖是獨占模式,所以是Node.EXCLUSIVE,
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 獲取等待佇列中的最后一個節點
Node pred = tail;
if (pred != null) {
// 走到這里表明tail節點存在
// 將當前node的prev節點設定為tail節點
node.prev = pred;
// cas將tail節點設定為新的節點,如果設定成功表明tail節點沒有被修改過,如果失敗表明其他執行緒提前修改的tail節點,出現了沖突,
if (compareAndSetTail(pred, node)) {
// 如果沒有出現沖突,表明當前節點添加節點到末尾成功
pred.next = node;
return node;
}
}
// tail節點被其他執行緒修改時走到這里
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 當tail節點不存在時或者tail節點cas替換失敗時呼叫這個方法
private Node enq(final Node node) {
for (;;) {
// 獲取佇列的末尾節點
Node t = tail;
// 如果tail節點不存在表明這個佇列還沒有進行初始化或者已經被清空了
if (t == null) { // Must initialize
// cas初始化了一個佇列,防止多個執行緒進行初始化出現沖突
if (compareAndSetHead(new Node()))
// 初始化佇列
tail = head;
} else {
// 如果佇列存在則不斷呼叫這個方法進行cas替換,知道成功插入到佇列末尾,
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
// cas搶占鎖的時候失敗,將執行緒插入到等待佇列中
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取當前節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點就是head,表明當前執行緒可以嘗試競爭鎖
if (p == head && tryAcquire(arg)) {
// 走到這里表明當前node競爭到了鎖
// 將當前節點設定為head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是head,或者競爭鎖失敗了,呼叫shouldParkAfterFailedAcquire讓前一個節點完事兒了把自己喚醒,同時進入等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前一個node的waitStatus
int ws = pred.waitStatus;
// 如果已經是SIGNAL表明前一個節點一直知道要通知自己了,可以放心進行睡眠等待了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 走到這里表明前面一個節點不為SIGNAL
// 下面的方法是尋找一個非cancel節點,把它設定為signal用來喚醒自己
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 把前一個節點設定為signal用來喚醒自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
// 阻塞當前執行緒
private final boolean parkAndCheckInterrupt() {
// 呼叫LockSupprt方法將當前執行緒進行阻塞
LockSupport.park(this);
return Thread.interrupted();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// 更新同步狀態
// 同步狀態是否允許被阻塞的執行緒獲取
public final boolean release(int arg) {
// 1. 釋放鎖
if (tryRelease(arg)) {
// 2. 如果獨占鎖釋放"完全",喚醒后繼節點
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果重入次數為零表示可以進行釋放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 獲取最后一個沒有被cancel的節點
s = t;
}
if (s != null)
// 呼叫LockSupport.unpark釋放下一個被阻塞且沒有被取消的執行緒來競爭鎖
LockSupport.unpark(s.thread);
}
簡單使用
public class Main {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread thread1 = new Thread(()->{
System.out.println("thread1 wait lock");
lock.lock();
System.out.println("thread1 get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("thread1 unlock");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 wait lock");
lock.lock();
System.out.println("thread2 get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("thread2 unlock");
});
thread1.start();
thread2.start();
}
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/277641.html
標籤:java
