主頁 >  其他 > Java 深入理解執行緒池

Java 深入理解執行緒池

2021-04-15 12:11:12 其他

文章目錄

  • 一、Java 中的執行緒池
    • 1. 執行緒池狀態
    • 2. 執行緒池主要屬性引數
    • 3. 執行緒池的實作原理
      • 3.1 ThreadPoolExecutor 執行緒池主要處理流程
      • 3.2 執行緒池方法決議
    • 4. 合理地配置執行緒池
    • 5. 執行緒池的監控
  • 二、手寫執行緒池
    • 1. 實作阻塞佇列
    • 2. 實作執行緒池
    • 3. 測驗
    • 4. 拒絕策略
      • 4.1 帶超時的添加任務
      • 4.2 拒絕策略(策略模式)
      • 4.3 測驗利用帶超時時間的拒絕策略
    • 5. 完整代碼

一、Java 中的執行緒池

1. 執行緒池狀態

ThreadPoolExecutor使用int的高3位來表示執行緒池狀態,低29位表示執行緒數量

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

在這里插入圖片描述

執行緒池狀態和執行緒池中執行緒的數量由一個原子整型ctl來共同表示

  • 使用一個數來表示兩個值的主要原因是:可以通過一次CAS同時更改兩個屬性的值
	// 原子整數,前3位保存了執行緒池的狀態,剩余位保存的是執行緒數量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
	//去掉前三位保存執行緒狀態的位數,剩下的用于保存執行緒數量
    private static final int COUNT_BITS = Integer.SIZE - 3;

	// 2^COUNT_BITS次方,表示可以保存的最大執行緒數
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

獲取執行緒池狀態、執行緒數量以及合并兩個值的操作

    // Packing and unpacking ctl
	
	// 傳入 ctl 值 獲取運行狀態 該操作會讓除高3位以外的數全部變為0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

	// 傳入 ctl 值 獲取運行執行緒數  該操作會讓高3位為0
    private static int workerCountOf(int c)  { return c & CAPACITY; }

	// 傳入 rs 運行狀態 wc 執行緒數量 計算ctl新值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

2. 執行緒池主要屬性引數

	//阻塞佇列,用于存放來不及被核心執行緒執行的任務
    private final BlockingQueue<Runnable> workQueue;
	
	// 全域鎖,解決創建銷毀執行緒等執行緒安全問題
    private final ReentrantLock mainLock = new ReentrantLock();

	// 用于存放核心執行緒的容器,只有當持有鎖時才能夠獲取其中的元素
    private final HashSet<Worker> workers = new HashSet<Worker>();

	//執行緒工廠,給執行緒取名字
    private volatile ThreadFactory threadFactory;

	// 拒絕執行處理器 處理拒絕策略
    private volatile RejectedExecutionHandler handler;

    // 救急執行緒(或者核心執行緒)空閑時的最大生存時間
    private volatile long keepAliveTime;

	// 核心執行緒數
    private volatile int corePoolSize;

	// 最大執行緒數 
	// 最大執行緒數 - 核心執行緒數 = 九級執行緒數
    private volatile int maximumPoolSize;

    // 默認拒絕策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
  • corePoolSize : (核心執行緒數量),如果呼叫了執行緒池的 prestartAllCoreThreads( ) 方法,執行緒池會提前創建并啟動所有基本執行緒,否則是懶惰創建
  • workQueue:用于保存等待執行的任務的阻塞佇列,可以選擇以下幾個具體實作
    • ArrayBlockingQueue:是一個基于陣列的有界阻塞佇列,按FIFO(先進先出原則)排序,新任務進來后,會放到該佇列的隊尾,有界的陣列可以防止資源耗盡問題,
    • LinkedBlockingQuene:基于鏈表的無界阻塞佇列(其實最大容量為Interger.MAX),按照FIFO排序,由于該佇列的近似無界性,當執行緒池中執行緒數量達到corePoolSize后,再有新任務進來,會一直存入該佇列,而不會去創建新執行緒直到maxPoolSize,因此使用該作業佇列時,引數maxPoolSize其實是不起作用的,吞吐量通常要高于ArrayBlockingQueue ,靜態工廠方法 Executors.newFixedThreadPool( ) 使用了該佇列
    • ③ SynchronousQuene 是一個不存盤元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQuene ,靜態工廠方法 Executors.newCachedThreadPool() 用的是此佇列
    • PriorityBlockingQueue:具有優先級的無界阻塞佇列,優先級通過引數Comparator實作,
  • maximumPoolSize:執行緒池最大執行緒數量,包括了核心執行緒數量和救急執行緒數量
  • threadFactory:執行緒工廠,可以給執行緒設定名字等
  • handler:拒絕執行處理器 處理拒絕策略 在處理程序中具體講解
  • keepAliveTime:執行緒活動保持時間,執行緒池作業執行緒空閑后,保持存活的時間,所以,如果任務很多,并且每個任務執行時間很多,可以調大存活時間,提高執行緒利用率
  • unit:空閑執行緒存活時間單位

