ConcurrentHashMap結構
陣列+鏈表+紅黑樹,鏈表大于8轉紅黑樹,紅黑樹節點數小于6退回鏈表,跟HashMap是一樣的,只是支持并發的操作

存放的key-value的Node節點
結構幾乎一樣,但是有個很重要的地方,val,next都被volatile關鍵字修飾,保證在多個執行緒之間的可見性,
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
樹形結構的Node,繼承Node多了幾個紅黑書樹樹特征的屬性,
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
}
繼承結構

核心屬性
- volatile Node<K,V>[] table;
結構中存放資料的陣列,初始化時并不會立馬初始化,而是等到第一次插入時延遲初始化,大小始終是2的冪 - volatile Node<K,V>[] nextTable;
這個屬性在擴容是很重要, 只有在擴容時才會用得到,可以簡單理解為擴容后需要放置的新陣列, - transient volatile long baseCount;
基本計算表中元素個數,主要在無競爭時使用, 通過CAS更新,但是map的size!=baseCount,具體往下看 - volatile int transferIndex;
擴容時,執行緒需要負責轉移的開始下標,擴容時分配任務時會頻繁用到, - transient volatile CounterCell[] counterCells;
很重要,用來計算個數的元素,后續在size方法,以及addCount方法時會用到,后續詳細講解, - volatile int cellsBusy;
當counterCells需要擴容了,修改counterCells時的標識,在addCount方法時講解 - transient volatile int sizeCtl;
默認為0,用來控制table的初始化和擴容操作
1 代表table正在初始化
N 表示有N-1個執行緒正在進行擴容操作:
如果table未初始化,表示table需要初始化的大小,
如果table初始化完成,表示table的容量,默認table大小的0.75倍
看懂幾個方法
- tabAt
擁有volatile語意的讀取陣列中的相應索引的資料,直接讀取主記憶體中的陣列資料, - casTabAt
使用CAS的方式保證執行緒安全, - setTabAt
對setTabAt的呼叫始終在鎖定區域內發生,并不需要完整的volatile語意,當前被編碼為volatile 是保守的,
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
開始看map的幾個核心方法的實作,
核心方法public V put(K key, V value)
put時存在多種情況,并且需要支持并發執行緒安全,下面分步驟進行決議
- 陣列table還未初始
直到put成功才退出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f;
int n = tab.length;
if (tab == null || tab.length == 0) {
tab = initTable();
}
}
初始化代碼,一進去就是個回圈一直到table完成初始化,執行緒會根據**sizeCtl(volatile保持執行緒可見性)**的值,判斷進行初始化操作,還是讓當前執行緒從運行狀態變為就緒狀態,讓出CPU,
- sizeCtl小于0
使當前執行緒從運行狀態變為就緒狀態讓出CPU - sizeCtl大于等于0
進行CAS操作改變sizeCtl的值為-1,代表已有執行緒競爭到初始化table的權利,準備開始初始化,
初始化程序:
如果table未初始化sizeCtl表示table需要初始化的大小,New出陣列,屬性賦值給table, table初始化完成,sizeCtl表示需要擴容的閾值,默認table大小的0.75倍,
private final Node<K,V>[] initTable() {
Node<K,V>[] tab;
//記錄sizeCtl的值,sizeCtl后續CAS修改-1了
int sc;
//只要table沒完成初始化就嘗試初始化
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//讓出Cpu的執行
Thread.yield();
//執行緒CAS,其中會有一個執行緒CAS成功
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//CAS成功的執行緒開始初始化table
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//0.75倍
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
//直到table完成初始化退出回圈,回傳table
return tab;
}

再接著往下看,會發現其他else 陳述句都進不去了,元素還沒put進去呢,這不白瞎了,別忘記了開頭進來是個一直的回圈,等table初始化完成后,回圈還沒結束呢,只有在元素插入后才會break跳出回圈,table初始化完成了接著往下看,
- 存放時陣列table相應位置為Null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break;
}

