Java程式員都曾被問到的一個問題是:
- 為什么HashMap是執行緒不安全的?
- 為什么ConcurrentHashMap是執行緒安全的?
為什么HashMap是執行緒不安全的?
Fail-Fast 機制
- 如果在使用迭代器的程序中有其他執行緒修改了map,那么將拋出ConcurrentModificationException,這就是所謂fail-fast策略,
- 看原始碼的時候我們會看到,在ArrayList,LinkedList,HashMap的內部,有一個int型別的modCount物件,對上述集合內容的修改都將增加這個值,modCount會在迭代器Iterator中使用,在迭代器的建構式中,有這么一行代碼,expectedModCount = modCount,在nextEntity和remove方法的呼叫程序中,如果modCount != expectedModCount,拋出ConcurrentModificationException例外,
final Entry<K,V> nextEntry() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
public void remove() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
死回圈
- HashMap中有兩個重要的屬性,一個是容量,一個是加載因子,容量必須是2的n次方,這是為了保證key hash之后的值可以均勻的分散在陣列里,當前結點個數等于容量*加載因子之后,需要進行擴容,擴展為原來的二倍,這個程序稱為rehash,
- 因為hashmap是頭插入法,新的結點插入在陣列的頭結點,當執行緒1執行rehash到newTable[i] = e時,執行緒1被掛起,此時執行緒2開始執行rehash并完成,這樣會導致結點的回圈參考問題,
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {// table是老陣列
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}
為什么ConcurrentHashMap是執行緒安全的?
java 7中的結構

- ConcurrentHashMap的資料結構是由一個Segment陣列和多個HashEntry組成,Segment陣列的意義就是將一個大的table分割成多個小的table來進行加鎖,而每一個Segment元素存盤的是HashEntry陣列+鏈表,
- 在進行資料的定位時,會首先找到 segment, 然后在 segment 中定位 bucket,
- 如果多執行緒操作同一個 segment, 就會觸發 segment 的鎖 ReentrantLock, 這就是分段鎖的基本實作原理,
- Segment 繼承 ReentrantLock 鎖,用于存放陣列 HashEntry[],segment在初始化后不可以擴容,但是HashEntry可以擴容,
static final class Segment<K,V> extends ReentrantLock implements Serializable {
}
- 在插入資料的時候,首先需要找到segment,呼叫該segment的put方法,將新節點插入segment中,在segment中獲取鎖,如果獲取失敗,則走scanAndLockForPut方法,如果獲取成功,回傳,如果失敗,則在回圈中一直獲取,直到成功,
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 根據key的hash再次進行hash運算
int hash = hash(key.hashCode());
// 基于hash定位segment陣列的索引,
// hash值是int值,32bits,segmentShift=28,無符號右移28位,剩下高4位,其余補0,
// segmentMask=15,二進制低4位全部是1,所以j相當于hash右移后的低4位,
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 找到對應segment
s = ensureSegment(j);
// 呼叫該segment的put方法,將新節點插入segment中
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 是否獲取鎖,失敗自旋獲取鎖(直到成功)
// 拿到結點之后,對結點進行插入操作,
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
}
java 8中的結構

- Java8中不再使用segment,不再使用分段鎖,而是使用大的陣列,為了解決hash碰撞,在鏈表長度超過一定值(默認8)的情況下,將鏈表轉為紅黑樹實作,尋址的復雜度從O(n)轉換為Olog(n),
- 并發控制使用Synchronized和CAS來操作,
- ConcurrentHashMap的建構式是一個空函式,初始化是在put中實作的,
- Node中value和next都用volatile修飾,保證并發的可見性,value和next都用volatile修飾,保證并發的可見性,
悲觀鎖:可以完全保證資料的獨占性和正確性,因為每次請求都會先對資料進行加鎖, 然后進行資料操作,最后再解鎖,而加鎖釋放鎖的程序會造成消耗,所以性能不高;
樂觀鎖:假設沒有沖突發生,如果檢測到沖突,就失敗重試,直到成功,資料庫的樂觀鎖是通過版本控制來實作的,
- 在put的程序中,不斷的嘗試,因為在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject,這是一種樂觀鎖的實作方式,
- 對要加入的特定陣列的節點tab[i]加鎖,遍歷鏈表或紅黑樹,對資料插入
// 不參與序列化
transient volatile Node<K,V>[] table; // volatile保證執行緒間可見
// 記錄容器的容量大小,通過CAS更新
private transient volatile long baseCount;
/**
* 初始化和擴容控制引數,為負數時表示table正在被初始化或resize:-1(初始化),-(1+擴容執行緒數)
* sizeCtl默認值為0,大于0是擴容的閥值
*/
private transient volatile int sizeCtl;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// while(true)回圈,不斷的嘗試,因為在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果陣列(HashMap)還未初始化,進行初始化操作(CAS操作)
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 計算新插入資料在陣列(HashMap)中的位置,如果該位置上的節點為null,直接放入資料(CAS操作)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果該節點的hash值為MOVED,說明正在進行擴容操作或者已經擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//
else {
V oldVal = null;
synchronized (f) { // 對特定陣列節點tab[i]加鎖
if (tabAt(tab, i) == f) { // 判斷tab[i]是否有變化
if (fh >= 0) { // 插入操作
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 遍歷鏈表
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 如果新插入值和tab[i]處的hash值和key值一樣,進行替換
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 如果此節點為尾部節點,把此節點的next參考指向新資料節點
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 如果是一顆紅黑樹
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //如果陣列節點的鏈表長度超過限定長度,轉換為一顆紅黑樹
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/131888.html
標籤:其他
