主頁 >  其他 > 萬字長文 圖文閱讀ConcurrentHashMap原始碼,吊打面試官系列

萬字長文 圖文閱讀ConcurrentHashMap原始碼,吊打面試官系列

2020-11-23 10:30:50 其他

ConcurrentHashMap結構

陣列+鏈表+紅黑樹,鏈表大于8轉紅黑樹,紅黑樹節點數小于6退回鏈表,跟HashMap是一樣的,只是支持并發的操作
在這里插入圖片描述

存放的key-value的Node節點

結構幾乎一樣,但是有個很重要的地方,val,next都被volatile關鍵字修飾,保證在多個執行緒之間的可見性,

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
}

樹形結構的Node,繼承Node多了幾個紅黑書樹樹特征的屬性,

 static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
}

繼承結構

在這里插入圖片描述

核心屬性

  • volatile Node<K,V>[] table;
    結構中存放資料的陣列,初始化時并不會立馬初始化,而是等到第一次插入時延遲初始化,大小始終是2的冪
  • volatile Node<K,V>[] nextTable;
    這個屬性在擴容是很重要, 只有在擴容時才會用得到,可以簡單理解為擴容后需要放置的新陣列,
  • transient volatile long baseCount;
    基本計算表中元素個數,主要在無競爭時使用, 通過CAS更新,但是map的size!=baseCount,具體往下看
  • volatile int transferIndex;
    擴容時,執行緒需要負責轉移的開始下標,擴容時分配任務時會頻繁用到,
  • transient volatile CounterCell[] counterCells;
    很重要,用來計算個數的元素,后續在size方法,以及addCount方法時會用到,后續詳細講解,
  • volatile int cellsBusy;
    當counterCells需要擴容了,修改counterCells時的標識,在addCount方法時講解
  • transient volatile int sizeCtl;
    默認為0,用來控制table的初始化和擴容操作
    1 代表table正在初始化
    N 表示有N-1個執行緒正在進行擴容操作:
    如果table未初始化,表示table需要初始化的大小,
    如果table初始化完成,表示table的容量,默認table大小的0.75倍

看懂幾個方法

  • tabAt
    擁有volatile語意的讀取陣列中的相應索引的資料,直接讀取主記憶體中的陣列資料,
  • casTabAt
    使用CAS的方式保證執行緒安全,
  • setTabAt
    對setTabAt的呼叫始終在鎖定區域內發生,并不需要完整的volatile語意,當前被編碼為volatile 是保守的,
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

開始看map的幾個核心方法的實作,

核心方法public V put(K key, V value)

put時存在多種情況,并且需要支持并發執行緒安全,下面分步驟進行決議

  • 陣列table還未初始
    直到put成功才退出
 for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; 
    int n = tab.length;
    if (tab == null || tab.length == 0) {
        tab = initTable();
    }     
}

初始化代碼,一進去就是個回圈一直到table完成初始化,執行緒會根據**sizeCtl(volatile保持執行緒可見性)**的值,判斷進行初始化操作,還是讓當前執行緒從運行狀態變為就緒狀態,讓出CPU,

  • sizeCtl小于0
    使當前執行緒從運行狀態變為就緒狀態讓出CPU
  • sizeCtl大于等于0
    進行CAS操作改變sizeCtl的值為-1,代表已有執行緒競爭到初始化table的權利,準備開始初始化,
    初始化程序:
    如果table未初始化sizeCtl表示table需要初始化的大小,New出陣列,屬性賦值給table, table初始化完成,sizeCtl表示需要擴容的閾值,默認table大小的0.75倍,
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; 
        //記錄sizeCtl的值,sizeCtl后續CAS修改-1了
        int sc;
        //只要table沒完成初始化就嘗試初始化
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                //讓出Cpu的執行
                Thread.yield(); 
            //執行緒CAS,其中會有一個執行緒CAS成功
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                //CAS成功的執行緒開始初始化table
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //0.75倍
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        //直到table完成初始化退出回圈,回傳table
        return tab;
    }

