文章目錄
- 簡介
- 分析
- 解決
- 一個變數
- CAS
- 一個佇列
- 加鎖
- 解鎖
- 測驗
- 總代碼
- 總結
- 問題
簡介
學習同步器之前,通過自己動手寫一個鎖,能更好地理解AQS及各種同步器實作的原理,
分析
自己動手寫一個鎖需要準備些什么呢?
synchronized說過它的實作原理是更改物件頭中的MarkWord,標記為已加鎖或未加鎖,但是,我們自己是無法修改物件頭資訊的,那么我們可不可以用一個變數來代替呢?
比如,這個變數的值為1的時候就說明已加鎖,變數值為0的時候就說明未加鎖,其次,我們要保證多個執行緒對上面我們定義的變數的爭用是可控的,所謂可控即同時只能有一個執行緒把它的值修改為1,且當它的值為1的時候其它執行緒不能再修改它的值,這種是不是就是典型的CAS操作,所以我們需要使用Unsafe這個類來做CAS操作,
然后,我們知道在多執行緒的環境下,多個執行緒對同一個鎖的爭用肯定只有一個能成功,那么,其它的執行緒就要排隊,所以我們還需要一個佇列,
最后,這些執行緒排隊的時候干嘛呢?它們不能再繼續執行自己的程式,那就只能阻塞了,阻塞完了當輪到這個執行緒的時候還要喚醒,所以我們還需要Unsfae這個類來阻塞(park)和喚醒(unpark)執行緒,
基于以上四點,我們需要的:一個變數、一個佇列、執行CAS/park/unpark的Unsafe類,
大概的流程圖如下圖所示:

解決
一個變數
這個變數只支持同時只有一個執行緒能把它修改為1,所以它修改完了一定要讓其它執行緒可見,因此,這個變數需要使用volatile來修飾,
private volatile int state;
CAS
這個變數的修改必須是原子操作,所以我們需要CAS更新它,我們這里使用Unsafe來直接CAS更新int型別的state,當然,這個變數如果直接使用AtomicInteger也是可以的
private boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
一個佇列
佇列的實作有很多,陣列、鏈表都可以,我們這里采用鏈表,畢竟鏈表實作佇列相對簡單一些,不用考慮擴容等問題,
這個佇列:放元素的時候都是放到尾部,且可能是多個執行緒一起放,所以對尾部的操作要CAS更新;喚醒一個元素的時候從頭部開始,但同時只有一個執行緒在操作,即獲得了鎖的那個執行緒,所以對頭部的操作不需要CAS去更新,
private static class Node {
// 存盤的元素為執行緒
Thread thread;
// 前一個節點(可以沒有,但實作起來很困難)
Node prev;
// 后一個節點
Node next;
public Node() {
}
public Node(Thread thread, Node prev) {
this.thread = thread;
this.prev = prev;
}
}
// 鏈表頭
private volatile Node head;
// 鏈表尾
private volatile Node tail;
// 原子更新tail欄位
private boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
這個佇列很簡單,存盤的元素是執行緒,需要有指向下一個待喚醒的節點,前一個節點可有可無,但是沒有實作起來很困難
加鎖
-
park:將當前執行緒掛起,unpark:精準的喚醒某個執行緒,
-
park的引數,表示掛起的到期時間,第一個如果是true,表示絕對時間,則var2為絕對時間值,單位是毫秒,第一個引數如果是false,表示相對時間,則var2為相對時間值,單位是納秒,
-
unpark的引數,表示執行緒,
簡單示例:
park(false,0) // 表示永不到期,一直掛起,直至被喚醒
long time = System.currentTimeMillis()+3000;
park(true,time + 3000) // 表示3秒后自動喚醒
park(false,3000000000L) // 表示3秒后自動喚醒
加鎖
public void lock() {
// 嘗試更新state欄位,更新成功說明占有了鎖
if (compareAndSetState(0, 1)) {
return;
}
// 未更新成功則入隊
Node node = enqueue();
Node prev = node.prev;
// 再次嘗試獲取鎖,需要檢測上一個節點是不是head,按入隊順序加鎖
while (node.prev != head || !compareAndSetState(0, 1)) {
// 未獲取到鎖,阻塞
unsafe.park(false, 0L);
}
// 下面不需要原子更新,因為同時只有一個執行緒訪問到這里
// 獲取到鎖了且上一個節點是head
// head后移一位
head = node;
// 清空當前節點的內容,協助GC
node.thread = null;
// 將上一個節點從鏈表中剔除,協助GC
node.prev = null;
prev.next = null;
}
// 入隊
private Node enqueue() {
while (true) {
// 獲取尾節點
Node t = tail;
// 構造新節點
Node node = new Node(Thread.currentThread(), t);
// 不斷嘗試原子更新尾節點
if (compareAndSetTail(t, node)) {
// 更新尾節點成功了,讓原尾節點的next指標指向當前節點
t.next = node;
return node;
}
}
}
(1)嘗試獲取鎖,成功了就直接回傳;
(2)未獲取到鎖,就進入佇列排隊;
(3)入隊之后,再次嘗試獲取鎖(但是必須是頭結點的后一個結點);
(4)如果不成功,就阻塞;
(5)如果成功了,就把頭節點后移一位,并清空當前節點的內容,且與上一個節點斷絕關系;
(6)加鎖結束;
解鎖
// 解鎖
public void unlock() {
// 把state更新成0,這里不需要原子更新,因為同時只有一個執行緒訪問到這里
state = 0;
// 下一個待喚醒的節點
Node next = head.next;
// 下一個節點不為空,就喚醒它
if (next != null)
unsafe.unpark(next.thread);
}
}
(1)把state改成0,這里不需要CAS更新,因為現在還在加鎖中,只有一個執行緒去更新,在這句之后就釋放了鎖;
(2)如果有下一個節點就喚醒它;
(3)喚醒之后就會接著走上面lock()方法的while回圈再去嘗試獲取鎖;
(4)喚醒的執行緒不是百分之百能獲取到鎖的,因為這里state更新成0的時候就解鎖了,之后可能就有執行緒去嘗試加鎖了,
測驗
static int count = 0;
public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
for (int i1 = 0; i1 < 100000; i1++) {
count++;
}
lock.unlock();
}
}).start();
}
Thread.sleep(3000);
System.out.println(count);
}
運行這段代碼的結果是總是列印出10000000(一千萬),說明我們的鎖很nice
總代碼
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class Demo16 {
static int count = 0;
public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
for (int i = 0; i < 50; i++) {
new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
for (int i1 = 0; i1 < 10000; i1++) {
count++;
}
lock.unlock();
}
}).start();
}
Thread.sleep(3000);
System.out.println(count);
}
}
// 鏈表結點
class Node {
public Thread thread;
public Node prev; // 前結點
public Node next; // 后結點
public Node(){}
public Node(Thread thread){
this.thread = thread;
}
}
class MyLock{
// o表示沒有鎖
private volatile Node head; // 頭結點
private volatile Node tail; // 尾結點
private static long tailOffset ; // 尾結點偏移量
private volatile int state = 0; // 可見的鎖標志
private static long stateOffset ; // 鎖標志的偏移量
private static Unsafe unsafe = null;
// 初始化頭尾結點
public MyLock(){
head=tail=new Node();
}
static {
Field theUnsafe = null;
try {
theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe= (Unsafe)theUnsafe.get(null);
stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
tailOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("tail"));
} catch (Exception e) {
e.printStackTrace();
}
}
// 入隊操作
public Node push(Node node){
while (true){
Node t = tail;
node.prev = t;
// 更新尾結點
if (compareAndSwapTail(t,node)){
System.out.println(Thread.currentThread().getName()+"入隊成功");
// 更新成功 就把原來的尾結點指向node,因為此時的t還沒有被重繪所以還是原來的tail
t.next = node;
return node;
}
}
}
// 呼叫Unsafe的cas操作
public boolean compareAndSwapTail(Node except,Node update){
return unsafe.compareAndSwapObject(this,tailOffset,except,update);
}
public boolean compareAndSwapState(int except,int update){
return unsafe.compareAndSwapInt(this,stateOffset,except,update);
}
// 上鎖
public void lock(){
if (!compareAndSwapState(0,1)){
// 也就是有執行緒已經獲取到鎖了
System.out.println(Thread.currentThread().getName()+"獲取鎖失敗~");
// 入隊
Node node = push(new Node(Thread.currentThread()));
// node的前結點不是頭結點,或者獲取鎖失敗 都會被阻塞 所以最開的時候就算是頭結點一會被阻塞,知道第一個鎖的執行緒釋放后就繼續while判斷
while(node.prev != head || !compareAndSwapState(0,1)) {
System.out.println(Thread.currentThread().getName()+"阻塞~~");
// 未獲取到鎖,阻塞 并且是一直阻塞
unsafe.park(false, 0L);
}
// 到這里說明node的前面是head結點 并且獲取鎖成功了
System.out.println(Thread.currentThread().getName()+"再次獲取鎖成功~~");
// 相當于把頭結點往后移
head = node;
// 因為head是空結點
// 清空當前節點的內容,協助GC
node.thread = null;
// 將上一個節點從鏈表中剔除,協助GC
node.prev = null;
node.prev.next = null;
}else {
System.out.println(Thread.currentThread().getName()+"獲取鎖成功~");
}
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+"釋放鎖");
// 把state更新成0,這里不需要原子更新,因為同時只有一個執行緒訪問到這里
state = 0;
// 下一個待喚醒的節點
Node next = head.next;
// 下一個節點不為空,就喚醒它
if (next != null) {
unsafe.unpark(next.thread);
System.out.println(next.thread.getName()+"被喚醒");
}
}
}
總結
(1)自己動手寫一個鎖需要做準備:一個變數、一個佇列、Unsafe類,
(2)原子更新變數為1說明獲得鎖成功;
(3)原子更新變數為1失敗說明獲得鎖失敗,進入佇列排隊;
(4)更新佇列尾節點的時候是多執行緒競爭的,所以要使用原子更新;
(5)更新佇列頭節點的時候只有一個執行緒,不存在競爭,所以不需要使用原子更新;
問題
(1)我們實作的鎖支持可重入嗎?
答:不可重入,因為我們每次只把state更新為1,如果要支持可重入也很簡單,獲取鎖時檢測鎖是不是被當前執行緒占有著(AQS中的setExclusiveOwnerThread(Thread.currentThread())設定此執行緒為獨占執行緒就是為了重入時檢測),如果是就把state的值加1,釋放鎖時每次減1即可,減為0時表示鎖已釋放,
(2)我們實作的鎖是公平鎖還是非公平鎖?
答:非公平鎖,因為獲取鎖的時候我們先嘗試了一次,這里并不是嚴格的排隊,所以是非公平鎖,想要公平鎖也很簡單,只需要在加鎖前判斷佇列中是否有執行緒在排隊
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/357099.html
標籤:java
