文章目錄
- 1. 無鎖解決執行緒安全問題
- 2. CAS與volatile
- 3. 原子整數
- 4. 原子參考
- 4.1 ABA問題
- 4.2 AtomicStampedReference
- 4.3 AtomicMarkableReference
- 5. 原子陣列
- 6. 原子欄位更新器
- 7. LongAdder原理
- 7.1 LongAdder 主要屬性
- 7.2 偽共享
- 7.3 累加
- 8. Unsafe
- 8.1 自己模擬實作原子整數
1. 無鎖解決執行緒安全問題
舉例:
package top.nnzi.concurrent.chapter02;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: 無鎖解決執行緒安全問題
* @Author: Aiguodala
* @CreateDate: 2021/4/9 22:55
*/
public class CASTest {
public static void main(String[] args) {
Account.demo(new AccountUnsafe(10000));
Account.demo(new AccountCas(10000));
}
}
interface Account {
Integer getBalance();
void withdraw(Integer amount);
/**
* 方法內會啟動 1000 個執行緒,每個執行緒做 -10 元 的操作 * 如果初始余額為 10000 那么正確的結果應當是 0
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
}
}
//執行緒不安全的做法
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return this.balance;
}
@Override
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
}
//執行緒安全的做法
class AccountCas implements Account {
//使用原子整數
private AtomicInteger balance;
public AccountCas(int balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
//得到原子整數的值
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while(true) {
//獲得修改前的值
int prev = balance.get();
//獲得修改后的值
int next = prev - amount;
//比較并設值
if(balance.compareAndSet(prev, next)) {
break;
}
}
}
}
2. CAS與volatile
前面看到的 AtomicInteger 的解決方法,內部并沒有用鎖來保護共享變數的執行緒安全,那么它是如何實作的呢?
其中的關鍵是 compareAndSwap(比較并設定值),它的簡稱就是 CAS (也有 Compare And Swap 的說法),它必須是原子操作,

作業流程
- 當一個執行緒要去修改Account物件中的值時,先獲取值pre(呼叫get方法),然后再將其設定為新的值next,在呼叫cas方法時,會將pre與Account中的余額進行比較,
- 如果兩者相等,就說明該值還未被其他執行緒修改,此時便可以進行修改操作,
- 如果兩者不相等,就不設定值,重新獲取值pre(呼叫get方法),然后再將其設定為新的值next(呼叫cas方法),直到修改成功為止,
注意
-
其實 CAS 的底層是 lock cmpxchg 指令(X86 架構),在單核 CPU 和多核 CPU 下都能夠保證【比較-交換】的原子性,
-
在多核狀態下,某個核執行到帶 lock 的指令時,CPU 會讓總線鎖住,當這個核把此指令執行完畢,再開啟總線,這個程序中不會被執行緒的調度機制所打斷,保證了多個執行緒對記憶體操作的準確性,是原子的,
volatile
- 獲取共享變數時,為了保證該變數的可見性,需要使用 volatile 修飾,
- 它可以用來修飾成員變數和靜態成員變數,他可以避免執行緒從自己的作業快取中查找變數的值,必須到主存中獲取 它的值,執行緒操作 volatile 變數都是直接操作主存,即一個執行緒對 volatile 變數的修改,對另一個執行緒可見,
注意
- volatile 僅僅保證了共享變數的可見性,讓其它執行緒能夠看到新值,但不能解決指令交錯問題(不能保證原子性)
- CAS 必須借助 volatile 才能讀取到共享變數的新值來實作【比較并交換】的效果
CAS特點
結合 CAS 和 volatile 可以實作無鎖并發,適用于執行緒數少、多核 CPU 的場景下,
- CAS 是基于樂觀鎖的思想:樂觀的估計,不怕別的執行緒來修改共享變數,就算改了也沒關系,我吃虧點再重試唄,
- synchronized 是基于悲觀鎖的思想:悲觀的估計,得防著其它執行緒來修改共享變數,我上了鎖你們都別想改,我改完了解開鎖,你們才有機會,
- CAS 體現的是無鎖并發、無阻塞并發,請仔細體會這兩句話的意思
- 因為沒有使用 synchronized,所以執行緒不會陷入阻塞,這是效率提升的因素之一
- 但如果競爭激烈,可以想到重試必然頻繁發生,反而效率會受影響
3. 原子整數
J.U.C 并發包提供了
- AtomicBoolean
- AtomicInteger
- AtomicLong
package top.nnzi.concurrent.chapter02;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description:
* @Author: Aiguodala
* @CreateDate: 2021/4/9 23:29
*/
public class AtomicTest {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
// 獲取并自增(i = 0, 結果 i = 1, 回傳 0),類似于 i++
System.out.println(i.getAndIncrement());
// 自增并獲取(i = 1, 結果 i = 2, 回傳 2),類似于 ++i
System.out.println(i.incrementAndGet());
// 自減并獲取(i = 2, 結果 i = 1, 回傳 1),類似于 --i
System.out.println(i.decrementAndGet());
// 獲取并自減(i = 1, 結果 i = 0, 回傳 1),類似于 i--
System.out.println(i.getAndDecrement());
// 獲取并加值(i = 0, 結果 i = 5, 回傳 0)
System.out.println(i.getAndAdd(5));
// 加值并獲取(i = 5, 結果 i = 0, 回傳 0)
System.out.println(i.addAndGet(-5));
// 獲取并更新(i = 0, p 為 i 的當前值, 結果 i = -2, 回傳 0)
// 其中函式中的操作能保證原子,但函式需要無副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并獲取(i = -2, p 為 i 的當前值, 結果 i = 0, 回傳 0)
// 其中函式中的操作能保證原子,但函式需要無副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 獲取并計算(i = 0, p 為 i 的當前值, x 為引數1, 結果 i = 10, 回傳 0)
// 其中函式中的操作能保證原子,但函式需要無副作用 // getAndUpdate 如果在 lambda 中參考了外部的區域變數,要保證該區域變數是 final 的
// getAndAccumulate 可以通過 引數1 來參考外部的區域變數,但因為其不在 lambda 中因此不必是
final int a = 0;
System.out.println(i.getAndAccumulate(a, (p, x) -> p + x));
// 計算并獲取(i = 10, p 為 i 的當前值, x 為引數1, 結果 i = 0, 回傳 0)
// 其中函式中的操作能保證原子,但函式需要無副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}
}
4. 原子參考
用于解決需要進行原子操作的資料不是基本型別
public interface DecimalAccount {
BigDecimal getBalance();
void withdraw(BigDecimal amount);
/**
* 方法內會啟動 1000 個執行緒,每個執行緒做 -10 元 的操作
* 如果初始余額為 10000 那么正確的結果應當是 0
*/
static void demo(DecimalAccountImpl account) {
List<Thread> ts = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
}
}
class DecimalAccountImpl implements DecimalAccount {
//原子參考,泛型型別為小數型別
AtomicReference<BigDecimal> balance;
public DecimalAccountImpl(BigDecimal balance) {
this.balance = new AtomicReference<BigDecimal>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
while(true) {
BigDecimal pre = balance.get();
BigDecimal next = pre.subtract(amount);
if(balance.compareAndSet(pre, next)) {
break;
}
}
}
public static void main(String[] args) {
DecimalAccount.demo(new DecimalAccountImpl(new BigDecimal("10000")));
}
}
4.1 ABA問題
public class Demo3 {
static AtomicReference<String> str = new AtomicReference<>("A");
public static void main(String[] args) {
new Thread(() -> {
String pre = str.get();
System.out.println("change");
try {
other();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//把str中的A改為C
System.out.println("change A->C " + str.compareAndSet(pre, "C"));
}).start();
}
static void other() throws InterruptedException {
new Thread(()-> {
System.out.println("change A->B " + str.compareAndSet("A", "B"));
}).start();
Thread.sleep(500);
new Thread(()-> {
System.out.println("change B->A " + str.compareAndSet("B", "A"));
}).start();
}
}
主執行緒僅能判斷出共享變數的值與初值 A 是否相同,不能感知到這種從 A 改為 B 又 改回 A 的情況,如果主執行緒希望:只要有其它執行緒【動過了】共享變數,那么自己的 cas 就算失敗,這時,僅比較值是不夠的,需要再加一個版本號
4.2 AtomicStampedReference
AtomicStampedReference 可以給原子參考加上版本號,追蹤原子參考整個的變化程序,如: A -> B -> A -> C ,通過AtomicStampedReference,我們可以知道,參考變數中途被更改了幾次,
public class Demo3 {
//指定版本號
static AtomicStampedReference<String> str = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) {
new Thread(() -> {
String pre = str.getReference();
//獲得版本號
int stamp = str.getStamp();
System.out.println("change");
try {
other();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//把str中的A改為C,并比對版本號,如果版本號相同,就執行替換,并讓版本號+1
System.out.println("change A->C stamp " + stamp + str.compareAndSet(pre, "C", stamp, stamp+1));
}).start();
}
static void other() throws InterruptedException {
new Thread(()-> {
int stamp = str.getStamp();
System.out.println("change A->B stamp " + stamp + str.compareAndSet("A", "B", stamp, stamp+1));
}).start();
Thread.sleep(500);
new Thread(()-> {
int stamp = str.getStamp();
System.out.println("change B->A stamp " + stamp + str.compareAndSet("B", "A", stamp, stamp+1));
}).start();
}
}
4.3 AtomicMarkableReference
但是有時候,并不關心參考變數更改了幾次,只是單純的關心是否更改過,所以就有了 AtomicMarkableReference
public class Demo4 {
//指定版本號
static AtomicMarkableReference<String> str = new AtomicMarkableReference<>("A", true);
public static void main(String[] args) {
new Thread(() -> {
String pre = str.getReference();
System.out.println("change");
try {
other();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//把str中的A改為C,并比對版本號,如果版本號相同,就執行替換,并讓版本號+1
System.out.println("change A->C mark " + str.compareAndSet(pre, "C", true, false));
}).start();
}
static void other() throws InterruptedException {
new Thread(() -> {
System.out.println("change A->A mark " + str.compareAndSet("A", "A", true, false));
}).start();
}
}
兩者的區別
-
AtomicStampedReference 需要我們傳入整型變數作為版本號,來判定是否被更改過
-
AtomicMarkableReference需要我們傳入布爾變數作為標記,來判斷是否被更改過
5. 原子陣列
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
6. 原子欄位更新器
- AtomicReferenceFieldUpdater // 域 欄位
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdate
public class Demo1 {
public static void main(String[] args) {
Student student = new Student();
// 獲得原子更新器
// 泛型
// 引數1 持有屬性的類 引數2 被更新的屬性的類
// newUpdater中的引數:第三個為屬性的名稱
AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
// 修改
updater.compareAndSet(student, null, "lby");
System.out.println(student);
}
}
class Student {
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
7. LongAdder原理
LongAdder 是原子累加器,提供了更快的性能,
7.1 LongAdder 主要屬性
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
- transient volatile Cell[] cells; 用于每個執行緒獨一份的累加單元陣列,懶惰初始化,
- transient volatile long base; 基礎值,如果沒有競爭就用CAS累加這個域
- transient volatile int cellsBusy; 在cells陣列創建或者擴容時,賦值為1 表示加鎖,
7.2 偽共享


因為 CPU 與 記憶體的速度差異很大,需要靠預讀資料至快取來提升效率,
而快取以快取行為單位,每個快取行對應著一塊記憶體,一般是 64 byte(8 個 long)
快取的加入會造成資料副本的產生,即同一份資料會快取在不同核心的快取行中
CPU 要保證資料的一致性,如果某個 CPU 核心更改了資料,其它 CPU 核心對應的整個快取行必須失效

因為 Cell 是陣列形式,在記憶體中是連續存盤的,一個 Cell 為 24 位元組(16 位元組的物件頭和 8 位元組的 value),因 此快取行可以存下 2 個的 Cell 物件,這樣問題來了:
- Core-0 要修改 Cell[0]
- Core-1 要修改 Cell[1]
無論誰修改成功,都會導致對方 Core 的快取行失效,
比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,這時會讓 Core-1 的快取行失效
@sun.misc.Contended 用來解決這個問題,它的原理是在使用此注解的物件或欄位的前后各增加 128 位元組大小的 padding(空白),從而讓 CPU 將物件預讀至快取時占用不同的快取行,這樣,不會造成對方快取行的失效

@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
7.3 累加
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}


累加完成之后最后對所有單元陣列求和
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
8. Unsafe
Unsafe 物件提供了非常底層的,操作記憶體、執行緒的方法,Unsafe 物件不能直接呼叫,只能通過反射獲得
import sun.misc.Unsafe;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
/**
* @Description:
* @Author: Aiguodala
* @CreateDate: 2021/4/10 14:54
*/
public class GetUnsafe {
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
// 通過反射獲得Unsafe物件
Class unsafeClass = Unsafe.class;
// 獲得建構式,Unsafe的建構式為私有的
Constructor constructor = unsafeClass.getDeclaredConstructor();
// 設定為允許訪問私有內容
constructor.setAccessible(true);
// 創建Unsafe物件
Unsafe unsafe = (Unsafe) constructor.newInstance();
// 創建Person物件
Person person = new Person();
// 獲得其屬性 name 的偏移量
Field field = Person.class.getDeclaredField("name");
long offset = unsafe.objectFieldOffset(field);
// 通過unsafe的CAS操作改變值
unsafe.compareAndSwapObject(person, offset, null, "AIguodala");
System.out.println(person);
}
}
class Person {
// 配合CAS操作,必須用volatile修飾
volatile String name;
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
'}';
}
}
8.1 自己模擬實作原子整數
package objectShare;
import sun.misc.Unsafe;
import java.lang.reflect.Constructor;
/**
* @Description:
* @Author: Aiguodala
* @CreateDate: 2021/4/10 16:02
*/
public class MyAtomicInteger {
private volatile int value;
private static final long valueOffset;
private static final Unsafe UNSAFE;
static {
Class unsafeClass = Unsafe.class;
try {
Constructor constructor = unsafeClass.getDeclaredConstructor();
constructor.setAccessible(true);
UNSAFE = (Unsafe) constructor.newInstance();
valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
}
}
public int getValue() {
return value;
}
public void decrement(int x) {
while (true) {
int pre = getValue();
int next = pre - x;
if (UNSAFE.compareAndSwapInt(this,value,pre,next)) {
break;
}
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/275108.html
標籤:其他
