关于spring:Java9第四篇Reactive-Stream-API响应式编程

27次阅读

共计 3850 个字符,预计需要花费 10 分钟才能阅读完成。

我打算在后续的一段时间内,写一系列对于 java 9 的文章,尽管 java 9 不像 Java 8 或者 Java 11 那样的外围 java 版本,然而还是有很多的个性值得关注。期待您能关注我,我将把 java 9 写成一系列的文章,大略十篇左右。

  • java9 第一篇 - 能够在 interface 中定义公有办法了
  • java9 第二篇 -Java9 改良 try-with-resources 语法
  • java9 第三篇 - 反对多 JDK 版本下运行的 Jar 文件打包形式

Java 9 的 Reactive Streams 是对异步流式编程的一种实现。它基于异步公布和订阅模型,具备非阻塞“背压”数据处理的特点。

Non-blocking Back Pressure(非阻塞背压):它是一种机制,让公布订阅模型中的订阅者防止接管大量数据(超出其解决能力),订阅者能够异步告诉发布者升高或晋升数据生产公布的速率。它是响应式编程实现成果的外围特点!

一、Java9 Reactive Stream API

Java 9 提供了一组定义响应式流编程的接口。所有这些接口都作为动态外部接口定义在 java.util.concurrent.Flow 类外面。

上面是 Java 响应式编程中的一些重要角色和概念,先简略了解一下

  • 发布者(Publisher)是潜在的有限数量的有序数据元素的生产者。它依据收到的需要 (subscription) 向以后订阅者公布肯定数量的数据元素。
  • 订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建设订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者能够依据本人的解决能力申请发布者公布数据元素的数量。
  • 订阅令牌(subscription)示意订阅者与发布者之间建设的订阅关系。当建设订阅关系后,发布者将其传递给订阅者。订阅者应用订阅令牌与发布者进行交互,例如申请数据元素的数量或勾销订阅。

二、Java 响应式编程四大接口

2.1.Subscriber Interface(订阅者订阅接口)

public static interface Subscriber<T> {public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();}
  • onSubscribe:在发布者承受订阅者的订阅动作之后,公布任何的订阅音讯之前被调用。新创建的 Subscription 订阅令牌对象通过此办法传递给订阅者。
  • onNext:下一个待处理的数据项的处理函数
  • onError:在发布者或订阅遇到不可复原的谬误时调用
  • onComplete:当没有订阅者调用(包含 onNext()办法)产生时调用。

2.2.Subscription Interface(订阅令牌接口)

订阅令牌对象通过 Subscriber.onSubscribe() 办法传递

public static interface Subscription {public void request(long n);
    public void cancel();}
  • request(long n)是无阻塞背压概念背地的要害办法。订阅者应用它来申请 n 个以上的生产我的项目。这样,订阅者管制了它以后可能接管多少个数据。
  • cancel()由订阅者被动来勾销其订阅,勾销后将不会在接管到任何数据音讯。

2.3.Publisher Interface(发布者接口)

@FunctionalInterface
public static interface Publisher<T> {public void subscribe(Subscriber<? super T> subscriber);
}

调用该办法,建设订阅者 Subscriber 与发布者 Publisher 之间的音讯订阅关系。

2.4.Processor Interface(处理器接口)

解决者 Processor 能够同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者 T 类型的数据元素,接管并转换为类型 R 的数据并公布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

二、实战案例

当初咱们要去实现下面的四个接口来实现响应式编程

  • Subscription Interface 订阅令牌接口通常不须要咱们本人编程去实现,咱们只须要在晓得 request()办法和 cancle()办法含意即可。
  • Publisher Interface 发布者接口,Java 9 曾经默认为咱们提供了实现 SubmissionPublisher,该实现类除了实现 Publisher 接口的办法外,提供了一个办法叫做 submit() 来实现音讯数据的发送。
  • Subscriber Interface 订阅者接口,通常须要咱们本人去实现。因为在数据订阅接管之后,不同的业务有不同的解决逻辑。
  • Processor 实际上是 Publisher Interface 和 Subscriber Interface 的集合体,有须要数据类型转换及数据处理的需要才去实现这个接口

上面的例子实现的式字符串的数据音讯订阅解决

实现订阅者 Subscriber Interface

import java.util.concurrent.Flow;

public class MySubscriber implements Flow.Subscriber<String> {

  private Flow.Subscription subscription;  // 订阅令牌

  @Override
  public void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅关系建设 onSubscribe:" + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }

  @Override
  public void onNext(String item) {System.out.println("item:" + item);
      // 一个音讯解决实现之后,能够持续调用 subscription.request(n); 向发布者要求数据发送
      //subscription.request(n);
  }

  @Override
  public void onError(Throwable throwable) {System.out.println("onError:" + throwable);
  }

  @Override
  public void onComplete() {System.out.println("onComplete");
  }
}

SubmissionPublisher 音讯发布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {public static void main(String[] args) throws InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   // 建设订阅关系,能够有多个订阅者
      sb.submit("数据 1");  // 发送音讯 1
      sb.submit("数据 2"); // 发送音讯 2
      sb.submit("数据 3"); // 发送音讯 3

      executor.shutdown();}
}

控制台打印输出后果

订阅关系建设
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 数据 1
item: 数据 2

请留神:即便发布者 submit 了 3 条数据,MySubscriber 也仅收到了 2 条数据进行了解决。是因为咱们在 MySubscriber#onSubscribe() 办法中应用了subscription.request(2);。这就是“背压”的响应式编程成果,我有能力解决多少数据,就会告诉音讯发布者给多少数据。

欢送关注我的博客,外面有很多精品合集

本文转载注明出处(必须带连贯,不能只转文字):字母哥博客 – zimug.com

感觉对您有帮忙的话,帮我点赞、分享!您的反对是我不竭的创作能源!。另外,笔者最近一段时间输入了如下的精品内容,期待您的关注。

  • 《手摸手教你学 Spring Boot2.0》
  • 《Spring Security-JWT-OAuth2 一本通》
  • 《实战前后端拆散 RBAC 权限管理系统》
  • 《实战 SpringCloud 微服务从青铜到王者》
  • 《VUE 深入浅出系列》

正文完
 0