并发编程学习笔记并发工具的使用

33次阅读

共计 15812 个字符,预计需要花费 40 分钟才能阅读完成。

并发工具类

极客时间 – 王宝令

Lock 和 Condition

隐藏在并发包中的管程

还记得 死锁 中,破坏不可抢占条件方案吗?以下三种方案可以全面弥补 synchronized 的问题。

  • 能够响应中断
  • 支持超时
  • 非阻塞地获取锁

代码则对应

// 支持中断的 API
void lockInterruptibly() 
  throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit) 
  throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();
可重入锁

顾名思义,指的是线程可以重复获取同一把锁。

class X {
  private final Lock rtl =
  new ReentrantLock();
  int value;
  public int get() {
    // 获取锁
    rtl.lock();         ②
    try {return value;} finally {
      // 保证锁能释放
      rtl.unlock();}
  }
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {value = 1 + get(); ①
    } finally {
      // 保证锁能释放
      rtl.unlock();}
  }
}
公平锁和非公平锁
  • 公平锁:唤醒的策略是谁的等待时间长,便先唤醒谁
  • 非公平锁:有可能等待时间短的反而先被唤醒。
// 无参构造函数:默认非公平锁
public ReentrantLock() {sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){sync = fair ? new FairSync() 
                : new NonfairSync();}
用锁的最佳实践
  • 永远只在更新对象的成员变量时加锁
  • 永远只在访问可变的成员变量时加锁
  • 永远不在调用其他对象的方法时加锁

Dubbo 如何使用管程实现异步转同步

Condition 实现了管程里面的条件变量

利用两个条件变量快速实现阻塞队列
public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =
    lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =
    lock.newCondition();

  // 入队
  void enq(T x) {lock.lock();
    try {while ( 队列已满){
        // 等待队列不满
        notFull.await();}  
      // 省略入队操作...
      // 入队后, 通知可出队
      notEmpty.signal();}finally {lock.unlock();
    }
  }
  // 出队
  void deq(){lock.lock();
    try {while ( 队列已空){
        // 等待队列不空
        notEmpty.await();}  
      // 省略出队操作...
      // 出队后,通知可入队
      notFull.signal();}finally {lock.unlock();
    }  
  }
}
Dubbo 异步转同步
// 创建锁与条件变量
private final Lock lock 
    = new ReentrantLock();
private final Condition done 
    = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){long start = System.nanoTime();
  lock.lock();
  try {while (!isDone()) {done.await(timeout);
      long cur=System.nanoTime();
      if (isDone() || 
          cur-start > timeout){break;}
    }
  } finally {lock.unlock();
  }
  if (!isDone()) {throw new TimeoutException();
  }
  return returnFromResponse();}
// RPC 结果是否已经返回
boolean isDone() {return response != null;}
// RPC 结果返回时调用该方法   
private void doReceived(Response res) {lock.lock();
  try {
    response = res;
    if (done != null) {done.signal();
    }
  } finally {lock.unlock();
  }
}

Semaphore 如何快速实现一个限流器

static int count;
// 初始化信号量
static final Semaphore s 
    = new Semaphore(1);
// 用信号量保证互斥    
static void addOne() {s.acquire();
  try {count+=1;} finally {s.release();
  }
}

对象池示例代码

class ObjPool<T, R> {
  final List<T> pool;
  // 用信号量实现限流器
  final Semaphore sem;
  // 构造函数
  ObjPool(int size, T t){pool = new Vector<T>(){};
    for(int i=0; i<size; i++){pool.add(t);
    }
    sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用 func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {t = pool.remove(0);
      return func.apply(t);
    } finally {pool.add(t);
      sem.release();}
  }
}
// 创建对象池
ObjPool<Long, String> pool = 
  new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行  
pool.exec(t -> {System.out.println(t);
    return t.toString();});

ReadWriteLock 如何快速实现一个完备的缓存

适用于读多写少场景

所有的读写锁遵守三条守则

  1. 允许多个线程同时读共享变量
  2. 只允许一个线程写共享变量
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量

简单的 Cache 工具类示例代码

