java并發編程之Lock鎖原理
- 概述
- 鎖的型別
- 使用場景
- Lock鎖原理詳解
- 總結
事先宣告:本文為原創文章,禁止直接轉載和抄襲,若要轉載需經過本人同意
概述
java鎖機制是一個老生常談的問題了,那么在接觸到鎖的時候有沒有想過什么情況下需要使用鎖?鎖的原理又是什么?接下來,筆者就這兩個問題展開對鎖的解述,
鎖的型別
常用的鎖分為以下幾類:
1、資料庫鎖
2、java內置鎖
3、分布式鎖
本篇文章筆者將對java內置鎖–Lock鎖做深入分析,其他兩個鎖筆者會陸續做出分享,
使用場景
首先,需要加鎖的資源一定是臨界資源,所謂臨界資源就是在多執行緒的情況下,各個執行緒會進行搶占的資源,下面以一段代碼來說明:
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class LockTest {
private int count = 500;
@Test
public void unLockTest() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 500; i++){
threadPool.submit(() -> {
count--;
});
}
Thread.sleep(5000);
System.out.println(count);
}
}
結果:2
代碼很簡單,創建了個核心執行緒數為10的執行緒池(這里需要注意的是在真實開發中禁止使用這種方式創建執行緒池),然后創建了500個任務,把這五百個任務里丟到執行緒池去處理,每個任務會對count進行減一,理想情況下,count應該為0,但實際count最后為2,其原因就是因為在多執行緒環境下,各個執行緒對count發生了資源的爭奪,導致了資料的不安全性,其中,count就是臨界資源,多執行緒就是我們常說的并發環境,
下面對代碼進行修改:
private int count = 500;
@Test
public void lockTest() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Lock lock = new ReentrantLock();
for (int i = 0; i < 500; i++){
threadPool.submit(() -> {
lock.lock();
count--;
lock.unlock();
});
}
Thread.sleep(5000);
System.out.println(count);
}
結果:0
代碼整體沒有太大變化,只是在count–前后做了加鎖和減鎖操作,最后無論代碼運行多少次,結果都是0(需要注意的是這里忽略了可見性),沒有出現未加鎖時的少減情況,這就是鎖的使用場景,無論是資料庫鎖、java內置鎖還是分布式鎖,他們的使用場景都大同小異,使用鎖的目的就是為了控制臨界資源的安全性,
Lock鎖原理詳解

