文章目錄
- 悲觀鎖與樂觀鎖
- 悲觀鎖
- 樂觀鎖
- 原子操作類
- CAS
- 原始碼決議
- unsafe
- java8對于原子操作類的優化
- 偽共享
悲觀鎖與樂觀鎖
悲觀鎖
如果是mysql資料庫,利用for update關鍵字+事務,這樣的效果就是當A執行緒走到for update的時候,會把指定的記錄上鎖,然后B執行緒過來,就只能等待,A執行緒修改完資料之后,提交事務,鎖就被釋放了,這個時候B執行緒就可以繼續做自己的事情了,悲觀鎖往往是互斥的,這么做是相當影響性能的,
樂觀鎖
在資料表中加一個版本號的欄位:version,這個欄位不需要程式員手動維護,是資料庫主動維護的,每次修改資料,version都會發生修改,
當version現在是1:
- A執行緒進來,讀到的
version是1, - B執行緒進來,讀到的
version是1, - A執行緒執行了更新的操作:
update stu set name='codebar' where id=1 and version=1,成功,資料庫主動把version改成了2, - B執行緒執行了更新操作:
update stu set name='hello' where id=1 and version=1
,失敗,因為這個時候version的欄位已經不是1了,
悲觀鎖的典型代表是synchronized,樂觀鎖的典型代表是CAS,
原子操作類
在java中提供了很多的原子操作類,比如AtomicInteger,其中有一個自增的方法,
public class Main {
public static void main(String[] args) {
Thread[] threads = new Thread[20];
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 20; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.incrementAndGet();
}
});
threads[i].start();
}
join(threads);
System.out.println("x=" + atomicInteger.get());
}
private static void join(Thread[] threads) {
for (int i = 0; i < 20; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
運行結果為:
x=20000
這就是原子操作的神奇之處,在高并發的情況下,這種方法會比synchronized更有優勢,因為synchronized關鍵字會讓代碼串行化,失去了多執行緒的優勢,
例子:有一個需求,一個欄位的初始值為0,開三個執行緒:
- 一個執行緒執行:當
x=0時,x修改為100 - 一個執行緒執行:當
x=100時,x修改為50 - 一個執行緒執行:當
x=50時,x修改為60
public static void main(String[] args) {
AtomicInteger atomicInteger=new AtomicInteger();
new Thread(() -> {
if(!atomicInteger.compareAndSet(0,100)){
System.out.println("0-100:失敗");
}
}).start();
new Thread(() -> {
try {
Thread.sleep(500);注意這里睡了一會兒,目的是讓第三個執行緒先執行判斷的操作,從而讓第三個執行緒修改失敗
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!atomicInteger.compareAndSet(100,50)){
System.out.println("100-50:失敗");
}
}).start();
new Thread(() -> {
if(!atomicInteger.compareAndSet(50,60)){
System.out.println("50-60:失敗");
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
運行結果為:
50-60:失敗
在這里,呼叫的方法compareAndSet,首字母就是CAS,而且傳遞了兩個引數,這兩個引數是在原生CAS操作中必須要傳遞的,
CAS
CAS的全稱是Compare And Swap,即比較交換,呼叫原生CAS操作需要確定三個值:
- 要更新的欄位:有時被拆分為兩個引數:1. 實體 2. 偏移地址
- 預期值
- 新值
原始碼決議
首先,呼叫這個方法需要傳遞兩個引數,一個是預期值,一個是新值,這個預期值就是舊值,新值是我們希望修改的值,該方法的內部實作:
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
該方法內部呼叫了unsafe下的compareAndSwapInt方法,除了傳遞了我們傳到此方法的兩個引數之外,又傳遞了兩個引數,這兩個引數就是我們之前說的實體和偏移地址,this代表是當前類的實體,即AtomicInteger類的實體,偏移地址是我們需要修改的欄位在實體的哪個位置,
確定欄位的程序不是在java中做的,而是在更底層中做的,
偏移地址是在本類的靜態代碼中獲得的:
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
unsafe.objectFileldOffset接收得是Fileld型別的引數,得到的就是對應欄位的偏移地址了,這里就是獲得value欄位在本類,即AtomicInteger中的偏移地址,
value欄位的定義:
private volatile int value;
compareAndSwapInt和objectFieldOffset這兩個方法的寫法是JNI,會呼叫C或者C++,最終將對應的指定發送給CPU,這是可以保證原子性的,兩個方法的定義為:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public native long objectFieldOffset(Field var1);
可以看到這兩個方法被native標記了,
對compareAndSwapInt方法的形象解釋:
當我們執行compareAndSwapInt方法時,傳入10和100,java會和更底層進行通信:老鐵,我給你了欄位的所屬實體和偏移地址,你幫我看一下這個欄位的值是不是10,如果是10的話,你就改成100,并且回傳true,如果不是的話,不用修改,直接回傳false,
其中比較的程序是compare,修改值的程序就是swap,因為是把舊值替換成新值,所以稱該程序為CAS,
incrementAndGet的原始碼:
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//根據實體和偏移地址獲得對應的值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
incrementAndGet方法會呼叫到getAndAddInt方法,這里有三個引數:
- var1:實體
- var2:偏移地址
- var4:需要自增的值,這里是1
getAndAddInt方法內部有一個while回圈,回圈體內部根據實體和偏移地址獲得對應的值,這里先稱為A,再來看看while里面的判斷內容,JDK和更底層進行通訊:嘿!我把實體和偏移地址給你,你幫我看一下這個值是不是A,如果是的話,幫我修改為A+1,回傳true,如果不是的話,回傳false吧,
問題:為什么需要while回圈?
比如同時有兩個執行緒執行到了getIntVolatile方法,拿到的值都是10,其中執行緒A執行native方法,修改成功,但是執行緒B就修改失敗了,因為CAS操作是可以保證原子性的,所以執行緒B只能苦逼的再一次回圈,這一次拿到的是11,又去執行native方法,修改成功,
像這樣的while回圈,叫做CAS自旋,
如果并發很高,大量的執行緒在進行CAS自旋,就會很浪費CPU,在java8之后,對原子操作類進行了一定的優化,
unsafe
Unsafe是不安全的,Unsafe下面有比較多的方法:
objectFieldOffset:接收一個Field型別的引數,回傳偏移地址compareAndSwapInt:比較交換,接收四個引數:實體,偏移地址,預期值,新值getIntVolatile:獲得值,支持Volatile,接收兩個引數:實體,偏移地址
java8對于原子操作類的優化
上面有提到,在高并發之下,N多執行緒進行自旋競爭同一個欄位,這會給CPU造成一定的壓力,所以在java8中,提供了更為完善的原子操作類:LongAdder
優化方式:內部維護了一個陣列Cell[]和base,Cell[]里面維護了value,在出現競爭的時候,JDK會根據演算法,選擇一個Cell,對其中的value進行操作,如果還出現競爭,則換一個Cell再次嘗試,最終把Cell[]里面的value和base相加,得到最終的結果,
LongAdder類的UML圖:

add()方法:
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {//第一行
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||//第二行
(c = cs[getProbe() & m]) == null ||//第三行
!(uncontended = c.cas(v = c.value, v + x)))//第四行
longAccumulate(x, null, uncontended);//第五行
}
}
- 第一行:||判斷,前者判斷
cells是否不為空,后者判斷CAS是否不成功,
casBase的實作:就是呼叫compareAndSet方法,判斷是否成功:
(1)如果當前沒有競爭,回傳true;
(2)如果當前有競爭,有執行緒會回傳false,
final boolean casBase(long cmp, long val) {
return BASE.compareAndSet(this, cmp, val);
}
所以第一行代碼的意思是:如果cell[]已經被初始化了,或者有競爭,才會進入第二行代碼,如果沒有競爭,也沒有初始化,就不會進入到第二行代碼,
如果沒有競爭,只會對base進行操作,
- 第二行:||判斷,前者判斷cs是否為
null,后者判斷(cs的長度減1)是否大于0,這兩個判斷,都是判斷Cell[]是否初始化的,如果沒有初始化,會進入第五行代碼, - 第三行:如果
cell進行了初始化,通過getProbe()& m演算法得到一個數字,判斷cs[數字]是否為null,并且把cs[數字]賦值給了c,如果為null,會進入第五行代碼,
getProbe()函式:
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
}
private static final VarHandle THREAD_PROBE;
這個演算法是根據THREAD_PROBE算出來的,
- 第四行代碼:對c進行
CAS操作,看是否成功,并把回傳值賦值給uncontended,如果當前沒有競爭,就會成功,如果當前有競爭,就會失敗,在外面有一個取非符號,所以如果CAS失敗了,會進入第五行代碼,需要注意的是,這里已經對cell元素進行操作了, - 第五行代碼:具體實作:

有三個if:
- 判斷
cells是否被初始化了,如果被初始化了,進入這個if
該if中的具體內容:

第一個判斷:根據演算法,拿出cs[]中的一個元素,并且賦值給c,然后判斷是否為NULL,如果為NULL,進入這個if,其中的具體內容:
if (cellsBusy == 0) { // 如果cellsBusy==0,代表現在“不忙”,進入這個if
Cell r = new Cell(x); //創建一個Cell
if (cellsBusy == 0 && casCellsBusy()) {//再次判斷cellsBusy ==0,加鎖,這樣只有一個執行緒可以進入這個if
//把創建出來Cell元素加入到Cell[]
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;//代表現在“不忙”
}
continue; // Slot is now non-empty
}
}
collide = false;
初始化Cell[]的時候,其中一個元素是Null,這里對這個為空的元素初始化,也就是只有用到了這個元素,才去初始化,
第六個判斷:判斷cellsBusy是否為0,并且加鎖,如果成功,進入這個if,對Cell[]進行擴容,
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue;
擴容Cell[]的時候,利用CAS加了鎖,所以保證執行緒的安全性
最外面是一個for(;;)死回圈,只有break了,才終止回圈,
一開始collode為false,在第三個if中,對cell進行CAS操作,如果成功,就break了,所以我們需要假設它是失敗的,進入第四了if,第四個if中會判斷cell的長度是否大于CPU核心數,如果小于核心數,就會進入第五個判斷,這個時候collode為false,會進入這個if,把collode改為true,代表有沖突,然后跑到advanceProbe方法,生成一個新的THREAD_PROBE,再次回圈,
如果在第三個if中,CAS還是失敗,再次判斷Cell[]的長度是否大于核心數,如果小于核心數,會進入第五個判斷,這個時候collode為true,所以不會進入到第五個if中去了,這樣就進入了第六個判斷,進行擴容,
cell[]的擴容時機:當cell[]的長度小于CPU核心數,并且已經兩次Cell CAS失敗了,
第三個判斷:
final boolean casCellsBusy() {
return CELLSBUSY.compareAndSet(this, 0, 1);
}
cas設定CELLSBUSY為1,可以理解為加了個鎖,因為馬上就要初始化了,
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
初始化cell[],可以看到長度為2,根據演算法,對其中的一個元素進行初始化,也就是此時cell[]的長度為2,但是里面有一個元素還是Null,現在只是對其中一個元素進行了初始化,最終把cellsBusy修改成了0,代表現在不忙了,
當出現競爭,且cell[]還沒有被初始化的時候,會初始化cell[]
初始化的規則是創建長度為2的陣列,但是只會初始化其中的一個元素,另外一個元素為NULL,
在對cell[]進行初始化的時候,是利用CAS加了鎖,所以可以保證執行緒安全,
3. 如果上面都失敗了,對base進行CAS操作

Contended是用來解決偽共享的,
偽共享
CPU與記憶體的關系:
當CPU需要一個資料,先去快取中找,如果快取沒有,會去記憶體找,找到了,就把資料復制到快取中去,下次直接去快取中取出即可,
在快取中的資料,是以快取行的形式存盤的,就是一個快取行可能不止一個資料,假如一個快取行的大小是64位元組,CPU去記憶體中取資料,會把臨近的64位元組的資料都取出來,然后復制到快取中,對于單執行緒,這是一種優化,如果CPU需要A資料,把臨近的BCDE資料都從記憶體中取出來,并放入快取中,CPU如果再需要BCDE資料,就可以直接去快取中取了,但是在多執行緒下就有劣勢了,因為同一快取行的資料,同時只能被一個執行緒讀取,這就叫偽共享,有一個解決辦法是:如果快取行的大小是64位元組,可以加上一些冗余欄位來填充到64位元組,但是這種方式不夠優雅,所以在Java8中推出了@jdk.internal.vm.annotation.Contended注解,來解決偽共享問題,但是如果開發者想用這個注解,需要添加JVM引數,
感謝并參考:
https://mp.weixin.qq.com/s/GPEHpzQvAXNvH0i__Hok4Q
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/279939.html
標籤:其他
上一篇:微信加密與登錄驗證分析
