关于java:60行从零开始自己动手写FutureTask是什么体验

前言

在并发编程当中咱们最常见的需要就是启动一个线程执行一个函数去实现咱们的需要,而在这种需要当中,咱们经常须要函数有返回值。比方咱们须要同一个十分大的数组当中数据的和,让每一个线程求某一个区间外部的和,最终将这些和加起来,那么每个线程都须要返回对应区间的和。而在Java当中给咱们提供了这种机制,去实现这一个成果——FutureTask

FutureTask

在本人写FutureTask之前咱们首先写一个例子来回顾一下FutureTask的编程步骤:

  • 写一个类实现Callable接口。
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

实现接口就实现call即可,能够看到这个函数是有返回值的,而FutureTask返回给咱们的值就是这个函数的返回值。

  • new一个FutureTask对象,并且new一个第一步写的类,new FutureTask<>(callable实现类)
  • 最初将刚刚失去的FutureTask对象传入Thread类当中,而后启动线程即可new Thread(futureTask).start();
  • 而后咱们能够调用FutureTaskget办法失去返回的后果futureTask.get();

如果有一个数组data,长度为100000,当初有10个线程,第i个线程求数组[i * 10000, (i + 1) * 10000)所有数据的和,而后将这十个线程的后果加起来。

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    int[] data = new int[100000];
    Random random = new Random();
    for (int i = 0; i < 100000; i++) {
      data[i] = random.nextInt(10000);
    }
    @SuppressWarnings("unchecked")
    FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10);
    // 设置10个 futuretask 工作计算数组当中数据的和
    for (int i = 0; i < 10; i++) {
      int idx = i;
      tasks[i] = new FutureTask<>(() -> {
        int sum = 0;
        for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
          sum += data[k];
        }
        return sum;
      });
    }
    // 开启线程执行 futureTask 工作
    for (FutureTask<Integer> futureTask : tasks) {
      new Thread(futureTask).start();
    }
    int threadSum = 0;
    for (FutureTask<Integer> futureTask : tasks) {
      threadSum += futureTask.get();
    }
    int sum = Arrays.stream(data).sum();
    System.out.println(sum == threadSum); // 后果始终为 true
  }
}

可能你会对FutureTask的应用形式感觉困惑,或者不是很分明,当初咱们来认真捋一下思路。

  1. 首先启动一个线程要么是继承自Thread类,而后重写Thread类的run办法,要么是给Thread类传递一个实现了Runnable的类对象,当然能够用匿名外部类实现。
  2. 既然咱们的FutureTask对象能够传递给Thread类,阐明FutureTask必定是实现了Runnable接口,咱们当初来看一下FutureTask的继承体系。

​ 能够发现的是FutureTask的确实现了Runnable接口,同时还实现了Future接口,这个Future接口次要提供了前面咱们应用FutureTask的一系列函数比方get

  1. 看到这里你应该可能大抵想到在FutureTask中的run办法会调用Callable当中实现的call办法,而后将后果保留下来,当调用get办法的时候再将这个后果返回。

本人实现FutureTask

工具筹备

通过上文的剖析你可能曾经大抵理解了FutureTask的大抵执行过程了,然而须要留神的是,如果你执行FutureTaskget办法是可能阻塞的,因为可能Callablecall办法还没有执行实现。因而在get办法当中就须要有阻塞线程的代码,然而当call办法执行实现之后须要将这些线程都唤醒。

