摘要:JDK 1.5 开始 JUC 包下提供的 Exchanger 类可用于两个线程之间替换信息。

本文分享自华为云社区《一行Java代码实现两玩家替换配备【并发编程】》,作者:陈皮的JavaLib 。

1 Exchanger 是什么

JDK 1.5 开始 JUC 包下提供的 Exchanger 类可用于两个线程之间替换信息。Exchanger 对象可了解为一个蕴含2个格子的容器,通过调用 exchanger 办法向其中的格子填充信息,当两个格子中的均被填充信息时,主动替换两个格子中的信息,而后将替换的信息返回给调用线程,从而实现两个线程的信息替换。

性能看似简略,但这在某些场景下是很有用途的,例如游戏中两个玩家替换配备;交友软件男女心仪对象匹配。

上面简略模仿下两个玩家替换配备的场景。

package com.chenpi;import java.util.concurrent.Exchanger;/** * @Description * @Author 陈皮 * @Date 2021/7/11 * @Version 1.0 */public class ChenPiMain {  public static void main(String[] args) throws InterruptedException {    Exchanger<String> exchanger = new Exchanger<>();    new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("屠龙刀");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "周芷若").start();    new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("倚天剑");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "张无忌").start();  }}// 输入后果如下交易胜利,张无忌取得屠龙刀交易胜利,周芷若取得倚天剑

2 Exchanger 详解

Exchager 类可用于两个线程之间替换信息,如果一个线程调用了 Exchanger 对象的 exchange 办法之后,会始终阻塞直到另一个线程来和它替换信息,替换之后的信息返回给调用线程,从而实现两个线程的信息替换。

Exchager 底层也是应用到了自旋和 cas 机制。

留神,如果超过两个线程调用同一个 Exchanger 对象 exchange 办法时,后果是不可预计的,只有有2个线程满足条件了,就认为匹配胜利并替换信息。而剩下的未能失去配对的线程,则会被阻塞始终期待直到有另一个线程能与它匹配与之配对。

package com.chenpi;import java.util.concurrent.Exchanger;/** * @Description * @Author 陈皮 * @Date 2021/7/11 * @Version 1.0 */public class ChenPiMain {  public static void main(String[] args) {    Exchanger<String> exchanger = new Exchanger<>();    new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("屠龙刀");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "周芷若").start();    new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("倚天剑");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "张无忌").start();    new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("假的倚天剑");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "成昆").start();  }}// 输入后果如下交易胜利,周芷若取得假的倚天剑交易胜利,成昆取得屠龙刀

当然,在期待替换信息的线程是能够被中断的,就比方玩家在期待交易过程中,忽然玩家下线了,那就应该中断线程期待。

package com.chenpi;import java.lang.Thread.State;import java.util.ArrayList;import java.util.List;import java.util.concurrent.Exchanger;/** * @Description * @Author 陈皮 * @Date 2021/7/11 * @Version 1.0 */public class ChenPiMain {  public static void main(String[] args) throws InterruptedException {    Exchanger<String> exchanger = new Exchanger<>();    List<Thread> threads = new ArrayList<>(3);    Thread thread1 = new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("屠龙刀");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "周芷若");    threads.add(thread1);    Thread thread2 = new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("倚天剑");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "张无忌");    threads.add(thread2);    Thread thread3 = new Thread(() -> {      String str = null;      try {        str = exchanger.exchange("假的屠龙刀");      } catch (InterruptedException e) {        e.printStackTrace();      }      System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);    }, "成昆");    threads.add(thread3);    for (Thread thread : threads) {      thread.start();    }    // 期待5秒    Thread.sleep(5000);    for (Thread thread : threads) {      System.out.println(thread.getName() + ":" + thread.getState());      // 如果还在阻塞期待则中断线程      if (thread.getState() == State.WAITING) {        thread.interrupt();      }    }  }}// 输入后果如下交易胜利,张无忌取得屠龙刀交易胜利,周芷若取得倚天剑周芷若:TERMINATED张无忌:TERMINATED成昆:WAITING交易胜利,成昆取得nulljava.lang.InterruptedException    at java.util.concurrent.Exchanger.exchange(Exchanger.java:568)    at com.chenpi.ChenPiMain.lambda$main$2(ChenPiMain.java:47)    at java.lang.Thread.run(Thread.java:748)

下面演示的是线程如果等不到另一个线程和它替换信息,则会始终期待上来。其实 Exchanger 还能够设置期待指定工夫。比方零碎设置玩家替换配备匹配工夫为60秒,如果超出工夫则终止交易。

package com.chenpi;import java.util.concurrent.Exchanger;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;/** * @Description * @Author 陈皮 * @Date 2021/7/11 * @Version 1.0 */public class ChenPiMain {  public static void main(String[] args) {    Exchanger<String> exchanger = new Exchanger<>();    new Thread(() -> {      try {        // 超时工夫设置为5秒        String str = exchanger.exchange("屠龙刀", 5, TimeUnit.SECONDS);        System.out.println("交易胜利," + Thread.currentThread().getName() + "取得" + str);      } catch (TimeoutException e) {        System.out.println("交易超时!");        e.printStackTrace();      } catch (InterruptedException e) {        System.out.println("交易异样终止");        e.printStackTrace();      }    }, "周芷若").start();  }}// 输入后果如下交易超时!java.util.concurrent.TimeoutException    at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)    at com.chenpi.ChenPiMain.lambda$main$0(ChenPiMain.java:22)    at java.lang.Thread.run(Thread.java:748)

3 Exchanger 利用

Exchager 在遗传算法和管道设计等利用中是十分有用的。比方两个线程之间替换缓冲区,填充缓冲区的线程在须要时从另一个线程取得一个刚清空的缓冲区,并将填充的缓冲区传递给清空缓冲区的线程。

package com.chenpi;import java.awt.image.DataBuffer;import java.util.concurrent.Exchanger;/** * @Description * @Author 陈皮 * @Date 2021/7/11 * @Version 1.0 */public class ChenPiMain {  Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();  DataBuffer initialEmptyBuffer = ... a made-up type  DataBuffer initialFullBuffer = ...  class FillingLoop implements Runnable {    public void run() {      DataBuffer currentBuffer = initialEmptyBuffer;      try {        while (currentBuffer != null) {          addToBuffer(currentBuffer);          if (currentBuffer.isFull()) {            currentBuffer = exchanger.exchange(currentBuffer);          }        }      } catch (InterruptedException ex) { ...handle ...}    }  }  class EmptyingLoop implements Runnable {    public void run() {      DataBuffer currentBuffer = initialFullBuffer;      try {        while (currentBuffer != null) {          takeFromBuffer(currentBuffer);          if (currentBuffer.isEmpty()) {            currentBuffer = exchanger.exchange(currentBuffer);          }        }      } catch (InterruptedException ex) { ...handle ...}    }  }  void start() {    new Thread(new FillingLoop()).start();    new Thread(new EmptyingLoop()).start();  }}

点击关注,第一工夫理解华为云陈腐技术~