執行緒2競爭CAS失敗,依舊需要下次回圈,下次回圈時table此處已經存放了Node節點(忽略移除的情況),接下來再看執行緒2要怎么處理,
- 存放的結構是鏈表(陣列中相應位置已有節點)
上述到執行緒1和執行緒2CAS競爭,執行緒2失敗,再次進到回圈處,前兩個條件已經進不去了(table已初始化,table存在元素)先暫時忽略正在擴容的情況,后面說,執行緒2只能進到最后一個else,代碼挺長耐心看,
代碼第三行看到 synchronized,其實只要看到就可以松一口氣,因為其中是同步代碼塊同時只有一個執行緒可以進入,只要考慮單執行緒的就好了,基本和HashMap插入元素一樣了,

//鎖住頭結點
synchronized (f) {
if (tabAt(tab, i) == f) {
//頭結點的Hash值如果小于0代表在擴容,這個后面擴容時說
if (fh >= 0) {
//統計鏈表的節點個數
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//存在key相同的情況,替換其中的value值
if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//沒有存在key相等,構造節點插入到鏈表后
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
}
}
if (binCount != 0) {
//大于閾值8需要進行鏈表轉紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
addCount(1L, binCount);
- 存放的結構是 紅黑樹
鎖住頭節點,如果頭節點是TreeBin結構,那么呼叫putTreeVal方法將節點插入紅黑樹,至于紅黑樹的插入與洗掉篇幅較長,下次在另一篇文章中介紹,
//鎖住頭節點,形成同步代碼塊
synchronized (f) {
if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
addCount(1L, binCount);
對于上述 synchronized (f),做一個補充,因為只是鎖住了單個節點,并不會影響操作其他鏈表或者其他紅黑樹的執行緒,只要執行緒tabAt(tab, i = (n - 1) & hash)獲取的節點不一致,就不會產生沖突,
核心方法private final void addCount(long x, int check)
再執行緒添加完節點后最后都呼叫了addCount(1L, binCount);到目前為止,是不是沒有發現擴容在何處觸發,因為我們說當hashMap存放的元素>容量*負載因子(默認0.75)就會擴容,那么這個方法一定是在put增加元素時檢查是否需要擴容,
一起看看這個方法干嘛用的,是否有觸發擴容,再看這個方法前,先看看size()方法,是怎么計數的,
核心方法 public int size()
先大概看一下原始碼,發現并不是直接回傳baseCount這個屬性,而是遍歷counterCells陣列中所有的值累加到baseCount上回傳,
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
final long sumCount() {
CounterCell[] as = counterCells;
CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
那么為什么這個中的所有值和baseCount累加在一起就等于Map的size呢,先看看他的結構很簡單,只有一個volatile修飾的value值,這個時候再回頭看addCount(long x, int check)方法
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
核心方法private final void addCount(long x, int check)

大致流程圖如上,如果CounterCell為Null,說明還沒有執行緒增加節點,直接CAS baseCount,如果成功了則表示添加成功,失敗的話說明還有其他執行緒在操作baseCount,準備開始修改CounterCell陣列中節點的值,隨機生成一個亂數取余CounterCell陣列長度,如果節點為Null或者CAS修改失敗進入fullAddCount,再來看fullAddCount函式
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//如果counterCells不為空,說明不是第一次add,直接進入if
//只有ounterCells為空,才會CAS競爭baseCount
//CAS操作失敗進入if 避免多個執行緒競爭baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a;
long v;
int m;
boolean uncontended = true;
//以下條件只要有一個為true,就會執行fullAddCount(x, uncontended),賊長的一個方法,
//CounterCell陣列為空
//ThreadLocalRandom.getProbe() 生成每個執行緒的一個亂數,&m 取余CounterCell陣列的長度得到的節點為Null
//CAS增加節點失敗
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
}
}
fullAddCount函式:從上圖可以看出,
- 當直接CAS操作BASECOUNT失敗
- CounterCell為Null(未初始化)
- 隨機值取余CounterCell表長度下標元素為Null
- 以及CAS修改下標元素值失敗
都會進入fullAddCount方法,一起看看fullAddCount做了什么,如果你不想看代碼可以看下面的圖和總結,
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//如果當前執行緒的probe的值為0,則初始化當前執行緒的probe的值,probe就是亂數
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//force initialization
ThreadLocalRandom.localInit();
//重新獲取probe,亂數
h = ThreadLocalRandom.getProbe();
// 重新生成了probe,未沖突標志位設定為true
wasUncontended = true;
}
//如果最后一個插槽非空則為true
boolean collide = false;
//自旋
for (;;) {
CounterCell[] as;
CounterCell a;
int n;
long v;
//counterCells已經完成初始化過,可以先看初始化部分
if ((as = counterCells) != null && (n = as.length) > 0) {
//亂數快速取余表長度,定位陣列中的節點
if ((a = as[(n - 1) & h]) == null) {
//該位置不存在節點元素
//cellsBusy=0表示counterCells不在初始化或者擴容狀態下
if (cellsBusy == 0) {
//構造一個CounterCell的值
CounterCell r = new CounterCell(x);
//cellsBusy=0表示counterCells不在初始化或者擴容狀態下,并且沒有其他執行緒在添加節點
//通過cas設定cellsBusy標識,防止其他執行緒來對counterCells并發處理
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
//將初始化的r物件的元素個數放在對應下標的位置
//cas成功后,檢查一些必要條件是否依舊成立
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//完成陣列相應元素的賦值
rs[j] = r;
created = true;
}
} finally {
//初始化結束恢復標識
cellsBusy = 0;
}
//資料添加成功,退出自旋
if (created)
break;
//資料添加不成功,進入下一次回圈
continue;
}
}
collide = false;
} else if (!wasUncontended)
wasUncontended = true;
//由于指定下標位置的cell值不為空,則直接通過cas進行原子累加,如果成功,則直接退出
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//CounterCells大于CPU核心數(執行緒的并發數不會超過cpu核心數)
else if (counterCells != as || n >= NCPU)
//設定當前執行緒的本次回圈失敗不進行其他操作,下次回圈再看情況做選擇
collide = false;
//上述步驟依舊未完成,執行緒的并發數小于cpu核心數,恢復collide狀態,標識下次回圈會進行操作
else if (!collide)
collide = true;
//進入這個步驟,說明CounterCell陣列容量不夠,
//執行緒競爭比較大了,才會到這步
//所以先設定一個標識表示為正在擴容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {
//擴容一倍 2變成4,這個擴容比較簡單
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
//恢復標識
cellsBusy = 0;
}
collide = false;
//繼續下一次自旋
continue;
}
//繼續下一次自旋
h = ThreadLocalRandom.advanceProbe(h);
}
//counterCellsw未初始化,進行初始化
//cellsBusy=0表示沒有執行緒執行初始化操作
//cellsBusy volatile變數保持執行緒可見性
//通過cas更新cellsbusy的值標注當前執行緒正在做初始化操作
else if (cellsBusy == 0 &&
counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
// Initialize table
try {
if (counterCells == as) {
//初始化容量為2
CounterCell[] rs = new CounterCell[2];
//將個數 放在指定的陣列下標位置
rs[h & 1] = new CounterCell(x);
//賦值給counterCells
counterCells = rs;
//設定初始化完成標識
init = true;
}
} finally {
//初始化結束恢復標識
cellsBusy = 0;
}
if (init)
break;
}
//競爭激烈,以上情況都不滿足,其它執行緒占據cell 陣列,直接累加在base變數中
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}

