簡介:
因為博主前面寫過HashMap的決議,所以這里只分析和HashMap不同點,類似的地方會略過,有需要的朋友可以結合HashMap的文章一起看,
不容易,終于把這篇文章更新出來了,ConcurrentHashMap實在是太難了,終于熬完了這鍋雞湯,one day day的,現在幾乎可以背出來了,

廢話不多說,跳過前戲,直接看代碼(提醒一下,精華都是各種if判斷),
先列幾個重要的引數:
//閾值
//=0:默認值
//-1:表示正在初始化
//<-1:表示多個執行緒在擴容(-2是第一個擴容執行緒,后面每多一個就+1)
//低16位是擴容執行緒數量+1,因為第一個擴容執行緒是+2,高16位的最高位是1,剩下15位是陣列的長度)
//>1:表示需要擴容閾值
private transient volatile int sizeCtl;
//擴容時執行緒每次領取完還剩余的任務
private transient volatile int transferIndex;
//標志陣列的下標正在擴容
static final int MOVED = -1;
//標志陣列的下標存放的是樹結構
static final int TREEBIN = -2;
方法串列
- putVal第一部分
- spread():
- initTable():
- putVal第二部分
- treeifyBin():
- tryPresize():
- TreeBin():
- addCount():
- 判斷CAS是否失敗:
- 判斷需不需要擴容:
- resizeStamp():
- transfer():
- 分配任務
- 領取任務
- 真正擴容
- `擴容結束的善后
- helpTransfer():
putVal第一部分
//跟hashMap不一樣的地方,不允許key為null
if (key == null || value == null) throw new NullPointerException();
//計算hash
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//初始化
tab = initTable();
//計算下標,跟hashMap一樣
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//跟hashMap不一樣的地方,CAS新增頭節點
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
第一部分還是比較簡單的:
- 計算hash
- 沒有初始化就先初始化
- 根據hash算出下標,如果下標為null,就用CAS新增
spread():
這里的計算下標跟hashMap有點不一樣,就是這里不允許hash為負數,
//跟hashmap有點區別,不僅高16參與了計算,并且還再次 & 計算
return (h ^ (h >>> 16)) & HASH_BITS;
簡單來說就是避免hash值是負數,具體原因下篇文章分析,
initTable():
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl剛開始是0,sizeCtl<0,說明有別的執行緒CAS成功了
if ((sc = sizeCtl) < 0)
//所以當前執行緒就先讓出CPU時間片,等待下次的系統調度,再次走上面的while判斷
Thread.yield(); // lost initialization race; just spin
//用CAS去修改SIZECTL為-1,表示在初始化,擴容是-2
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//雙重校驗
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
//sizeCtl>0,就是擴容閾值
sizeCtl = sc;
}
break;
}
}
return tab;
}
initTable的作用比較簡單,就是初始化table的,但是要避免多執行緒初始化,
關于這里的雙重校驗額外說明一下:
仔細想想,這個雙重校驗不是為了這里,因為CAS是立即得到回傳結果的,不是Lock或synchronized,不會阻塞
即便是多執行緒同時initTable也不會出現問題,這里是為了防止putAll的
在沒有初始化的時候,一個執行緒put,一個執行緒putAll,就會出現問題,
putAll的初始化是調的tryPresize,這里的二次校驗是防止tryPresize也在初始化
putVal第二部分
//判斷hash是否為MOVED,就是判斷是否有執行緒在擴容
else if ((fh = f.hash) == MOVED)
//幫助擴容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//跟hashMap不一樣的地方,如果此下標已經有值了,鎖住頭節點
synchronized (f) {
//獲取該下標的值,用的getObjectVolatile,強制從主存中獲取屬性值
if (tabAt(tab, i) == f) {
//判斷是不是鏈表
if (fh >= 0) {
binCount = 1;
//遍歷鏈表,尾插
for (Node<K,V> e = f;; ++binCount) {
//...省略掉
}
//判斷是紅黑樹結構
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//...省略掉
}
}
}
//binCount初始是0,0代表插入的下標沒值,插入的是頭節點
//binCount!= 0,1代表插入的是鏈表,2代表是樹
//所以先判斷binCount!= 0,再去判斷是否達到轉樹的閾值
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//鏈表轉樹
treeifyBin(tab, i);
if (oldVal != null)
//說明是替換值,直接return了,下面的擴容不需要判斷了
return oldVal;
break;
}
}
}
//數量+1,并且判斷是否需要擴容
addCount(1L, binCount);
return null;
}
這里就是精華了:
- 判斷是否正在擴容,是的話就幫助擴容
- 插入鏈表或者
Tree里面去 - 判斷是否需要鏈表轉
Tree,或者是否需要替換舊值(treeifyBin) - 總數量+1,且判斷是否需要擴容(addCount)
treeifyBin():
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//數量不夠,先擴容,不轉樹
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//這里遍歷鏈表,把Node轉成TreeNode,跟hashMap一樣
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//將新的TreeNode改成Tree結構
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
treeifyBin(Node<K,V>[] tab, int index)的作用:
- 判斷總數是否小于64,小于就先擴容,不轉Tree
- 遍歷鏈表,把Node轉成TreeNode
- 把TreeNode改成Tree結構
tryPresize():
private final void tryPresize(int size) {
//判斷是否大于最大容量的一半,是的話就取最大容量,否則就取擴容2倍之后的最小的2的冪次方
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//判斷是不是要初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//用CAS去修改SIZECTL為-1,表示在初始化,擴容是-2
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//二次校驗,同樣,校驗的不是這里,是initTable那邊的初始化
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//這里是判斷是否需要擴容
//c <= sc:判斷的是putAll丟進來的集合數量是否小于擴容閾值
//n >= MAXIMUM_CAPACITY:判斷的是陣列長度是否大于最大容量
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//走到這,說明要擴容
else if (tab == table) {
//這里的判斷跟addCount方法的判斷一致,在addCount會詳細說明
int rs = resizeStamp(n);
//省略掉
.....
}
}
}
tryPresize(int size):其實還挺簡單的,判斷要初始化還是擴容,擴容的判斷細節留到addCount里面詳細講解,
TreeBin():
TreeBin(TreeNode<K,V> b) {
//注意這一步,把Node的hash改成TREEBIN,也就是是-2,所以判斷 < 0就知道是Tree結構了
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
//紅黑樹的新增,跟hashMap,感興趣的朋友可以去看博主寫的hashMap篇
for (TreeNode<K,V> x = b, next; x != null; x = next) {
//省略掉
.....
}
this.root = r;
assert checkInvariants(root);
}
TreeBin(TreeNode<K,V> b):也很簡單,比起hashMap,就多了一步,把Node的hash改成-2用來區分鏈表和紅黑樹的,
addCount():
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//判斷CAS是否失敗
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
//省略掉
.....
}
//判斷需不需要擴容
if (check >= 0) {
//省略掉
.....
}
}
也是相當復雜的,一步步看,先看下大概思路:
- 先判斷有沒有執行緒CAS增加總數量失敗的,或者說自己CAS失敗了,有就去記錄
- 判斷需不需要擴容
好,簡簡單單的2步,但是復雜的令人想吐,
判斷CAS是否失敗:
這部分就留到下篇文章(細節篇)里面去分析,因為不是很重要,這里就不影響大家的思路了,接著看高潮部分,擴容(下篇文章跟這篇會同步更新),
判斷需不需要擴容:
//判斷是否是新增,洗掉是-1
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//判斷是否需要擴容,sizeCtl是擴容閾值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//n是陣列的長度,計算結果的低15位就是n,只不過第16位是1而已
//n最小是16,最大 < MAXIMUM_CAPACITY
//那么rs的范圍就是 32795 ~ 32769,大概率是10000000000xxxxx
int rs = resizeStamp(n);
//判斷是否有別的執行緒在擴容
if (sc < 0) {
//極端情況的判斷
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//SIZECTL + 1 ,其實就是低16位+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//擴容
transfer(tab, nt);
}
//這一步就是當前沒有執行緒在擴容,CAS去標志
//此時SIZECTL的低16位是2,高16位的最高位是1,剩下15位是陣列的長度
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
resizeStamp():
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Integer.numberOfLeadingZeros(n):
Integer.numberOfLeadingZeros(n)是回傳高位的所有的0的個數,具體代碼就不分析了,反正就是一頓操作
Java的int型別是32位的
假設n = 2,二進制就是10,那么Integer.numberOfLeadingZeros(n) = 30
n = 20,二進制就是10100,那么Integer.numberOfLeadingZeros(n) = 27
1 << (RESIZE_STAMP_BITS - 1):
RESIZE_STAMP_BITS = 16
1 << (RESIZE_STAMP_BITS - 1) = 1000000000000000,是16位
Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)):
n是陣列的長度,最小16,最大MAXIMUM_CAPACITY(1 << 30)
那么Integer.numberOfLeadingZeros(n)的結果就是最小是1,最大是27
( 1 ~ 27 ) | 1000000000000000
計算的結果就是低15位肯定是Integer.numberOfLeadingZeros(n),只不過第16位是1而已
一頓操作,回傳值高16位全是0,第16位是1,低15位就是( 1 ~ 27 )之間的值,
第一個復雜點來了,關于已有執行緒在擴容,協助幫助擴容的邏輯 ,看下這幾個極端情況的判斷:
//判斷是否有別的執行緒在擴容
if (sc < 0) {
//就是這里的判斷
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
//出現上述的情況,就跳出回圈
break;
}
再強調一下上面的邏輯是判斷是否幫助擴容,而第一個執行緒進transfer之后:
sizeCtl:低16位是2,高16位的最高位是1,剩下15位是陣列的長度,最高位是1,表示是負數
看下幫助擴容的邏輯:
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
判斷1:(sc >>> RESIZE_STAMP_SHIFT) != rs:
rs:低15位就是陣列長度,只不過第16位是1而已,大概率是10000000000xxxxx,高16位都是0,正數
RESIZE_STAMP_SHIFT:16
sc < 0,sc = sizeCtl(低16位是2,高16位的最高位是1,剩下15位是陣列的長度)
sc >>> 16:就回到了resizeStamp(n)的值,也就是得到高16位
rs = resizeStamp(n),理論上應該是相等的
這里判斷不相等,只可能是sc的高16位變化了,而高16代表了陣列的長度,意味著擴容成功了
判斷2:sc == rs + 1:
sc == rs + 1:因為sc是負數,rs是正數,不可能相等
判斷3:sc == rs + MAX_RESIZERS:
MAX_RESIZERS = 65535,sc是負數,rs是正數,MAX_RESIZERS 是正數,sc == rs + MAX_RESIZERS,不可能
關于這2點我想了很久,各種查資料,發現有大佬已經向JDK提出了這個Bug,且被收錄了,JDK bug link:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
上面2個判斷應該改成:
判斷2:sc == rs << 16 + 1:
rs << 16 + 1:左移16位之后,+1,那么低16位就是1,而sc的低16表示的擴容執行緒數
這里判斷相等,就是判斷的擴容是否結束擴容,第一個擴容執行緒的低16是+2,后面每結束一個是-1,最后應該是1
判斷3:sc == rs + MAX_RESIZERS:
跟判斷2的思路一樣,只不過這里是判斷擴容執行緒是否達到上限
判斷4:(nt = nextTable) == null:
nextTable只有是擴容的時候才有值, == null也表示擴容結束
判斷5:transferIndex <= 0:
transferIndex 只會 == 0,表示最后一個任務已經被領取了,具體實作在擴容方法里面會詳解
這幾個判斷屬實牛逼,Doug Lea牛逼,經常出現一個值表示多種狀態的,大佬就是大佬,
transfer():
好了,判斷結束了,終于到核心中的核心了,擴容,
同樣的,由于較為復雜,先說下大概思路,再show代碼:
- 分配任務
- 領取任務
- 真正擴容
- 擴容結束的善后
分配任務
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//用CPU數量去判斷,每個CPU擴容的陣列長度,最少16
//為什么是(n >>> 3) / NCPU,而不是直接n / NCPU呢?
//我覺得大概是因為(個人想法):
//1:真正擴容的執行緒一次不需要領取太多任務,給擴容執行緒一個機會,讓別的執行緒有機會幫助擴容
//2:盡可能的利用每個CPU,這樣put場景多的情況,就會分散壓力到每個CPU
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//剛開始擴容,nextTab == null
if (nextTab == null) {
try {
//擴容2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
//上面的代碼會報錯,原因只有2個
//1:n << 1結果過大,但是 < Integer.MAX_VALUE,導致OOM,堆空間溢位
//2:n << 1結果超出Integer.MAX_VALUE,導致結果是負數
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//這里transferIndex是老陣列的長度
transferIndex = n;
}
第一部分很簡單,就是把陣列分成幾組,最少16個一組,執行緒一次擴容一組,
領取任務
int nextn = nextTab.length;
//定義一個ForwardingNode,并把hash變成了MOVED
//當執行緒發現此下標的hash是MOVED,就知道了,這個下標正在擴容,會幫助擴容
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//標志位,用來標志這個執行緒需要處理的陣列的下標,通過--i,一直遍歷
boolean advance = true;
//標志位,用來退出
boolean finishing = false; // to ensure sweep before committing nextTab
//i:是遍歷的下標
//bound:表示當前執行緒需要處理的區間的最小下標(先找到最小下標,然后往后遍歷擴容)
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//判斷當前執行緒領取的任務有沒有做完呢,比如說領取了16個,此時的bound為剩余的任務數量
//回圈每次--去判斷,沒完成,就直接false跳出這個回圈,繼續去干活
if (--i >= bound || finishing)
advance = false;
//transferIndex == 0的時候,表示最后一份任務被領取了
//而走到這個else if說明--i < bound,表示最后一份任務完成了,就會進這個if
else if ((nextIndex = transferIndex) <= 0) {
//將 i 標志位 -1,下面有用
i = -1;
advance = false;
}
//用CAS去標志TRANSFERINDEX,成功即代表該執行緒領取任務成功
//并把TRANSFERINDEX更新成剩下的區間,最后一份任務是0,這里對應著上面進擴容邏輯判斷5
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//把bound改為剩余的任務數量
bound = nextBound;
//i = 剩余任務數量 -1,就是剩下陣列的下標最大值
i = nextIndex - 1;
advance = false;
}
}
第二部分看上去挺復雜,接下來也不難,就是遍歷陣列,然后死回圈的去用CAS更新transferIndex,更新成剩余任務,比如原來陣列64,任務每組16個,就把transferIndex更新成48,最后一份就是更新成0,
幾個判斷也都有詳細注釋,判斷了每組任務做完沒有?任務領完沒有?
真正擴容
//獲取該下標的值,用的getObjectVolatile,強制從主存中獲取屬性值
else if ((f = tabAt(tab, i)) == null)
//如果為null就用fwd標志當前下標
advance = casTabAt(tab, i, null, fwd);
//到這說明當前下標不是null,并且有執行緒處理過了
else if ((fh = f.hash) == MOVED)
advance = true;
else {
//鎖住下標的頭節點
synchronized (f) {
//二次判斷一下是否相等
if (tabAt(tab, i) == f) {
//如果不需要改變下標:ln和hn其中一個是null,一個是原來的鏈表
//如果需要改變下標:ln和hn其中一個是正反一半的鏈表,一個是反向鏈表
Node<K,V> ln, hn;
//判斷是不是鏈表
if (fh >= 0) {
//計算值,用來判斷是否需要改變下標,跟hashMap一樣
int runBit = fh & n;
//lastRun要么是頭節點,可以全部挪到新陣列
//要么是需要拆分的其中一個鏈表的尾節點
Node<K,V> lastRun = f;
//遍歷鏈表
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
//判斷是否有下標不一致的節點
if (b != runBit) {
runBit = b;
//此時的lastRun是另一個鏈表的頭節點
lastRun = p;
}
}
//為0則代表不需要改變下標
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//同樣是遍歷,但是多了個p != lastRun:用來判斷整個鏈表是否需要拆分的
//只有走到上面的for里面的if里面,才會不一樣
//反之就是一樣的,如果一樣的,這個for就不會走了
//這里就是為啥遍歷2次的原因(減少new Node的次數):
//判斷第一個for是否可以直接把鏈表挪到新陣列,就不需要new了
//需要變成2個鏈表,lastRun后面的同下標的節點不需要new了
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
//ln有值,那么這個鏈表就是一個反向鏈表,hn就是一個正向鏈表
ln = new Node<K,V>(ph, pk, pv, ln);
else
//同樣
hn = new Node<K,V>(ph, pk, pv, hn);
}
//ln掛到新陣列的原下標
setTabAt(nextTab, i, ln);
//hn掛到新陣列的原下標 + 老陣列長度,跟HashMap一樣
setTabAt(nextTab, i + n, hn);
//用fwd占位,進去遍歷下一個下標
setTabAt(tab, i, fwd);
advance = true;
}
//紅黑樹的擴容,跟HashMap一樣,其實也是復制的鏈表,Tree同樣維護了一份鏈表
else if (f instanceof TreeBin) {
//...省略掉
}
}
}
第三部分,其實真的走到擴容這一步,反而很簡單的,難的是上面的各種判斷,就是很簡單的遍歷鏈表(紅黑樹結構也是遍歷的鏈表),然后判斷每個節點是否要掛到新的下標,然后CAS掛到新下標,基本上跟跟HashMap一樣,
有一個小小的區別,也是一個優化(區別基本上都是優化):
這里遍歷了2次,hashmap也就遍歷了1次,其實遍歷1次就夠了,遍歷鏈表,分成2個鏈表,1個改變下標的鏈表,1個不改變的,為啥這里遍歷2次呢?
第一次for:
//遍歷鏈表
for (Node<K,V> p = f.next; p != null; p = p.next) {
//計算值
int b = p.hash & n;
//判斷是否不一致,不一致說明整個鏈表不能直接挪到新陣列,需要拆分挪到2個不一樣的下標
if (b != runBit) {
runBit = b;
//此時的lastRun是另一個鏈表的頭節點
lastRun = p;
}
}
第1個for,其實為了lastRun,
如果此下標的值都不需要改變下標,那么lastRun = f,f是頭節點,
那第2個for的判斷p != lastRun就不滿足,就不需要走第2個for了,
如果此下標的值有需要改變下標的,那么lastRun就是另一個鏈表的頭節點
打個比方:比如此時的陣列長度是64
有1,65,193,321,129,257,此時準備擴容,那么走完第1個for,此時的lastRun = 129 ,并且后面掛著257,也就是所有在129后面的不需要改變下標的節點,
有1,129,257,65,193,321,此時準備擴容,那么走完第1個for,此時的lastRun = 65 ,并且后面掛著193,321,也就是所有在65后面的不需要改變下標的節點,
第2個for:
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
//一開始ln有值,那么這個鏈表就是一個正反一半的鏈表,hn就是一個反向鏈表
ln = new Node<K,V>(ph, pk, pv, ln);
else
//同樣
hn = new Node<K,V>(ph, pk, pv, hn);
}
注意此時的p != lastRun滿足,就會走這個for,
假設是1,65,193,321,129,257,那么lastRun = 129
走完之后ln是1,129,257,hn是321,193,65,
假設是1,129,65,193,321,257,那么lastRun = 257
走完之后ln是129,1,257,hn是321,193,65,
一個是正反一半的鏈表,一個是反向鏈表,
仔細看這個for,從頭開始遍歷,每次指向前面的節點,應該全是反向節點,為啥會出現正反一半呢,因為p != lastRun,
遍歷到lastRun的時候,因為lastRun后面跟著原來在他后面的節點(跟lastRun下標一樣的),就不需要遍歷了,
這樣就少new了一些node,細想一下,真的超級牛叉,concurrentHashMap真的精華非常多,很多細小的點,都是極致性能優化,
`擴容結束的善后
//i < 0:只會是-1,表示當前執行緒是最后一份任務,并且完成了
//i >= n,i + n >= nextn說實話,這2個判斷真沒看明白,有大佬看到這里,麻煩指點一下
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//判斷擴容是否完成
if (finishing) {
nextTable = null;
table = nextTab;
//更新sizeCtl,也就是閾值,1.5倍(用的位運算,最大程度的提高性能)
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//如果 sc == 識別符號 + 1 (擴容結束了,不再有執行緒進行擴容)
//默認第一個執行緒設定sc的低16 +2,一個執行緒結束了,就會將 sc -1,最后就是1也就是rs +1
//把SIZECTL-1,表示擴容執行緒-1,注意sc = sizeCtl,是沒-1之前的
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判斷是不是最后一個擴容執行緒
//因為第一個執行緒擴容是SIZECTL+2,后面再來是SIZECTL+1,每個擴容執行緒退出時,SIZECTL-1,到最后一個(-1之前),應該是SIZECTL+2
//而(sc - 2)也就是(sizeCtl - 2),sizeCtl的低16位代表了擴容執行緒,如果最后一個那就是2,-2就是0
//resizeStamp(n) << RESIZE_STAMP_SHIFT的低16位全是0,要是不相等說明不是最后一個
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//到這說明是最后一個擴容執行緒,把finishing改為true,把i改成n
//i改成n 是為了再檢查一遍,是否全部擴容完畢,--i >= bound每次會滿足,然后再走下面的else if去一一校驗
//finishing改為true 是為了上面的一步全部檢查之后,就可以真正退出了
i = n; // recheck before commit
}
}
這里其實還是擴容之前的判斷,記住擴容的時候sizeCtl的高16表示陣列的長度,最高位是1,低16表示擴容的執行緒數+1,因為第一個執行緒擴容是+2,后面是每多一個執行緒就是+1,
想通了這個,這些判斷就很好理解了,Doug Lea牛逼,
長舒一口氣,擴容終于結束了,

helpTransfer():
好了,大家冷靜一下,回到put的第二部分的helpTransfer:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
哈哈,沒有一句注釋,其實這里跟準備擴容的判斷一模一樣,大家可以自己加注釋,復習一遍,
好了,整個put全結束了,精華太多了,本次就分享到這,還有一些細節,大家可以去看我的下篇文章:細節篇,下篇分析ThreadPoolExecutor的,或者大家也可以留言分析啥,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/233542.html
標籤:其他
上一篇:秋招結束,特來回饋各位大佬,阿里、騰訊、美團、滴滴、大廠面經總結
下一篇:微前端學習