class Cache<K,V> {
  final Map<K, V> m =
    new HashMap<>();
  final ReadWriteLock rwl =
    new ReentrantReadWriteLock();
  // 读锁
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  // 读缓存
  V get(K key) {r.lock();
    try {return m.get(key); }
    finally {r.unlock(); }
  }
  // 写缓存
  V put(String key, Data v) {w.lock();
    try {return m.put(key, v); }
    finally {w.unlock(); }
  }
}

当缓存数据很大时,实现缓存的按需加载

class Cache<K,V> {
  final Map<K, V> m =
    new HashMap<>();
  final ReadWriteLock rwl = 
    new ReentrantReadWriteLock();
  final Lock r = rwl.readLock();
  final Lock w = rwl.writeLock();
 
  V get(K key) {
    V v = null;
    // 读缓存
    r.lock();         ①
    try {v = m.get(key); ②
    } finally{r.unlock();     ③
    }
    // 缓存中存在,返回
    if(v != null) {   ④
      return v;
    }  
    // 缓存中不存在,查询数据库
    w.lock();         ⑤
    try {
      // 再次验证
      // 其他线程可能已经查询过数据库
      v = m.get(key); ⑥
      if(v == null){  ⑦
        // 查询数据库
        v= 省略代码无数
        m.put(key, v);
      }
    } finally{w.unlock();
    }
    return v; 
  }
}

锁的升级是不允许的,但是锁的降级却是允许的。

官方示例代码

class CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReadWriteLock rwl =
    new ReentrantReadWriteLock();
  // 读锁  
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  
  void processCachedData() {
    // 获取读锁
    r.lock();
    if (!cacheValid) {
      // 释放读锁,因为不允许读锁的升级
      r.unlock();
      // 获取写锁
      w.lock();
      try {
        // 再次检查状态  
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // 释放写锁前,降级为读锁
        // 降级是可以的
        r.lock(); ①} finally {
        // 释放写锁
        w.unlock();}
    }
    // 此处仍然持有读锁
    try {use(data);} 
    finally {r.unlock();}
  }
}

另外缓存的设计,使用 guava cache 就足够了,没必要自己去实现。

  • guava cache

StampedLock 比读写锁更快的锁

Stamplock 三种模式

  • 写锁 对应读写锁的写锁
  • 悲观读锁 对应读写锁的读锁
  • 乐观读 注意这里没有锁的概念

使用 stampLock 示例代码

final StampedLock sl = 
  new StampedLock();
  
// 获取 / 释放悲观读锁示意代码
long stamp = sl.readLock();
try {// 省略业务相关代码} finally {sl.unlockRead(stamp);
}

// 获取 / 释放写锁示意代码
long stamp = sl.writeLock();
try {// 省略业务相关代码} finally {sl.unlockWrite(stamp);
}

stamplock 乐观读的官方示例代码

class Point {
  private int x, y;
  final StampedLock sl = 
    new StampedLock();
  // 计算到原点的距离  
  int distanceFromOrigin() {
    // 乐观读
    long stamp = 
      sl.tryOptimisticRead();
    // 读入局部变量,// 读的过程数据可能被修改
    int curX = x, curY = y;
    // 判断执行读操作期间,// 是否存在写操作,如果存在,// 则 sl.validate 返回 false
    if (!sl.validate(stamp)){
      // 升级为悲观读锁
      stamp = sl.readLock();
      try {
        curX = x;
        curY = y;
      } finally {
        // 释放悲观读锁
        sl.unlockRead(stamp);
      }
    }
    return Math.sqrt(curX * curX + curY * curY);
  }
}

StampedLock 使用注意事项

  1. 它不支持重入
  2. 悲观读和写 都不支持条件变量
  3. StampedLock 的 readLock 或者 writeLock 时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。如示例代码。如果要支持可中断操作,一定记得要调用 readLockInterruptibly() 和写锁 writeLockInterruptibly()
final StampedLock lock
  = new StampedLock();
Thread T1 = new Thread(()->{
  // 获取写锁
  lock.writeLock();
  // 永远阻塞在此处,不释放写锁
  LockSupport.park();});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
  // 阻塞在悲观读锁
  lock.readLock());
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

StampedLock 读写模板代码

读模板

final StampedLock sl = 
  new StampedLock();

