當前位置:編程學習大全網 - 編程語言 - RxJava怎麽實現多個線程同時執行,怎麽實現retryWhen

RxJava怎麽實現多個線程同時執行,怎麽實現retryWhen

在編寫壹個類時,如果該類中的代碼可能運行於多線程環境下,那麽就要考慮同步的問題,Java實現線程同步的方法很多,具體如下。

(1)synchronized關鍵字

在Java中內置了語言級的同步原語synchronized關鍵字,其在多線程條件下實現了對***享資源的同步訪問。根據synchronized關鍵字修飾的對象不同可以分為以下幾種情況。

*synchronized關鍵字同步方法

public synchronized void method(){

//do something

}

註意: 如果使用synchronized關鍵字同步方法,很容易誤認為同步關鍵字鎖住了它所包圍的代碼。但是實際情況卻不是這樣,同步加鎖的是對象而並非代碼。因此。如果在壹個類中有壹個同步方法,該方法是可以被兩個不同的線程同時執行的,只要每個線程自己創建壹個該類的實例即可。

示例代碼:

package newthread;

public class TestSync {

public static void main(String[] args) {

MyThread1 my1=new MyThread1(1);

MyThread1 my2=new MyThread1(3);

my1.start();

my2.start();

}

}

class MyThread1 extends Thread{

private int val;

public MyThread1(int v){

val=v;

}

public synchronized void printVal(int v){

for(int i=0;i<100;i++){

System.out.print(v);

}

}

public void run(){

printVal(val);

}

}

執行代碼結果是1和3交叉輸出的,即1和3兩個線程在並發執行printVal方法,並沒有實現同步功能。原因在於synchronized關鍵字鎖定的是對象而並非代碼塊,如果需要實現真正的同步,必須同步壹個全局對象或者對類進行同步。synchronized關鍵字同步類的格式如下:

synchronized(MyThread.class){}

改進代碼

package newthread;

public class TestSync_2 {

public static void main(String[] args) {

MyThread_2 my1=new MyThread_2(1);

my1.start();

MyThread_2 my2=new MyThread_2(2);

my2.start();

}

}

class MyThread_2 extends Thread{

private int val;

public MyThread_2(int v){

val=v;

}

public void printVal(int v){

synchronized(MyThread_2.class){

for(int i=0;i<100;i++){

System.out.print(v);

}

}

}

public void run(){

printVal(val);

}

}

在上面的實例中,printVal()方法的功能不再對個別的類實行同步,而是對當前類進行同步。對於MyThread而言,它只有惟壹的類定義,兩個線程在相同的鎖上同步,因此在同壹時刻只有壹個線程可以執行printVal()方法。至於輸出結果的兩種可能,則是由於Java線程調度的搶占實現模式所決定的。

*synchronized關鍵字同步公***的靜態成員變量

在上面的示例中,通過對當前類進行加鎖,達到了線程同步的效果,但是基於線程同步的壹般原理是應該盡量減小同步的粒度以達到更高的性能。其實針對上文中的示例,也可以通過對公***對象加鎖,即添加壹個靜態成員變量來實現,兩種方法都通過同步該對象而達到線程安全。示例代碼如下:

package newthread;

public class TestSync_3 {

public static void main(String[] args) {

MyThread_3 my1=new MyThread_3(2);

my1.start();

MyThread_3 my2=new MyThread_3(5);

my2.start();

}

}

class MyThread_3 extends Thread{

private int val;

private static Object lock=new Object();

public MyThread_3(int v){

val=v;

}

public void printVal(int v){

synchronized(lock){

for(int i=0;i<100;i++){

System.out.print(v);

}

}

}

public void run(){

printVal(val);

}

}

註意:為了能夠提高程序的性能,針對示例代碼中對於對象的選取做壹個簡單的介紹:基於JVM的優化機制,由於String類型的對象是不可變的,因此當用戶使用“”的形式引用字符串時,如果JVM發現內存已經有壹個這樣的對象,那麽它就使用該對象而不再生成壹個新的String對象,這樣是為了減小內存的使用;而零長度的byte數組對象創建起來將比任何對象要實用,生成零長度的byte[]對象只需要3條操作碼,而Object lock=new Object()則需要7行操作碼。

*synchronized關鍵字同步遊離塊

synchronized{

//do something

}