3. 執行緒池的實作原理

3.1 ThreadPoolExecutor 執行緒池主要處理流程

在這里插入圖片描述

在這里插入圖片描述

  1. 使用者 發布任務
  2. 如果當前運行的執行緒少于 核心執行緒數(corePoolSize),則創建新執行緒來執行任務(這一步需要獲得全域鎖,不然會引發執行緒安全問題)
  3. 如果運行的執行緒等于或者大于corePoolSize 則將任務加入阻塞佇列(BlockQueue)
  4. 如果BlockQueue 已滿,無法將任務加入佇列,則創建新執行緒來處理任務(這同樣需要獲得全域鎖)
    • 此處就用到救急執行緒,其數量就是最大執行緒數減去核心執行緒數的數量
  5. 如果創建新執行緒使當前運行的執行緒超出maximumPoolSize 任務將被拒絕,并呼叫RejectedExecutionHandler.rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法
    • 拒絕策略 jdk 提供了 4 種實作
      在這里插入圖片描述

    • AbortPolicy:讓呼叫者拋出 RejectedExecutionException 例外,這是默認策略

    • CallerRunsPolicy:讓呼叫者運行任務

    • DiscardPolicy:放棄本次任務

    • DiscardOldestPolicy:放棄佇列中最早的任務,本任務取而代之

明白了上述內容 我們就可以看看原始碼是如何實作的

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

		// 作業執行緒數小于核心執行緒數 就 呼叫addWorker 創建新執行緒
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

		// 如果執行緒池還在運行,并且可以加入阻塞佇列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

			// 重新檢查執行緒池是否還在運行 或者有的執行緒已經死了 必要時回滾佇列并且采用拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);

			// 執行緒池仍在運行 采用救急執行緒
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

		// 如果創建執行緒失敗,則采用拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

3.2 執行緒池方法決議

