主頁 > 後端開發 > 創建執行緒那么容易,為什么非要讓我使用執行緒池?(深深深入剖析)

創建執行緒那么容易,為什么非要讓我使用執行緒池?(深深深入剖析)

2020-10-01 08:12:16 後端開發

一、概述

1、問題

先看我們遇到的問題:我們創建執行緒的方式很簡單,new Thread(() -> {...}),就是因為這么簡單粗暴的方式,才帶來了致命的問題,首先執行緒的創建和銷毀都是很耗時很浪費性能的操作,你用執行緒為了什么?為了就是異步,為了就是提升性能,簡單的new三五個Thread還好,我需要一千個執行緒呢?你也for回圈new1000個Thread嗎?用完在銷毀掉,那這一千個執行緒的創建和銷毀的性能是很糟糕的!

2、解決

為了解決上述問題,執行緒池誕生了,執行緒池的核心思想就是:執行緒復用,也就是說執行緒用完后不銷毀,放到池子里等著新任務的到來,反復利用N個執行緒來執行所有新老任務,這帶來的開銷只會是那N個執行緒的創建,而不是每來一個請求都帶來一個執行緒的從生到死的程序,

二、執行緒池

1、概念

還說個雞兒,上面的問題解決方案已經很通俗易懂了,針對特級小白我在舉個生活的案例:

比如找作業面試,涉及到兩個角色:面試官、求職者,求職者成千上萬,每來一個求職者都要為其單獨新找一個面試官來面試嗎?顯然不是,公司都有面試官池子,比如:A、B、C你們三就是這公司的面試官了,有人來面試你們三輪流面就行了,可能不是很恰當,含義就是說我并不需要為每個請求(求職者)都單獨分配一個新的執行緒(面試官) ,而是我固定好幾個執行緒,由他們幾個來處理所有請求,不會反復創建銷毀,

2、引數

2.1、原始碼

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {}

 

2.2、解釋

  • corePoolSize:核心執行緒數

執行緒池在完成初始化之后,默認情況下,執行緒池中不會有任何執行緒,執行緒池會等有任務來的時候再去創建執行緒,核心執行緒創建出來后即使超出了執行緒保持的存活時間配置也不會銷毀,核心執行緒只要創建就永駐了,就等著新任務進來進行處理,

  • maximumPoolSize:最大執行緒數

核心執行緒忙不過來且任務存盤佇列滿了的情況下,還有新任務進來的話就會繼續開辟執行緒,但是也不是任意的開辟執行緒數量,執行緒數(包含核心執行緒)達到maximumPoolSize后就不會產生新執行緒了,就會執行拒絕策略,

  • keepAliveTime:執行緒保持的存活時間

如果執行緒池當前的執行緒數多于corePoolSize,那么如果多余的執行緒空閑時間超過keepAliveTime,那么這些多余的執行緒(超出核心執行緒數的那些執行緒)就會被回收,

  • unit:執行緒保持的存活時間單位

比如:TimeUnit.MILLISECONDSTimeUnit.SECONDS

  • workQueue:任務存盤佇列

核心執行緒數滿了后還有任務繼續提交到執行緒池的話,就先進入workQueue

workQueue通常情況下有如下選擇:

LinkedBlockingQueue:無界佇列,意味著無限制,其實是有限制,大小是int的最大值,也可以自定義大小,

ArrayBlockingQueue:有界佇列,可以自定義大小,到了閾值就開啟新執行緒(不會超過maximumPoolSize),

SynchronousQueueExecutors.newCachedThreadPool();默認使用的佇列,也不算是個佇列,他不沒有存盤元素的能力,

一般都采取LinkedBlockingQueue,因為他也可以設定大小,可以取代ArrayBlockingQueue有界佇列,

  • threadFactory:當執行緒池需要新的執行緒時,會用threadFactory來生成新的執行緒

默認采用的是DefaultThreadFactory,主要負責創建執行緒,newThread()方法,創建出來的執行緒都在同一個執行緒組且優先級也是一樣的,

  • handler:拒絕策略,任務量超出執行緒池的配置限制或執行shutdown還在繼續提交任務的話,會執行handler的邏輯,

