《Java動手擼原始碼》手寫實作執行緒池
文章目錄
- 《Java動手擼原始碼》手寫實作執行緒池
- 前言
- 一、執行緒池的原理
- 二、簡易版本
- 三、完善版本
- 1.類圖
- 2.重點代碼分析
- 2.1 ThreadPool介面分析
- 2.2 RunableQueue介面分析
- 2.3 拒絕策略
- 2.4 BasicThreadPool(重點)
- 2.5 ThreadPoolTest類代碼測驗
- 總結
前言
執行緒池想必大家都用過,無論是C++還是Java等各種語言里面都有執行緒池,我們通過對Thread的學習得知,Thread是一個重量級的資源,創建、啟動以及銷毀都是比較耗費系統資源的,因此對執行緒的重復利用是非常好的程式設計習慣,價值系統可創建的執行緒數量是有限的,執行緒數量和系統性能是一種拋物線的關系,也就是當執行緒數量到達某個數值時,性能反倒會降低很多,因此對執行緒的管理,尤其是數量的控制更能直接決定程式的性能,
本作者維護了一個倉庫,名稱叫Thread,打算在這個倉庫里面手寫實作Java多執行緒的一些經典技術,歡迎大家的star,本博文的代碼已經上傳到了該倉庫,在com.thread.threadpool包下,
鏈接: 倉庫地址,歡迎大家的star,您的star是我繼續下去的動力,
一、執行緒池的原理
所謂執行緒池,通俗的理解為一個池子,池子里面存放著創建好的執行緒,當有任務提交給執行緒池執行時,池子中的某個執行緒會主動的執行該任務,如果池子中的執行緒數量不夠應付數量眾多的任務時,則需要自動擴充新的執行緒到池子中,但是該數量是優先的,就好比池塘的水界限一樣,當任務比較少的時候,池子中的執行緒能夠自動回收,釋放資源,為了能夠異步地提交任務和快取未被處理的任務,需要有一個任務佇列,如下圖所示,