ConcurrentHashMap單純做一個size+1的操作都是這般鬼斧神工,太難了,不知道你看完上面的圖沒有,做個總結,

size+1是如何做到執行緒安全的
繞來繞去的,心中是不是在想直接CAS操作baseCount累加就不用這么麻煩了,但是這會造成多個執行緒在put完元素,同時競爭baseCount,導致性能低下,
選擇size=baseCount+counterCells陣列中的所有值加一塊,這樣結合上圖就會發現,多個執行緒可以同時競爭baseCount,counterCells的每個資料節點(改變counterCells需要競爭cellsBusy),這樣是不是有可能10個執行緒競爭一個baseCount和多個counterCells陣列節點(默認陣列長度為2,當競爭激烈時,會有執行緒擴容成2倍),10個競爭3個,是不是比10個競爭1個好呢,
下面看下,什么情況下會CAS競爭什么節點
- counterCells未初始化,CAS競爭cellsBusy置為1,開始初始化
- 執行緒競爭激烈,counterCells擴容成2倍,CAS競爭cellsBusy置為1
- counterCells相應下標元素為Null,CAS競爭cellsBusy置為1,需要對counterCells做修改
- counterCells相應下標元素不為Null,CAS競爭相應下標元素,+x
瞬間由多個執行緒競爭一個baseCount,到多個執行緒競爭多個元素,并且當競爭激烈時,還會進行擴容,可以競爭元素增多,
你以為addCount方法結束了嗎,上文說了還要判斷是否需要擴容,到現在目前為止只是完成了size的+1操作,后面還有一半的代碼,來判斷是否需要擴容,
private final void addCount(long x, int check) {
/******省略size+1的代碼**************/
//統計map中有多少個元素
long s = sumCount();
//remove也會呼叫addCount,但是check為-1代表不需要檢查擴容
if (check >= 0) {
Node<K,V>[] tab;
Node<K,V>[] nt;
int n;
int sc;
//為什么是回圈而不是if
//首先內部的幾處CAS都有可能會失敗,失敗了就需要再做一次判斷
//其次考慮一種情況,是否會存在,該執行緒剛剛完成擴容,又有大量的元素進到map中
//while中完成擴容后,最后一行代碼又計算了一下map的元素個數,看是否還需要擴容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//已經存在執行緒擴容了(回顧一下sizeCtl各個數字的含義)
if (sc < 0) {
// 沒看懂,前面幾個關于sc的判斷沒看懂,有明白的辛苦評論告知
// 能進入if的說明sc小于0,但是還存在一個情況就是進入if后擴容完成了
//在擴容的分配任務中可以看到對nextTable,transferIndex 的修改
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//CAS修改使得sizeCtl表示的正在擴容的執行緒+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
//CAS修改SIZECTL的值,第一個進入擴容的執行緒
}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
至此要進入最核心的代碼transfer擴容了,做好戰斗準備
核心方法void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) ;擴容最難的方法
ConcurrentHashMap是支持多個執行緒擴容的,在什么時候觸發多個執行緒后續再分析,
當有多個執行緒擴容時,需要先分配任務,將將陣列拆分成多塊,指定執行緒處理某一些桶的擴容,例如這樣

下面先分析執行緒是怎么確定自己需要擴容的桶的,
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//確認一個執行緒需要處理多少個桶
int n = tab.length;
//執行緒單次需要處理的桶的數量
int stride;
//如果CPU個數大于1,需要處理的數量為, 將 length / 8 然后除以 CPU核心數,如果得到的結果小于 16,那么就使用 16,
//讓每個 CPU 處理的桶一樣多,避免出現轉移任務不均勻的現象
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
//nextTab為Null擴容執行緒,不為Null是幫助擴容執行緒
if (nextTab == null) { // initiating
try {
//初始化一下新的陣列
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//賦值,這樣其他執行緒訪問nextTable時就不會為空了
nextTable = nextTab;
//這個下標很重要,這里是等于舊的陣列的容量,代表的含義是未處理的最后一個位置,
transferIndex = n;
}
//新陣列的容量,也很重要,先記著
int nextn = nextTab.length;
//上文說過的一個標記的用的Node節點,hash值為-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//也是一個標識,標識是否前進到下一個桶的位置(是否處理下一個桶)
boolean advance = true;
//當前執行緒的擴容任務是否結束
boolean finishing = false;
//開始自旋處理
for (int i = 0, bound = 0;;) {
Node<K,V> f;
int fh;
while (advance) {
int nextIndex, nextBound;
//這個兩個if暫時不要看,看完最后一個分支再看
//只需要記得其中完成了--i,nextIndex = transferIndex(舊的table的長度 )
//(看完最后一個分支再看)當經歷過下面的任務分配后,
// 如果--i,大于等于bound上限,說明已經分配了任務并且沒執行完不用再執行分配任務了
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//CAS設定transferIndex的值(未分配處理的最后一個結點的位置),如果CAS成功就計算需要處理的桶的開始和結束位置,
//其中兩個分重要的屬性nextBound,nextIndex
//處理的桶下限:nextBound = 陣列長度-單次需要處理的桶個數
//處理的桶上限:i = 陣列的最后一個
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
//任務分配完成,結束while回圈
advance = false;
}
}
}
}