默認采用的是AbortPolicy,遇到上面的情況,執行緒池將直接采取直接拒絕策略,也就是直接拋出例外,RejectedExecutionException

3、原理

3.1、原理

  • 執行緒池剛啟動的時候核心執行緒數為0

  • 丟任務給執行緒池的時候,執行緒池會新開啟執行緒來執行這個任務

  • 如果執行緒數小于corePoolSize,即使作業執行緒處于空閑狀態,也會創建一個新執行緒來執行新任務

  • 如果執行緒數大于或等于corePoolSize,則會將任務放到workQueue,也就是任務佇列

  • 如果任務佇列滿了,且執行緒數小于maximumPoolSize,則會創建一個新執行緒來運行任務

  • 如果任務佇列滿了,且執行緒數大于或等于maximumPoolSize,則直接采取拒絕策略

3.2、圖解

創建執行緒那么容易,為什么非要讓我使用執行緒池?(深深深入剖析)

3.3、舉例

執行緒池引數配置:核心執行緒5個,最大執行緒數10個,佇列長度為100,

那么執行緒池啟動的時候不會創建任何執行緒,假設請求進來6個,則會創建5個核心執行緒來處理五個請求,另一個沒被處理到的進入到佇列,這時候有進來99個請求,執行緒池發現核心執行緒滿了,佇列還在空著99個位置,所以會進入到佇列里99個,加上剛才的1個正好100個,這時候再次進來5個請求,執行緒池會再次開辟五個非核心執行緒來處理這五個請求,目前的情況是執行緒池里執行緒數是10個RUNNING狀態的,佇列里100個也滿了,如果這時候又進來1個請求,則直接走拒絕策略,

3.4、原始碼

public void execute(Runnable command) {
    int c = ctl.get();
    // workerCountOf(c):作業執行緒數
    // worker數量比核心執行緒數小,直接創建worker執行任務
    if (workerCountOf(c) < corePoolSize) {
        // addWorker里面負責創建執行緒且執行任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // worker數量超過核心執行緒數,任務直接進入佇列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 執行緒池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作,
        // 這兒為什么需要recheck,是因為任務入佇列前后,執行緒池的狀態可能會發生變化,
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 這兒為什么需要判斷0值,主要是在執行緒池構造方法中,核心執行緒數允許為0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果執行緒池不是運行狀態,或者任務進入佇列失敗,則嘗試創建worker執行任務,
    // 這兒有3點需要注意:
    // 1. 執行緒池不是運行狀態時,addWorker內部會判斷執行緒池狀態
    // 2. addWorker第2個引數表示是否創建核心執行緒
    // 3. addWorker回傳false,則說明任務執行失敗,需要執行reject操作
    else if (!addWorker(command, false))
        reject(command);
}

 

4、Executors

4.1、概念

首先這不是一個執行緒池,這是執行緒池的工具類,他能方便的為我們創建執行緒,但是阿里巴巴開發手冊上說明不推薦用Executors創建執行緒池,推薦自己定義執行緒池,這是因為Executors創建的任何一種執行緒池都可能引發血案,具體是什么問題下面會說,

4.2、固定執行緒數

4.2.1、描述

核心執行緒數和最大執行緒數是一樣的,所以稱之為固定執行緒數,

其他引數配置默認為:永不超時(0ms),無界佇列(LinkedBlockingQueue)、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),

4.2.2、api

Executors.newFixedThreadPool(n);

4.2.3、demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description: 創建2個執行緒來執行10個任務,
 *
 * @author TongWei.Chen 2020-07-09 21:28:34
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 10; i++) {
            // 從結果中可以發現執行緒name永遠都是兩個,不會有第三個,
            executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
    }
}

 

4.2.4、問題

問題就在于它是無界佇列,佇列里能放int的最大值個任務,并發巨高的情況下極大可能直接OOM了然后任務還在堆積,畢竟直接用的是jvm記憶體,所以建議自定義執行緒池,自己按照需求指定合適的佇列大小,自定義拒絕策略將超出佇列大小的任務放到對外記憶體做補償,比如Redis,別把業務系統壓垮就行,

