當前位置:編程學習大全網 - 源碼下載 - 在生產者和消費者的實例中,如何實現線程並發和***享資源?

在生產者和消費者的實例中,如何實現線程並發和***享資源?

解決思路

在現實應用中,很多時候都需要讓多個線程按照壹定的次序來訪問***享資源,例如,經典的生產者和消費者問題。這類問題描述了這樣壹種情況,假設倉庫中只能存放壹件產品,生產者將生產出來的產品放入倉庫,消費者將倉庫中的產品取走消費。如果倉庫中沒有產品,則生產者可以將產品放入倉庫,否則停止生產並等待,直到倉庫中的產品被消費者取走為止。如果倉庫中放有產品,則消費者可以將產品取走消費,否則停止消費並等待,直到倉庫中再次放入產品為止。顯然,這是壹個同步問題,生產者和消費者***享同壹資源,並且,生產者和消費者之間彼此依賴,互為條件向前推進。但是,該如何編寫程序來解決這個問題呢?

傳統的思路是利用循環檢測的方式來實現,這種方式通過重復檢查某壹個特定條件是否成立來決定線程的推進順序。比如,壹旦生產者生產結束,它就繼續利用循環檢測來判斷倉庫中的產品是否被消費者消費,而消費者也是在消費結束後就會立即使用循環檢測的方式來判斷倉庫中是否又放進產品。顯然,這些操作是很耗費CPU資源的,不值得提倡。那麽有沒有更好的方法來解決這類問題呢?

首先,當線程在繼續執行前需要等待壹個條件方可繼續執行時,僅有 synchronized 關鍵字是不夠的。因為雖然synchronized關鍵字可以阻止並發更新同壹個***享資源,實現了同步,但是它不能用來實現線程間的消息傳遞,也就是所謂的通信。而在處理此類問題的時候又必須遵循壹種原則,即:對於生產者,在生產者沒有生產之前,要通知消費者等待;在生產者生產之後,馬上又通知消費者消費;對於消費者,在消費者消費之後,要通知生產者已經消費結束,需要繼續生產新的產品以供消費。

其實,Java提供了3個非常重要的方法來巧妙地解決線程間的通信問題。這3個方法分別是:wait()、notify()和notifyAll()。它們都是Object類的最終方法,因此每壹個類都默認擁有它們。

雖然所有的類都默認擁有這3個方法,但是只有在synchronized關鍵字作用的範圍內,並且是同壹個同步問題中搭配使用這3個方法時才有實際的意義。

這些方法在Object類中聲明的語法格式如下所示:

final void wait() throws InterruptedException

final void notify()

final void notifyAll()

其中,調用wait()方法可以使調用該方法的線程釋放***享資源的鎖,然後從運行態退出,進入等待隊列,直到被再次喚醒。而調用notify()方法可以喚醒等待隊列中第壹個等待同壹***享資源的線程,並使該線程退出等待隊列,進入可運行態。調用notifyAll()方法可以使所有正在等待隊列中等待同壹***享資源的線程從等待狀態退出,進入可運行狀態,此時,優先級最高的那個線程最先執行。顯然,利用這些方法就不必再循環檢測***享資源的狀態,而是在需要的時候直接喚醒等待隊列中的線程就可以了。這樣不但節省了寶貴的CPU資源,也提高了程序的效率。

由於wait()方法在聲明的時候被聲明為拋出InterruptedException異常,因此,在調用wait()方法時,需要將它放入try…catch代碼塊中。此外,使用該方法時還需要把它放到壹個同步代碼段中,否則會出現如下異常:

"java.lang.IllegalMonitorStateException: current thread not owner"

這些方法是不是就可以實現線程間的通信了呢?下面將通過多線程同步的模型: 生產者和消費者問題來說明怎樣通過程序解決多線程間的通信問題。

具體步驟

下面這個程序演示了多個線程之間進行通信的具體實現過程。程序中用到了4個類,其中ShareData類用來定義***享數據和同步方法。在同步方法中調用了wait()方法和notify()方法,並通過壹個信號量來實現線程間的消息傳遞。

// 例4.6.1 CommunicationDemo.java 描述:生產者和消費者之間的消息傳遞過程

class ShareData