在這里插入圖片描述
再接著往下看,會發現其他else 陳述句都進不去了,元素還沒put進去呢,這不白瞎了,別忘記了開頭進來是個一直的回圈,等table初始化完成后,回圈還沒結束呢,只有在元素插入后才會break跳出回圈,table初始化完成了接著往下看,

  • 存放時陣列table相應位置為Null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
        break;
}

在這里插入圖片描述
執行緒2競爭CAS失敗,依舊需要下次回圈,下次回圈時table此處已經存放了Node節點(忽略移除的情況),接下來再看執行緒2要怎么處理,

  • 存放的結構是鏈表(陣列中相應位置已有節點)
    上述到執行緒1和執行緒2CAS競爭,執行緒2失敗,再次進到回圈處,前兩個條件已經進不去了(table已初始化,table存在元素)先暫時忽略正在擴容的情況,后面說,執行緒2只能進到最后一個else,代碼挺長耐心看,
    代碼第三行看到 synchronized,其實只要看到就可以松一口氣,因為其中是同步代碼塊同時只有一個執行緒可以進入,只要考慮單執行緒的就好了,基本和HashMap插入元素一樣了,
    在這里插入圖片描述
//鎖住頭結點
synchronized (f) {
    if (tabAt(tab, i) == f) {
        //頭結點的Hash值如果小于0代表在擴容,這個后面擴容時說
        if (fh >= 0) {
            //統計鏈表的節點個數
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                //存在key相同的情況,替換其中的value值
                if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                //沒有存在key相等,構造節點插入到鏈表后
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                    value, null);
                    break;
                }
            }
        }
    }
}
if (binCount != 0) {
    //大于閾值8需要進行鏈表轉紅黑樹
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}
addCount(1L, binCount);
  • 存放的結構是 紅黑樹
    鎖住頭節點,如果頭節點是TreeBin結構,那么呼叫putTreeVal方法將節點插入紅黑樹,至于紅黑樹的插入與洗掉篇幅較長,下次在另一篇文章中介紹,
//鎖住頭節點,形成同步代碼塊
synchronized (f) {
    if (f instanceof TreeBin) {
        Node<K,V> p;
        binCount = 2;
        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
            oldVal = p.val;
            if (!onlyIfAbsent)
                p.val = value;
        } 
    }
}
addCount(1L, binCount);

對于上述 synchronized (f),做一個補充,因為只是鎖住了單個節點,并不會影響操作其他鏈表或者其他紅黑樹的執行緒,只要執行緒tabAt(tab, i = (n - 1) & hash)獲取的節點不一致,就不會產生沖突,

核心方法private final void addCount(long x, int check)

再執行緒添加完節點后最后都呼叫了addCount(1L, binCount);到目前為止,是不是沒有發現擴容在何處觸發,因為我們說當hashMap存放的元素>容量*負載因子(默認0.75)就會擴容,那么這個方法一定是在put增加元素時檢查是否需要擴容,
一起看看這個方法干嘛用的,是否有觸發擴容,再看這個方法前,先看看size()方法,是怎么計數的,

核心方法 public int size()

先大概看一下原始碼,發現并不是直接回傳baseCount這個屬性,而是遍歷counterCells陣列中所有的值累加到baseCount上回傳,

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
    }
