前言
在使用多執行緒并發編程的時,經常會遇到對共享變數修改操作,此時我們可以選擇ConcurrentHashMap,ConcurrentLinkedQueue來進行安全地存盤資料,但如果單單是涉及狀態的修改,執行緒執行順序問題,使用Atomic開頭的原子組件或者ReentrantLock、CyclicBarrier之類的同步組件,會是更好的選擇,下面將一一介紹它們的原理和用法
- 原子組件的實作原理CAS
- AtomicBoolean、AtomicIntegerArray等原子組件的用法、
- 同步組件的實作原理
- ReentrantLock、CyclicBarrier等同步組件的用法
關注公眾號,一起交流,微信搜一搜: 潛行前行
原子組件的實作原理CAS
- cas的底層實作可以看下之前寫的一篇文章:詳解鎖原理,synchronized、volatile+cas底層實作
應用場景
- 可用來實作變數、狀態在多執行緒下的原子性操作
- 可用于實作同步鎖(ReentrantLock)
原子組件
- 原子組件的原子性操作是靠使用cas來自旋操作volatile變數實作的
- volatile的型別變數保證變數被修改時,其他執行緒都能看到最新的值
- cas則保證value的修改操作是原子性的,不會被中斷
基本型別原子類
AtomicBoolean //布爾型別
AtomicInteger //正整型數型別
AtomicLong //長整型型別
- 使用示例
public static void main(String[] args) throws Exception {
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
//異步執行緒修改atomicBoolean
CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
try {
Thread.sleep(1000); //保證異步執行緒是在主執行緒之后修改atomicBoolean為false
atomicBoolean.set(false);
}catch (Exception e){
throw new RuntimeException(e);
}
});
atomicBoolean.set(true);
future.join();
System.out.println("boolean value is:"+atomicBoolean.get());
}
---------------輸出結果------------------
boolean value is:false
參考類原子類
AtomicReference
//加時間戳版本的參考類原子類
AtomicStampedReference
//相當于AtomicStampedReference,AtomicMarkableReference關心的是
//變數是否還是原來變數,中間被修改過也無所謂
AtomicMarkableReference
- AtomicReference的原始碼如下,它內部定義了一個
volatile V value,并借助VarHandle(具體子類是FieldInstanceReadWrite)實作原子操作,MethodHandles會幫忙計算value在類的偏移位置,最后在VarHandle呼叫Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)方法原子修改物件的屬性
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
private volatile V value;
....
ABA問題
- 執行緒X準備將變數的值從A改為B,然而這期間執行緒Y將變數的值從A改為C,然后再改為A;最后執行緒X檢測變數值是A,并置換為B,但實際上,A已經不再是原來的A了
- 解決方法,是把變數定為唯一型別,值可以加上版本號,或者時間戳,如加上版本號,執行緒Y的修改變為A1->B2->A3,此時執行緒X再更新則可以判斷出A1不等于A3
- AtomicStampedReference的實作和AtomicReference差不多,不過它原子修改的變數是
volatile Pair<V> pair;,Pair是其內部類,AtomicStampedReference可以用來解決ABA問題
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
- 如果我們不關心變數在中間程序是否被修改過,而只是關心當前變數是否還是原先的變數,則可以使用AtomicMarkableReference
- AtomicStampedReference的使用示例
public class Main {
public static void main(String[] args) throws Exception {
Test old = new Test("hello"), newTest = new Test("world");
AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1);
reference.compareAndSet(old, newTest,1,2);
System.out.println("物件:"+reference.getReference().name+";版本號:"+reference.getStamp());
}
}
class Test{
Test(String name){ this.name = name; }
public String name;
}
---------------輸出結果------------------
物件:world;版本號:2
陣列原子類
AtomicIntegerArray //整型陣列
AtomicLongArray //長整型陣列
AtomicReferenceArray //參考型別陣列
- 陣列原子類內部會初始一個final的陣列,它把整個陣列當做一個物件,然后根據下標index計演算法元素偏移量,再呼叫UNSAFE.compareAndSetReference進行原子操作,陣列并沒被volatile修飾,為了保證元素型別在不同執行緒的可見,獲取元素使用到了UNSAFE
public native Object getReferenceVolatile(Object o, long offset)方法來獲取實時的元素值 - 使用示例
//元素默認初始化為0
AtomicIntegerArray array = new AtomicIntegerArray(2);
// 下標為0的元素,期待值是0,更新值是1
array.compareAndSet(0,0,1);
System.out.println(array.get(0));
---------------輸出結果------------------
1
屬性原子類
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
- 如果操作物件是某一型別的屬性,可以使用AtomicIntegerFieldUpdater原子更新,不過類的屬性需要定義成volatile修飾的變數,保證該屬性在各個執行緒的可見性,否則會報錯
- 使用示例
public class Main {
public static void main(String[] args) {
AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
Test test = new Test("hello world");
fieldUpdater.compareAndSet(test,"hello world","siting");
System.out.println(fieldUpdater.get(test));
System.out.println(test.name);
}
}
class Test{
Test(String name){ this.name = name; }
public volatile String name;
}
---------------輸出結果------------------
siting
siting
累加器
Striped64
LongAccumulator
LongAdder
//accumulatorFunction:運算規則,identity:初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
- LongAccumulator和LongAdder都繼承于Striped64,Striped64的主要思想是和ConcurrentHashMap有點類似,分段計算,單個變數計算并發性能慢時,我們可以把數學運算分散在多個變數,而需要計算總值時,再一一累加起來
- LongAdder相當于LongAccumulator一個特例實作
- LongAccumulator的示例
public static void main(String[] args) throws Exception {
LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
for(int i=0;i<100000;i++){
CompletableFuture.runAsync(() -> accumulator.accumulate(1));
}
Thread.sleep(1000); //等待全部CompletableFuture執行緒執行完成,再獲取
System.out.println(accumulator.get());
}
---------------輸出結果------------------
100000
同步組件的實作原理
- java的多數同步組件會在內部維護一個狀態值,和原子組件一樣,修改狀態值時一般也是通過cas來實作,而狀態修改的維護作業被Doug Lea抽象出AbstractQueuedSynchronizer(AQS)來實作
- AQS的原理可以看下之前寫的一篇文章:詳解鎖原理,synchronized、volatile+cas底層實作
同步組件
ReentrantLock、ReentrantReadWriteLock
- ReentrantLock、ReentrantReadWriteLock都是基于AQS(AbstractQueuedSynchronizer)實作的,因為它們有公平鎖和非公平鎖的區分,因此沒直接繼承AQS,而是使用內部類去繼承,公平鎖和非公平鎖各自實作AQS,ReentrantLock、ReentrantReadWriteLock再借助內部類來實作同步
- ReentrantLock的使用示例
ReentrantLock lock = new ReentrantLock();
if(lock.tryLock()){
//業務邏輯
lock.unlock();
}
- ReentrantReadWriteLock的使用示例
public static void main(String[] args) throws Exception {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
if(lock.readLock().tryLock()){ //讀鎖
//業務邏輯
lock.readLock().unlock();
}
if(lock.writeLock().tryLock()){ //寫鎖
//業務邏輯
lock.writeLock().unlock();
}
}
Semaphore實作原理和使用場景
- Semaphore和ReentrantLock一樣,也有公平和非公平競爭鎖的策略,一樣也是通過內部類繼承AQS來實作同步
- 通俗解釋:假設有一口井,最多有三個人的位置打水,每有一個人打水,則需要占用一個位置,當三個位置全部占滿時,第四個人需要打水,則要等待前三個人中一個離開打水位,才能繼續獲取打水的位置
- 使用示例
public static void main(String[] args) throws Exception {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 3; i++)
CompletableFuture.runAsync(() -> {
try {
System.out.println(Thread.currentThread().toString() + " start ");
if(semaphore.tryAcquire(1)){
Thread.sleep(1000);
semaphore.release(1);
System.out.println(Thread.currentThread().toString() + " 無阻塞結束 ");
}else {
System.out.println(Thread.currentThread().toString() + " 被阻塞結束 ");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
//保證CompletableFuture 執行緒被執行,主執行緒再結束
Thread.sleep(2000);
}
---------------輸出結果------------------
Thread[ForkJoinPool.commonPool-worker-19,5,main] start
Thread[ForkJoinPool.commonPool-worker-5,5,main] start
Thread[ForkJoinPool.commonPool-worker-23,5,main] start
Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞結束
Thread[ForkJoinPool.commonPool-worker-5,5,main] 無阻塞結束
Thread[ForkJoinPool.commonPool-worker-19,5,main] 無阻塞結束
- 可以看出三個執行緒,因為信號量設定為2,第三個執行緒是無法獲取資訊成功的,會列印阻塞結束
CountDownLatch實作原理和使用場景
- CountDownLatch也是靠AQS實作的同步操作
- 通俗解釋:玩游戲時,假如主線任務需要靠完成五個小任務,主線任務才能繼續進行時,此時可以用CountDownLatch,主線任務阻塞等待,每完成一小任務,就done一次計數,直到五個小任務全部被執行才能觸發主線
- 使用示例
public static void main(String[] args) throws Exception {
CountDownLatch count = new CountDownLatch(2);
for (int i = 0; i < 2; i++)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println(" CompletableFuture over ");
count.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
//等待CompletableFuture執行緒的完成
count.await();
System.out.println(" main over ");
}
---------------輸出結果------------------
CompletableFuture over
CompletableFuture over
main over
CyclicBarrier實作原理和使用場景
- CyclicBarrier則是靠
ReentrantLock lock和Condition trip屬性來實作同步 - 通俗解釋:CyclicBarrier需要阻塞全部執行緒到await狀態,然后全部執行緒再全部被喚醒執行,想象有一個欄桿攔住五只羊,需要當五只羊一起站在欄桿時,欄桿才會被拉起,此時所有的羊都可以飛跑出羊圈
- 使用示例
public static void main(String[] args) throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
CompletableFuture.runAsync(()->{
try {
System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
barrier.await(); //需要等待main執行緒也執行到await狀態才能繼續執行
System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
}catch (Exception e){
throw new RuntimeException(e);
}
});
Thread.sleep(1000);
//和CompletableFuture執行緒相互等待
barrier.await();
System.out.println("main run over!");
}
---------------輸出結果------------------
CompletableFuture run start-1609822588881
main run over!
CompletableFuture run over-1609822589880
StampedLock
- StampedLock不是借助AQS,而是自己內部維護多個狀態值,并配合cas實作的
- StampedLock具有三種模式:寫模式、讀模式、樂觀讀模式
- StampedLock的讀寫鎖可以相互轉換
//獲取讀鎖,自旋獲取,回傳一個戳值
public long readLock()
//嘗試加讀鎖,不成功回傳0
public long tryReadLock()
//解鎖
public void unlockRead(long stamp)
//獲取寫鎖,自旋獲取,回傳一個戳值
public long writeLock()
//嘗試加寫鎖,不成功回傳0
public long tryWriteLock()
//解鎖
public void unlockWrite(long stamp)
//嘗試樂觀讀讀取一個時間戳,并配合validate方法校驗時間戳的有效性
public long tryOptimisticRead()
//驗證stamp是否有效
public boolean validate(long stamp)
- 使用示例
public static void main(String[] args) throws Exception {
StampedLock stampedLock = new StampedLock();
long stamp = stampedLock.tryOptimisticRead();
//判斷版本號是否生效
if (!stampedLock.validate(stamp)) {
//獲取讀鎖,會空轉
stamp = stampedLock.readLock();
long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
if (writeStamp != 0) { //成功轉為寫鎖
//fixme 業務操作
stampedLock.unlockWrite(writeStamp);
} else {
stampedLock.unlockRead(stamp);
//嘗試獲取寫讀
stamp = stampedLock.tryWriteLock();
if (stamp != 0) {
//fixme 業務操作
stampedLock.unlockWrite(writeStamp);
}
}
}
}
歡迎指正文中錯誤
參考文章
- 并發之Striped64(l累加器)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/245660.html
標籤:java
上一篇:化堆疊為隊Java版(力扣)
下一篇:實作自己的RPC框架(學習筆記)
