乐趣区

关于java:netty系列之可以自动通知执行结果的Future有见过吗

简介

在我的心中,JDK 有两个经典版本,第一个就是当初大部分公司都在应用的 JDK8,这个版本引入了 Stream、lambda 表达式和泛型,让 JAVA 程序的编写变得更加晦涩,缩小了大量的冗余代码。

另外一个版本要早点,还是 JAVA 1.X 的时代,咱们称之为 JDK1.5,这个版本引入了 java.util.concurrent 并发包,从此在 JAVA 中能够欢快的应用异步编程。

尽管先 JDK 曾经倒退到了 17 版本,然而并发这一块的变动并不是很大。受限于 JDK 要保持稳定的需要,所以 concurrent 并发包提供的性能并不能齐全满足某些业务场景。所以依赖于 JDK 的包自行研发了属于本人的并发包。

当然,netty 也不例外,一起来看看 netty 并发包都有那些劣势吧。

JDK 异步缘起

怎么在 java 中创立一个异步工作,或者开启一个异步的线程,每个人可能都有属于本人的答复。

大家第一工夫可能想到的是创立一个实现 Runnable 接口的类,而后将其封装到 Thread 中运行,如下所示:

new Thread(new(RunnableTask())).start()

每次都须要 new 一个 Thread 是 JDK 大神们不可承受的,于是他们产生了一个将 thread 调用进行封装的想法,而这个封装类就叫做 Executor.

Executor 是一个 interface,首先看一下这个 interface 的定义:

public interface Executor {void execute(Runnable command);
}

接口很简略,就是定义了一个 execute 办法来执行传入的 Runnable 命令。

于是咱们能够这样来异步开启工作:

   Executor executor = anExecutor;
   executor.execute(new RunnableTask1());
   executor.execute(new RunnableTask2());

看到这里,聪慧的小伙伴可能就要问了,如同不对呀,Executor 自定义了 execute 接口,如同跟异步和多线程并没有太大的关系呀?

别急,因为 Executor 是一个接口,所以咱们能够有很多实现。比方上面的间接执行 Runnable, 让 Runnable 在以后线程中执行:

 class DirectExecutor implements Executor {public void execute(Runnable r) {r.run();
   }
 }

又比方上面的在一个新的线程中执行 Runnable:

 class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) {new Thread(r).start();}
 }

又比方上面的将多个工作寄存在一个 Queue 中,执行完一个工作再执行下一个工作的序列执行:

 class SerialExecutor implements Executor {final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {this.executor = executor;}

   public synchronized void execute(final Runnable r) {tasks.offer(new Runnable() {public void run() {
         try {r.run();
         } finally {scheduleNext();
         }
       }
     });
     if (active == null) {scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {if ((active = tasks.poll()) != null) {executor.execute(active);
     }
   }
 }

这些 Executor 都十分完满。然而他们都只能提交工作,提交工作之后就什么都不晓得了。这对于好奇的宝宝们是不可忍耐的,因为咱们须要晓得执行的后果,或者对执行工作进行管控。

于是就有了 ExecutorService。ExecutorService 也是一个接口,不过他提供了 shutdown 办法来进行承受新的工作,和 isShutdown 来判断敞开的状态。

除此之外,它还提供了独自调用工作的 submit 办法和批量调用工作的 invokeAll 和 invokeAny 办法。

既然有了 execute 办法,submit 尽管和 execute 办法基本上执行了雷同的操作,然而在办法参数和返回值上有稍许区别。

首先是返回值,submit 返回的是 Future,Future 示意异步计算的后果。它提供了查看计算是否实现、期待其实现以及检索计算结果的办法。Future 提供了 get 办法,用来获取计算结果。然而如果调用 get 办法的同时,计算结果并没有筹备好,则会产生阻塞。

其次是 submit 的参数,一般来说只有 Callable 才会有返回值,所以咱们罕用的调用形式是这样的:

<T> Future<T> submit(Callable<T> task);

如果咱们传入 Runnable,那么尽管也返回一个 Future,然而返回的值是 null:

Future<?> submit(Runnable task);

如果我又想传入 Runnable,又想 Future 有返回值怎么办呢?

今人通知咱们,鱼和熊掌不可兼得!然而当初是 2021 年了,有些事件是能够产生扭转了:

<T> Future<T> submit(Runnable task, T result);

下面咱们能够传入一个 result,当 Future 中的工作执行结束之后间接将 result 返回。

既然 ExecutorService 这么弱小,如何创立 ExecutorService 呢?

最简略的方法就是用 new 去创立对应的实例。然而这样不够优雅,于是 JDK 提供了一个 Executors 工具类,他提供了多种创立不同 ExecutorService 的静态方法,十分好用。

netty 中的 Executor

为了兼容 JDK 的并发框架,尽管 netty 中也有 Executor,然而 netty 中的 Executor 都是从 JDK 的并发包中衍生进去的。

具体而言,netty 中的 Executor 叫做 EventExecutor, 他继承自 EventExecutorGroup:

public interface EventExecutor extends EventExecutorGroup 

而 EventExecutorGroup 又继承自 JDK 的 ScheduledExecutorService:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>

为什么叫做 Group 呢?这个 Group 的意思是它外面蕴含了一个 EventExecutor 的汇合。这些联合中的 EventExecutor 通过 Iterable 的 next 办法来进行遍历的。

这也就是为什么 EventExecutorGroup 同时继承了 Iterable 类。

而后 netty 中的其余具体 Executor 的实现再在 EventExecutor 的根底之上进行扩大。从而失去了 netty 本人的 EventExecutor 实现。

Future 的窘境和 netty 的实现

那么 JDK 中的 Future 会有什么问题呢?后面咱们也提到了 JDK 中的 Future 尽管保留了计算结果,然而咱们要获取的时候还是须要通过调用 get 办法来获取。

然而如果以后计算结果还没进去的话,get 办法会造成以后线程的阻塞。

别怕,这个问题在 netty 中被解决了。

先看下 netty 中 Future 的定义:

public interface Future<V> extends java.util.concurrent.Future<V> 

能够看到 netty 中的 Future 是继承自 JDK 的 Future。同时增加了 addListener 和 removeListener, 以及 sync 和 await 办法。

先讲一下 sync 和 await 办法,两者都是期待 Future 执行完结。不同之处在于,如果在执行过程中, 如果 future 失败了,则会抛出异样。而 await 办法不会。

那么如果不想同步调用 Future 的 get 办法来取得计算结果。则能够给 Future 增加 listener。

这样当 Future 执行完结之后,会主动告诉 listener 中的办法,从而实现异步告诉的成果,其应用代码如下:

EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
Future<?> f = group.submit(new Runnable() {...});
f.addListener(new FutureListener<?> {public void operationComplete(Future<?> f) {..}
});

还有一个问题,每次咱们提交工作的时候,都须要创立一个 EventExecutorGroup,有没有不须要创立就能够提交工作的办法呢?

有的!

netty 为那些没有工夫创立新的 EventExecutorGroup 的同志们,特意创立一个全局的 GlobalEventExecutor,这是能够间接应用的:

GlobalEventExecutor.INSTANCE.execute(new Runnable() {...});

GlobalEventExecutor 是一个单线程的工作执行器,每隔一秒钟回去检测有没有新的工作,有的话就提交到 executor 执行。

总结

netty 为 JDK 的并发包提供了十分有用的扩大。大家能够间接应用。

本文已收录于 http://www.flydean.com/46-netty-future-executor/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

退出移动版