面試官聽我講完ConcurrentHashMap執行緒安全直呼要為我點贊
前言
這是Homer與面試官系列第二篇了,周五那天是元宵,準備一下班就奔赴樂山吃個特色炸串串,再去峨眉山泡個溫泉放松一下,結果專案經理說周六可能要加班,心都一下涼了半截,最后挑選了兩位幸運同事加班,hhh,不過悲催的是去了峨眉山山腳,溫泉已經停止營業了,一萬個無語,,,今天一回成都,就開始創作,第二篇文章homer還是得認真寫,畢竟上一篇還識訓了好幾個粉絲,hhhh,謝謝大家支持,
開場
面試開始,面試大叔拖著沉重的腳步進入會議室,小伙子上次HashMap講的還不錯,也說了存在執行緒安全問題,可能你造成環結構和資料丟失,談談怎么解決呢?
哎呀,一上來就問的這么直接,都不寒暄幾句嗎?那我開始吧,Java中有HashTable、Collections.synchronizedMap、以及ConcurrentHashMap可以實作執行緒安全的Map,
那你先說說Collections.synchronizedMap如何執行緒安全呢?
Collections.synchronizedMap是在內部維護一個Map物件,由創建時傳入Map物件,
public static void main(String[] args) {
HashMap map = new HashMap(1);
Collections.synchronizedMap(map);
}
同時還維護有一個排斥鎖mutex,可以看到有兩個構造器,如果傳入了mutex引數,則將物件排斥鎖賦值為傳入的物件,如果未傳入則是當前this物件,
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // 維護的map物件
final Object mutex; // 排斥鎖
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
這樣就執行緒安全了???
不不不,不是的,你別急啊,你等我慢慢說吧!
行,你繼續
創建出synchronizedMap之后,再操作map的時候,就會對方法內的代碼塊上鎖,以傳入物件或者this為鎖,
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
這么冷門的玩意兒你也回答的清楚,那你再說說HashTable呢?
跟HashMap相比Hashtable是執行緒安全的,可以在多執行緒的情況下使用,HashTable為方法都加了synchronized,保證其執行緒安全,
public synchronized int size() { return count; }
@SuppressWarnings("unchecked")
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}
既然HashTable執行緒安全,為什么不直接使用這個替代了HashMap?
因為HashTable是各處加鎖的,在執行緒對資料結構進行操作時候,會損耗一定的效率,雖然JDK對synchronized做了很大程度的優化,但效率依舊是低于HashMap的,
他們除了這些,還有啥不同呢?
Hashtable 是不允許鍵或值為 null 的,HashMap 的鍵值則都可以為 null,當然HashMap只能有一個鍵為null,Hashtable在我們put 空值的時候會直接拋空指標例外,但是HashMap卻做了特殊處理,
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
HashMap中,null可以作為鍵,這樣的鍵只有一個;可以有一個或多個鍵所對應的值為null,當get()方法回傳null值時,可能是 HashMap中沒有該鍵,也可能使該鍵所對應的值為null,因此,在HashMap中不能由get()方法來判斷HashMap中是否存在某個鍵, 而應該用containsKey()方法來判斷,
Hashtable使用的是安全失敗機制(fail-safe),這種機制會使你此次讀到的資料不一定是最新的資料,
嗯哼,講講什么是安全失敗機制?
fail-safe是一種基于容器的一個克隆的遍歷方式,因此,對容器內容的修改不影響遍歷,java.util.concurrent包下的容器都是安全失敗的,可以在多執行緒下并發使用,并發修改,
采用安全失敗機制的集合容器,在遍歷時不是直接在集合內容上訪問的,而是先復制原有集合內容,在拷貝的集合上進行遍歷,由于迭代時是對原集合的拷貝進行遍歷,所以在遍歷程序中對原集合所作的修改并不能被迭代器檢測到,所以不會觸發Concurrent Modification Exception,
基于拷貝內容的優點是避免了Concurrent Modification Exception,但同樣地,迭代器并不能訪問到修改后的內容,即:迭代器遍歷的是開始遍歷那一刻拿到的集合拷貝,在遍歷期間原集合發生的修改迭代器是不知道的,
還有啥不同呢?
- 繼承的父類不同:父類也是不一樣的,HashMap是繼承自AbstractMap類,而HashTable是繼承自Dictionary類,不過它們都實作了同時實作了map、Cloneable(可復制)、Serializable(可序列化)這三個介面,Dictionary類是一個已經被廢棄的類(見其原始碼中的注釋),父類都被廢棄,自然而然也沒人用它的子類Hashtable了,
- 對外提供的介面不同:Hashtable比HashMap多提供了elments() 和contains() 兩個方法,(了解即可)
- 遍歷方式的內部實作上不同:HashMap都使用了 Iterator,Hashtable還使用了Enumeration的方式 ,HashMap的Iterator是fail-fast迭代器,當有其它執行緒改變了HashMap的結構(增加,洗掉,修改元素),將會拋出ConcurrentModificationException,不過,通過Iterator的remove()方法移除元素則不會拋出ConcurrentModificationException例外,但這并不是一個一定發生的行為,要看JVM,
- 初始容量大小和每次擴充容量大小的不同:Hashtable默認的初始大小為11,之后每次擴充,容量變為原來的2n+1,HashMap默認的初始化大小為16,之后每次擴充,容量變為原來的2倍,創建時,如果給定了容量初始值,那么Hashtable會直接使用你給定的大小,而HashMap會將其擴充為2的冪次方大小,也就是說Hashtable會盡量使用素數、奇數,而HashMap則總是使用2的冪作為哈希表的大小,Hashtable的側重點是哈希的結果更加均勻,使得哈希沖突減少,當哈希表的大小為素數時,簡單的取模哈希的結果會更加均勻,而HashMap則更加關注hash的計算效率問題,在取模計算時,如果模數是2的冪,那么我們可以直接使用位運算來得到結果,效率要大大高于做除法,HashMap為了加快hash的速度,將哈希表的大小固定為了2的冪,當然這引入了哈希分布不均勻的問題,所以HashMap為解決這問題,又對hash演算法做了一些改動,
那JDK8后HashTable有啥改變嗎?
哦哦哦,是這樣的,HashTable在1.8以后采用了快速失敗機制(fail-fast),同HashMap,使用一個引數modCount來確保資料安全性,modCount的使用類似于并發編程中的CAS(Compare and Swap)技術,每次在發生增刪改的時候都會出現modCount++的動作,而modcount可以理解為是當前hashtable的狀態,每發生一次操作,狀態就向前走一步,設定這個狀態,主要是由于hashtable等容器類在迭代時,判斷資料是否過時時使用的,一旦在迭代的程序中狀態發生了改變,則會快速拋出一個例外,終止迭代行為,
那你既然說HashTable涉及到效率問題,那有啥解決方案?
我們在開發程序中都是使用ConcurrentHashMap,他的并發的相比前兩者好很多,在1.7中資料結構如下:

是由 Segment 陣列、HashEntry 組成,和 HashMap 一樣,仍然是陣列加鏈表,Segment 是 ConcurrentHashMap 的一個內部類,主要的組成如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
// 和 HashMap 中的 HashEntry 作用一樣,真正存放資料的桶
transient volatile HashEntry<K,V>[] table;
transient int count;
// 記得快速失敗(fail—fast)么?
transient int modCount;
// 大小
transient int threshold;
// 負載因子
final float loadFactor;
}
這種結構怎么實作高并發嗎?
ConcurrentHashMap 采用了分段鎖技術,其中 Segment 繼承于 ReentrantLock,不會像 HashTable 那樣不管是 put 還是 get 操作都需要做同步處理,理論上 ConcurrentHashMap 支持 CurrencyLevel (Segment 陣列數量)的執行緒并發,每當一個執行緒占用鎖訪問一個 Segment 時,不會影響到其他的 Segment,就是說如果容量大小是16他的并發度就是16,可以同時允許16個執行緒操作16個Segment,而且還是執行緒安全的,
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();//這就是為啥他不可以put null值的原因
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
先定位到哪個Segment,然后再進行put操作,
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 將當前 Segment 中的 table 通過 key 的 hashcode 定位到 HashEntry
//這里說明一下,tryLock 和 lock 是 ReentrantLock 中的方法,
//區別是 tryLock 不會阻塞,搶鎖成功就回傳true,失敗就立馬回傳false,
//而 lock 方法是,搶鎖成功則回傳,失敗則會進入同步佇列,阻塞等待獲取鎖,
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 遍歷該 HashEntry,如果不為空則判斷傳入的 key 和當前遍歷的 key 是否相等,相等則覆寫舊的 value,
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 不為空則需要新建一個 HashEntry 并加入到 Segment 中,同時會先判斷是否需要擴容,
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//釋放鎖
unlock();
}
return oldValue;
}
首先第一步的時候會嘗試獲取鎖,如果獲取失敗肯定就有其他執行緒存在競爭,則利用 scanAndLockForPut() 自旋獲取鎖,
- 嘗試自旋獲取鎖,
- 如果重試的次數達到了 MAX_SCAN_RETRIES 則改為阻塞鎖獲取,保證能獲取成功,
那是如何擴容的呢?
當 put 方法時,發現元素個數超過了閾值,則會擴容,需要注意的是,每個Segment只管它自己的擴容,互相之間并不影響,換句話說,可以出現這個 Segment的長度為2,另一個Segment的長度為4的情況(只要是2的n次冪),
那get呢?
get 邏輯比較簡單,只需要將 Key 通過 Hash 之后定位到具體的 Segment ,再通過一次 Hash 定位到具體的元素上,
由于 HashEntry 中的 value 屬性是用 volatile 關鍵詞修飾的,保證了記憶體可見性,所以每次獲取時都是最新值,
ConcurrentHashMap 的 get 方法是非常高效的,因為整個程序都不需要加鎖,
那1.7中采用Segment,但是還是存在一些問題?
是的,因為基本上還是陣列加鏈表的方式,我們去查詢的時候,還得遍歷鏈表,會導致效率很低,這個跟jdk1.7的HashMap是存在的一樣問題,所以他在jdk1.8完全優化了,
那你跟我談談1.8中的呢?
其中拋棄了原有的 Segment 分段鎖,而采用了 CAS + synchronized 來保證并發安全性,
跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不變,把值和next采用了volatile去修飾,保證了可見性,并且也引入了紅黑樹,在鏈表大于一定值的時候會轉換(默認是8),鎖的粒度降低了,并且,用的是 Synchronized 鎖,
不是都說同步鎖是重量級鎖嗎,這樣不是會影響并發效率嗎?
確實之前同步鎖是一個重量級鎖,但是在 JDK1.6 之后進行了各種優化之后,它已經不再那么重了,引入了偏向鎖,輕量級鎖,以及鎖升級的概念,而且,據說在更細粒度的代碼層面上,同步鎖已經可以媲美 Lock 鎖,甚至是趕超了,具體下一次面試再詳聊吧,
那是如何執行緒安全呢?
ConcurrentHashMap在進行put操作的還是比較復雜的,大致可以分為以下步驟:
- 根據 key 計算出 hashcode ,
- 判斷是否需要進行初始化,
- 即為當前 key 定位出的 Node,如果為空表示當前位置可以寫入資料,利用 CAS 嘗試寫入,失敗則自旋保證成功,
- 如果當前位置的 hashcode == MOVED == -1,則需要進行擴容,
- 如果都不滿足,則利用 synchronized 鎖寫入資料,
- 如果數量大于 TREEIFY_THRESHOLD 則要轉換為紅黑樹,
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//可以看到,在并發情況下,key 和 value 都是不支持為空的,
if (key == null || value == null) throw new NullPointerException();
//這里和1.8 HashMap 的hash 方法大同小異,只是多了一個操作,如下
//( h ^ (h >>> 16)) & HASH_BITS; HASH_BITS = 0x7fffffff;
// 0x7fffffff ,二進制為 0111 1111 1111 1111 1111 1111 1111 1111 ,
//所以,hash值除了做了高低位異或運算,還多了一步,保證最高位的 1 個 bit 位總是0,
int hash = spread(key.hashCode());
//用來計算當前鏈表上的元素個數
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果表為空,則說明還未初始化,
if (tab == null || (n = tab.length) == 0)
//初始化表,只有一個執行緒可以初始化成功,
tab = initTable();
//若表已經初始化,則找到當前 key 所在的桶,并且判斷是否為空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//若當前桶為空,則通過 CAS 原子操作,把新節點插入到此位置,
//這保證了只有一個執行緒可以 CAS 成功,其它執行緒都會失敗,
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//若所在桶不為空,則判斷節點的 hash 值是否為 MOVED(值是-1)
else if ((fh = f.hash) == MOVED)
//若為-1,說明當前陣列正在進行擴容,則需要當前執行緒幫忙遷移資料
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//這里用加同步鎖的方式,來保證執行緒安全,給桶中第一個節點物件加鎖
synchronized (f) {
//recheck 一下,保證當前桶的第一個節點無變化,后邊很多這樣類似的操作,不再贅述
if (tabAt(tab, i) == f) {
//如果hash值大于等于0,說明是正常的鏈表結構
if (fh >= 0) {
binCount = 1;
//從頭結點開始遍歷,每遍歷一次,binCount計數加1
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果找到了和當前 key 相同的節點,則用新值替換舊值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//若遍歷到了尾結點,則把新節點尾插進去
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//否則判斷是否是樹節點,這里提一下,TreeBin只是頭結點對TreeNode的再封裝
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//注意下,這個判斷是在同步鎖外部,因為 treeifyBin內部也有同步鎖,并不影響
if (binCount != 0) {
//如果節點個數大于等于 8,則轉化為紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//把舊節點值回傳
if (oldVal != null)
return oldVal;
break;
}
}
}
//給元素個數加 1,并有可能會觸發擴容,比較復雜,稍后細講
addCount(1L, binCount);
return null;
}
ConcurrentHashMap的get操作又是怎么樣子的呢?
- 根據計算出來的 hashcode 尋址,如果就在桶上那么直接回傳值,
- 如果是紅黑樹那就按照樹的方式獲取值,
- 就不滿足那就按照鏈表的方式遍歷獲取值,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/265350.html
標籤:其他
上一篇:開源資產/漏洞管理平臺使用測評
下一篇:單例模式的詳解與實作(終極)