在本篇文章当中应用锁ReentrantLock和条件变量Condition进行线程的阻塞和唤醒,在咱们本人入手实现FutureTask之前,咱们先相熟一下下面两种工具的应用办法。

  • ReentrantLock次要有两个办法:

    • lock对临界区代码块进行加锁。
    • unlock对临界区代码进行解锁。
  • Condition次要有三个办法:

    • await阻塞调用这个办法的线程,期待其余线程唤醒。
    • signal唤醒一个被await办法阻塞的线程。
    • signalAll唤醒所有被await办法阻塞的线程。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {

  private ReentrantLock lock;
  private Condition condition;

  LockDemo() {
    lock = new ReentrantLock();
    condition = lock.newCondition();
  }

  public void blocking() {
    lock.lock();
    try {
      System.out.println(Thread.currentThread() + " 筹备期待被其余线程唤醒");
      condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }finally {
      lock.unlock();
    }
  }

  public void inform() throws InterruptedException {
    // 先休眠两秒 等他其余线程先阻塞
    TimeUnit.SECONDS.sleep(2);
    lock.lock();
    try {
      System.out.println(Thread.currentThread() + " 筹备唤醒其余线程");
      condition.signal(); // 唤醒一个被 await 办法阻塞的线程
      // condition.signalAll(); // 唤醒所有被 await 办法阻塞的线程
    }finally {
      lock.unlock();
    }
  }

  public static void main(String[] args) {
    LockDemo lockDemo = new LockDemo();
    Thread thread = new Thread(() -> {
      lockDemo.blocking(); // 执行阻塞线程的代码
    }, "Blocking-Thread");
    Thread thread1 = new Thread(() -> {
      try {
        lockDemo.inform(); // 执行唤醒线程的代码
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }, "Inform-Thread");
    thread.start();
    thread1.start();
  }
}

下面的代码的输入:

Thread[Blocking-Thread,5,main] 筹备期待被其余线程唤醒
Thread[Inform-Thread,5,main] 筹备唤醒其余线程

FutureTask设计与实现

在前文当中咱们曾经谈到了FutureTask的实现原理,次要有以下几点:

  • 构造函数须要传入一个实现了Callable接口的类对象,这个将会在FutureTaskrun办法执行,而后失去函数的返回值,并且将返回值存储起来。
  • 当线程调用get办法的时候,如果这个时候Callable当中的call曾经执行实现,间接返回call函数返回的后果就行,如果call函数还没有执行实现,那么就须要将调用get办法的线程挂起,这里咱们能够应用condition.await()将线程挂起。
  • call函数执行实现之后,须要将之前被get办法挂起的线程唤醒继续执行,这里应用condition.signalAll()将所有挂起的线程唤醒。
  • 因为是咱们本人实现FutureTask,性能不会那么齐全,只须要可能满足咱们的次要需要即可,次要是帮忙大家理解FutureTask原理。

实现代码如下(剖析都在正文当中):

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

// 这里须要实现 Runnable 接口,因为须要将这个对象放入 Thread 类当中
// 而 Thread 要求传入的对象实现了 Runnable 接口
public class MyFutureTask<V> implements Runnable {

  private final Callable<V> callable;
  private Object returnVal; // 这个示意咱们最终的返回值
  private final ReentrantLock lock;
  private final Condition condition;

  public MyFutureTask(Callable<V> callable) {
    // 将传入的 callable 对象存储起来 不便在前面的 run 办法当中调用
    this.callable = callable;
    lock = new ReentrantLock();
    condition = lock.newCondition();
  }

  @SuppressWarnings("unchecked")
  public V get(long timeout, TimeUnit unit) {
    if (returnVal != null) // 如果符合条件 阐明 call 函数曾经执行实现 返回值曾经不为 null 了
      return (V) returnVal; // 间接将后果返回即可 这样不必竞争锁资源 进步程序执行效率
    lock.lock();
    try {
      // 这里须要进行二次判断 (双重查看)
      // 因为如果一个线程在第一次判断 returnVal 为空
      // 而后这个时候它可能因为获取锁而被挂起
      // 而在被挂起的这段时间,call 可能曾经执行实现
      // 如果这个时候不进行判断间接执行 await办法
      // 那前面这个线程将无奈被唤醒
      if (returnVal == null)
        condition.await(timeout, unit);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    return (V) returnVal;
  }

  @SuppressWarnings("unchecked")
  public V get() {
    if (returnVal != null)
      return (V) returnVal;
    lock.lock();
    try {
      // 同样的须要进行双重查看
      if (returnVal == null)
          condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    return (V) returnVal;
  }


  @Override
  public void run() {
    if (returnVal != null)
      return;
    try {
      // 在 Runnable 的 run 办法当中
      // 执行 Callable 办法的 call 失去返回后果
      returnVal = callable.call();
    } catch (Exception e) {
      e.printStackTrace();
    }
    lock.lock();
    try {
      // 因为曾经失去了后果
      // 因而须要将所有被 await 办法阻塞的线程唤醒
      // 让他们从 get 办法返回
      condition.signalAll();
    }finally {
      lock.unlock();
    }
  }
    // 上面是测试代码
  public static void main(String[] args) {
    MyFutureTask<Integer> ft = new MyFutureTask<>(() -> {
      TimeUnit.SECONDS.sleep(2);
      return 101;
    });
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 输入为 null
    System.out.println(ft.get()); // 输入为 101
  }
}

咱们当初用咱们本人写的MyFutureTask去实现在前文当中数组求和的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  int[] data = new int[100000];
  Random random = new Random();
  for (int i = 0; i < 100000; i++) {
    data[i] = random.nextInt(10000);
  }
  @SuppressWarnings("unchecked")
  MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10);
  for (int i = 0; i < 10; i++) {
    int idx = i;
    tasks[i] = new MyFutureTask<>(() -> {
      int sum = 0;
      for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
        sum += data[k];
      }
      return sum;
    });
  }
  for (MyFutureTask<Integer> MyFutureTask : tasks) {
    new Thread(MyFutureTask).start();
  }
  int threadSum = 0;
  for (MyFutureTask<Integer> MyFutureTask : tasks) {
    threadSum += MyFutureTask.get();
  }
  int sum = Arrays.stream(data).sum();
  System.out.println(sum == threadSum); // 输入后果为 true
}

总结

在本篇文章当中次要给大家介绍了FutureTask的外部原理,并且咱们本人通过应用ReentrantLockCondition实现了咱们本人的FutureTask,本篇文章的次要内容如下:

  • FutureTask的外部原理:

    • FutureTask首先会继承Runnable接口,这样就能够将FutureTask的对象间接放入Thread类当中,作为构造函数的参数。
    • 咱们在应用FutureTask的时候须要传入一个Callable实现类的对象,在函数call当中实现咱们须要执行的函数,执行实现之后,将call函数的返回值保留下来,当有线程调用get办法时候将保留的返回值返回。
  • 咱们应用条件变量进行对线程的阻塞和唤醒。

    • 当有线程调用get办法时,如果call曾经执行实现,那么能够间接将后果返回,否则须要应用条件变量将线程挂起。
    • call函数执行实现的时候,须要应用条件变量将所有阻塞在get办法的线程唤醒。
  • 双重查看:

    • 咱们在get办法当中首先判断returnVal是否为空,如果不为空间接将后果返回,这就能够不必去竞争锁资源了,能够进步程序执行的效率。
    • 然而咱们在应用锁爱护的临界区还须要进行判断,判断returnVal是否为空,因为如果一个线程在第一次判断 returnVal 为空,而后这个时候它可能因为获取锁而被挂起, 而在被挂起的这段时间,call 可能曾经执行实现,如果这个时候不进行判断间接执行 await办法,那前面这个线程将无奈被唤醒,因为在call函数执行实现之后调用了condition.signalAll(),如果线程在这之后执行await办法,那么未来再没有线程去将这些线程唤醒。

更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理