// 乐观读
long stamp = 
  sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
  // 升级为悲观读锁
  stamp = sl.readLock();
  try {
    // 读入方法局部变量
    .....
  } finally {
    // 释放悲观读锁
    sl.unlockRead(stamp);
  }
}
// 使用方法局部变量执行业务操作
......

写模板

long stamp = sl.writeLock();
try {
  // 写共享变量
  ......
} finally {sl.unlockWrite(stamp);
}

CountDownLatch 和 CyclicBarrier, 让多线程步调一致

使用 Thread join 保持步调一致代码

while(存在未对账订单){
  // 查询未对账订单
  Thread T1 = new Thread(()->{pos = getPOrders();
  });
  T1.start();
  // 查询派送单
  Thread T2 = new Thread(()->{dos = getDOrders();
  });
  T2.start();
  // 等待 T1、T2 结束
  T1.join();
  T2.join();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
} 

使用 CountDownLatch 实现

// 创建 2 个线程的线程池
Executor executor = 
  Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 计数器初始化为 2
  CountDownLatch latch = 
    new CountDownLatch(2);
  // 查询未对账订单
  executor.execute(()-> {pos = getPOrders();
    latch.countDown();});
  // 查询派送单
  executor.execute(()-> {dos = getDOrders();
    latch.countDown();});
  
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}

使用 CyclicBarrier 实现线程同步

// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池 
Executor executor = 
  Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{executor.execute(()->check());
  });
  
void check(){P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
  
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{while( 存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();}
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{while( 存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();}
  });
  T2.start();}

总结

  • CountDownLatch: 主要用来解决一个线程等待多个线程的场景。可以类比旅游团长要等待所有的游客到齐才能去下一个景点。且计数器无法重置
  • CyclicBarrier: 一组线程的相互等待,更像是几个驴友之间不离不弃。计数器可自动重置。

并发容器:都有哪些“坑”需要我们填

注意的坑

List list = Collections.
  synchronizedList(new ArrayList());
Iterator i = list.iterator(); 
while (i.hasNext())
  foo(i.next());

应该加同步遍历使用的时候

List list = Collections.
  synchronizedList(new ArrayList());
synchronized (list) {Iterator i = list.iterator(); 
  while (i.hasNext())
    foo(i.next());
}    

原子类: 无锁工具类的典范

无锁方案的实现原理

其实原子类性能高的秘密很简单,硬件支持而已。CPU 为了解决并发问题,提供了 CAS 指令。

CAS 指令包含三个参数:共享变量的内存地址 A,用于比较的值 B,共享变量的新值 C。并且只有当内存中地址 A 的值等于 B 的值,才允许将内存中地址 A 值改变为 C 值。

模拟示例代码

class SimulatedCAS{
  int count;synchronized int cas(int expect, int newValue){
    // 读目前 count 的值
    int curValue = count;
    // 比较目前 count 值是否 == 期望值
    if(curValue == expect){
      // 如果是,则更新 count 的值
      count = newValue;
    }
    // 返回写入前的值
    return curValue;
  }
}

使用 CAS,一般都会伴随着自旋。示例代码如下

class SimulatedCAS{
  volatile int count;
  // 实现 count+=1
  addOne(){
    do {newValue = count+1; //①}while(count !=
      cas(count,newValue) //②
  }
  // 模拟实现 CAS,仅用来帮助理解
  synchronized int cas(int expect, int newValue){
    // 读目前 count 的值
    int curValue = count;
    // 比较目前 count 值是否 == 期望值
    if(curValue == expect){
      // 如果是,则更新 count 的值
      count= newValue;
    }
    // 返回写入前的值
    return curValue;
  }
}

另外如果使用 CAS 方案,请注意小心 ABA 的问题。

原子类概览

Executor 与线程池:如何创建正确的线程池

java 中的线程池

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 
  • corePoolSize: 表示线程池保有的最小线程数。有些项目很闲,但是也不能把人撤了,至少要保留 corePoolSize 个人坚守阵地
  • maximumPoolSize: 表示线程池创建的最大线程数。当项目很忙时,就要加人,但是也不能无限制的加,最多只能加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到只有 corePoolSize 个人。
  • keepAliveTime & unit: 一个线程如果在一段时间内都没有执行任务,就说明很闲。当线程池的线程数大于 corePoolSize, 那么这个空闲的线程就需要被撤走了。
  • threadFactory: 通过这个参数你可以自定义如何创建线程,可以给线程指定一个有意义的名字。在实际工作中,我一般是这么干的 new ThreadFactoryBuilder().setNameFormat("reportFile-pool-%d").build()
  • handler: 通过这个参数可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,且队列已经满了。此时提交任务,就会执行拒绝策略