final long sumCount() {
        CounterCell[] as = counterCells; 
        CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

那么為什么這個中的所有值和baseCount累加在一起就等于Map的size呢,先看看他的結構很簡單,只有一個volatile修飾的value值,這個時候再回頭看addCount(long x, int check)方法

@sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

核心方法private final void addCount(long x, int check)

在這里插入圖片描述
大致流程圖如上,如果CounterCell為Null,說明還沒有執行緒增加節點,直接CAS baseCount,如果成功了則表示添加成功,失敗的話說明還有其他執行緒在操作baseCount,準備開始修改CounterCell陣列中節點的值,隨機生成一個亂數取余CounterCell陣列長度,如果節點為Null或者CAS修改失敗進入fullAddCount,再來看fullAddCount函式

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //如果counterCells不為空,說明不是第一次add,直接進入if
        //只有ounterCells為空,才會CAS競爭baseCount
        //CAS操作失敗進入if 避免多個執行緒競爭baseCount
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; 
            long v; 
            int m;
            boolean uncontended = true;
            //以下條件只要有一個為true,就會執行fullAddCount(x, uncontended),賊長的一個方法,
            //CounterCell陣列為空
            //ThreadLocalRandom.getProbe() 生成每個執行緒的一個亂數,&m 取余CounterCell陣列的長度得到的節點為Null
            //CAS增加節點失敗
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
        }
    }

fullAddCount函式:從上圖可以看出,

  • 當直接CAS操作BASECOUNT失敗
  • CounterCell為Null(未初始化)
  • 隨機值取余CounterCell表長度下標元素為Null
  • 以及CAS修改下標元素值失敗
    都會進入fullAddCount方法,一起看看fullAddCount做了什么,如果你不想看代碼可以看下面的圖和總結,
private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //如果當前執行緒的probe的值為0,則初始化當前執行緒的probe的值,probe就是亂數
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            //force initialization
            ThreadLocalRandom.localInit();
            //重新獲取probe,亂數
            h = ThreadLocalRandom.getProbe();
            // 重新生成了probe,未沖突標志位設定為true
            wasUncontended = true;
        }
        //如果最后一個插槽非空則為true
        boolean collide = false;

        //自旋
        for (;;) {
            CounterCell[] as; 
            CounterCell a; 
            int n; 
            long v;
 
            //counterCells已經完成初始化過,可以先看初始化部分
            if ((as = counterCells) != null && (n = as.length) > 0) {
 
                //亂數快速取余表長度,定位陣列中的節點
                if ((a = as[(n - 1) & h]) == null) {
                    //該位置不存在節點元素
                    //cellsBusy=0表示counterCells不在初始化或者擴容狀態下
                    if (cellsBusy == 0) {
                        //構造一個CounterCell的值
                        CounterCell r = new CounterCell(x); 
 
                        //cellsBusy=0表示counterCells不在初始化或者擴容狀態下,并且沒有其他執行緒在添加節點
                        //通過cas設定cellsBusy標識,防止其他執行緒來對counterCells并發處理
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               
                                CounterCell[] rs; int m, j;
                                //將初始化的r物件的元素個數放在對應下標的位置
                                //cas成功后,檢查一些必要條件是否依舊成立
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //完成陣列相應元素的賦值
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                 //初始化結束恢復標識
                                cellsBusy = 0;
                            }
                            //資料添加成功,退出自旋
                            if (created)
                                break;
                            //資料添加不成功,進入下一次回圈
                            continue;
                        }
                    }
                    collide = false;
                } else if (!wasUncontended)
                    wasUncontended = true;
                //由于指定下標位置的cell值不為空,則直接通過cas進行原子累加,如果成功,則直接退出
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                //CounterCells大于CPU核心數(執行緒的并發數不會超過cpu核心數)
                else if (counterCells != as || n >= NCPU)
                    //設定當前執行緒的本次回圈失敗不進行其他操作,下次回圈再看情況做選擇
                    collide = false;  

                //上述步驟依舊未完成,執行緒的并發數小于cpu核心數,恢復collide狀態,標識下次回圈會進行操作
                else if (!collide)
                    collide = true;
 
                //進入這個步驟,說明CounterCell陣列容量不夠,
                //執行緒競爭比較大了,才會到這步
                //所以先設定一個標識表示為正在擴容
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {
                            //擴容一倍 2變成4,這個擴容比較簡單
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        //恢復標識
                        cellsBusy = 0;
                    }
                    collide = false;
                    //繼續下一次自旋
                    continue;
                }
                //繼續下一次自旋
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //counterCellsw未初始化,進行初始化
            //cellsBusy=0表示沒有執行緒執行初始化操作
            //cellsBusy volatile變數保持執行緒可見性
            //通過cas更新cellsbusy的值標注當前執行緒正在做初始化操作
            else if (cellsBusy == 0 && 
                     counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                // Initialize table
                try {                           
                    if (counterCells == as) {
                        //初始化容量為2
                        CounterCell[] rs = new CounterCell[2];
                        //將個數 放在指定的陣列下標位置
                        rs[h & 1] = new CounterCell(x);
                        //賦值給counterCells
                        counterCells = rs;
                        //設定初始化完成標識
                        init = true;
                    }
                } finally {
                    //初始化結束恢復標識
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //競爭激烈,以上情況都不滿足,其它執行緒占據cell 陣列,直接累加在base變數中
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;
        }
    }