分配任務的重點在于,先CAS修改transferIndex的值,修改成功再分配,這樣在其他執行緒讀取到transferIndex的值時,再去分配任務時就不會造成任務重復分配分配完任務后,看執行緒是怎么處理這些分配到自己頭上的這些桶的,先看一個Node結構,其HASH值為-1,用來做特定標識的,標記當前桶已經被處理了
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
//Hash值MOVED為負一
super(MOVED, null, null, null);
this.nextTable = tab;
}
}

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
/*************************以下代碼為分配任務*************************/
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*************************以上代碼為分配任務*************************/
for (int i = 0, bound = 0;;) {
//先不用去看這個if,這個是判斷當前執行緒是否已經將任務處理完畢
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果這個桶是空的,直接CAS一個Hash值為-1的ForwardingNode節點
//代表這個桶已經處理完畢,然后進入下一次回圈,i在while回圈中會-1
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//桶不為空,但是是ForwardingNode節點,直接進入下次回圈(處理下個桶)
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//開始鎖住頭節點,往新的陣列遷移陣列(里面細節先不看之后都是同步代碼塊,先分析一下整體流程)
synchronized (f) {
//判斷是鏈表還是樹,按結構遷移資料
}
}
}
}
至此為止擴容部分大部分結束,其實簡單來說,就兩個步驟