① 添加執行緒的addWorker( ) 方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {
            // 獲取表示狀態和執行緒數的原子整數
            int c = ctl.get();
            // 獲取執行緒池狀態
            int rs = runStateOf(c);

            // 如果執行緒池狀態不是 RUNNING 或者 阻塞佇列中有任務 則創建執行緒失敗
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            firstTask == null &&
                            !workQueue.isEmpty()))
                return false;

            for (; ; ) {
                // 獲取執行緒個數
                int wc = workerCountOf(c);
                // 如果執行緒數大于容量 或者 大于核心執行緒數或者最大執行緒數(用哪個系結取決于傳入的core)則創建執行緒失敗
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 在多執行緒情況下 如果CAS創建執行緒 修改 原子整數 失敗 則回滾到retry 重新回圈
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 重新獲取 表示狀態和執行緒個數的原子整數
                c = ctl.get();
                // 如果 運行狀態和當初不同,則回滾重新回圈
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {

            // 創建新執行緒處理任務
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 創建新執行緒需要獲得全域鎖
                mainLock.lock();
                try {
                    //加鎖的同時再次檢測 避免在釋放鎖之前呼叫了shut down
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        // 再次確認執行緒存活
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 將該執行緒 加入到 HashSet集合中(執行緒池)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 更新標志位
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                // 如果作業執行緒成功添加,開始執行緒開始作業 并更新標志位
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果執行緒啟動失敗 
            // 呼叫addWorkerFailed(w)方法: 洗掉該作業執行緒 作業執行緒數減一,并且嘗試終止執行緒池
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

② 向執行緒池提交任務

  • execute() : 上面已經分析過,用于提交不需要回傳值的任務,所以無法判斷任務是否被執行緒池執行成功
  • submit() :用于提交需要回傳值的任務,執行緒池會回傳一個 future 物件,通過這個future物件可以判斷任務是否執行成功,并且可以通過future 的get () 方法來獲取回傳值,get() 方法會阻塞當前執行緒直到任務完成,也可以使用帶超時時間的get() ,這里著重分析該方法

使用

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        // 通過submit執行Callable中的call方法
        Future<String> future = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "OKK";
            }
        });

        try {
            // 通過future 來獲得回傳值
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

③ 關閉執行緒池的方法

shutdown() 將執行緒池的狀態設定成 SHUTDOWN 中斷沒有執行任務的執行緒,其他執行緒執行完任務,自己消亡

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 獲取全域鎖
        mainLock.lock();
        try {
            // 通過安全管理器看是否有權關閉執行緒池
            checkShutdownAccess();
            // 將執行緒池狀態設定為 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 打斷 空閑的作業執行緒
            interruptIdleWorkers();
            // 給子類提供一些擴展方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 嘗試終結執行緒池
        tryTerminate();
    }
    final void tryTerminate() {
        for (; ; ) {

            // 獲取存盤狀態和執行緒數量的 原子整數
            int c = ctl.get();
            // 如果存在以下三種情況,嘗試終結執行緒池失敗
            // 1、執行緒池狀態為RUNNING
            // 2、執行緒池狀態為 RUNNING SHUTDOWN STOP (狀態值大于TIDYING)
            // 3、執行緒池狀態為SHUTDOWN,但阻塞佇列中還有任務等待執行
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 如果存活執行緒數不為0 打斷空閑的執行緒
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            // 獲取全域鎖
            mainLock.lock();
            try {
                // 嘗試使用CAS將執行緒池狀態改為 TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        // 通過CAS將執行緒池狀態改為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdownNow() 首先將執行緒池狀態設定為STOP,然后嘗試停止所有的正在執行或暫停人物的執行緒,并回傳等待執行的任務的串列

    public List<Runnable> shutdownNow() {
        // 回傳的任務串列
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        // 獲取全域鎖
        mainLock.lock();
        try {
            // 通過安全管理器看是否有權關閉執行緒池
            checkShutdownAccess();
            // 將執行緒池狀態設定為STOP
            advanceRunState(STOP);
            // 遍歷打斷所有執行緒
            interruptWorkers();
            // 將未執行的任務從佇列中移除,然后回傳給呼叫者
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

4. 合理地配置執行緒池

可以從以下角度分析:

  • 任務的性質:CPU密集型任務、IO密集型任務、混合型任務
    • CPU密集型應該配置盡可能小的執行緒,通常采用cpu核數+1能夠實作最優的CPU利用率,+1是保證當執行緒由于頁缺失故障(作業系統或其它原因導致暫停時,額外的這個執行緒就能頂上去,保證CPU時鐘周期不被浪費
    • IO密集型應該配置盡可能多的執行緒,因為CPU不總是處于繁忙狀態,例如,當你執行業務計算時,這時候會使用CPU資源,但當你執行IO操作時、遠程RPC呼叫時,包括進行資料庫操作時,這時候CPU就閑下來了,你可以利用多執行緒提高它的利用率,
  • 任務的優先級:高、中和低
  • 任務的執行時間:長、中和短
  • 任務的依賴性:是否依賴其他系統資源,如資料庫連接

注意

建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千,
有一次,我們系統里后臺任務執行緒的佇列和執行緒池全滿了,不斷拋出拋棄任務的例外,通過排查發現是資料庫出現的問題,導致執行SQL變得非常緩慢,因為后臺任務執行緒池里的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池里的作業執行緒全部阻塞,任務積壓在執行緒池里,如果當時我們設定成無界佇列,那么執行緒池的佇列就會越來越多,最多可能會撐滿記憶體,OutOfMemory,導致整個系統不可用

5. 執行緒池的監控

如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,快速定位問題,可以通過執行緒池提供的引數進行監控:

  • largestPoolSize:執行緒池里曾經創建過的最大執行緒數量,可以判斷知道執行緒池是否滿過
  • completedTaskCount:執行緒池已完成的任務數量
  • getPoolSize( ):執行緒池的執行緒數量
  • getActiveCount():獲取活動的執行緒數

也可以通過擴展執行緒池進行監控,通過繼承執行緒池來自定義執行緒池,重寫執行緒池的下列方法,在一些特定的時間段進行一些監控

  • protected void beforeExecute(Thread t, Runnable r) { }
  • protected void afterExecute(Runnable r, Throwable t) { }
  • protected void terminated() { }

二、手寫執行緒池

1. 實作阻塞佇列

主要欄位

  • 任務佇列queue 用于存放發布的任務
  • ReentrantLock加鎖保證取放任務的執行緒安全
  • fullWaitSet 和 emptyWaitSet 作為任務佇列滿或者空 時的等待佇列

主要方法

  • 執行緒池獲取任務 T take()
    • 多載方法,分別用于沒有時間限制的獲取任務以及帶超時的獲取任務
    • 若任務佇列為空 進入等待佇列
    • 不為空則選取第一個
  • 主執行緒 添加任務 void put(T task)
  • 獲取 任務佇列的任務個數 size ()
/**
 * 阻塞佇列
 * @param <T>
 */
class BlockQueue<T> {

    /**
     * 任務佇列
     */
    private Deque<T> queue = new ArrayDeque<>();

    /**
     * 鎖
     */
    private ReentrantLock lock = new ReentrantLock();

    /**
     * 任務佇列滿后 生產者進入等待佇列等待
     */
    private Condition fullWaitSet = lock.newCondition();

    /**
     * 任務佇列空時,消費者進入等待佇列等待
     */
    private Condition emptyWaitSet = lock.newCondition();

    /**
     * 容量
     */
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * 從任務佇列獲取任務
     * @return
     */
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 有超時時間的獲取任務
     * @param timeout
     * @param unit
     * @return
     */
    public T take(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            //將timeout 轉換成納秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    //回傳剩余的時間
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 添加任務方法
     * @param task
     */
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 獲取任務佇列任務數
     * @return
     */
    public int size () {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }
}

2. 實作執行緒池

主要方法

  1. void execute (Runnable task)
    • 如果 任務數量小于 執行緒池中的執行緒數,則創建Worker物件(實作Thread類)來執行任務
    • 如果 大于執行緒數,則先放入阻塞佇列中存放
  2. Worker 的 run 方法
    • 如果傳入的任務不為空 則執行傳入的任務
    • 執行完成之后 繼續執行任務佇列中的任務
    • 全部結束之后洗掉該執行緒
/**
 * 執行緒池
 */
class ThreadPool {

    /**
     * 任務佇列
     */
    private BlockQueue<Runnable> taskQueue;

    /**
     * 執行緒集合
     */
    private HashSet<Worker> workers = new HashSet();

    /**
     * 核心執行緒數
     */
    private int coreSize;

    /**
     * 獲取任務超時時間
     */
    private long timeout;

    /**
     * 時間工具
     */
    private TimeUnit timeUnit;


    /**
     * 執行任務
     * @param task
     */
    public void execute (Runnable task) {
        synchronized (workers) {
            // 當任務數沒有超過核心執行緒數,直接交給執行緒執行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                System.out.println("新增worker物件"+worker);
                worker.start();
            }else { //超過核心執行緒數 就加入任務佇列暫存
                System.out.println("執行緒數滿,將任務加入任務佇列" + task);
                taskQueue.put(task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueCapacity);
    }

    /**
     * 真正執行任務的執行緒
     */
    private class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在執行任務"+task);
                    task.run();
                }catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("洗掉"+this+"執行緒");
                workers.remove(this);
            }
        }
    }
}

3. 測驗

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);

        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPool.execute(() -> {
                System.out.println("任務"+j + "執行完成");
            });
        }
    }

在這里插入圖片描述

  • 設定執行緒數為2個,執行代碼 前兩個任務進入時 創建兩個執行緒執行任務,之后任務則無法執行進入任務佇列
  • 等兩個執行緒執行完任務繼續 獲取任務佇列中的任務執行,如果超過任務獲取等待時間,退出執行任務回圈,沒有任務后洗掉執行緒,

以上實作了基本的執行緒池,但是如果任務數量龐大,并且執行任務比較緩慢,任務佇列滿后,遲遲等不到解決,并且有新的任務來,會一直處于等待狀態,所以要添加拒絕策略

4. 拒絕策略

4.1 帶超時的添加任務

    /**
     * 待超時的添加任務
     * @param task
     * @param timeout
     * @param timeUnit
     * @return
     */
    public boolean put(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

4.2 拒絕策略(策略模式)

佇列滿后,新的任務可以選擇繼續死等,帶超時的等待,放棄執行任務,拋出例外等等很多解決策略,所以應該把選擇權交給工程師,提高可擴展性,把所有操作抽象成介面,讓使用者自己實作

/**
 * 拒絕策略
 * @param <T>
 */
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue<T> queue, T task);
}

給 ThreadPool 執行緒池加入拒絕策略屬性,并在構造方法中初始化

    /**
     * 拒絕策略
     */
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

將執行任務中 加入任務佇列的方法改成 tryPut() 嘗試加入佇列

    /**
     * 執行任務
     * @param task
     */
    public void execute (Runnable task) {
        synchronized (workers) {
            // 當任務數沒有超過核心執行緒數,直接交給執行緒執行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                System.out.println("新增worker物件"+worker);
                worker.start();
            }else { //超過核心執行緒數 就加入任務佇列暫存
                //taskQueue.put(task);
                taskQueue.tryPut(rejectPolicy, task);

            }
        }
    }