在這里插入圖片描述
ConcurrentHashMap單純做一個size+1的操作都是這般鬼斧神工,太難了,不知道你看完上面的圖沒有,做個總結,
在這里插入圖片描述

size+1是如何做到執行緒安全的

繞來繞去的,心中是不是在想直接CAS操作baseCount累加就不用這么麻煩了,但是這會造成多個執行緒在put完元素,同時競爭baseCount,導致性能低下,
選擇size=baseCount+counterCells陣列中的所有值加一塊,這樣結合上圖就會發現,多個執行緒可以同時競爭baseCount,counterCells的每個資料節點(改變counterCells需要競爭cellsBusy),這樣是不是有可能10個執行緒競爭一個baseCount和多個counterCells陣列節點(默認陣列長度為2,當競爭激烈時,會有執行緒擴容成2倍),10個競爭3個,是不是比10個競爭1個好呢,
下面看下,什么情況下會CAS競爭什么節點

  • counterCells未初始化,CAS競爭cellsBusy置為1,開始初始化
  • 執行緒競爭激烈,counterCells擴容成2倍,CAS競爭cellsBusy置為1
  • counterCells相應下標元素為Null,CAS競爭cellsBusy置為1,需要對counterCells做修改
  • counterCells相應下標元素不為Null,CAS競爭相應下標元素,+x
    瞬間由多個執行緒競爭一個baseCount,到多個執行緒競爭多個元素,并且當競爭激烈時,還會進行擴容,可以競爭元素增多,

你以為addCount方法結束了嗎,上文說了還要判斷是否需要擴容,到現在目前為止只是完成了size的+1操作,后面還有一半的代碼,來判斷是否需要擴容,