4.2.5、原始碼

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
                // 核心執行緒數和最大執行緒數都是nThreads
                nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  // 無界佇列!!!致命問題的關鍵所在,
                                  new LinkedBlockingQueue<Runnable>());
}

 

4.3、單個執行緒

4.3.1、描述

核心執行緒數和最大執行緒數是1,內部默認的,不可更改,所以稱之為單執行緒數的執行緒池,

類似于Executors.newFixedThreadPool(1);

其他引數配置默認為:永不超時(0ms),無界佇列(LinkedBlockingQueue)、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),

4.3.2、api

Executors.newSingleThreadExecutor();

 

4.3.3、demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description: 創建1個執行緒來執行10個任務,
 *
 * @author TongWei.Chen 2020-07-09 21:28:34
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            // 從結果中可以發現執行緒name永遠都是pool-1-thread-1,不會有第二個出現,
            executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
    }
}

 

4.3.4、問題

同【4.2、固定執行緒數】的問題,都是無界佇列惹的禍,

4.3.5、原始碼

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(
        // 核心執行緒數和最大執行緒數都是1,寫死的,客戶端不可更改,
                 1, 1,
                                0L, TimeUnit.MILLISECONDS,
                 // 無界佇列!!!致命問題的關鍵所在,
                                new LinkedBlockingQueue<Runnable>()));
}

 

4.4、帶快取的執行緒池

4.4.1、描述

他的功能是來個任務我就開辟個執行緒去處理,不會進入佇列,SynchronousQueue佇列也不帶存盤元素的功能,那這意味著來一億個請求就會開辟一億個執行緒去處理,keepAliveTime為60S,意味著執行緒空閑時間超過60S就會被殺死;這就叫帶快取功能的執行緒池,

核心執行緒數是0,最大執行緒數是int的最大值,內部默認的,不可更改,

其他引數配置默認為:1min超時(60s),SynchronousQueue佇列、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),

4.4.2、api

Executors.newCachedThreadPool();

4.4.3、demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description: 創建個帶快取功能的執行緒池來執行10個任務,
 *
 * @author TongWei.Chen 2020-07-09 21:28:34
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            // 從結果中可以發現執行緒name有10個,也就是有幾個任務就會開辟幾個執行緒,
            executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
    }
}

 

4.4.4、問題

問題就在于他的最大執行緒數是int的最大值,因為他內部采取的佇列是SynchronousQueue,這個佇列沒有容納元素的能力,這將意味著只要來請求我就開啟執行緒去作業,巔峰期能創建二十幾億個執行緒出來作業,你自己想想多么可怕!!!

4.4.5、原始碼

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
                // 核心執行緒數是0,最大執行緒數都是Integer.MAX_VALUE,這個可致命了!!!
                0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

 

4.5、有調度功能的執行緒池

4.5.1、描述

RocketMQ內部大量采用了此種執行緒池來做心跳等任務,

核心執行緒數手動傳進來,最大執行緒數是Integer.MAX_VALUE,最大執行緒數是內部默認的,不可更改,

其他引數配置默認為:永不超時(0ns),帶延遲功能的佇列(DelayedWorkQueue)、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),

4.5.2、api

Executors.newScheduledThreadPool(n);

4.5.3、demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description: 創建個帶調度功能的執行緒池來執行任務,
 *
 * @author TongWei.Chen 2020-07-09 21:28:34
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        // 五秒一次
        scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName()), 5, TimeUnit.SECONDS);
        // 首次五秒后執行,其次每隔1s執行一次
        scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()), 5, 1, TimeUnit.SECONDS);
    }
}

 

4.5.4、問題

【同4.4、帶快取的執行緒池的問題】

問題就在于他的最大執行緒數是int的最大值,這將意味海量并發期能創建二十幾億個執行緒出來作業,你自己想想多么可怕!!!

4.5.5、原始碼