并且在阻塞佇列中實作該方法

  • 上鎖,保證佇列的執行緒安全
  • 如果佇列滿了,則呼叫自己實作的拒絕策略處理
  • 沒有滿則直接加入任務佇列
    /**
     * 嘗試加入任務佇列
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                rejectPolicy.reject(this,task);
            }else {
                System.out.println("加入任務佇列"+task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }

4.3 測驗利用帶超時時間的拒絕策略

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
                1, (queue, task) -> {
            queue.put(task,500,TimeUnit.MILLISECONDS);//測驗使用有超時時間的拒絕策略
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任務"+j + "執行完成");
            });
        }
    }

在這里插入圖片描述

  • 設定阻塞佇列大小為 1, 執行緒池執行緒數為1,執行第一個任務時, 第二個任務進入任務佇列,第三個任務等待進入佇列,0.5秒后第一個任務沒有完成,任務佇列還是滿的,所以第三個任務放棄加入任務佇列,所以最后只完成了兩個任務,

5. 完整代碼

package threadPool_Test;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Description: 自定義執行緒池
 * @Author: Aiguodala
 * @CreateDate: 2021/4/11 14:59
 */

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
                1, (queue, task) -> {
            queue.put(task,500,TimeUnit.MILLISECONDS);//測驗使用有超時時間的拒絕策略
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任務"+j + "執行完成");
            });
        }
    }
}

