當前位置:編程學習大全網 - 源碼下載 - Java多線程(五)之BlockingQueue深入分析

Java多線程(五)之BlockingQueue深入分析

  壹 概述

 BlockingQueue作為線程容器 可以為線程同步提供有力的保障

  二 BlockingQueue定義的常用方法

  BlockingQueue定義的常用方法如下

 拋出異常 特殊值 阻塞 超時

 插入 add(e) offer(e) put(e) offer(e time unit)

 移除 remove() poll() take() poll(time unit)

 檢查 element() peek() 不可用 不可用

  )add(anObject) 把anObject加到BlockingQueue裏 即如果BlockingQueue可以容納 則返回true 否則招聘異常

  )offer(anObject) 表示如果可能的話 將anObject加到BlockingQueue裏 即如果BlockingQueue可以容納 則返回true 否則返回false

  )put(anObject) 把anObject加到BlockingQueue裏 如果BlockQueue沒有空間 則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續

  )poll(time) 取走BlockingQueue裏排在首位的對象 若不能立即取出 則可以等time參數規定的時間 取不到時返回null

  )take() 取走BlockingQueue裏排在首位的對象 若BlockingQueue為空 阻斷進入等待狀態直到Blocking有新的對象被加入為止

 其中 BlockingQueue 不接受null 元素 試圖add put 或offer 壹個null 元素時 某些實現會拋出NullPointerException null 被用作指示poll 操作失敗的警戒值

  三 BlockingQueue的幾個註意點

  BlockingQueue 可以是限定容量的 它在任意給定時間都可以有壹個remainingCapacity 超出此容量 便無法無阻塞地put 附加元素 沒有任何內部容量約束的BlockingQueue 總是報告Integer MAX_VALUE 的剩余容量

  BlockingQueue 實現主要用於生產者 使用者隊列 但它另外還支持Collection 接口 因此 舉例來說 使用remove(x) 從隊列中移除任意壹個元素是有可能的 然而 這種操作通常不 會有效執行 只能有計劃地偶爾使用 比如在取消排隊信息時

  BlockingQueue 實現是線程安全的 所有排隊方法都可以使用內部鎖或其他形式的並發控制來自動達到它們的目的 然而 大量的 Collection 操作(addAll containsAll retainAll 和removeAll)沒有 必要自動執行 除非在實現中特別說明 因此 舉例來說 在只添加了c 中的壹些元素後 addAll(c) 有可能失敗(拋出壹個異常)

  BlockingQueue 實質上不 支持使用任何壹種 close 或 shutdown 操作來指示不再添加任何項 這種功能的需求和使用有依賴於實現的傾向 例如 壹種常用的策略是 對於生產者 插入特殊的end of stream 或 poison 對象 並根據使用者獲取這些對象的時間來對它們進行解釋

  四 簡要概述BlockingQueue常用的四個實現類

 

  )ArrayBlockingQueue:規定大小的BlockingQueue 其構造函數必須帶壹個int參數來指明其大小 其所含的對象是以FIFO(先入先出)順序排序的

 

  )LinkedBlockingQueue:大小不定的BlockingQueue 若其構造函數帶壹個規定大小的參數 生成的BlockingQueue有大小限制 若不帶大小參數 所生成的BlockingQueue的大小由Integer MAX_VALUE來決定 其所含的對象是以FIFO(先入先出)順序排序的

  )PriorityBlockingQueue:類似於LinkedBlockQueue 但其所含對象的排序不是FIFO 而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序

  )SynchronousQueue:特殊的BlockingQueue 對其的操作必須是放和取交替完成的

 其中LinkedBlockingQueue和ArrayBlockingQueue比較起來 它們背後所用的數據結構不壹樣 導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue 但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue

  五 具體BlockingQueue的實現類的內部細節

  有耐心的同學請看具體實現類細節

  ArrayBlockingQueue

 ArrayBlockingQueue是壹個由數組支持的有界阻塞隊列 此隊列按 FIFO(先進先出)原則對元素進行排序 隊列的頭部 是在隊列中存在時間最長的元素 隊列的尾部 是在隊列中存在時間最短的元素 新元素插入到隊列的尾部 隊列檢索操作則是從隊列頭部開始獲得元素

 這是壹個典型的 有界緩存區 固定大小的數組在其中保持生產者插入的元素和使用者提取的元素 壹旦創建了這樣的緩存區 就不能再增加其容量 試圖向已滿隊列中放入元素會導致放入操作受阻塞 試圖從空隊列中檢索元素將導致類似阻塞

 ArrayBlockingQueue創建的時候需要指定容量capacity(可以存儲的最大的元素個數 因為它不會自動擴容)以及是否為公平鎖(fair參數)

 在創建ArrayBlockingQueue的時候默認創建的是非公平鎖 不過我們可以在它的構造函數裏指定 這裏調用ReentrantLock的構造函數創建鎖的時候 調用了

 public ReentrantLock(boolean fair) {

 sync = (fair)? new FairSync() : new NonfairSync()

 }

 FairSync/ NonfairSync是ReentrantLock的內部類

 線程按順序請求獲得公平鎖 而壹個非公平鎖可以闖入 且當它尚未進入等待隊列 就會和等待隊列head結點的線程發生競爭 如果鎖的狀態可用 請求非公平鎖的線程可在等待隊列中向前跳躍 獲得該鎖 內部鎖synchronized沒有提供確定的公平性保證

 分三點來講這個類

  添加新元素的方法 add/put/offer

  該類的幾個實例變量 takeIndex/putIndex/count/

  Condition實現

  添加新元素的方法 add/put/offer

 首先 談到添加元素的方法 首先得分析以下該類同步機制中用到的鎖

  Java代碼

 [java]

 lock = new ReentrantLock(fair)

 notEmpty = lock newCondition() //Condition Variable

 notFull =? lock newCondition() //Condition Variable

 這三個都是該類的實例變量 只有壹個鎖lock 然後lock實例化出兩個Condition notEmpty/noFull分別用來協調多線程的讀寫操作

 Java代碼

 [java]

 public boolean offer(E e) {

 if (e == null) throw new NullPointerException()

 final ReentrantLock lock = this lock;//每個對象對應壹個顯示的鎖

 lock lock() //請求鎖直到獲得鎖(不可以被interrupte)

 try {

 if (count == items length)//如果隊列已經滿了

 return false;

 else {

 insert(e)

 return true;

 }

 } finally {

 lock unlock() //

 }

 }

 看insert方法

 private void insert(E x) {

 items[putIndex] = x;

 //增加全局index的值

 /*

 Inc方法體內部

 final int inc(int i) {

 return (++i == items length)? : i;

 }

 這裏可以看出ArrayBlockingQueue采用從前到後向內部數組插入的方式插入新元素的 如果插完了 putIndex可能重新變為 (在已經執行了移除操作的前提下 否則在之前的判斷中隊列為滿)

 */

 putIndex = inc(putIndex)

 ++count;

 notEmpty signal() //wake up one waiting thread

 }

  Java代碼

 [java]

 public void put(E e) throws InterruptedException {

 if (e == null) throw new NullPointerException()

 final E[] items = ems;

 final ReentrantLock lock = this lock;

 lock lockInterruptibly() //請求鎖直到得到鎖或者變為interrupted

 try {

 try {

 while (count == items length)//如果滿了 當前線程進入noFull對應的等waiting狀態

 notFull await()

 } catch (InterruptedException ie) {

 notFull signal() // propagate to non interrupted thread

 throw ie;

 }

 insert(e)

 } finally {

 lock unlock()

 }

 }

  Java代碼

 [java]

 public boolean offer(E e long timeout TimeUnit unit)

 throws InterruptedException {

 if (e == null) throw new NullPointerException()

 long nanos = unit toNanos(timeout)

 final ReentrantLock lock = this lock;

 lock lockInterruptibly()

 try {

 for ( ) {

 if (count != items length) {

 insert(e)

 return true;

 }

 if (nanos <= )

 return false;

 try {

 //如果沒有被 signal/interruptes 需要等待nanos時間才返回

 nanos = notFull awaitNanos(nanos)

 } catch (InterruptedException ie) {

 notFull signal() // propagate to non interrupted thread

 throw ie;

 }

 }

 } finally {

 lock unlock()

 }

 }

  Java代碼

 [java]

 public boolean add(E e) {

 return super add(e)

 }

 父類

 public boolean add(E e) {

 if (offer(e))

 return true;

 else

 throw new IllegalStateException( Queue full )

 }

  該類的幾個實例變量 takeIndex/putIndex/count

  Java代碼

 [java]

 用三個數字來維護這個隊列中的數據變更

 /** items index for next take poll or remove */

 private int takeIndex;

 /** items index for next put offer or add */

 private int putIndex;

 /** Number of items in the queue */

 private int count;

 提取元素的三個方法take/poll/remove內部都調用了這個方法

 Java代碼

 [java]

 private E extract() {

 final E[] items = ems;

 E x = items[takeIndex];

 items[takeIndex] = null;//移除已經被提取出的元素

 takeIndex = inc(takeIndex) //策略和添加元素時相同

  count;

 notFull signal() //提醒其他在notFull這個Condition上waiting的線程可以嘗試工作了

 return x;

 }

 從這個方法裏可見 tabkeIndex維護壹個可以提取/移除元素的索引位置 因為takeIndex是從 遞增的 所以這個類是FIFO隊列

 putIndex維護壹個可以插入的元素的位置索引

 count顯然是維護隊列中已經存在的元素總數

  Condition實現

 Condition現在的實現只有ncurrent locks AbstractQueueSynchoronizer內部的ConditionObject 並且通過ReentranLock的newCondition()方法暴露出來 這是因為Condition的await()/sinal()壹般在lock lock()與lock unlock()之間執行 當執行condition await()方法時 它會首先釋放掉本線程持有的鎖 然後自己進入等待隊列 直到sinal() 喚醒後又會重新試圖去拿到鎖 拿到後執行await()下的代碼 其中釋放當前鎖和得到當前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定 並且享受ReentranLock的重進入特性

  Java代碼

 [java]

 public final void await() throws InterruptedException {

 if (Thread interrupted())

 throw new InterruptedException()

 //加壹個新的condition等待節點

 Node node = addConditionWaiter()

 //釋放自己的鎖

 int savedState = fullyRelease(node)

 int interruptMode = ;

 while (!isOnSyncQueue(node)) {

 //如果當前線程 等待狀態時CONDITION park住當前線程 等待condition的signal來解除

 LockSupport park(this)

 if ((interruptMode = checkInterruptWhileWaiting(node)) != )

 break;

 }

 if (acquireQueued(node savedState) && interruptMode != THROW_IE)

 interruptMode = REINTERRUPT;

 if (node nextWaiter != null)

 unlinkCancelledWaiters()

 if (interruptMode != )

 reportInterruptAfterWait(interruptMode)

 }

  SynchronousQueue

 壹種阻塞隊列 其中每個 put 必須等待壹個 take 反之亦然 同步隊列沒有任何內部容量 甚至連壹個隊列的容量都沒有 不能在同步隊列上進行 peek 因為僅在試圖要取得元素時 該元素才存在 除非另壹個線程試圖移除某個元素 否則也不能(使用任何方法)添加元素 也不能叠代隊列 因為其中沒有元素可用於叠代 隊列的頭 是嘗試添加到隊列中的首個已排隊線程元素 如果沒有已排隊線程 則不添加元素並且頭為 null 對於其他Collection 方法(例如 contains) SynchronousQueue 作為壹個空集合 此隊列不允許 null 元素

 同步隊列類似於 CSP 和 Ada 中使用的 rendezvous 信道 它非常適合於傳遞性設計 在這種設計中 在壹個線程中運行的對象要將某些信息 事件或任務傳遞給在另壹個線程中運行的對象 它就必須與該對象同步

 對於正在等待的生產者和使用者線程而言 此類支持可選的公平排序策略 默認情況下不保證這種排序 但是 使用公平設置為 true 所構造的隊列可保證線程以 FIFO 的順序進行訪問 公平通常會降低吞吐量 但是可以減小可變性並避免得不到服務

  LinkedBlockingQueue

 壹個基於已鏈接節點的 範圍任意的 blocking queue 此隊列按 FIFO(先進先出)排序元素 隊列的頭部 是在隊列中時間最長的元素 隊列的尾部 是在隊列中時間最短的元素 新元素插入到隊列的尾部 並且隊列檢索操作會獲得位於隊列頭部的元素 鏈接隊列的吞吐量通常要高於基於數組的隊列 但是在大多數並發應用程序中 其可預知的性能要低

 單向鏈表結構的隊列 如果不指定容量默認為Integer MAX_VALUE 通過putLock和takeLock兩個鎖進行同步 兩個鎖分別實例化notFull和notEmpty兩個Condtion 用來協調多線程的存取動作 其中某些方法(如remove toArray toString clear等)的同步需要同時獲得這兩個鎖 並且總是先putLock lock緊接著takeLock lock(在同壹方法fullyLock中) 這樣的順序是為了避免可能出現的死鎖情況(我也想不明白為什麽會是這樣?)

  PriorityBlockingQueue

 壹個無界的阻塞隊列 它使用與類 PriorityQueue 相同的順序規則 並且提供了阻塞檢索的操作 雖然此隊列邏輯上是無界的 但是由於資源被耗盡 所以試圖執行添加操作可能會失敗(導致 OutOfMemoryError) 此類不允許使用 null 元素 依賴自然順序的優先級隊列也不允許插入不可比較的對象(因為這樣做會拋出ClassCastException)

  看它的三個屬性 就基本能看懂這個類了

  Java代碼

 [java]

 private final PriorityQueue q;

 private final ReentrantLock lock = new ReentrantLock(true)

 private final Condition notEmpty = lock newCondition()

 lock說明本類使用壹個lock來同步讀寫等操作

 notEmpty協調隊列是否有新元素提供 而隊列滿了以後會調用PriorityQueue的grow方法來擴容

  DelayQueue

 Delayed 元素的壹個無界阻塞隊列 只有在延遲期滿時才能從中提取元素 該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素 如果延遲都還沒有期滿 則隊列沒有頭部 並且 poll 將返回 null 當壹個元素的getDelay(TimeUnit NANOSECONDS) 方法返回壹個小於或等於零的值時 則出現期滿 此隊列不允許使用 null 元素

 Delayed接口繼承自Comparable 我們插入的E元素都要實現這個接口

 DelayQueue的設計目的間API文檔

 An unbounded blocking queue of Delayed elements in which an element can only be taken when its delay has expired The head of the queue is that Delayed element whose delay expired furthest in the past If no delay has expired there is no head and poll will returnnull Expiration occurs when an element s getDelay(TimeUnit NANOSECONDS) method returns a value less than or equal to zero Even though unexpired elements cannot be removed using take or poll they are otherwise treated as normal elements For example the size method returns the count of both expired and unexpired elements This queue does not permit null elements

 因為DelayQueue構造函數了裏限定死不允許傳入parator(之前的PriorityBlockingQueue中沒有限定死) 即只能在pare方法裏定義優先級的比較規則 再看上面這段英文 The head of the queue is that Delayed element whose delay expired furthest in the past 說明pare方法實現的時候要保證最先加入的元素最早結束延時 而 Expiration occurs when an element s getDelay(TimeUnit NANOSECONDS) method returns a value less than or equal to zero 說明getDelay方法的實現必須保證延時到了返回的值變為<= 的int

 上面這段英文中 還說明了 在poll/take的時候 隊列中元素會判定這個elment有沒有達到超時時間 如果沒有達到 poll返回null 而take進入等待狀態 但是 除了這兩個方法 隊列中的元素會被當做正常的元素來對待 例如 size方法返回所有元素的數量 而不管它們有沒有達到超時時間 而協調的Condition available只對take和poll是有意義的

 另外需要補充的是 在ScheduledThreadPoolExecutor中工作隊列類型是它的內部類DelayedWorkQueue 而DelayedWorkQueue的Task容器是DelayQueue類型 而ScheduledFutureTask作為Delay的實現類作為Runnable的封裝後的Task類 也就是說ScheduledThreadPoolExecutor是通過DelayQueue優先級判定規則來執行任務的

  BlockingDque+LinkedBlockingQueue

lishixinzhi/Article/program/Java/gj/201311/27544

  • 上一篇:支撐壓力位指標
  • 下一篇:如何獲取新浪微博的登錄參數
  • copyright 2024編程學習大全網