為了演示Exchanger類,我們將創建生產者和消費者任務。ExchangerProducer和ExchangerConsumer使用壹個List<Fat>作為要求交換的對象,它們都包含壹個用於這個List<Fat>的Exchanger。當妳調用Exchanger.exchange()方法時,它將阻塞直至對方任務調用它自己的exchange()方法,那時,這兩個exchange()方法將同時完成,而List<Fat>被交換:
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class ExchangerProducer implements Runnable {
private List<Fat> holder;
private Exchanger<List<Fat>> exchanger;
public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
this.exchanger = exchanger;
this.holder = holder;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
//填充列表
for (int i = 0;i < ExchangerDemo.size; i++) {
holder.add(new Fat());
}
//等待交換
holder = exchanger.exchange(holder);
}
} catch (InterruptedException e) {
}
System.out.println("Producer stopped.");
}
}
class ExchangerConsumer implements Runnable {
private List<Fat> holder;
private Exchanger<List<Fat>> exchanger;
private volatile Fat value;
private static int num = 0;
public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
this.exchanger = exchanger;
this.holder = holder;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
//等待交換
holder = exchanger.exchange(holder);
//讀取列表並移除元素
for (Fat x : holder) {
num++;
value = x;
//在循環內刪除元素,這對於CopyOnWriteArrayList是沒有問題的
holder.remove(x);
}
if (num % 10000 == 0) {
System.out.println("Exchanged count=" + num);
}
}
} catch (InterruptedException e) {
}
System.out.println("Consumer stopped. Final value: " + value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay = 5; //秒
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
List<Fat> producerList = new CopyOnWriteArrayList<>();
List<Fat> consumerList = new CopyOnWriteArrayList<>();
Exchanger<List<Fat>> exchanger = new Exchanger<>();
exec.execute(new ExchangerProducer(exchanger, producerList));
exec.execute(new ExchangerConsumer(exchanger, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
class Fat {
private volatile double d;
private static int counter = 1;
private final int id = counter++;
public Fat() {
//執行壹段耗時的操作
for (int i = 1; i<10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void print() {System.out.println(this);}
public String toString() {return "Fat id=" + id;}
}
執行結果(可能的結果):
Exchanged count=10000
Exchanged count=20000
Exchanged count=30000
Exchanged count=40000
Exchanged count=50000
Exchanged count=60000
Exchanged count=70000
Exchanged count=80000
Consumer stopped. Final value: Fat id=88300
Producer stopped.
在main()中,創建了用於兩個任務的單壹的Exchanger,以及兩個用於互換的CopyOnWriteArrayList。這個特定的List變體允許列表在被遍歷的時候調用remove()方法,而不會拋出ConcurrentModifiedException異常。ExchangerProducer將填充這個List,然後將這個滿列表跟ExchangerConsumer的空列表交換。交換之後,ExchangerProducer可以繼續的生產Fat對象,而ExchangerConsumer則開始使用滿列表中的對象。因為有了Exchanger,填充壹個列表和消費另壹個列表便同時發生了。