    • CallerRunsPolicy: 提交任务的线程自己去执行该任务
    • AbortPolicy: 默认的拒绝策略,throw RejectExecutionException
    • DiscardPolicy: 直接丢弃任务,没有异常抛出。
    • DiscardOldestPolicy: 丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列中
  • allowCoreThreadTimeOut(boolean): java1.6 版本加入了可以让所有线程都支持超时的功能。很舒服。

Future: 如何用多线程实现最优的 烧水泡茶 程序

Future API

// 提交 Runnable 任务
Future<?> 
  submit(Runnable task);
// 提交 Callable 任务
<T> Future<T> 
  submit(Callable<T> task);
// 提交 Runnable 任务及结果引用  
<T> Future<T> 
  submit(Runnable task, T result);

// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消  
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

FutureTask API

FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

// 创建 FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交 FutureTask 
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();

// 创建 FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

实现最优的烧水泡茶程序

// 创建任务 T2 的 FutureTask
FutureTask<String> ft2
  = new FutureTask<>(new T2Task());
// 创建任务 T1 的 FutureTask
FutureTask<String> ft1
  = new FutureTask<>(new T1Task(ft2));
// 线程 T1 执行任务 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 执行任务 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程 T1 执行结果
System.out.println(ft1.get());

// T1Task 需要执行的任务:// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
  FutureTask<String> ft2;
  // T1 任务需要 T2 任务的 FutureTask
  T1Task(FutureTask<String> ft2){this.ft2 = ft2;}
  @Override
  String call() throws Exception {System.out.println("T1: 洗水壶...");
    TimeUnit.SECONDS.sleep(1);
    
    System.out.println("T1: 烧开水...");
    TimeUnit.SECONDS.sleep(15);
    // 获取 T2 线程的茶叶  
    String tf = ft2.get();
    System.out.println("T1: 拿到茶叶:"+tf);

    System.out.println("T1: 泡茶...");
    return "上茶:" + tf;
  }
}
// T2Task 需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {System.out.println("T2: 洗茶壶...");
    TimeUnit.SECONDS.sleep(1);

    System.out.println("T2: 洗茶杯...");
    TimeUnit.SECONDS.sleep(2);

    System.out.println("T2: 拿茶叶...");
    TimeUnit.SECONDS.sleep(1);
    return "龙井";
  }
}
// 一次执行结果:T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井 

CompletableFuture: 异步编程没那么难

使用 CompletableFuture 实现 烧水泡茶 程序

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{System.out.println("T1: 洗水壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1: 烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{System.out.println("T2: 洗茶壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2: 拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return "龙井";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{System.out.println("T1: 拿到茶叶:" + tf);
    System.out.println("T1: 泡茶...");
    return "上茶:" + tf;
  });
// 等待任务 3 执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {u.sleep(t);
  }catch(InterruptedException e){}}
// 一次执行结果:T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井 

如何理解 CompletionStage 接口

任务是有时序关系的。

  • 串行关系
  • 并行关系
  • 汇聚关系

串行关系
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(() -> "Hello World")      //①
  .thenApply(s -> s + "QQ")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ
AND 汇聚关系
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
OR 汇聚关系
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
异常处理

其中 whenComplete 和 handler 的区别在于:whenComplete 不支持返回结果,而 handler 支持返回结果。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
CompletableFuture<Integer> 
  f0 = CompletableFuture
    .supplyAsync(()->7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
System.out.println(f0.join());

总结

对异步编程感兴趣,建议关注一下 RxJava 这个项目。

CompletionService: 如何批量执行异步任务

使用 CompletionService 实现询价系统

先查询出来的结果先进数据库,互不影响。

// 创建线程池
ExecutorService executor = 
  Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new 
  ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {Integer r = cs.take().get();
  executor.execute(()->save(r));
}

利用 CompletionService 实现 Dubbo 的 Forking Cluster

Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。

// 创建线程池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs =
  new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures =
  new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures 
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
  // 只要有一个成功返回,则 break
  for (int i = 0; i < 3; ++i) {r = cs.take().get();
    // 简单地通过判空来检查是否成功返回
    if (r != null) {break;}
  }
} finally {
  // 取消所有任务
  for(Future<Integer> f : futures)
    f.cancel(true);
}
// 返回结果
return r;