通過上面的描述可知,一個完整的執行緒池應該具有一下幾個要素:
- 任務佇列:用戶快取提交的任務
- 執行緒數量管理功能:一個執行緒池必須能夠很好的管理和控制執行緒的數量,可通過如下的三個引數來實作,
- 創建執行緒池時初始的執行緒數量init
- 執行緒池自動擴充時的最大執行緒數量max;
- 執行緒空閑時需要釋放一部分執行緒,但是也要維護一定數量的核心執行緒core
三者的關系是init<= core <= max
- 任務拒絕策略,如果執行緒數量已經達到上限且任務佇列已滿,則需要有相應的拒絕策略來通知任務
- 執行緒工廠,主要用于個性化定制執行緒,比如將執行緒設定為守護執行緒以及設定執行緒名稱等,
- QueueSize:任務佇列主要存放提交的Runnable,但是為了防止記憶體溢位,需要有limit數量對其進行控制,
- KeepedAlive時間:改時間主要決定執行緒各個重要引數自動維護的時間間隔,
二、簡易版本
其實這個版本除了不能自動維護執行緒的數量,其他功能都差不多實作了,而且也比較好理解,
package com.thread.threadpool;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class SimpleThreadPool {
private static final int DEFAULT_MAX_THREAD_SIZE = 10;
private static final LinkedList<Runnable> TASK_QUEUE = new LinkedList<Runnable>();
private static final String THREAD_POOL_PREFIX = "SIMPLE_THREAD_POOL-";
private static final int DEFAULT_MAX_TASK_SIZE = 2000;
private final List<WorkerThread> THREAD_QUEUE = new ArrayList<WorkerThread>();
private static final DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
throw new DiscardException("Discard this Task...");
};
private int seq = 0;
private int threadSize;
private int taskSize;
private DiscardPolicy discardPolicy;
private ThreadGroup threadGroup = new ThreadGroup("simpleThreadGroup");
private volatile boolean isDestory = false;
public SimpleThreadPool(int threadSize, int taskSize, DiscardPolicy discardPolicy) {
this.threadSize = threadSize;
this.taskSize = taskSize;
this.discardPolicy = discardPolicy;
init();
}
public SimpleThreadPool() {
this(DEFAULT_MAX_THREAD_SIZE, DEFAULT_MAX_TASK_SIZE, DEFAULT_DISCARD_POLICY);
}
private void init() {
for (int i = 0; i < threadSize; i++) {
WorkerThread WorkerThread = new WorkerThread(threadGroup, THREAD_POOL_PREFIX + seq++);
WorkerThread.start();
THREAD_QUEUE.add(WorkerThread);
}
}
public void submit(Runnable runner) throws Exception {
if (isDestory) {
throw new RuntimeException("The thread pool is already destoryed and not allow to submit");
}
synchronized (TASK_QUEUE) {
if (TASK_QUEUE.size() > taskSize)
discardPolicy.discard();
TASK_QUEUE.addLast(runner);
TASK_QUEUE.notifyAll();
}
}
public void shutdown() throws InterruptedException {
System.out.println("shutdown");
while (!TASK_QUEUE.isEmpty()) {
Thread.sleep(10);
}
int size = THREAD_QUEUE.size();
while (size > 0) {
for (WorkerThread task : THREAD_QUEUE) {
if (task.TASK_STATE == TaskState.BLOCK) {
task.interrupt();
task.close();
size--;
} else {
Thread.sleep(10);
}
}
}
this.isDestory = true;
System.out.println("The Thread Pool shutdown...");
}
public int getThreadSize() {
return threadSize;
}
public int getTaskSize() {
return taskSize;
}
private enum TaskState {FREE, RUNNING, BLOCK, DEAD}
private static class DiscardException extends RuntimeException {
public DiscardException(String message) {
super(message);
}
}
private static interface DiscardPolicy {
public void discard() throws DiscardException;
}
private static class WorkerThread extends Thread {
private volatile TaskState TASK_STATE = TaskState.FREE;
public WorkerThread(ThreadGroup threadGroup, String threadName) {
super(threadGroup, threadName);
}
@Override
public void run() {
OUTER:
while (TASK_STATE != TaskState.DEAD) {
Runnable runner = null;
synchronized (TASK_QUEUE) {
while (TASK_QUEUE.size() == 0) {
try {
TASK_STATE = TaskState.BLOCK;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
//e.printStackTrace();
break OUTER;
}
}
runner = TASK_QUEUE.removeFirst();
}
if (runner != null) {
TASK_STATE = TaskState.RUNNING;
runner.run();
TASK_STATE = TaskState.FREE;
}
}
}
public void close() {
TASK_STATE = TaskState.DEAD;
}
}
public static void main(String[] args) throws InterruptedException {
SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
for (int i = 0; i < 40; i++) {
final int j = i;
try {
simpleThreadPool.submit(() -> {
System.out.println("The runnable " + j + "be served as " + Thread.currentThread().getName() + " start");
try {
Thread.sleep(1000);
System.out.println("The runnable " + j + "be served as " + Thread.currentThread().getName() + " end");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
// e.printStackTrace();
System.out.println(e);
}
}
Thread.sleep(9000);
simpleThreadPool.shutdown();
try {
simpleThreadPool.submit(() -> {
System.out.println("嘗試再次提交...");
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、完善版本
1.類圖
我沒有找到好的UML類圖設計工具,然后就把代碼寫完之后,用IDEA生成的,

如圖所示,一共14個類和介面,基本實作了執行緒池的功能,
2.重點代碼分析
2.1 ThreadPool介面分析

代碼如下:
package com.thread.threadpool;
public interface ThreadPool {
// 提交任務到執行緒池
void execute(Runnable runnable);
// 關閉執行緒池
void shutdown();
// 獲取執行緒池的初始化大小
int getInitSize();
// 獲取執行緒池最大的執行緒數
int getMaxSize();
// 獲取執行緒池核心執行緒數量
int getCoreSize();
// 獲取執行緒池中用于快取任務佇列的大小
int getQueueSize();
// 獲取執行緒池活躍的執行緒數量
int getActiveCount();
// 查看執行緒池是否已經被shutdown
boolean isShutdown();
}
ThreadPool 介面就是定義了一系列的規范,比如提交任務到執行緒池,關閉執行緒池,獲取執行緒池的初始大小、最大支持的執行緒數、執行緒池的核心執行緒數量、執行緒池快取任務佇列的大小、執行緒池中活躍的執行緒數量等,
2.2 RunableQueue介面分析
RunableQueue是任務的快取佇列,任務是做快取,有任務來的時候進入佇列,FIFO先進先出,所以要提供進入佇列和彈出佇列的方法,

代碼如下:
package com.thread.threadpool;
public interface RunableQueue {
// 當有新的任務進來時首先會offer到佇列
void offer(Runnable runnable);
// 作業執行緒通過take方法獲取Runnable,執行緒獲取程序中可能會拋出例外,
Runnable take() throws InterruptedException;
// 獲取任務佇列中任務的數量
int size();
}
2.3 拒絕策略

這里不貼代碼了,因為很簡單,DenyPolicy是一個函式式介面,定義了拒絕策略的介面函式,下面三個是實作類,AbortDenyPolicy的拒絕策略是拋出RunntimeException;DiscardDenyPolicy的策略是直接丟棄當前的任務,并且不做任何處理;RunnerDenyPolicy的拒絕策略是讓任務提交者在自己所在的執行緒中執行任務,
2.4 BasicThreadPool(重點)

代碼如下:
package com.thread.threadpool;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BasicThreadPool extends Thread implements ThreadPool {
// 初始化執行緒數量
private final int initSize;
// 執行緒池的最大執行緒數量
private final int maxSize;
// 執行緒池核心執行緒數量
private final int coreSize;
// 當前活躍的執行緒數量
private int activeCount;
// 創建執行緒所需的工廠
private final ThreadFactory threadFactory;
// 任務佇列
private final RunableQueue runableQueue;
// 執行緒池是否已經被shutdown
private volatile boolean isShutdown = false;
// 作業執行緒佇列
private final Queue<ThreadTask> threadTaskQueue = new ArrayDeque<>();
// 默認的拒絕策略是丟棄的策略
private final static DenyPolicy DEFAULT_DENY_POLICY = new DiscardDenyPolicy();
// 默認的執行緒工廠實作
private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
// 默認的存活時間
private final long keepAliveTime;
private final TimeUnit timeUnit;
private static class ThreadTask {
Thread thread;
WorkThread workThread;
public ThreadTask(Thread thread, WorkThread workThread) {
this.thread = thread;
this.workThread = workThread;
}
}
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger GROUP_COUNT = new AtomicInteger(1);
private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUP_COUNT.getAndDecrement());
private static AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
@Override
public Thread createThread(Runnable runnable) {
return new Thread(group, runnable, "thread-pool-" + THREAD_COUNTER.getAndIncrement());
}
}
public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,
int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
this.initSize = initSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
this.activeCount = activeCount;
this.threadFactory = threadFactory;
this.runableQueue = new LinkedRunableQueue(queueSize, denyPolicy, this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
init();
}
public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);
}
private void newThread() {
WorkThread workThread = new WorkThread(runableQueue);
Thread thread = this.threadFactory.createThread(workThread);
ThreadTask threadTask = new ThreadTask(thread, workThread);
threadTaskQueue.offer(threadTask);
this.activeCount++;
thread.start();
}
void init() {
start();
for (int i = 0; i < initSize; i++) {
newThread();
}
}
@Override
public void execute(Runnable runnable) {
if (this.isShutdown)
throw new IllegalStateException("The ThreadPool is destory");
this.runableQueue.offer(runnable);
}
@Override
public void shutdown() {
synchronized (this) {
if (isShutdown) return;
isShutdown = true;
threadTaskQueue.forEach(threadTask -> {
threadTask.workThread.stop();
threadTask.thread.interrupt();
});
}
}
// 從執行緒池中移除某個執行緒
private void removeThread() {
ThreadTask threadTask = threadTaskQueue.remove();
threadTask.workThread.stop();
this.activeCount--;
}
@Override
public void run() {
while (!isShutdown && !interrupted()) {
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
isShutdown = true;
break;
}
synchronized (this) {
if (isShutdown) {
break;
}
// 第一次擴容:當前佇列中有任務尚未處理,并且activeCount < coreSize
if (runableQueue.size() > 0 && activeCount < coreSize) {
// 因為是首次擴容,所以起點就是初試大小
for (int i = initSize; i < coreSize; i++) {
newThread();
}
continue;//先擴容到coreSize大小
}
//第二次擴容:當前的佇列中有任務尚未處理,并且activeCount < maxSize則繼續擴容
if (runableQueue.size() > 0 && activeCount < maxSize) {
// 擴容到coreSize之后,發現佇列中還有任務沒有得到處理,則繼續擴容到maxSize,
for (int i = coreSize; i < maxSize; i++) {
newThread();
}
}
// 擴容結束:如果任務佇列中沒有任務,則需要回收部分執行緒,如果執行緒當前正在執行著任務,就等任務執行完之后回收,
if (runableQueue.size() == 0 && activeCount > coreSize) {
removeThread();
}
}
}
}
@Override
public int getInitSize() {
if (this.isShutdown)
throw new IllegalStateException("The ThreadPool is destory");
return initSize;
}
@Override
public int getMaxSize() {
if (this.isShutdown)
throw new IllegalStateException("The ThreadPool is destory");
return maxSize;
}
@Override
public int getCoreSize() {
if (this.isShutdown)
throw new IllegalStateException("The ThreadPool is destory");
return coreSize;
}
@Override
public int getQueueSize() {
if (this.isShutdown)
throw new IllegalStateException("The ThreadPool is destory");
return runableQueue.size();
}
@Override
public int getActiveCount() {
if (this.isShutdown)
throw new IllegalStateException("The ThreadPool is destory");
return activeCount;
}
@Override
public boolean isShutdown() {
return this.isShutdown;
}
}
這個實作類是最復雜的,也是最關鍵的代碼,執行緒池的實作原理就是:執行緒池維護了一個快取佇列,這個佇列用來存放用戶提交的任務,執行緒池動態的從佇列里面獲取任務去執行,并且根據任務的數量動態的改變執行緒池中執行執行緒的大小,所以基于如上的說明,執行緒池其實本身也是一個Thread執行緒,他的執行單元里面的邏輯是動態改變執行緒池大小的關鍵,具體大家直接去我的github倉庫下載代碼,用IDEA打開看一下更直觀,鏈接: 倉庫地址,大家方便的話可以給我一個star,您的鼓勵是我繼續下去的動力,加油,
2.5 ThreadPoolTest類代碼測驗
ThreadPoolTest類啟動了20個任務,并通過列印,可以直觀的查看執行緒池的變化情況,
代碼如下:
package com.thread.threadpool;
import java.util.concurrent.TimeUnit;
// 執行緒池的測驗
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
//定義執行緒池,初始執行緒數為2,核心執行緒數為4,最大執行緒數為6,任務最多允許1000個任務,
final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);
for (int i = 0; i < 20; i++) {
threadPool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "is running");
TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getName() + "is done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
for (; ; ) {
// 不斷輸出執行緒池的資訊
System.out.println("getActiveCount:" + threadPool.getActiveCount());
System.out.println("getQueueSize:" + threadPool.getQueueSize());
System.out.println("getCore:" + threadPool.getCoreSize());
System.out.println("getMaxSize:" + threadPool.getMaxSize());
System.out.println("------------------------------------------------");
TimeUnit.SECONDS.sleep(5);
}
}
}
控制臺列印如下,可以看到執行緒池一開始啟動了兩個執行緒進行任務處理,后來經過第一次擴容到coreSize(4)個,第二次擴容到maxSize(6)個,之后執行緒執行的差不多之后,將執行緒池的大小回收到了coreSize(4)個,
com.thread.threadpool.ThreadPoolTest
getActiveCount:2
thread-pool-0is running
thread-pool-1is running
getQueueSize:18
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:2
getQueueSize:18
getCore:4
getMaxSize:6
------------------------------------------------
thread-pool-2is running
thread-pool-3is running
thread-pool-0is done
thread-pool-1is done
thread-pool-1is running
thread-pool-0is running
getActiveCount:4
getQueueSize:14
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:14
getCore:4
getMaxSize:6
------------------------------------------------
thread-pool-2is done
thread-pool-2is running
thread-pool-3is done
thread-pool-3is running
thread-pool-4is running
thread-pool-5is running
getActiveCount:6
thread-pool-0is done
thread-pool-0is running
thread-pool-1is done
getQueueSize:9
getCore:4
getMaxSize:6
------------------------------------------------
thread-pool-1is running
getActiveCount:6
getQueueSize:8
getCore:4
getMaxSize:6
------------------------------------------------
thread-pool-2is done
thread-pool-3is done
thread-pool-3is running
thread-pool-2is running
thread-pool-4is done
thread-pool-5is done
thread-pool-5is running
thread-pool-4is running
thread-pool-0is done
thread-pool-1is done
thread-pool-0is running
thread-pool-1is running
getActiveCount:6
getQueueSize:2
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:6
getQueueSize:2
getCore:4
getMaxSize:6
------------------------------------------------
thread-pool-3is done
thread-pool-2is done
thread-pool-3is running
thread-pool-2is running
thread-pool-4is done
thread-pool-5is done
thread-pool-0is done
thread-pool-1is done
getActiveCount:6
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:6
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
thread-pool-3is done
thread-pool-2is done
getActiveCount:5
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:5
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
getActiveCount:4
getQueueSize:0
getCore:4
getMaxSize:6
------------------------------------------------
Process finished with exit code -1
總結
執行緒池,還是多執行緒領域一個非常重要的技術,很值得大家去學習,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/254928.html
標籤:java
上一篇:創建SpringMvc專案流程--史上最詳細步驟教程
下一篇:服務介面呼叫:OpenFeign
