乐趣区

关于java:60行从零开始自己动手写FutureTask是什么体验

前言

在并发编程当中咱们最常见的需要就是启动一个线程执行一个函数去实现咱们的需要,而在这种需要当中,咱们经常须要函数有返回值。比方咱们须要同一个十分大的数组当中数据的和,让每一个线程求某一个区间外部的和,最终将这些和加起来,那么每个线程都须要返回对应区间的和。而在 Java 当中给咱们提供了这种机制,去实现这一个成果——FutureTask

FutureTask

在本人写 FutureTask 之前咱们首先写一个例子来回顾一下 FutureTask 的编程步骤:

  • 写一个类实现 Callable 接口。
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;}

实现接口就实现 call 即可,能够看到这个函数是有返回值的,而 FutureTask 返回给咱们的值就是这个函数的返回值。

  • new一个 FutureTask 对象,并且 new 一个第一步写的类,new FutureTask<>(callable 实现类)
  • 最初将刚刚失去的 FutureTask 对象传入 Thread 类当中,而后启动线程即可new Thread(futureTask).start();
  • 而后咱们能够调用 FutureTaskget办法失去返回的后果futureTask.get();

如果有一个数组 data,长度为 100000,当初有 10 个线程,第i 个线程求数组 [i * 10000, (i + 1) * 10000) 所有数据的和,而后将这十个线程的后果加起来。

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {int[] data = new int[100000];
    Random random = new Random();
    for (int i = 0; i < 100000; i++) {data[i] = random.nextInt(10000);
    }
    @SuppressWarnings("unchecked")
    FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10);
    // 设置 10 个 futuretask 工作计算数组当中数据的和
    for (int i = 0; i < 10; i++) {
      int idx = i;
      tasks[i] = new FutureTask<>(() -> {
        int sum = 0;
        for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {sum += data[k];
        }
        return sum;
      });
    }
    // 开启线程执行 futureTask 工作
    for (FutureTask<Integer> futureTask : tasks) {new Thread(futureTask).start();}
    int threadSum = 0;
    for (FutureTask<Integer> futureTask : tasks) {threadSum += futureTask.get();
    }
    int sum = Arrays.stream(data).sum();
    System.out.println(sum == threadSum); // 后果始终为 true
  }
}

可能你会对 FutureTask 的应用形式感觉困惑,或者不是很分明,当初咱们来认真捋一下思路。

  1. 首先启动一个线程要么是继承自 Thread 类,而后重写 Thread 类的 run 办法,要么是给 Thread 类传递一个实现了 Runnable 的类对象,当然能够用匿名外部类实现。
  2. 既然咱们的 FutureTask 对象能够传递给 Thread 类,阐明 FutureTask 必定是实现了 Runnable 接口,咱们当初来看一下 FutureTask 的继承体系。

​ 能够发现的是 FutureTask 的确实现了 Runnable 接口,同时还实现了 Future 接口,这个 Future 接口次要提供了前面咱们应用 FutureTask 的一系列函数比方get

  1. 看到这里你应该可能大抵想到在 FutureTask 中的 run 办法会调用 Callable 当中实现的 call 办法,而后将后果保留下来,当调用 get 办法的时候再将这个后果返回。

本人实现 FutureTask

工具筹备

通过上文的剖析你可能曾经大抵理解了 FutureTask 的大抵执行过程了,然而须要留神的是,如果你执行 FutureTaskget办法是可能阻塞的,因为可能 Callablecall办法还没有执行实现。因而在 get 办法当中就须要有阻塞线程的代码,然而当 call 办法执行实现之后须要将这些线程都唤醒。

