當前位置:編程學習大全網 - 熱門推薦 - java 線程池ThreadPoolExecutor ***同完成壹個任務

java 線程池ThreadPoolExecutor ***同完成壹個任務

線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量異步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每個ThreadPoolExecutor 還維護著壹些基本的統計數據,如完成的任務數。

為了便於跨大量上下文使用,此類提供了很多可調整的參數和擴展掛鉤。但是,強烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個後臺線程),它們均為大多數使用場景預定義了設置。否則,在手動配置和調整此類時,使用以下指導:

核心和最大池大小

ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見getMaximumPoolSize())設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,如果運行的線程少於 corePoolSize,則創建新線程來處理請求,即使其他輔助線程是空閑的。如果運行的線程多於corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才創建新線程。如果設置的 corePoolSize 和 maximumPoolSize相同,則創建了固定大小的線程池。如果將 maximumPoolSize 設置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的並發任務。在大多數情況下,核心和最大池大小僅基於構造來設置,不過也可以使用setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。

按需構造

默認情況下,即使核心線程最初只是在新任務需要時才創建和啟動的,也可以使用方法 prestartCoreThread()或 prestartAllCoreThreads() 對其進行動態重寫。

創建新線程

使用 ThreadFactory 創建新線程。如果沒有另外說明,則在同壹個 ThreadGroup 中壹律使用Executors.defaultThreadFactory() 創建線程,並且這些線程具有相同的 NORM_PRIORITY 優先級和非守護進程狀態。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優先級、守護進程狀態,等等。如果從 newThread返回 null 時 ThreadFactory 未能創建線程,則執行程序將繼續運行,但不能執行任何任務。

保持活動時間

如果池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閑時間超過 keepAliveTime 時將會終止(參見getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當池處於非活動狀態時減少資源消耗的方法。如果池後來變得更為活動,則可以創建新的線程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在關閉前有效地從以前的終止狀態禁用空閑線程。

排隊

所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:

A. 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。

B. 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。

C. 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

排隊有三種通用策略:

直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造壹個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集合時出現鎖定。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙的情況下將新任務加入隊列。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

被拒絕的任務

當 Executor 已經關閉,並且 Executor 將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程序策略:

A. 在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。

B. 在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

C. 在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。

D. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。

定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。

掛鉤方法

此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後調用。它們可用於操縱執行環境;例如,重新初始化ThreadLocal、搜集統計信息或添加日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成的所有特殊處理。

如果掛鉤或回調方法拋出異常,則內部輔助線程將依次失敗並突然終止。

隊列維護

方法 getQueue() 允許出於監控和調試目的而訪問工作隊列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。

壹、例子

創建 TestThreadPool 類:

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class TestThreadPool {

private static int produceTaskSleepTime = 2;

private static int produceTaskMaxNumber = 10;

public static void main(String[] args) {

// 構造壹個線程池

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,

TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),

new ThreadPoolExecutor.DiscardOldestPolicy());

for (int i = 1; i <= produceTaskMaxNumber; i++) {

try {

String task = "task@ " + i;

System.out.println("創建任務並提交到線程池中:" + task);

threadPool.execute(new ThreadPoolTask(task));

Thread.sleep(produceTaskSleepTime);

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

view plain

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class TestThreadPool {

private static int produceTaskSleepTime = 2;

private static int produceTaskMaxNumber = 10;

public static void main(String[] args) {

// 構造壹個線程池

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,

TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),

new ThreadPoolExecutor.DiscardOldestPolicy());

for (int i = 1; i <= produceTaskMaxNumber; i++) {

try {

String task = "task@ " + i;

System.out.println("創建任務並提交到線程池中:" + task);

threadPool.execute(new ThreadPoolTask(task));

Thread.sleep(produceTaskSleepTime);

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

創建 ThreadPoolTask類:

view plaincopy to clipboardprint?

import java.io.Serializable;

public class ThreadPoolTask implements Runnable, Serializable {

private Object attachData;

ThreadPoolTask(Object tasks) {

this.attachData = tasks;

}

public void run() {

System.out.println("開始執行任務:" + attachData);

attachData = null;

}

public Object getTask() {

return this.attachData;

}

}

view plain

import java.io.Serializable;

public class ThreadPoolTask implements Runnable, Serializable {

private Object attachData;

ThreadPoolTask(Object tasks) {

this.attachData = tasks;

}

public void run() {

System.out.println("開始執行任務:" + attachData);

attachData = null;

}

public Object getTask() {

return this.attachData;

}

}

執行結果:

創建任務並提交到線程池中:task@ 1

開始執行任務:task@ 1

創建任務並提交到線程池中:task@ 2

開始執行任務:task@ 2

創建任務並提交到線程池中:task@ 3

創建任務並提交到線程池中:task@ 4

開始執行任務:task@ 3

創建任務並提交到線程池中:task@ 5

開始執行任務:task@ 4

創建任務並提交到線程池中:task@ 6

創建任務並提交到線程池中:task@ 7

創建任務並提交到線程池中:task@ 8

開始執行任務:task@ 5

開始執行任務:task@ 6

創建任務並提交到線程池中:task@ 9

開始執行任務:task@ 7

創建任務並提交到線程池中:task@ 10

開始執行任務:task@ 8

開始執行任務:task@ 9

開始執行任務:task@ 10

ThreadPoolExecutor配置

壹、ThreadPoolExcutor為壹些Executor提供了基本的實現,這些Executor是由Executors中的工廠 newCahceThreadPool、newFixedThreadPool和newScheduledThreadExecutor返回的。 ThreadPoolExecutor是壹個靈活的健壯的池實現,允許各種各樣的用戶定制。

二、線程的創建與銷毀

1、核心池大小、最大池大小和存活時間***同管理著線程的創建與銷毀。

2、核心池的大小是目標的大小;線程池的實現試圖維護池的大小;即使沒有任務執行,池的大小也等於核心池的大小,並直到工作隊列充滿前,池都不會創建更多的線程。如果當前池的大小超過了核心池的大小,線程池就會終止它。

3、最大池的大小是可同時活動的線程數的上限。

4、如果壹個線程已經閑置的時間超過了存活時間,它將成為壹個被回收的候選者。

5、newFixedThreadPool工廠為請求的池設置了核心池的大小和最大池的大小,而且池永遠不會超時

6、newCacheThreadPool工廠將最大池的大小設置為Integer.MAX_VALUE,核心池的大小設置為0,超時設置為壹分鐘。這樣創建了無限擴大的線程池,會在需求量減少的情況下減少線程數量。

三、管理

1、 ThreadPoolExecutor允許妳提供壹個BlockingQueue來持有等待執行的任務。任務排隊有3種基本方法:無限隊列、有限隊列和同步移交。

2、 newFixedThreadPool和newSingleThreadExectuor默認使用的是壹個無限的 LinkedBlockingQueue。如果所有的工作者線程都處於忙碌狀態,任務會在隊列中等候。如果任務持續快速到達,超過了它們被執行的速度,隊列也會無限制地增加。穩妥的策略是使用有限隊列,比如ArrayBlockingQueue或有限的LinkedBlockingQueue以及 PriorityBlockingQueue。

3、對於龐大或無限的池,可以使用SynchronousQueue,完全繞開隊列,直接將任務由生產者交給工作者線程

4、可以使用PriorityBlockingQueue通過優先級安排任務

  • 上一篇:禦龍在天有接紫車紫雞毛紫經書的技巧麽?
  • 下一篇:讀書有哪些方法和技巧
  • copyright 2024編程學習大全網