/**
 * 拒絕策略
 * @param <T>
 */
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue<T> queue, T task);
}

/**
 * 執行緒池
 */
class ThreadPool {

    /**
     * 任務佇列
     */
    private BlockQueue<Runnable> taskQueue;

    /**
     * 執行緒集合
     */
    private HashSet<Worker> workers = new HashSet();

    /**
     * 核心執行緒數
     */
    private int coreSize;

    /**
     * 獲取任務超時時間
     */
    private long timeout;

    /**
     * 時間工具
     */
    private TimeUnit timeUnit;

    /**
     * 拒絕策略
     */
    private RejectPolicy<Runnable> rejectPolicy;


    /**
     * 執行任務
     * @param task
     */
    public void execute (Runnable task) {
        synchronized (workers) {
            // 當任務數沒有超過核心執行緒數,直接交給執行緒執行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                System.out.println("新增worker物件"+worker);
                worker.start();
            }else { //超過核心執行緒數 就加入任務佇列暫存
                //taskQueue.put(task);
                taskQueue.tryPut(rejectPolicy, task);

            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 真正執行任務的執行緒
     */
    private class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在執行任務"+task);
                    task.run();
                }catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("洗掉"+this+"執行緒");
                workers.remove(this);
            }
        }
    }
}

/**
 * 阻塞佇列
 * @param <T>
 */
class BlockQueue<T> {

    /**
     * 任務佇列
     */
    private Deque<T> queue = new ArrayDeque<>();

    /**
     * 鎖
     */
    private ReentrantLock lock = new ReentrantLock();

    /**
     * 任務佇列滿后 生產者進入等待佇列等待
     */
    private Condition fullWaitSet = lock.newCondition();

    /**
     * 任務佇列空時,消費者進入等待佇列等待
     */
    private Condition emptyWaitSet = lock.newCondition();

    /**
     * 容量
     */
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * 從任務佇列獲取任務
     * @return
     */
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 有超時時間的獲取任務
     * @param timeout
     * @param unit
     * @return
     */
    public T take(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            //將timeout 轉換成納秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    //回傳剩余的時間
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 添加任務方法
     * @param task
     */
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    System.out.println("等待加入任務佇列"+task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任務佇列"+task);
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }


    /**
     * 待超時的添加任務
     * @param task
     * @param timeout
     * @param timeUnit
     * @return
     */
    public boolean put(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    System.out.println("等待加入任務佇列");
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任務佇列"+task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 獲取任務佇列任務數
     * @return
     */
    public int size () {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 嘗試加入任務佇列
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                rejectPolicy.reject(this,task);
            }else {
                System.out.println("加入任務佇列"+task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

參考《Java 并發編程的藝術》

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/276318.html

標籤:其他

上一篇:Kotlin實戰---Retrofit網路模型

下一篇:什么是OWASP Top 10?web安全必讀

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more