private final void addCount(long x, int check) {
         /******省略size+1的代碼**************/
        //統計map中有多少個元素
        long    s = sumCount();
        //remove也會呼叫addCount,但是check為-1代表不需要檢查擴容
        if (check >= 0) {
            Node<K,V>[] tab;
             Node<K,V>[] nt; 
            int n;
            int sc;
            //為什么是回圈而不是if
            //首先內部的幾處CAS都有可能會失敗,失敗了就需要再做一次判斷
            //其次考慮一種情況,是否會存在,該執行緒剛剛完成擴容,又有大量的元素進到map中
            //while中完成擴容后,最后一行代碼又計算了一下map的元素個數,看是否還需要擴容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //已經存在執行緒擴容了(回顧一下sizeCtl各個數字的含義)
                if (sc < 0) {
                    // 沒看懂,前面幾個關于sc的判斷沒看懂,有明白的辛苦評論告知
                    // 能進入if的說明sc小于0,但是還存在一個情況就是進入if后擴容完成了
                    //在擴容的分配任務中可以看到對nextTable,transferIndex 的修改
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //CAS修改使得sizeCtl表示的正在擴容的執行緒+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                //CAS修改SIZECTL的值,第一個進入擴容的執行緒
                }else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

至此要進入最核心的代碼transfer擴容了,做好戰斗準備

核心方法void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) ;擴容最難的方法

ConcurrentHashMap是支持多個執行緒擴容的,在什么時候觸發多個執行緒后續再分析
當有多個執行緒擴容時,需要先分配任務,將將陣列拆分成多塊,指定執行緒處理某一些桶的擴容,例如這樣
在這里插入圖片描述
下面先分析執行緒是怎么確定自己需要擴容的桶的,

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        //確認一個執行緒需要處理多少個桶
        int n = tab.length;
        //執行緒單次需要處理的桶的數量
        int stride;
        //如果CPU個數大于1,需要處理的數量為, 將 length / 8 然后除以 CPU核心數,如果得到的結果小于 16,那么就使用 16,
        //讓每個 CPU 處理的桶一樣多,避免出現轉移任務不均勻的現象 
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE;
         
        //nextTab為Null擴容執行緒,不為Null是幫助擴容執行緒
        if (nextTab == null) {            // initiating
            try {
                //初始化一下新的陣列
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //賦值,這樣其他執行緒訪問nextTable時就不會為空了  
            nextTable = nextTab;
            //這個下標很重要,這里是等于舊的陣列的容量,代表的含義是未處理的最后一個位置,
            transferIndex = n;
        }
        //新陣列的容量,也很重要,先記著
        int nextn = nextTab.length;
        //上文說過的一個標記的用的Node節點,hash值為-1
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //也是一個標識,標識是否前進到下一個桶的位置(是否處理下一個桶)
        boolean advance = true;
        //當前執行緒的擴容任務是否結束
        boolean finishing = false;
        //開始自旋處理
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; 
            int fh;
            while (advance) {
                int nextIndex, nextBound;
                //這個兩個if暫時不要看,看完最后一個分支再看
                //只需要記得其中完成了--i,nextIndex = transferIndex(舊的table的長度 )
                //(看完最后一個分支再看)當經歷過下面的任務分配后,
                // 如果--i,大于等于bound上限,說明已經分配了任務并且沒執行完不用再執行分配任務了
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //CAS設定transferIndex的值(未分配處理的最后一個結點的位置),如果CAS成功就計算需要處理的桶的開始和結束位置,
                //其中兩個分重要的屬性nextBound,nextIndex
                //處理的桶下限:nextBound = 陣列長度-單次需要處理的桶個數
                //處理的桶上限:i = 陣列的最后一個
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    //任務分配完成,結束while回圈
                    advance = false;
                }
            }
       }
    }

在這里插入圖片描述
在這里插入圖片描述
分配任務的重點在于,先CAS修改transferIndex的值,修改成功再分配,這樣在其他執行緒讀取到transferIndex的值時,再去分配任務時就不會造成任務重復分配分配完任務后,看執行緒是怎么處理這些分配到自己頭上的這些桶的,先看一個Node結構,其HASH值為-1,用來做特定標識的,標記當前桶已經被處理了

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //Hash值MOVED為負一
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
}

在這里插入圖片描述

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        /*************************以下代碼為分配任務*************************/
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        /*************************以上代碼為分配任務*************************/
        for (int i = 0, bound = 0;;) {
            //先不用去看這個if,這個是判斷當前執行緒是否已經將任務處理完畢
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //如果這個桶是空的,直接CAS一個Hash值為-1的ForwardingNode節點
            //代表這個桶已經處理完畢,然后進入下一次回圈,i在while回圈中會-1
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //桶不為空,但是是ForwardingNode節點,直接進入下次回圈(處理下個桶)
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //開始鎖住頭節點,往新的陣列遷移陣列(里面細節先不看之后都是同步代碼塊,先分析一下整體流程)
                synchronized (f) {
                   //判斷是鏈表還是樹,按結構遷移資料
                }
            }
        }
    }

至此為止擴容部分大部分結束,其實簡單來說,就兩個步驟
在這里插入圖片描述
但是每個步驟拆開細節都很多,

什么時候才會觸發分配任務

怎么判斷所有任務已經分配完畢,

 //分配任務的代碼
 while (advance) {
    int nextIndex, nextBound;
    if (--i >= bound || finishing)
        advance = false;
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}