{

private char c;

private boolean isProduced = false; // 信號量

public synchronized void putShareChar(char c) // 同步方法putShareChar()

{

if (isProduced) // 如果產品還未消費,則生產者等待

{

try

{

wait(); // 生產者等待

} catch (InterruptedException e) {

e.printStackTrace();

}

}

this.c = c;

isProduced = true; // 標記已經生產

notify(); // 通知消費者已經生產,可以消費

}

public synchronized char getShareChar() // 同步方法getShareChar()

{

if (!isProduced) // 如果產品還未生產,則消費者等待

{

try

{

wait(); // 消費者等待

} catch (InterruptedException e) {

e.printStackTrace();

}

}

isProduced = false; // 標記已經消費

notify(); // 通知需要生產

return this.c;

}

}

class Producer extends Thread // 生產者線程

{

private ShareData s;

Producer(ShareData s)

{

this.s = s;

}

public void run()

{

for (char ch = 'A'; ch <= 'D'; ch++)

{

try

{

Thread.sleep((int) (Math.random() * 3000));

} catch (InterruptedException e) {

e.printStackTrace();

}

s.putShareChar(ch); // 將產品放入倉庫

System.out.println(ch + " is produced by Producer.");

}

}

}

class Consumer extends Thread // 消費者線程

{

private ShareData s;

Consumer(ShareData s)

{

this.s = s;

}

public void run()

{

char ch;

do {

try

{

Thread.sleep((int) (Math.random() * 3000));

} catch (InterruptedException e) {

e.printStackTrace();

}

ch = s.getShareChar(); // 從倉庫中取出產品

System.out.println(ch + " is consumed by Consumer. ");

} while (ch != 'D');

}

}

class CommunicationDemo

{

public static void main(String[] args)

{

ShareData s = new ShareData();

new Consumer(s).start();

new Producer(s).start();

}

}

上面的程序演示了生產者生產出A、B、C、D四個字符,消費者消費這四個字符的全過程,程序結果如圖4.6.1所示:

圖4.6.1 生產者和消費者舉例

通過程序的運行結果可以看到,盡管在主方法中先啟動了Consumer線程,但是,由於倉庫中沒有產品,因此,Consumer線程就會調用 wait()方法進入等待隊列進行等待,直到Producer線程將產品生產出來並放進倉庫,然後使用notify()方法將其喚醒。

由於在兩個線程中都指定了壹定的休眠時間,因此也可能出現這樣的情況:生產者將產品生產出來放入倉庫,並通知等待隊列中的Consumer線程,然而,由於休眠時間過長,Consumer線程還沒有打算消費產品,此時,Producer線程欲生產下壹個產品,結果由於倉庫中的產品沒有被消費掉,故 Producer線程執行wait()方法進入等待隊列等待,直到Consumer線程將倉庫中的產品消費掉以後通過notify()方法去喚醒等待隊列中的Producer線程為止。可見,兩個線程之間除了必須保持同步之外,還要通過相互通信才能繼續向前推進。

前面這個程序中,生產者壹次只能生產壹個產品,而消費者也只能壹次消費壹個產品。那麽現實中也有這樣的情況,生產者可以壹次生產多個產品,只要倉庫容量夠大,就可以壹直生產。而消費者也可以壹次消費多個產品,直到倉庫中沒有產品為止。

但是,無論是生產產品到倉庫,還是從倉庫中消費,每壹次都只能允許壹個操作。顯然,這也是個同步問題,只不過在這個問題中***享資源是壹個資源池,可以存放多個資源。下面就以棧結構為例給出如何在這個問題中解決線程通信的程序代碼。

// 例4.6.2 CommunicationDemo2.java

class SyncStack // 同步堆棧類,可以壹次放入多個數據

{

private int index = 0; // 堆棧指針初始值為0

private char[] buffer = new char[5]; // 堆棧有5個字符的空間

public synchronized void push(char c) // 入棧同步方法

{

if (index == buffer.length) // 堆棧已滿,不能入棧

{

try

{

this.wait(); // 等待出棧線程將數據出棧

} catch (InterruptedException e) {

}

}

buffer[index] = c; // 數據入棧

index++; // 指針加1,棧內空間減少

this.notify(); // 通知其他線程把數據出棧

}

public synchronized char pop() // 出棧同步方法

{

if (index == 0) // 堆棧無數據,不能出棧

{

try

{

this.wait(); // 等待入棧線程把數據入棧

} catch (InterruptedException e) {

}

}

this.notify(); // 通知其他線程入棧

index--; // 指針向下移動

return buffer[index]; // 數據出棧

}

}

class Producer implements Runnable // 生產者類

