关于java:JDK-9新特性之Flow-API-初探

39次阅读

共计 9625 个字符,预计需要花费 25 分钟才能阅读完成。

自身我是只打算介绍 JDK 11 的 新的 Http Client 的,然而又碰见 Flow API 响应式流,只好将这部分货色独立进去,简略介绍一下。

[TOC]

响应式流的引入

Reactive Stream 反应式流或响应式流,这个词我是在介绍 JDK 11 中的 HttpClient 中碰到的:

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://openjdk.java.net/"))
.POST(HttpRequest.BodyPublishers.ofString("aaaa"))
                .build();
// BodyHandlers.fromLineSubscriber 要求的参数是 Subscriber 类实例
// 而后我点了点发现 Subscriber 位于 Flow 类内是一个动态接口
client.sendAsync(request, HttpResponse.BodyHandlers.fromLineSubscriber())

往上翻了一下发现这个 Flow 出自 Doug Lea 大佬之手,下面还写了 Since 9,也就是说这个类是在 JDK 9 之后才进入到 JDK 外面的。

Doug Lea 的正文一贯是正文比代码多,咱们先看正文看,看看引入这个 Flow 类是为了什么?

Interrelated interfaces and static methods for establishing flow-controlled components in which {@link Publisher Publishers} produce items consumed by one or more {@link Subscriber Subscribers}, each managed by a {@link Subscription Subscription}.
这些接口和静态方法都是为了建设一起公布 - 订阅者模式 (Publisher 发布者公布 一个或多个 Subscriber 订阅者生产, 每个订阅者被 Subscription 治理) 的流式管制组件。