在本篇文章当中应用锁 ReentrantLock 和条件变量 Condition 进行线程的阻塞和唤醒,在咱们本人入手实现 FutureTask 之前,咱们先相熟一下下面两种工具的应用办法。

  • ReentrantLock次要有两个办法:

    • lock对临界区代码块进行加锁。
    • unlock对临界区代码进行解锁。
  • Condition次要有三个办法:

    • await阻塞调用这个办法的线程,期待其余线程唤醒。
    • signal唤醒一个被 await 办法阻塞的线程。
    • signalAll唤醒所有被 await 办法阻塞的线程。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {

  private ReentrantLock lock;
  private Condition condition;

  LockDemo() {lock = new ReentrantLock();
    condition = lock.newCondition();}

  public void blocking() {lock.lock();
    try {System.out.println(Thread.currentThread() + "筹备期待被其余线程唤醒");
      condition.await();} catch (InterruptedException e) {e.printStackTrace();
    }finally {lock.unlock();
    }
  }

  public void inform() throws InterruptedException {
    // 先休眠两秒 等他其余线程先阻塞
    TimeUnit.SECONDS.sleep(2);
    lock.lock();
    try {System.out.println(Thread.currentThread() + "筹备唤醒其余线程");
      condition.signal(); // 唤醒一个被 await 办法阻塞的线程
      // condition.signalAll(); // 唤醒所有被 await 办法阻塞的线程}finally {lock.unlock();
    }
  }

  public static void main(String[] args) {LockDemo lockDemo = new LockDemo();
    Thread thread = new Thread(() -> {lockDemo.blocking(); // 执行阻塞线程的代码
    }, "Blocking-Thread");
    Thread thread1 = new Thread(() -> {
      try {lockDemo.inform(); // 执行唤醒线程的代码
      } catch (InterruptedException e) {e.printStackTrace();
      }
    }, "Inform-Thread");
    thread.start();
    thread1.start();}
}

下面的代码的输入:

Thread[Blocking-Thread,5,main] 筹备期待被其余线程唤醒
Thread[Inform-Thread,5,main] 筹备唤醒其余线程

FutureTask 设计与实现

在前文当中咱们曾经谈到了 FutureTask 的实现原理,次要有以下几点:

  • 构造函数须要传入一个实现了 Callable 接口的类对象,这个将会在 FutureTaskrun办法执行,而后失去函数的返回值,并且将返回值存储起来。
  • 当线程调用 get 办法的时候,如果这个时候 Callable 当中的 call 曾经执行实现,间接返回 call 函数返回的后果就行,如果 call 函数还没有执行实现,那么就须要将调用 get 办法的线程挂起,这里咱们能够应用 condition.await() 将线程挂起。
  • call 函数执行实现之后,须要将之前被 get 办法挂起的线程唤醒继续执行,这里应用 condition.signalAll() 将所有挂起的线程唤醒。
  • 因为是咱们本人实现 FutureTask,性能不会那么齐全,只须要可能满足咱们的次要需要即可,次要是帮忙大家理解FutureTask 原理。

实现代码如下(剖析都在正文当中):

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

// 这里须要实现 Runnable 接口,因为须要将这个对象放入 Thread 类当中
// 而 Thread 要求传入的对象实现了 Runnable 接口
public class MyFutureTask<V> implements Runnable {

  private final Callable<V> callable;
  private Object returnVal; // 这个示意咱们最终的返回值
  private final ReentrantLock lock;
  private final Condition condition;

  public MyFutureTask(Callable<V> callable) {
    // 将传入的 callable 对象存储起来 不便在前面的 run 办法当中调用
    this.callable = callable;
    lock = new ReentrantLock();
    condition = lock.newCondition();}

  @SuppressWarnings("unchecked")
  public V get(long timeout, TimeUnit unit) {if (returnVal != null) // 如果符合条件 阐明 call 函数曾经执行实现 返回值曾经不为 null 了
      return (V) returnVal; // 间接将后果返回即可 这样不必竞争锁资源 进步程序执行效率
    lock.lock();
    try {// 这里须要进行二次判断 (双重查看)
      // 因为如果一个线程在第一次判断 returnVal 为空
      // 而后这个时候它可能因为获取锁而被挂起
      // 而在被挂起的这段时间,call 可能曾经执行实现
      // 如果这个时候不进行判断间接执行 await 办法
      // 那前面这个线程将无奈被唤醒
      if (returnVal == null)
        condition.await(timeout, unit);
    } catch (InterruptedException e) {e.printStackTrace();
    } finally {lock.unlock();
    }
    return (V) returnVal;
  }

