本文部分摘自《Java 并發編程的藝術》
概述
Java 中的執行緒池是運行場景最多的并發框架,合理使用執行緒池能夠帶來三個好處:
- 降低資源消耗,通過重復利用已有的執行緒降低執行緒創建和銷毀造成的消耗
- 提高回應速度,當任務到達時,任務可以不需要等待執行緒創建就能立即執行
- 提高執行緒可管理性,執行緒是稀缺資源,使用執行緒池進行統一分配、調優和監控,可以降低資源消耗,提高系統穩定性
執行緒池的實作原理

從圖中可以看到,當提交一個新任務到執行緒池時,執行緒池的處理流程如下:
- 執行緒池判斷核心執行緒池里的執行緒是否都在執行任務,如果不是,創建一個新的作業執行緒執行任務,否則進入下一流程
- 執行緒池判斷作業佇列是否已滿,如果作業佇列沒有滿,將新提交的任務存盤在作業佇列中,否則進入下一流程
- 執行緒池判斷執行緒池里的執行緒是否都處于作業狀態,如果沒有,創建一個新的作業執行緒執行任務,否則交給飽和策略來處理這個任務
使用執行緒池
1. 創建執行緒池
我們可以通過 ThreadPoolExecutor 來創建一個執行緒池
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
創建一個執行緒需要輸入幾個引數,如下:
-
corePoolSize(執行緒池的基本大小)
當提交一個任務到執行緒池時,執行緒池會創建一個執行緒來執行任務,即時其他空閑的基本執行緒能夠執行新任務也會創建執行緒,等到需要執行的任務數大于執行緒池基本大小時就不再創建
-
maximumPoolSize(執行緒池最大數量)
執行緒池允許創建的最大執行緒數,如果佇列滿了,并且已創建的執行緒數小于最大執行緒數,則執行緒池會再創建新的執行緒執行任務,值得注意的是,如果使用無界阻塞佇列做任務佇列,則這個引數沒有什么效果
-
keepAliveTime(執行緒活動保持時間)
執行緒池的作業執行緒空閑后,保持存活的時間,如果任務很多,并且每個任務的執行時間都比較短,可以調大時間,提高執行緒利用率
-
unit(執行緒保持活動時間的單位)
可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)和納秒(NANOSECONDS)
-
workQueue(任務佇列)
用于保存等到執行的任務的阻塞佇列,可以選擇以下幾個阻塞佇列:
-
ArrayBlockingQueue
是一個基于陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序
-
LinkedBlockingQueue
一個基于鏈表結構的阻塞佇列,此佇列按 FIFO 排序元素,吞吐量通常高于 ArrayBlockingQueue
-
SynchronousQueue
一個不存盤元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一致處于阻塞狀態,吞吐量通常要高于 LinkedBlockingQueue
-
PriorityBlockingQueue
一個具有優先級的無界阻塞佇列
-
-
threadFactory
用于設定創建執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字
-
handler(飽和策略)
當任務和執行緒池都滿了,說明執行緒池處于飽和狀態,必須采取一種策略處理提交的新任務,在 JDK5 中執行緒池框架提供了以下四種策略:
- AbortPolicy:直接拋出例外,默認采取這種策略
- CallerRunsPolicy:使用呼叫者所在執行緒來運行任務
- DiscardOldestPolicy:丟棄佇列最近的一個任務,并執行當前任務
- DiscardPolicy:不處理,丟棄掉
也可以根據需要實作 RejectedExecutionHandler 介面自定義策略
2. 向執行緒池提交任務
可以使用 execute() 和 submit() 方法向執行緒池提交任務
-
execute() 方法用于提交不需要回傳值的任務,所以無法判斷任務是否被執行緒池執行成功
threadsPool.execute(new Runnable() { @Override public void run() { //... } }) -
submit() 方法用于提交需要回傳值的任務,執行緒池會回傳一個 future 物件,通過這個物件可以判斷任務是否執行成功
Future<Object> future = executor.submit(hasReturnValueTask); try { Object s = future.get(); } catch(InterruptedException e) { // 處理中斷例外 } catch(ExecutionException e) { // 處理無法執行任務例外 } finally { // 關閉執行緒池 executor.shutdown(); }
3. 關閉執行緒池
可以通過呼叫執行緒池的 shutdown 或 shutdownNow 方法來關閉執行緒池,它們的原理是遍歷執行緒池中的作業執行緒,逐個呼叫執行緒的 interrupt 方法來中斷執行緒,所以無法回應中斷的任務可能永遠無法終止
shutdown 方法和 shutdownNow 方法存在一定的區別:
- shutdownNow 方法首先將執行緒池狀態設定成 STOP,然后嘗試停止所有正在執行或暫停任務的執行緒,并回傳等待執行任務的串列
- shutdown 方法只是將執行緒池狀態設定成 SHUTDOWN 狀態,然后中斷所有沒有正在執行任務的執行緒
只要呼叫了這兩個關閉方法中的任意一個,isShutdown 方法就會回傳 true,當所有任務都已關閉,才表示執行緒池關閉成功,這時呼叫 isTerminaed 方法會回傳 true,至于應該采用哪種方法關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫 shutdown 方法關閉執行緒池,如果任務不一定要執行完成,可以呼叫 shutdownNow 方法
基于執行緒池技術的簡單 Web 服務器
目前的瀏覽器都支持多執行緒訪問,比如請求一個頁面的時候,頁面包含的圖片等靜態資源會被瀏覽器并發的獲取,如果 Web 服務器是單執行緒的,按順序處理發送過來的請求,無疑會影響用戶體驗,因此大部分 Web 服務器都支持并發訪問
下面使用執行緒池來構造一個簡單的 Web 服務器,這個 Web 服務器用來處理 HTTP 請求,目前只能處理簡單的文本和圖片內容,該 Web 服務器使用 main 執行緒不斷接受客戶端的 Socket 連接,將連接以及請求提交給執行緒池處理,這樣使得 Web 服務器能同時處理多個客戶端的請求
public class SimpleHttpServer {
static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
5, 10, 60L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
static ServerSocket serverSocket;
static int port = 8080;
public static void setPort(int port) {
if (port > 0) {
SimpleHttpServer.port = port;
}
}
/**
* 啟動 SimpleHttpServer
*/
public static void start() throws Exception {
serverSocket = new ServerSocket(port);
Socket socket = null;
while ((socket = serverSocket.accept()) != null) {
// 接收一個客戶端Socket,生成一個HttpRequestHandler,放入執行緒池執行
threadPool.execute(new HttpRequestHandler(socket));
}
serverSocket.close();
}
static class HttpRequestHandler implements Runnable {
private Socket socket;
public HttpRequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
String line;
BufferedReader br = null;
BufferedReader reader = null;
PrintWriter out = null;
InputStream in = null;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String header = reader.readLine();
// 計算絕對路徑
String filePath = SimpleHttpServer.class.getResource(header.split(" ")[1]).getPath();
out = new PrintWriter(socket.getOutputStream());
// 如果請求資源的后綴為 jpg 或 ico,則讀取資源并輸出
if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
in = new FileInputStream(filePath);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int i;
while ((i = in.read()) != -1) {
baos.write(i);
}
byte[] array = baos.toByteArray();
out.println("HTTP/1.1 200 OK");
out.println("Server: YeeQ");
out.println("Content-Type: image/jpeg");
out.println("Content-Length: " + array.length);
out.println("");
socket.getOutputStream().write(array, 0, array.length);
} else {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
out = new PrintWriter(socket.getOutputStream());
out.println("HTTP/1.1 200 OK");
out.println("Server: YeeQ");
out.println("Content-Type: text/html; charset=UTF-8");
out.println("");
while ((line = br.readLine()) != null) {
out.println(line);
}
}
out.flush();
} catch (Exception e) {
if (out != null) {
out.println("HTTP/1.1 500");
out.println("");
out.flush();
}
} finally {
close(br, in, reader, out, socket);
}
}
}
/**
* 關閉流或者socket
*/
private static void close(Closeable... closeables) {
if (closeables != null) {
for (Closeable closeable : closeables) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
SimpleHttpServer.start();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/270643.html
標籤:其他