1.處理位置i要大于下限bound=》分配給當前執行緒的任務沒處理完,
2. transferIndex(代表還未分配的桶的最后一個的位置)大于0=》還有未處理的桶
什么時候transferIndex會被設定成等于0呢,執行到CAS處,當transferIndex不大于stride(每次被分配的個數),如果CAS成功說明我將剩下所有的任務都分配給A執行緒去執行了,那么這個時候B執行緒進來發現transferIndex等于0,那就是任務分配完了,設定i = -1;advance = false;(i=-1這個很重要)
在這里插入圖片描述

再看什么時候會退出自旋

整個transfer方法看下來,只有這個if陳述句內有return陳述句,(忽略例外return),上面一個點說到了當判斷transferIndex等于0時說明任務分配完了,i = -1,還有就是while內 if (–i >= bound || finishing),–i遞減到-1,才會滿足以下條件,說明已經將所有任務分配完,并且當前執行緒的任務已經執行完,

//后面兩個判斷條件沒看懂,有看懂的辛苦評論區告知
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    //什么時候finishing會為true呢,只有執行了下面的CAS才會為true
    //所以先看看下面的額CAS干什么了
    //經過下面的CAS后,finishing設定為True,并且先設定i=n,進行下次回圈
    //這個時候只有在--i到-1時,再進到這個if陳述句,也就是將所有元素再檢查一遍后
    //才會再進到這個方法,這時finishing為true退出自旋,完成擴容,
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    //CAS設定正在擴容的執行緒-1
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        //看不懂為什么這么操作,有知道的辛苦評論區告知
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        //設定擴容已經結束
        finishing = advance = true;
        //把i重新設定為N,再重新檢查一遍,結束本次回圈
        i = n; // recheck before commit
    }
}

i = n,這個設定很巧妙,當所有任務都分配出去后,執行緒便會進到這個方法中,但是想退出自旋,完成擴容,還需要做最后一件事,檢查一遍所有的桶是否都已經完成遷移,以及可能幫助承接最后一個任務的執行緒完成剩余作業,當檢查完畢,–i到負數時,再次進入這個判斷條件退出擴容,

幫助擴容的時機

在Put元素時如果節點的Hash為-1說明,正在擴容,而且該桶的元素已經發生了遷移,幫助擴容,helpTransfer跟addCount中判斷幫助擴容差不多就不再進入helpTransfer函式了,

f = tabAt(tab, i = (n - 1) & hash)
else if ((fh = f.hash) == MOVED) {
    tab = helpTransfer(tab, f);
}

在addCount后,需要擴容,并且已經有其他執行緒在擴容了(sizeCtl的值已經為負數了 )

if (sc < 0) {
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
        transfer(tab, nextTable);
}

最后我們來大概的看看同步代碼塊中的代碼

不是很重要,畢竟在synchronized中,是同步代碼,其操作與HashMap中遷移資料類似
HashMap原始碼閱讀