  @SuppressWarnings("unchecked")
  public V get() {if (returnVal != null)
      return (V) returnVal;
    lock.lock();
    try {
      // 同样的须要进行双重查看
      if (returnVal == null)
          condition.await();} catch (InterruptedException e) {e.printStackTrace();
    } finally {lock.unlock();
    }
    return (V) returnVal;
  }


  @Override
  public void run() {if (returnVal != null)
      return;
    try {
      // 在 Runnable 的 run 办法当中
      // 执行 Callable 办法的 call 失去返回后果
      returnVal = callable.call();} catch (Exception e) {e.printStackTrace();
    }
    lock.lock();
    try {
      // 因为曾经失去了后果
      // 因而须要将所有被 await 办法阻塞的线程唤醒
      // 让他们从 get 办法返回
      condition.signalAll();}finally {lock.unlock();
    }
  }
    // 上面是测试代码
  public static void main(String[] args) {MyFutureTask<Integer> ft = new MyFutureTask<>(() -> {TimeUnit.SECONDS.sleep(2);
      return 101;
    });
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 输入为 null
    System.out.println(ft.get()); // 输入为 101
  }
}

咱们当初用咱们本人写的 MyFutureTask 去实现在前文当中数组求和的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {int[] data = new int[100000];
  Random random = new Random();
  for (int i = 0; i < 100000; i++) {data[i] = random.nextInt(10000);
  }
  @SuppressWarnings("unchecked")
  MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10);
  for (int i = 0; i < 10; i++) {
    int idx = i;
    tasks[i] = new MyFutureTask<>(() -> {
      int sum = 0;
      for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {sum += data[k];
      }
      return sum;
    });
  }
  for (MyFutureTask<Integer> MyFutureTask : tasks) {new Thread(MyFutureTask).start();}
  int threadSum = 0;
  for (MyFutureTask<Integer> MyFutureTask : tasks) {threadSum += MyFutureTask.get();
  }
  int sum = Arrays.stream(data).sum();
  System.out.println(sum == threadSum); // 输入后果为 true
}

总结

在本篇文章当中次要给大家介绍了 FutureTask 的外部原理,并且咱们本人通过应用 ReentrantLockCondition实现了咱们本人的FutureTask,本篇文章的次要内容如下:

  • FutureTask的外部原理:

    • FutureTask首先会继承 Runnable 接口,这样就能够将 FutureTask 的对象间接放入 Thread 类当中,作为构造函数的参数。
    • 咱们在应用 FutureTask 的时候须要传入一个 Callable 实现类的对象,在函数 call 当中实现咱们须要执行的函数,执行实现之后,将 call 函数的返回值保留下来,当有线程调用 get 办法时候将保留的返回值返回。
  • 咱们应用条件变量进行对线程的阻塞和唤醒。

    • 当有线程调用 get 办法时,如果 call 曾经执行实现,那么能够间接将后果返回,否则须要应用条件变量将线程挂起。
    • call 函数执行实现的时候,须要应用条件变量将所有阻塞在 get 办法的线程唤醒。
  • 双重查看:

    • 咱们在 get 办法当中首先判断 returnVal 是否为空,如果不为空间接将后果返回,这就能够不必去竞争锁资源了,能够进步程序执行的效率。
    • 然而咱们在应用锁爱护的临界区还须要进行判断,判断 returnVal 是否为空,因为如果一个线程在第一次判断 returnVal 为空,而后这个时候它可能因为获取锁而被挂起,而在被挂起的这段时间,call 可能曾经执行实现,如果这个时候不进行判断间接执行 await 办法,那前面这个线程将无奈被唤醒,因为在 call 函数执行实现之后调用了 condition.signalAll(),如果线程在这之后执行await 办法,那么未来再没有线程去将这些线程唤醒。

更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

退出移动版