synchronized關鍵字同步代碼塊和上文中所提到的synchronized關鍵字同步公***的靜態成員變量類似,都是為了降低同步粒度避免對整個類進行同步而極大降低了程序的性能

*synchronized關鍵字同步靜態方法

public synchronized static void methodAAA(){

//do something

}

public void methodBBB(){

synchronized(Test.class){

//do something

}

}

synchronized關鍵字同步靜態方法與synchronized關鍵字同步類實現的效果相同,唯壹不同的是獲得的鎖對象不同,當同壹個object對象訪問methodAAA()和methodBBB()時,同步靜態方法獲得的鎖就是該object類,而同步類方法獲得的對象鎖則是object對象所屬的那個類

(2)Metux互斥體的設計和使用

Metux稱為互斥體,同樣廣泛應用於多線程編程中。。其中以Doug Lea教授編寫的concurrent工具包中實現的Mutex最為典型,本文將以此為例,對多線程編程中互斥體的設計和使用做簡單介紹。在Doug Lea的concurrent工具包中,Mutex實現了Sync借口,該接口是concurrent工具包中所有鎖(lock)、門(gate)、和條件變量(condition)的公***接口,SyncD的實現類主要有Metux、Semaphore及其子類、Latch、CountDown和ReentrantLock等。這也體現了面向對象抽象編程的思想,使開發者可以在不改變代碼或者改變少量代碼的情況下,選擇使用Sync的不同實現。Sync接口的定義如下:

package newthread;

public interface Sync {

//獲取許可

public void acquire() throws InterruptedException;

//嘗試獲取許可

public boolean attempt(long msecs)throws InterruptedException;

//釋放許可

public void realse();

}

通過使用Sync接口可以替代synchronized關鍵字,並提供更加靈活的同步控制,但是並不是說concurrent工具包是獨立於synchronized的技術,其實concurrent工具包也是在synchronized的基礎上搭建的。區別在於synchronized關鍵字僅在方法內或者代碼塊內有效,而使用Sync卻可以跨越方法甚至通過在對象之間傳遞,跨越對象進行同步。這是Sync及concurrent工具包比直接使用synchronized更加強大的地方。

需要註意的是Sync中的acquire()和attempt()方法都會拋出InterruptedException異常,因此在使用Sync及其子類時,調用這些方法壹定要捕獲InterruptedException。而release()方法並不會拋出InterruptedException異常,這是因為在acquire()和attemp()方法中都有可能會調用wait()方法等待其他線程釋放鎖。因此,如果對壹個並沒有acquire()方法的線程調用release()方法不會存在什麽問題。而由於release()方法不會拋出InterruptedException,因此必須在catch或finally子句中調用release()方法以保證獲得的鎖能夠被正確釋放。示例代碼如下:

package newthread;

public class TestSync_4 {

Sync gate;

public void test(){

try {

gate.acquire();

try{

//do something

}finally{

gate.realse();

}

} catch (InterruptedException ex) {

}

}

}

Mutex是壹個非重入的互斥鎖。廣泛應用於需要跨越方法的before or after類型的同步環境中。下面是壹個Doug Lea的concurrent工具包中的Mutex的實現,代碼如下:

package newthread;

import com.sun.corba.se.impl.orbutil.concurrent.Sync;

public class TestMutex implements Sync{

protected boolean flg=false;

public void acquire() throws InterruptedException {

if(Thread.interrupted())

throw new InterruptedException;

synchronized(this){

try{

while(flg)

wait();

flg=true;

}catch(InterruptedException ex){

notify();

throw ex;

}

}

}

public boolean attempt(long msecs) throws InterruptedException {

if(Thread.interrupted())

throw new InterruptedException;

synchronized(this){

if(!flg){

flg=true;

return true;

}else if(msecs<=0){

return false;

}else{

long waitTime=msecs;

long start=System.currentTimeMillis();

try{

for(;;){

wait(waitTime);

if(!flg){

flg=true;

return true;

}else{

waitTime=msecs-(System.currentTimeMillis()-start);

if(waitTime<=0)

return false;

}

}

}catch(InterruptedException ex){

notify();

throw ex;

}

}

}

}

public void release() {

flg=false;

notify();

}

}

  • 上一篇:周末發給女朋友的短信
  • 下一篇:簡易攻略正當防衛2
  • copyright 2024編程學習大全網