Fork/Join: 单机版的 mapreduce

Fork/Join 的使用

ForkJoinTask 是一个抽象类,方法很多,最核心的属 fork 和 join 方法。

fork 方法会异步地执行一个子任务
join 方法则会阻塞当前线程来等待子任务的执行结果

它有两个子类,RecursiveAction 和 RecuisiveTask。他们的区别在于一个没有返回值,而一个是有返回值的。

使用 Fork/Join 实现斐波那契数列

static void main(String[] args){
  // 创建分治任务线程池  
  ForkJoinPool fjp = 
    new ForkJoinPool(4);
  // 创建分治任务
  Fibonacci fib = 
    new Fibonacci(30);   
  // 启动分治任务  
  Integer result = 
    fjp.invoke(fib);
  // 输出结果  
  System.out.println(result);
}
// 递归任务
static class Fibonacci extends 
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){if (n <= 1)
      return n;
    Fibonacci f1 = 
      new Fibonacci(n - 1);
    // 创建子任务  
    f1.fork();
    Fibonacci f2 = 
      new Fibonacci(n - 2);
    // 等待子任务结果,并合并结果  
    return f2.compute() + f1.join();
  }
}

ForkJoinPool 工作原理

ForkJoinPool 相比较于 ThreadPoolExecutor 内部有多个队列。它支持一种叫做 工作窃取 的机制。如果工作线程空了,那它会窃取其他工作队列里的任务。

ForkJoinPool 中的队列采用的是双端队列,工作线程正常获取任务和窃取任务分别是从任务队列不同的端消费的,这样能避免很多不必要的数据竞争。

模拟 Mapreduce 统计单词数量

static void main(String[] args){String[] fc = {"hello world",
          "hello me",
          "hello fork",
          "hello join",
          "fork join in world"};
  // 创建 ForkJoin 线程池    
  ForkJoinPool fjp = 
      new ForkJoinPool(3);
  // 创建任务    
  MR mr = new MR(fc, 0, fc.length);  
  // 启动任务    
  Map<String, Long> result = 
      fjp.invoke(mr);
  // 输出结果    
  result.forEach((k, v)->
    System.out.println(k+":"+v));
}
//MR 模拟类
static class MR extends 
  RecursiveTask<Map<String, Long>> {private String[] fc;
  private int start, end;
  // 构造函数
  MR(String[] fc, int fr, int to){
    this.fc = fc;
    this.start = fr;
    this.end = to;
  }
  @Override protected 
  Map<String, Long> compute(){if (end - start == 1) {return calc(fc[start]);
    } else {int mid = (start+end)/2;
      MR mr1 = new MR(fc, start, mid);
      mr1.fork();
      MR mr2 = new MR(fc, mid, end);
      // 计算子任务,并返回合并的结果    
      return merge(mr2.compute(),
          mr1.join());
    }
  }
  // 合并结果
  private Map<String, Long> merge(
      Map<String, Long> r1, 
      Map<String, Long> r2) {
    Map<String, Long> result = 
        new HashMap<>();
    result.putAll(r1);
    // 合并结果
    r2.forEach((k, v) -> {Long c = result.get(k);
      if (c != null)
        result.put(k, c+v);
      else 
        result.put(k, v);
    });
    return result;
  }
  // 统计单词数量
  private Map<String, Long> 
      calc(String line) {
    Map<String, Long> result =
        new HashMap<>();
    // 分割单词    
    String [] words = 
        line.split("\\s+");
    // 统计单词数量    
    for (String w : words) {Long v = result.get(w);
      if (v != null) 
        result.put(w, v+1);
      else
        result.put(w, 1L);
    }
    return result;
  }
}

正文完
 0