<p>These interfaces correspond to the reactive-streams
specification. They apply in both concurrent and distributed
asynchronous settings: All (seven) methods are defined in {@code
void} “one-way” message style. Communication relies on a simple form
of flow control (method {@link Subscription#request}) that can be
used to avoid resource management problems that may otherwise occur
in “push” based systems.
这些接口遵循响应式流的标准,他们被利用于并发和分布式异步设置: 所有七个办法都被定义为返回值为 void 的单向音讯格调。
音讯的交换依赖于简略的流式管制 (Subscription 的 request 办法) 能够用来防止基于推送零碎的一些资源管理问题。

这个响应流标准是啥? 我关上了 href 的这个链接进行查看。

为什么要引入响应流标准

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
响应流式一种倡导,旨在为具备非阻塞背压的异步流解决提供规范,这包含针对 JVM 运行时环境、javaScript、网络协议的工作。

Handling streams of data—especially“live”data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

在异步零碎中解决, 解决数据流, 尤其是数据量未预先确定的施行数据要特地小心。最为突出而又常见的问题是资源生产管制的问题,以便避免大量数据疾速到来吞没目的地。
为了让让一片网络的计算机或者一台计算机内的多核 CPU 在执行计算工作的时候应用并行模式,咱们须要异步。

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of back pressure were synchronous (see also the Reactive Manifesto), therefore care has to be taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

响应流的次要指标是管制逾越异步边界的数据交换, 行将一个元素传递到令外一个线程或线程中要确保接管方不会被迫缓冲任意数量的数据。换句话说,背压是该模型的重要组成,该模型能够让线程之间的队列是有界的。如果采取背压形式的通信是同步的,那么异步解决的形式将会被否定的 (详见响应式宣言)。因而必须要求所有的反应式流实现都是异步和非阻塞的。
It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.
恪守本标准的实现能够实现交互操作,从而在整个流利用的处理过程中受害。

JDK 9 的正式公布工夫是 2017 年 9 月, 如果你点搜寻 Reactive Manifesto,会发现这个宣言于 14 年 9 月 16 日公布,这是一种编程理念,对应的有响应式编程,同面向对象编程、函数式编程一样,是一种理念。推出标准就是为了束缚其实现,防止每个库有有本人的一套响应式实现,这对于开发者来说是一件很头痛的事件。响应式编程的提出如上文所示次要是为了解决异步数据处理的背压景象,那什么是背压。

背压的解释

背压并不是响应式编程独有的概念,背压的英文是 BackPressure,不是一种机制,也不是一种策略,而是一种景象: 在数据流从上游生产者向上游消费者传输的过程中,上游生产速度大于上游生产速度,导致上游的 Buffer 溢出,这种景象咱们称之为 Backpressure 呈现。背压的重点在于上游的生产速度大于上游生产速度,而在于 Buffer 溢出。

举一个例子就是在 Java 中,咱们向线程池中提交工作,队列满了触发回绝策略(拒绝接受新工作还是抛弃旧的解决新的)。写到这里可能有同学会说,那你用无界队列不行吗?那如果提交的工作一直收缩,导致你整个零碎解体掉了怎么办?如果上游零碎生产速度快到能够把零碎搞解体,那么就须要设置 Buffer 下限。

梳理一下

首先呈现响应式编程理念,而后呈现响应式编程实现,再而后呈现响应式标准,响应流次要解决解决元素流的问题—如何将元素流从发布者传递到订阅者,不而不须要发布者阻塞,或者要求订阅者有有限的缓冲区,无限的缓冲区在达到缓冲上界的时候,对达到的元素进行抛弃或者回绝,订阅者能够异步告诉发布者升高或晋升数据生产公布的速率,它是响应式编程实现成果的外围特点。

而响应式标准则是一种倡导,遵循此倡导的零碎能够让数据在各个响应式零碎中都实现响应式的解决数据,标准在 Java 中的模式就是接口,也就是咱们本篇的主题 Flow 类,对于一项规范而言,它的目标天然是用更少的协定来形容交互。而响应流模型也非常简单:

  • 订阅者异步的向发布者申请 N 个元素
  • 发布者异步的向订阅者发送 (0 < M <= N) 个元素。

写到这里可能有同学会问了,为啥不是订阅者要多少元素,发布者给多少啊?这其实上是一种协调机制,在生产数据中有以下两种状况值得咱们留神:

  • 订阅者生产过快(在响应式模型中, 解决这种状况是告诉发布者产生元素快一点,相似于去包子店吃包子, 饭量比拟大的顾客来,包子店生产不迭,就会通知包子店做的快一点,说完还接着吃包子)

  • 发布者公布过快(在响应式模型中,解决这种状况是告诉生产者降低生产速率,还是去包子店吃包子,尽管顾客饭量比拟大,然而吃的比较慢,很快摆不下了,就会通知包子店做的慢一些)

Flow 的大抵介绍

Flow 是一个被 final 关键字润饰的类,外面是几组 public static 接口和 buffer 变量长度:

  • Publisher 发布者
  • Subscriber 订阅者
  • Subscription 订阅函件(或订阅令牌), 通过此实例, 用于订阅者和发布者之间协调申请元素数量和申请订阅元素数量
  • Processor 继承 Publisher 和 Subscriber,用于连贯 Publisher 和 Subscriber, 也能够连贯其余处理器

简略示例

public class FlowDemo {
    static class SampleSubscriber<T> implements Flow.Subscriber<T> {
        final Consumer<? super T> consumer;
        Flow.Subscription subscription;
        SampleSubscriber(Consumer<? super T> consumer) {
            this.bufferSize = bufferSize;
            this.consumer = consumer;
        }
        @Override
        public void onSubscribe(Flow.Subscription subscription) {System.out.println("建设订阅关系");
            this.subscription = subscription; // 赋值
            subscription.request(2);
        }
        public void onNext(T item) {System.out.println("收到发送者的音讯"+ item);
            consumer.accept(item);
            // 可调用 subscription.request 接着申请发布者发消息
          //  subscription.request(1);
        }
        public void onError(Throwable ex) {ex.printStackTrace(); }
        public void onComplete() {}
    }

    public static void main(String[] args) {
        SampleSubscriber subscriber = new SampleSubscriber<>(200L,o->{System.out.println("hello ....." + o);
        });
        ExecutorService executor = Executors.newFixedThreadPool(1);
        SubmissionPublisher<Boolean> submissionPublisher = new SubmissionPublisher(executor,Flow.defaultBufferSize());
        submissionPublisher.subscribe(subscriber);
        submissionPublisher.submit(true);
        submissionPublisher.submit(true);
        submissionPublisher.submit(true);
        executor.shutdown();}
}

输入后果:

为啥发布者公布了三条音讯,你订阅者只解决了两条啊,因为在建设订阅关系的时候订阅者就跟发布者阐明了, 我只有两条音讯, 以后生产能力有余, 在生产之后, 还能够再申请发布者发送。

上面咱们来演示一下背压成果, 咱们当初设定缓冲池大小的工作是 Flow 定义的默认值, 256。咱们当初尝试提交 1000 个工作试试看:

public class FlowDemo {
    static class SampleSubscriber<T> implements Flow.Subscriber<T> {
        final Consumer<? super T> consumer;
        Flow.Subscription subscription;
        SampleSubscriber(Consumer<? super T> consumer) {this.consumer = consumer;}
        @Override
        public void onSubscribe(Flow.Subscription subscription) {System.out.println("建设订阅关系");
            this.subscription = subscription; // 赋值
            subscription.request(1);
        }
        public void onNext(T item) {
            try {System.out.println("thread name 0"+Thread.currentThread().getName());
                TimeUnit.SECONDS.sleep(30);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("收到发送者的音讯"+ item);
            consumer.accept(item);
            // 可调用 subscription.request 接着申请发布者发消息
            subscription.request(1);
        }
        public void onError(Throwable ex) {ex.printStackTrace(); }
        public void onComplete() {}
    }

    public static void main(String[] args) {
        SampleSubscriber subscriber = new SampleSubscriber<>(o->{System.out.println("hello ....." + o);
        });
        ExecutorService executor = Executors.newFixedThreadPool(1);
        SubmissionPublisher<Boolean> submissionPublisher = new SubmissionPublisher(executor,Flow.defaultBufferSize());
        submissionPublisher.subscribe(subscriber);
        for (int i = 0; i < 1000; i++) {System.out.println("开始公布第"+i+"条音讯");
            submissionPublisher.submit(true);
            System.out.println("开始公布第"+i+"条音讯公布结束");
        }
        executor.shutdown();}
}

为什么到第 257 条被阻塞住了, 那是因为缓冲区满了, 缓冲区呈现闲暇才会被容许接着生产。

public class MyProcessor extends SubmissionPublisher<Boolean> implements Flow.Processor<Boolean, Boolean> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(Boolean item) {if (item){
            item = false;
            // 处理器将此条信息转发
            this.submit(item);
            System.out.println("将 true 转换为 false");
        }
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {throwable.printStackTrace();
        this.subscription.cancel();}

    @Override
    public void onComplete() {System.out.println("处理器处理完毕");
        this.close();}
}
public class FlowDemo {
    static class SampleSubscriber<T> implements Flow.Subscriber<T> {
        final Consumer<? super T> consumer;
        Flow.Subscription subscription;
        SampleSubscriber(Consumer<? super T> consumer) {this.consumer = consumer;}
        @Override
        public void onSubscribe(Flow.Subscription subscription) {System.out.println("建设订阅关系");
            this.subscription = subscription; // 赋值
            subscription.request(1);
        }
        public void onNext(T item) {System.out.println("收到发送者的音讯"+ item);
            consumer.accept(item);
            // 可调用 subscription.request 接着申请发布者发消息
            subscription.request(1);
        }
        public void onError(Throwable ex) {ex.printStackTrace(); }
        public void onComplete() {}
    }

    public static void main(String[] args) throws Exception{
        SampleSubscriber subscriber = new SampleSubscriber<>(o->{System.out.println("hello ....." + o);
        });
        ExecutorService executor = Executors.newFixedThreadPool(1);
        SubmissionPublisher<Boolean> submissionPublisher = new SubmissionPublisher(executor,Flow.defaultBufferSize());
        MyProcessor myProcessor = new MyProcessor();
        // 做信息转发
        submissionPublisher.subscribe(myProcessor);
        myProcessor.subscribe(subscriber);
        for (int i = 0; i < 2; i++) {System.out.println("开始公布第"+i+"条音讯");
            submissionPublisher.submit(true);
            System.out.println("开始公布第"+i+"条音讯公布结束");
        }
        TimeUnit.SECONDS.sleep(2);
        executor.shutdown();}
}

输入后果:

总结一下

咱们由 JDK 11 的 HTTP Client 的申请参数看到了 Flow API, 在 Flow 类中的正文中看到了 Reactive Stream, 由 Reactive Stream 看到了响应式标准, 由标准引出响应流解决的问题, 即协调发布者和订阅者,发布者公布太快, 订阅者申请发布者减缓生产速度,生产太慢,订阅者申请发布者加快速度。在 Java 畛域曾经有了响应式的一些实现:

  • RXJava 是 ReactiveX 我的项目中的 Java 实现,Rxjava 早于 Reactive Streams 标准, RXJava 2.0+ 的确实现了 Reactive Streams API 标准。
  • Reactor 是 Pivotal 提供的 Java 实现,它作为 Spring Framework 5 的重要组成部分,是 WebFlux 采纳的默认反应式框架
  • Akka Streams 齐全实现了 Reactive Streams 标准,但 Akka Streams API 与 Reactive Streams API 齐全拆散。

为了对立标准,JDK 9 引入了 Flow,Flow 能够相似于 JDBC, 属于 API 标准,理论应用时须要应用 API 对应的具体实现,Reactive Streams 为咱们提供了一个咱们能够代码的接口,而无需关怀其实现。

参考资料

  • 反应式流 Reactive Streams 入门介绍 https://zhuanlan.zhihu.com/p/…
  • Reactive Streams http://www.reactive-streams.org/
  • 如何形象的形容反应式编程中的背压 (Backpressure) 机制?https://www.zhihu.com/questio…
  • Java9 个性 - 响应式流(Reactive Stream) https://zhuanlan.zhihu.com/p/…
  • 反应式编程摸索与总结 https://developer.aliyun.com/…
  • 反应式宣言 https://www.reactivemanifesto…
  • Java9-Reactive Stream API 响应式编程 https://zhuanlan.zhihu.com/p/…

正文完
 0