文章目錄
- LongAdder簡介
- 原理
- 原始碼分析
- 主要內部類
- 主要屬性
- add(x)方法
- longAccumulate()方法
- sum()方法
- LongAdder VS AtomicLong
- 總結
LongAdder簡介
LongAdder是java8中新增的原子類,在多執行緒環境中,它比AtomicLong性能要高出不少,特別是寫多的場景
原理
LongAdder的原理是,在最初無競爭時,只更新base的值,當有多執行緒競爭時通過分段的思想,讓不同的執行緒更新不同的段,最后把這些段相加就得到了完整的LongAdder存盤的值,

原始碼分析
LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內部類和各重要屬性,
主要內部類
- 這是java8之后的版本對Cell有所改變 注解采用
jdk.internal.**

- java8版本
// Striped64中的內部類,使用@sun.misc.Contended注解,說明里面的值消除偽共享
@sun.misc.Contended static final class Cell {
// 存盤元素的值,使用volatile修飾保證可見性
volatile long value;
Cell(long x) { value = x; }
// CAS更新value的值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe 實體
private static final sun.misc.Unsafe UNSAFE;
// value欄位的偏移量
private static final long valueOffset;
// 靜態代碼塊初始化UNSAFE、valueOffset
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell類使用@sun.misc.Contended注解,說明是要避免偽共享的,
偽共享是什么?可以參考 偽共享
使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性,
主要屬性
- Striped64中的屬性
/**
* cells陣列,存盤各個段的值
*/
transient volatile Cell[] cells;
/**
* 最初無競爭時使用的,也算一個特殊的段
*/
transient volatile long base;
/**
* 標記當前是否有執行緒在創建或擴容cells,或者在創建Cell
* 通過CAS更新該值,相當于是一個鎖
*/
transient volatile int cellsBusy;
-
最初無競爭或有其它執行緒在創建cells陣列時使用base更新值,有過競爭時使用cells更新值,
-
最初無競爭是指一開始沒有執行緒之間的競爭,但也有可能是多執行緒在操作,只是這些執行緒沒有同時去更新base的值,
-
有過競爭是指只要出現過競爭不管后面有沒有競爭都使用cells更新值,規則是不同的執行緒hash到不同的cell上去更新,減少競爭,
add(x)方法
add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中存盤的值增加x,x可為正可為負,
public void add(long x) {
/**
* as: Striped64中的cells屬性
* b: Striped64中的base屬性
* v: 當前執行緒hash到的Cell中存盤的值
* m: cells的長度減1,hash時作為掩碼使用
* a: 當前執行緒hash到的Cell
*/
Cell[] as; long b, v; int m; Cell a;
/**
* 條件1:cells不為空,說明出現過競爭,cells已經創建
* 條件2:cas操作base失敗,說明其它執行緒先一步修改了base,正在出現競爭
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true表示當前競爭還不激烈、false表示競爭激烈,多個執行緒hash到同一個Cell,可能要擴容
boolean uncontended = true;
/**
* 條件1:cells為空,說明正在出現競爭,上面是從條件2過來的
* 條件2:應該不會出現
* 條件3:當前執行緒所在的Cell為空,說明當前執行緒還沒有更新過Cell,應初始化一個Cell
* 條件4:更新當前執行緒所在的Cell失敗,說明現在競爭很激烈,多個執行緒hash到了同一個Cell,應擴容
*/
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法回傳的是執行緒中的threadLocalRandomProbe欄位
// 它是通過亂數生成的一個值,對于一個確定的執行緒這個值是固定的
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 呼叫Striped64中的方法處理
longAccumulate(x, null, uncontended);
}
}
(1)最初無競爭時只更新base;
(2)直到更新base失敗時,創建cells陣列;
(3)當多個執行緒競爭同一個Cell比較激烈時,可能要擴容;
longAccumulate()方法
這個方法太長了,先看看其中一些變數
- base: 類似于AtomicLong中全域的value值,在沒有競爭情況下資料直接累加到base上,或者cells擴容時,也需要將資料寫入到base上
- collide:表示擴容意向,false 一定不會擴容,true可能會擴容,
- cellsBusy:初始化cells或者擴容cells需要獲取鎖, 0:表示無鎖狀態 1:表示其他執行緒已經持有了鎖
- casCellsBusy(): 通過CAS操作修改cellsBusy的值,CAS成功代表獲取鎖,回傳true
- NCPU:當前計算機CPU數量,Cell陣列擴容時會使用到
- getProbe(): 獲取當前執行緒的threadLocalRandomProbe欄位
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存盤執行緒的probe值
int h;
// 如果getProbe()方法回傳0,說明亂數未初始化
if ((h = getProbe()) == 0) {
// 強制初始化
ThreadLocalRandom.current(); // force initialization
// 重新獲取probe值
h = getProbe();
// 都未初始化,肯定還不存在競爭激烈
wasUncontended = true;
}
// 是否發生碰撞(也就是是否hash碰撞) collide:表示擴容意向,false 一定不會擴容,true可能會擴容,
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells已經初始化過
if ((as = cells) != null && (n = as.length) > 0) {
// 當前執行緒所在的Cell未初始化
if ((a = as[(n - 1) & h]) == null) {
// 當前無其它執行緒在創建或擴容cells,也沒有執行緒在創建Cell cellsBusy:初始化cells或者擴容cells需要獲取鎖, 0:表示無鎖狀態 1:表示其他執行緒已經持有了鎖
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一個Cell,值為當前需要增加的值
Cell r = new Cell(x); // Optimistically create
// 再次檢測cellsBusy,casCellsBusy()并嘗試cas更新它為1
// 相當于當前執行緒加鎖
if (cellsBusy == 0 && casCellsBusy()) {
// 是否創建成功
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 重新獲取cells,并找到當前執行緒hash到cells陣列中的位置
// 這里一定要重新獲取cells,因為as并不在鎖定范圍內
// 有可能已經擴容了,這里要重新獲取
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 把上面新建的Cell放在cells的j位置處
rs[j] = r;
// 創建成功
created = true;
}
} finally {
// 相當于釋放鎖
cellsBusy = 0;
}
// 創建成功了就回傳
// 值已經放在新建的Cell里面了
if (created)
break;
continue; // Slot is now non-empty
}
}
// 標記當前未出現沖突
collide = false;
}
// 當前執行緒所在的Cell不為空,且更新失敗了 也就是競爭比較激烈
// 這里簡單地設為true,相當于簡單地自旋一次,下次回圈用
// 通過下面的陳述句修改執行緒的probe再重新嘗試
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 再次嘗試CAS更新當前執行緒所在Cell的值,如果成功了就回傳
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 還是不成功就擴容
// 如果cells陣列的長度達到了CPU核心數,或者cells擴容了
// 設定collide為false并通過下面的陳述句修改執行緒的probe再重新嘗試
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 上上個elseif都更新失敗了,且上個條件不成立,說明出現沖突了
else if (!collide)
collide = true;
// 明確出現沖突了,嘗試占有鎖,并擴容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 檢查是否有其它執行緒已經擴容過了
if (cells == as) { // Expand table unless stale
// 新陣列為原陣列的兩倍
Cell[] rs = new Cell[n << 1];
// 把舊陣列元素拷貝到新陣列中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 重新賦值cells為新陣列
cells = rs;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 已解決沖突
collide = false;
// 使用擴容后的新陣列重新嘗試
continue; // Retry with expanded table
}
// 更新失敗或者達到了CPU核心數,重新生成probe,并重試
h = advanceProbe(h);
}
// 未初始化過cells陣列,嘗試占有鎖并初始化cells陣列
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 是否初始化成功
boolean init = false;
try {
// 檢測是否有其它執行緒初始化過
if (cells == as) {
// 新建一個大小為2的Cell陣列
Cell[] rs = new Cell[2];
// 找到當前執行緒hash到陣列中的位置并創建其對應的Cell
rs[h & 1] = new Cell(x);
// 賦值給cells陣列
cells = rs;
// 初始化成功
init = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 初始化成功直接回傳
// 因為增加的值已經同時創建到Cell中了
if (init)
break;
}
// 如果有其它執行緒在初始化cells陣列中,就嘗試更新base
// 如果成功了就回傳
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
(1)如果cells陣列未初始化,當前執行緒會嘗試占有cellsBusy鎖并創建cells陣列;
(2)如果當前執行緒嘗試創建cells陣列時,發現有其它執行緒已經在創建了,就嘗試更新base,如果成功就回傳;
(3)通過執行緒的probe值再與陣列長度-1相與找到當前執行緒應該更新cells陣列中的哪個Cell;
(4)如果當前執行緒所在的Cell未初始化,就占有占有cellsBusy鎖并在相應的位置創建一個Cell;
(5)嘗試CAS更新當前執行緒所在的Cell,如果成功就回傳,如果失敗說明出現沖突;
(5)當前執行緒更新Cell失敗后并不是立即擴容,而是嘗試更新probe值后再重試一次;
(6)如果在重試的時候還是更新失敗,就擴容;
(7)擴容時當前執行緒占有cellsBusy鎖,并把陣列容量擴大到兩倍,再遷移原cells陣列中元素到新陣列中;
(8)cellsBusy在創建cells陣列、創建Cell、擴容cells陣列三個地方用到;
(9)cells長度是2的冪次方數與陣列長度取模可以轉化為按位與運算,提升計算性能,(n - 1) & h這里類似HashMap,參考HashMap
sum()方法
sum()方法是獲取LongAdder中真正存盤的值的大小,通過把base和所有段相加得到,
public long sum() {
Cell[] as = cells; Cell a;
// sum的初始值為 base
long sum = base;
if (as != null) {
// 遍歷所有的Cell
for (int i = 0; i < as.length; ++i) {
// 所在的cell不為空就把次cell的value累加到sum中
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看到sum()方法是把base和所有段的值相加得到,這里有一個問題,如果前面已經累加到sum上的Cell的value,在sum方法還沒結束value就又修改了,就沒法計算到了,所以LongAdder可以說不是強一致性的,它是最終一致性的,
LongAdder VS AtomicLong
public static void main(String[] args) throws InterruptedException{
testLongAdderVSAtomicLong(1,10000000);
testLongAdderVSAtomicLong(20,10000000);
testLongAdderVSAtomicLong(40,10000000);
testLongAdderVSAtomicLong(60,10000000);
}
/**
* @param threadCount 執行緒數
* @param times 自增次數
*/
public static void testLongAdderVSAtomicLong(int threadCount, int times) throws InterruptedException{
System.out.println("執行緒數:"+threadCount+"自增次數:"+times);
long start1 = System.currentTimeMillis();
testLongAdder(threadCount, times);
long end1 = System.currentTimeMillis();
System.out.println("LongAdder的時間:" + (end1 - start1));
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
long end2 = System.currentTimeMillis();
System.out.println("AtomicLong的時間:" + (end2 - start2));
}
public static void testAtomicLong(int threadCount, int times) throws InterruptedException{
AtomicLong atomicLong = new AtomicLong();
ArrayList<Thread> list = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(new Runnable() {
@Override
public void run() {
for (int i1 = 0; i1 < times; i1++) {
atomicLong.incrementAndGet();
}
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
}
public static void testLongAdder(int threadCount, int times) throws InterruptedException {
LongAdder longAdder = new LongAdder();
ArrayList<Thread> list = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(new Runnable() {
@Override
public void run() {
for (int i1 = 0; i1 < times; i1++) {
longAdder.add(1);
}
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
}
輸出結果
執行緒數:1自增次數:10000000
LongAdder的時間:84
AtomicLong的時間:65
執行緒數:20自增次數:10000000
LongAdder的時間:263
AtomicLong的時間:3707
執行緒數:40自增次數:10000000
LongAdder的時間:411
AtomicLong的時間:7951
執行緒數:60自增次數:10000000
LongAdder的時間:581
AtomicLong的時間:9388
可以看到當只有一個執行緒的時候,AtomicLong反而性能更高,隨著執行緒越來越多,AtomicLong的性能急劇下降,而LongAdder的性能影響很小,
總結
(1)LongAdder通過base和cells陣列來存盤值;
(2)不同的執行緒會hash到不同的cell上去更新,減少了競爭;
(3)LongAdder的性能非常高,最侄訓達到一種無競爭的狀態;
在longAccumulate()方法中有個條件是
n>=NCPU就不會走到擴容邏輯了,而n是2的倍數,那是不是代表cells陣列最大只能達到大于等于NCPU的最小2次方?
確實,因為同一個CPU核心同時只會運行一個執行緒,而更新失敗了說明有兩個不同的核心更新了同一個Cell,這時會重新設定更新失敗的那個執行緒的probe值,這樣下一次它所在的Cell很大概率會發生改變,如果運行的時間足夠長,最侄訓出現同一個核心的所有執行緒都會hash到同一個Cell(大概率,但不一定全在一個Cell上)上去更新,所以,這里cells陣列中長度并不需要太長,達到CPU核心數足夠了
參考
小劉講原始碼
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/356105.html
標籤:java