synchronized (f) {
    if (tabAt(tab, i) == f) {
        Node<K,V> ln, hn;
        if (fh >= 0) {
            //節點的hash值&table容量,要么等于0,要么不為0
            int runBit = fh & n;
            //記錄節點,這個節點的意思代表后面的所有hash&N都跟lastRun一樣了
            Node<K,V> lastRun = f;
            //遍歷鏈表
            for (Node<K,V> p = f.next; p != null; p = p.next) {
                int b = p.hash & n;
                //只要后一個節點的p.hash & n不等于前一個節點,就更新
                //這樣在后面lastRun后面的節點都跟lastRun一致
                if (b != runBit) {
                    runBit = b;
                    lastRun = p;
                }
            }
            //如果runBit==0保留在原處的節點
            //這點在HashMap中有過說明
            if (runBit == 0) {
                ln = lastRun;
                hn = null;
            }else {
                //位置變為當前位置+table長度
                hn = lastRun;
                ln = null;
            }
            //將節點分開,但是終止條件是p != lastRun;因為lastRun節點后面的存放的位置與lastRun一致,在前面的代碼已經處理過
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                int ph = p.hash; 
                K pk = p.key;
                V pv = p.val;
                //為什么ph & n==0就放在原處,否則i+n在hashmap一文中說過了,不再贅述
                if ((ph & n) == 0)
                    ln = new Node<K,V>(ph, pk, pv, ln);
                else
                    hn = new Node<K,V>(ph, pk, pv, hn);
            }
            //具有volatile語意的設定節點到新的陣列
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            // 將舊的鏈表設定成占位符
            setTabAt(tab, i, fwd);
            advance = true;
        }
        //紅黑樹的拆分,后續在說紅黑樹時再分析
        else if (f instanceof TreeBin) {
            TreeBin<K,V> t = (TreeBin<K,V>)f;
            TreeNode<K,V> lo = null, loTail = null;
            TreeNode<K,V> hi = null, hiTail = null;
            int lc = 0, hc = 0;
            for (Node<K,V> e = t.first; e != null; e = e.next) {
                int h = e.hash;
                TreeNode<K,V> p = new TreeNode<K,V>
                    (h, e.key, e.val, null, null);
                if ((h & n) == 0) {
                    if ((p.prev = loTail) == null)
                        lo = p;
                    else
                        loTail.next = p;
                    loTail = p;
                    ++lc;
                }
                else {
                    if ((p.prev = hiTail) == null)
                        hi = p;
                    else
                        hiTail.next = p;
                    hiTail = p;
                    ++hc;
                }
            }
            //如果樹的節點少于6需要轉回鏈表
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                (hc != 0) ? new TreeBin<K,V>(lo) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                (lc != 0) ? new TreeBin<K,V>(hi) : t;
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
            advance = true;
        }
    }
}

最后的最后做個總結

ConcurrentHashMap使用了volatile關鍵字保證執行緒之間可見性,以及CAS,synchronized來做到支持并發的,
其中Put方法時,先去判斷table是否已經初始化,如果還未初始化執行緒CAS修改SIZECTL的值為-1進行初始化,如果此時其他執行緒讀到SIZECTL為負數會讓出Cpu的執行,
存放元素時如果table中相應位置為Null則會CAS修改節點,如果該節點的Hash值為-1說明正在擴容,并且這個桶已經擴容完成了就會幫助擴容,
如果CAS失敗就會進入下一次的回圈,這時如果該位置存在節點就用synchronized鎖住這個節點再判斷是紅黑樹結構還是鏈表結構插入元素,
在插入元素后要進行size+1操作,以及判斷是否需要擴容,
在做size+1的操作時,先CAS修改BaseCount屬性,如果修改失敗就會轉而去操作CounterCell陣列,這樣就把所有執行緒競爭baseCount變成了競爭baseCount和CounterCell陣列中的每個節點,
當然如果執行緒之間競爭激烈會有執行緒CAS修改cellsBusy的值為1,去擴容CounterCell,
當執行完size+1操作后,計算map的size,將baseCount+CounterCell陣列中每個節點的值,如果這個值大于map容量的0.75則需要擴容,
擴容支持多執行緒擴容,擴容第一步會為每個執行緒分配任務,需要處理桶的長度,執行緒會判斷分配的任務范圍下每個位置的節點的情況,如果是null CAS為ForwardingNode一個Hash值為-1的標識節點,
如果不為空也不為ForwardingNode節點,就用synchronized鎖住頭節點類似HashMap一樣將元素移動到新的table,
當transferIndex=0時,說明任務分配完畢,會CAS設定SIZECTL-1代表正在擴容的執行緒-1,CAS成功后悔再檢查一遍所有得元素是否都已經轉移成功再退出擴容

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/226640.html

標籤:其他

上一篇:【聯邦學習】讀書筆記(二) 隱私保護技術

下一篇:Java集合總結大全--史上最強

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more