如圖,Lock只是一個介面,它有很多實作類,本文著重講ReentrantLock,
ReetrantLock
在使用Lock前,先要new一個Lock出來,先來分析new ReentrantLock()做了什么,代碼如下:
//初始化Lock
Lock lock = new ReentrantLock();
//部分ReentrantLock()原始碼
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
}
原始碼1-1
如上原始碼所示,在初始化Lock時,會給ReentrantLock的成員變數sync賦值NonfairSync的實體,先來看看Sync是什么,代碼如下:
//部分Sync原始碼
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
}
原始碼1-2
Sync是ReentrantLock的抽象內部類并且它實作了AQS抽象類,這里注意這個lock方法,該方法是一個抽象方法,Sync有兩個實作類,分別是NonfairSync以及FairSync(其實這兩個實作類就是Lock鎖的公平鎖與非公平鎖機制),看到這里就可以結合上面的原始碼串起來了,在使用無參的ReentrantLock創建Lock實體時,默認使用的是NonfairSync非公平鎖機制,
lock方法
Lock鎖加鎖的方法是lock方法,下面來分析lock原始碼:
//部分ReentrantLock()原始碼
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
}
原始碼2-1
如上原始碼所示,Lock類的lock方法實際呼叫的是其子類ReentrantLock的lock方法,而ReentrantLock中的lock方法則又呼叫了Sync的lock方法,而在“原始碼1-1”的時候我就已經知道sync的默認值為NonfairSync實體,所以這里的lock方法最侄訓呼叫到NonfairSync中的lock方法,
NonfairSync原始碼分析:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
原始碼3-1
先來看lock方法,首先會呼叫compareAndSetState(0, 1)方法,該方法是一個CAS操作,其作用就是修改state值為1,修改成功則回傳true,修改失敗則回傳false,若是修改成功,則會執行setExclusiveOwnerThread(Thread.currentThread()),該方法的作用是修改exclusiveOwnerThread為當前執行緒id,下面先說明status和exclusiveOwnerThread這兩個變數是什么,看原始碼:
//部分AQS原始碼
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
//CLH佇列頭結點
private transient volatile Node head;
//CLH佇列尾結點
private transient volatile Node tail;
//鎖標識state變數
private volatile int state;
}
原始碼3-2
//AQS父類原始碼
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
//獨占執行緒id變數
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
原始碼3-3
在看完原始碼3-2和原始碼3-3后已經可以得知變數state和exclusiveOwnerThread的來源,那么在講明這兩個變數的作用,在Lock鎖中,一個執行緒想要獲取鎖,那么需要滿足一下兩個條件之一:
條件一:該執行緒可以成功的將state值從0改為1
條件二:若state值不為1時,則檢測持有鎖的執行緒是否為當前執行緒,也就是exclusiveOwnerThread的值是否為當前執行緒id,若是則將state值加1,這就是重入鎖,
看到這里對原始碼3-1的if邏輯也有了認知,那么接下來講原始碼3-1的else邏輯,也就是acquire(1)方法,原始碼如下:
acquire方法
//AQS部分原始碼
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
原始碼4-1
該方法中主要呼叫了3個方法,分別是if判斷中的tryAcquire以及acquireQueued方法和if體中的selfInterrupt方法,首先分析tryAcquire方法,原始碼如下:
tryAcquire原始碼
//NonFairSync部分原始碼
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//Sync部分原始碼
abstract static class Sync extends AbstractQueuedSynchronizer {
//nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
//獲取當前執行緒id
final Thread current = Thread.currentThread();
//獲取鎖標識
int c = getState();
//若鎖標識為0,則當前執行緒嘗試去獲取鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
//當前執行緒獲取鎖成功,回傳true
return true;
}
}
//如果持有鎖執行緒id和當前執行緒id相等,則進行重入鎖
else if (current == getExclusiveOwnerThread()) {
//鎖標識加1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//設定鎖標識state的值為nextc
setState(nextc);
//當前執行緒獲取鎖成功,回傳true
return true;
}
//當前執行緒獲取鎖失敗,回傳false
return false;
}
原始碼4-2
如上原始碼所示,tryAcquire最終呼叫Sync中的nonfairTryAcquire方法,該方法主要邏輯是當前執行緒去獲取鎖,如果能夠獲取到則回傳true,若不能獲取到則回傳false,具體細節看注釋,接下來繼續看acquireQueued方法原始碼,首先在原始碼4-1中可以看到該方法的引數是addWaiter(Node.EXCLUSIVE), arg),這里先看addWaiter(Node.EXCLUSIVE), arg)方法原始碼,如下:
addWaiter原始碼
//AQS部分原始碼
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
//CLH佇列頭結點
private transient volatile Node head;
//CLH佇列尾結點
private transient volatile Node tail;
//鎖標識state變數
private volatile int state;
//addWaiter方法
private Node addWaiter(Node mode) {
//將當前執行緒包裝成一個Node物件
Node node = new Node(Thread.currentThread(), mode);
//獲取CLH佇列的尾結點
Node pred = tail;
//如果尾結點不為空,則說明整個CLH佇列不為空,那么就將隊尾結點設定為當前結點
if (pred != null) {
node.prev = pred;
//cas操作將隊尾結點設定為當前node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若呼叫到這里,則說明上述cas操作失敗或CLH佇列為空
enq(node);
return node;
}
//enq方法
private Node enq(final Node node) {
//cas自旋操作
for (;;) {
//獲取尾結點
Node t = tail;
//若尾結點為空,說明CLH佇列為空,則初始化CLH佇列
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//若尾結點不為空,則cas操作將隊尾結點設定為當前node(注意這里使用了自旋,所以這個替換最終是會成功的)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}
//Node原始碼
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//生命狀態1,表明該結點需要移除
static final int CANCELLED = 1;
//生命狀態-1,等待觸發狀態
static final int SIGNAL = -1;
//生命狀態-2,表明該節點在條件佇列
static final int CONDITION = -2;
//生命狀態-3,表明是共享的,需要向后傳播
static final int PROPAGATE = -3;
//生命狀態
volatile int waitStatus;
//前驅結點
volatile Node prev;
//后繼結點
volatile Node next;
//當前結點對應執行緒
volatile Thread thread;
Node nextWaiter;
}
原始碼4-3
以上就是addWaiter方法原始碼,其作用就是將沒獲取到鎖的執行緒包裝成一個node結點,然后將CLH佇列的尾結點設定為該node結點,這里需注意Node類中的的變數,尤其是waitStatus,具體細節看注釋,接下來繼續看acquireQueued方法原始碼,如下:
acquireQueued原始碼
//AQS部分原始碼
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
//該變數在lock方法中的呼叫鏈中無實際作用
boolean failed = true;
try {
//設定執行緒中斷信號為false
boolean interrupted = false;
//cas自旋
for (;;) {
//獲取當前node的前一個結點
final Node p = node.predecessor();
//若前一個結點是頭結點,則去獲取鎖,若獲取鎖成功,則將當前node設定為頭結點
//(在setHead中會將當前node的前驅結點以及對應的執行緒置為null),原來的head指向空(指向空,在gc時就是回收原來的head)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//回傳中斷信號
return interrupted;
}
//該if判斷中主要是設定了當前node的前驅節點的生命狀態(waitStatus)以及阻塞當前執行緒操作
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//回傳中斷信號
interrupted = true;
}
} finally {
//將當前node從CLH佇列中剔除掉,在lock方法的呼叫鏈中無實際作用
if (failed)
cancelAcquire(node);
}
}
}
原始碼4-4
以上就是acquireQueued方法原始碼,該方法的作用就是執行緒嘗試獲取鎖,若獲取不到則進行執行緒阻塞操作,其中在if判斷中呼叫了兩個方法:shouldParkAfterFailedAcquire和parkAndCheckInterrupt(該方法阻塞執行緒),這里需要注意的是提到了結點的生命狀態(waitStatus),接下來繼續看shouldParkAfterFailedAcquire方法原始碼,如下:
shouldParkAfterFailedAcquireuy原始碼
//AQS部分原始碼
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取當前結點前驅結點的生命狀態
int ws = pred.waitStatus;
//如果前驅節點的生命狀態為SIGNAL,則回傳true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果前驅節點的生命狀態大于0,則為取消狀態,進行結點移除
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//將前驅節點的waitStaus設定為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
原始碼4-5
以上就是shouldParkAfterFailedAcquire方法原始碼,該方法就是將當前節點的前驅結點waiteStatus設定為-1,那么為什么設定為-1呢?在CLH佇列中,只有當前結點的waitStatus為-1,那么當前結點對應執行緒在釋放鎖時才會喚醒下一個結點,接下來繼續看parkAndCheckInterrupt原始碼:
parkAndCheckInterrupt原始碼
//AQS部分原始碼
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private final boolean parkAndCheckInterrupt() {
//阻塞執行緒
LockSupport.park(this);
//回傳執行緒的中斷狀態
return Thread.interrupted();
}
}
以上就是parkAndCheckInterrupt方法原始碼,該方法主要方法是LockSupport.park(this),此方法會呼叫Unsafe類中的park方法繼而呼叫到作業系統的函式,最終阻塞當前執行緒,
以上就是Lock鎖的lock方法原理,接下來繼續講unLock方法,
unLock原始碼
//部分ReentrantLock原始碼
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
//unLock方法
public void unlock() {
sync.release(1);
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
//release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒CLH佇列第一個執行緒
unparkSuccessor(h);
return true;
}
return false;
}
//unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒指定執行緒id的執行緒
LockSupport.unpark(s.thread);
}
以上就是unLock方法的呼叫鏈以及原始碼,其核心方法LockSupport.unpark(s.thread)會繼續呼叫Unsafe類的unpark方法,該方法會呼叫到作業系統的函式繼而去喚醒指定執行緒,
總結
到這里,Lock鎖的地原理和機制已經全部梳理完了,不過還有一些細節,比如中斷信號,cas原子操作沒有詳細講,但這些不影整體的邏輯梳理,若是對文章中的一些點有疑惑,可在評論區留言或私信我,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/279545.html
標籤:其他