public ScheduledThreadPoolExecutor(int corePoolSize) {
    // 致命的問題跟newCachedThreadPool一樣,最大執行緒數能開到幾十億(Integer.MAX_VALUE)!!!
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

 

4.6、停止執行緒

4.6.1、shutdown

平緩的結束執行緒池,比如當前執行緒池還在執行任務,還沒執行完,這時候執行了shutdown的話,執行緒池并不會立即停止作業,而是會等待執行緒池中的任務都執行完成后才會shutdown掉,但是如果執行shutdown了,外界還在繼續提交任務到執行緒池,那么執行緒池會直接采取拒絕策略,

4.6.2、isShutdown

判斷執行緒池是否已經shutdown,

4.6.3、shutdownNow

暴力結束執行緒池,不管你當前執行緒池有沒有任務在執行,佇列里有沒有堆積訊息,我都直接讓執行緒池掛掉,但是他的回傳值是佇列里那些未被執行的任務,有需要的可以記錄下log啥的,

4.7、疑問

這幾種執行緒池為什么要采取不一樣的佇列?比如newFixedThreadPool為什么采取LinkedBlockingQueue,而newCachedThreadPool又為什么采取SynchronousQueue

因為newFixedThreadPool執行緒數量有限,他又不想丟失任務,只能采取無界佇列,而newCachedThreadPool的話本身自帶int最大值個執行緒數,所以沒必要用無界佇列,他的宗旨就是我有執行緒能處理,不需要佇列,

5、總結幾個問題

1、執行緒池的狀態

  • RUNNING:接受新任務并處理排隊任務,
  • SHUTDOWN:不接受新任務,但是會處理排隊任務,【見:停止執行緒的4.6.1、shutdown】
  • STOP:不接受新任務,也不處理排隊任務,并中端正在進行的任務,
  • TIDYING:所有任務都已經完事,作業執行緒為0的時候 ,執行緒會進入這個狀態并執行terminate()鉤子方法,
  • TERMINATED:terminate()鉤子方法運行完成,

2、執行緒池自動創建還是手動?

那肯定是手動了,因為Executors自動創建的那些執行緒池都存在致命的問題,手動創建執行緒池我們能自己控制執行緒數大小以及佇列大小,還可以指定組名稱等等個性化配置,重點不會出現致命問題,風險都把控在我們手里,

3、執行緒數多少合適?

  • CPU密集型(比如加密、各種復雜計算等):建議設定為CPU核數+1,
  • 耗時IO操作(比如讀寫資料庫,壓縮解壓縮大檔案等等):一般會設定CPU核數的2倍,當然也有個很牛X的計算公式:執行緒數=CPU核數 *(1+平均等待時間/平均作業時間)

4、before&after

在執行緒執行前后可以通過兩個方法來進行列印log或其他作業,

原始碼如下:

// 執行前的before
beforeExecute(wt, task);
Throwable thrown = null;
try {
    // 真正執行
    task.run();
} catch (RuntimeException x) {
    thrown = x; throw x;
} catch (Error x) {
    thrown = x; throw x;
} catch (Throwable x) {
    thrown = x; throw new Error(x);
} finally {
    // 執行完成后after
    afterExecute(task, thrown);
}

 

6、核心原始碼(全)

1、常用變數的解釋

// 1. `ctl`,可以看做一個int型別的數字,高3位表示執行緒池狀態,低29位表示worker數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`為32,所以`COUNT_BITS`為29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,執行緒池允許的最大執行緒數,1左移29位,然后減1,即為 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 4. 執行緒池有5種狀態,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 5. `runStateOf()`,獲取執行緒池狀態,通過按位與操作,低29位將全部變成0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 6. `workerCountOf()`,獲取執行緒池worker數量,通過按位與操作,高3位將全部變成0
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 7. `ctlOf()`,根據執行緒池狀態和執行緒池worker數量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
// 8. `runStateLessThan()`,執行緒池狀態小于xx
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// 9. `runStateAtLeast()`,執行緒池狀態大于等于xx
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

 

2、構造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 基本型別引數校驗
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 空指標校驗
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    // 根據傳入引數`unit`和`keepAliveTime`,將存活時間轉換為納秒存到變數`keepAliveTime `中
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

 

3、提交執行task的程序

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // worker數量比核心執行緒數小,直接創建worker執行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // worker數量超過核心執行緒數,任務直接進入佇列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 執行緒池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作,
        // 這兒為什么需要recheck,是因為任務入佇列前后,執行緒池的狀態可能會發生變化,
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 這兒為什么需要判斷0值,主要是在執行緒池構造方法中,核心執行緒數允許為0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果執行緒池不是運行狀態,或者任務進入佇列失敗,則嘗試創建worker執行任務,
    // 這兒有3點需要注意:
    // 1. 執行緒池不是運行狀態時,addWorker內部會判斷執行緒池狀態
    // 2. addWorker第2個引數表示是否創建核心執行緒
    // 3. addWorker回傳false,則說明任務執行失敗,需要執行reject操作
    else if (!addWorker(command, false))
        reject(command);
}

 