{

SyncStack s; // 生產者類生成的字母都保存到同步堆棧中

public Producer(SyncStack s)

{

this.s = s;

}

public void run()

{

char ch;

for (int i = 0; i < 5; i++)

{

try

{

Thread.sleep((int) (Math.random() * 1000));

} catch (InterruptedException e) {

}

ch = (char) (Math.random() * 26 + 'A'); // 隨機產生5個字符

s.push(ch); // 把字符入棧

System.out.println("Push " + ch + " in Stack"); // 打印字符入棧

}

}

}

class Consumer implements Runnable // 消費者類

{

SyncStack s; // 消費者類獲得的字符都來自同步堆棧

public Consumer(SyncStack s)

{

this.s = s;

}

public void run()

{

char ch;

for (int i = 0; i < 5; i++)

{

try

{

Thread.sleep((int) (Math.random() * 3000));

} catch (InterruptedException e) {

}

ch = s.pop(); // 從堆棧中讀取字符

System.out.println("Pop " + ch + " from Stack"); // 打印字符出棧

}

}

}

public class CommunicationDemo2

{

public static void main(String[] args)

{

SyncStack stack = new SyncStack();

// 下面的消費者類對象和生產者類對象所操作的是同壹個同步堆棧對象

Thread t1 = new Thread(new Producer(stack)); // 線程實例化

Thread t2 = new Thread(new Consumer(stack)); // 線程實例化

t2.start(); // 線程啟動

t1.start(); // 線程啟動

}

}

程序中引入了壹個堆棧數組buffer[]來模擬資源池,並使生產者類和消費者類都實現了Runnable接口,然後在主程序中通過前面介紹的方法創建兩個***享同壹堆棧資源的線程,並且有意先啟動消費者線程,後啟動生產者線程。請在閱讀程序的時候仔細觀察例4.6.1和本例的相似點以及區別之處,體會作者的用心。程序結果輸出如圖4.6.2所示:

圖4.6.2 ***享資源池的生產者和消費者問題

由於是棧結構,所以符合後進先出原則。有興趣的讀者還可以用符合先進先出原則的隊列結構來模擬線程間通信的過程,相信可以通過查閱相關的資料來解決這個問題,在這裏就不再給出程序代碼了,作為壹個思考題供讀者練習。

專家說明

本小節介紹了三個重要的方法:wait()、notify()和notifyAll()。使用它們可以高效率地完成多個線程間的通信問題,這樣在通信問題上就不必再使用循環檢測的方法來等待某個條件的發生,因為這種方法是極為浪費CPU資源的,當然這種情況也不是所期望的。在例4.6.1中,為了更好地通信,引入了壹個專門用來傳遞信息的信號量。利用信號量來決定線程是否等待無疑是壹種非常安全的操作,值得提倡。此外,在例4.6.2中引入了資源池作為***享資源,並解決了在這種情況下如何實現多線程之間的通信問題。希望讀者能夠舉壹反三,編寫出解決更加復雜問題的程序。

專家指點

可以肯定的是,合理地使用wait()、notify()和notifyAll()方法確實能夠很好地解決線程間通信的問題。但是,也應該了解到這些方法是更復雜的鎖定、排隊和並發性代碼的構件。尤其是使用 notify()來代替notifyAll()時是有風險的。除非確實知道每壹個線程正在做什麽,否則最好使用notifyAll()。其實,在 JDK1.5中已經引入了壹個新的包:java.util.concurrent 包,該包是壹個被廣泛使用的開放源碼工具箱,裏面都是有用的並發性實用程序。完全可以代替wait()和notify()方法用來編寫自己的調度程序和鎖。有關信息可以查閱相關資料,本書中不再贅述。

相關問題

Java提供了各種各樣的輸入輸出流(stream),使程序員能夠很方便地對數據進行操作。其中,管道(pipe)流是壹種特殊的流,用於在不同線程間直接傳送數據。壹個線程發送數據到輸出管道,另壹個線程從輸入管道中讀出數據。通過使用管道,達到實現多個線程間通信的目的。那麽,如何創建和使用管道呢?

Java提供了兩個特殊的專門用來處理管道的類,它們就是PipedInputStream類和PipedOutputStream類。

其中,PipedInputStream代表了數據在管道中的輸出端,也就是線程從管道讀出數據的壹端;PipedOutputStream代表了數據在管道中的輸入端,也就是線程向管道寫入數據的壹端,這兩個類壹起使用就可以創建出數據輸入輸出的管道流對象。

  • 上一篇:核桃編程壹年學費多少
  • 下一篇:關於dmi指標源公式
  • copyright 2024編程學習大全網