但是每個步驟拆開細節都很多,
什么時候才會觸發分配任務
怎么判斷所有任務已經分配完畢,
//分配任務的代碼
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
1.處理位置i要大于下限bound=》分配給當前執行緒的任務沒處理完,
2. transferIndex(代表還未分配的桶的最后一個的位置)大于0=》還有未處理的桶
什么時候transferIndex會被設定成等于0呢,執行到CAS處,當transferIndex不大于stride(每次被分配的個數),如果CAS成功說明我將剩下所有的任務都分配給A執行緒去執行了,那么這個時候B執行緒進來發現transferIndex等于0,那就是任務分配完了,設定i = -1;advance = false;(i=-1這個很重要)
再看什么時候會退出自旋
整個transfer方法看下來,只有這個if陳述句內有return陳述句,(忽略例外return),上面一個點說到了當判斷transferIndex等于0時說明任務分配完了,i = -1,還有就是while內 if (–i >= bound || finishing),–i遞減到-1,才會滿足以下條件,說明已經將所有任務分配完,并且當前執行緒的任務已經執行完,
//后面兩個判斷條件沒看懂,有看懂的辛苦評論區告知
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//什么時候finishing會為true呢,只有執行了下面的CAS才會為true
//所以先看看下面的額CAS干什么了
//經過下面的CAS后,finishing設定為True,并且先設定i=n,進行下次回圈
//這個時候只有在--i到-1時,再進到這個if陳述句,也就是將所有元素再檢查一遍后
//才會再進到這個方法,這時finishing為true退出自旋,完成擴容,
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//CAS設定正在擴容的執行緒-1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//看不懂為什么這么操作,有知道的辛苦評論區告知
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//設定擴容已經結束
finishing = advance = true;
//把i重新設定為N,再重新檢查一遍,結束本次回圈
i = n; // recheck before commit
}
}
i = n,這個設定很巧妙,當所有任務都分配出去后,執行緒便會進到這個方法中,但是想退出自旋,完成擴容,還需要做最后一件事,檢查一遍所有的桶是否都已經完成遷移,以及可能幫助承接最后一個任務的執行緒完成剩余作業,當檢查完畢,–i到負數時,再次進入這個判斷條件退出擴容,
幫助擴容的時機
在Put元素時如果節點的Hash為-1說明,正在擴容,而且該桶的元素已經發生了遷移,幫助擴容,helpTransfer跟addCount中判斷幫助擴容差不多就不再進入helpTransfer函式了,
f = tabAt(tab, i = (n - 1) & hash)
else if ((fh = f.hash) == MOVED) {
tab = helpTransfer(tab, f);
}
在addCount后,需要擴容,并且已經有其他執行緒在擴容了(sizeCtl的值已經為負數了 )
if (sc < 0) {
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nextTable);
}
最后我們來大概的看看同步代碼塊中的代碼
不是很重要,畢竟在synchronized中,是同步代碼,其操作與HashMap中遷移資料類似
HashMap原始碼閱讀
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//節點的hash值&table容量,要么等于0,要么不為0
int runBit = fh & n;
//記錄節點,這個節點的意思代表后面的所有hash&N都跟lastRun一樣了
Node<K,V> lastRun = f;
//遍歷鏈表
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
//只要后一個節點的p.hash & n不等于前一個節點,就更新
//這樣在后面lastRun后面的節點都跟lastRun一致
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//如果runBit==0保留在原處的節點
//這點在HashMap中有過說明
if (runBit == 0) {
ln = lastRun;
hn = null;
}else {
//位置變為當前位置+table長度
hn = lastRun;
ln = null;
}
//將節點分開,但是終止條件是p != lastRun;因為lastRun節點后面的存放的位置與lastRun一致,在前面的代碼已經處理過
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
//為什么ph & n==0就放在原處,否則i+n在hashmap一文中說過了,不再贅述
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//具有volatile語意的設定節點到新的陣列
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 將舊的鏈表設定成占位符
setTabAt(tab, i, fwd);
advance = true;
}
//紅黑樹的拆分,后續在說紅黑樹時再分析
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果樹的節點少于6需要轉回鏈表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
最后的最后做個總結
ConcurrentHashMap使用了volatile關鍵字保證執行緒之間可見性,以及CAS,synchronized來做到支持并發的,
其中Put方法時,先去判斷table是否已經初始化,如果還未初始化執行緒CAS修改SIZECTL的值為-1進行初始化,如果此時其他執行緒讀到SIZECTL為負數會讓出Cpu的執行,
存放元素時如果table中相應位置為Null則會CAS修改節點,如果該節點的Hash值為-1說明正在擴容,并且這個桶已經擴容完成了就會幫助擴容,
如果CAS失敗就會進入下一次的回圈,這時如果該位置存在節點就用synchronized鎖住這個節點再判斷是紅黑樹結構還是鏈表結構插入元素,
在插入元素后要進行size+1操作,以及判斷是否需要擴容,
在做size+1的操作時,先CAS修改BaseCount屬性,如果修改失敗就會轉而去操作CounterCell陣列,這樣就把所有執行緒競爭baseCount變成了競爭baseCount和CounterCell陣列中的每個節點,
當然如果執行緒之間競爭激烈會有執行緒CAS修改cellsBusy的值為1,去擴容CounterCell,
當執行完size+1操作后,計算map的size,將baseCount+CounterCell陣列中每個節點的值,如果這個值大于map容量的0.75則需要擴容,
擴容支持多執行緒擴容,擴容第一步會為每個執行緒分配任務,需要處理桶的長度,執行緒會判斷分配的任務范圍下每個位置的節點的情況,如果是null CAS為ForwardingNode一個Hash值為-1的標識節點,
如果不為空也不為ForwardingNode節點,就用synchronized鎖住頭節點類似HashMap一樣將元素移動到新的table,
當transferIndex=0時,說明任務分配完畢,會CAS設定SIZECTL-1代表正在擴容的執行緒-1,CAS成功后悔再檢查一遍所有得元素是否都已經轉移成功再退出擴容
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/226640.html
標籤:其他
下一篇:Java集合總結大全--史上最強