4、addworker原始碼決議

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外層自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 這個條件寫得比較難懂,我對其進行了調整,和下面的條件等價
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 執行緒池狀態大于SHUTDOWN時,直接回傳false
        // 2. 執行緒池狀態等于SHUTDOWN,且firstTask不為null,直接回傳false
        // 3. 執行緒池狀態等于SHUTDOWN,且佇列為空,直接回傳false
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 內層自旋
        for (;;) {
            int wc = workerCountOf(c);
            // worker數量超過容量,直接回傳false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS的方式增加worker數量,
            // 若增加成功,則直接跳出外層回圈進入到第二部分
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 執行緒池狀態發生變化,對外層回圈進行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // 其他情況,直接內層回圈進行自旋即可
            // else CAS failed due to workerCount change; retry inner loop
        } 
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // worker的添加必須是串行的,因此需要加鎖
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 這兒需要重新檢查執行緒池狀態
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker已經呼叫過了start()方法,則不再創建worker
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // worker創建并添加到workers成功
                    workers.add(w);
                    // 更新`largestPoolSize`變數
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 啟動worker執行緒
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // worker執行緒啟動失敗,說明執行緒池狀態發生了變化(關閉操作被執行),需要進行shutdown相關操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

 

5、執行緒池worker任務單元

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 這兒是Worker的關鍵所在,使用了執行緒工廠創建了一個執行緒,傳入的引數為當前worker
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // 省略代碼...
}

 

6、核心執行緒執行邏輯-runworker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 呼叫unlock()是為了讓外部可以中斷
    w.unlock(); // allow interrupts
    // 這個變數用于判斷是否進入過自旋(while回圈)
    boolean completedAbruptly = true;
    try {
        // 這兒是自旋
        // 1. 如果firstTask不為null,則執行firstTask;
        // 2. 如果firstTask為null,則呼叫getTask()從佇列獲取任務,
        // 3. 阻塞佇列的特性就是:當佇列為空時,當前執行緒會被阻塞等待
        while (task != null || (task = getTask()) != null) {
            // 這兒對worker進行加鎖,是為了達到下面的目的
            // 1. 降低鎖范圍,提升性能
            // 2. 保證每個worker執行的任務是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果執行緒池正在停止,則對當前執行緒進行中斷操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 執行任務,且在執行前后通過`beforeExecute()`和`afterExecute()`來擴展其功能,
            // 這兩個方法在當前類里面為空實作,
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 幫助gc
                task = null;
                // 已完成任務數加一 
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 自旋操作被退出,說明執行緒池正在結束
        processWorkerExit(w, completedAbruptly);
    }
}

 

7、自建執行緒池注意點

  • 阻塞任務佇列數
  • 執行緒池的名字,最好跟業務相關
  • 核心執行緒池大小,看業務實際情況,可以參考【執行緒數多少合適?】
  • 最大執行緒池大小,看業務實際情況,可以參考【執行緒數多少合適?】
  • 拒絕策略,我個人一般都是記錄log,如果主要的業務我會根據log做補償,

比如:

ThreadPoolExecutor executor = new ThreadPoolExecutor(CPU核數 + 1, 2 * CPU核數 + 1,
      5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
         // 執行緒池名字pay-account
          new DefaultThreadFactory("pay-account"), (r1, executor) -> {
         // 記錄log 重新入佇列做補償
 });

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/144564.html

標籤:Java

上一篇:c語言陣列長度

下一篇:scrapy自定義擴展(extensions)實作實時監控scrapy爬蟲的運行狀態

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more