关于java:使用Reactor完成类似的Flink的操作

38次阅读

共计 2822 个字符,预计需要花费 8 分钟才能阅读完成。

一、背景

Flink 在解决流式工作的时候有很大的劣势,其中 windows 等操作符能够很不便的实现聚合工作,然而 Flink 是一套独立的服务,业务流程中如果想应用须要将数据发到 kafka,用 Flink 解决完再发到 kafka,而后再做业务解决,流程很繁琐。

比方在业务代码中想要实现相似 Flink 的 window 按工夫批量聚合性能,如果纯手动写代码比拟繁琐,应用 Flink 又太重,这种场景下应用响应式编程 RxJava、Reactor 等的 window、buffer 操作符能够很不便的实现。

响应式编程框架也早已有了背压以及丰盛的操作符反对,能不能用响应式编程框架解决相似 Flink 的操作呢,答案是必定的。

本文应用 Reactor 来实现 Flink 的 window 性能来举例,其余操作符实践上雷同。文中波及的代码:github

二、实现过程

Flink 对流式解决做的很好的封装,应用 Flink 的时候简直不必关怀线程池、积压、数据失落等问题,然而应用 Reactor 实现相似的性能就必须对 Reactor 运行原理比拟理解,并且通过不同场景下测试,否则很容易出问题。

上面列举出实现过程中的外围点:

1、创立 Flux 和发送数据拆散

入门 Reactor 的时候给的示例都是创立 Flux 的时候同时就把数据赋值了,比方:Flux.just、Flux.range 等,从 3.4.0 版本后先创立 Flux,再发送数据可应用 Sinks 实现。有两个比拟容易混同的办法:

  • Sinks.many().multicast() 如果没有订阅者,那么接管的音讯间接抛弃
  • Sinks.many().unicast() 如果没有订阅者,那么保留接管的音讯直到第一个订阅者订阅
  • Sinks.many().replay() 不论有多少订阅者,都保留所有音讯

在此示例场景中,抉择的是 Sinks.many().unicast()

官网文档:https://projectreactor.io/doc…

2、背压反对

下面办法的对象背压策略反对两种:BackpressureBuffer、BackpressureError,在此场景必定是抉择 BackpressureBuffer,须要指定缓存队列,初始化办法如下:Queues.<String>get(queueSize).get()

数据提交有两个办法:

  • emitNext 指定提交失败策略同步提交
  • tryEmitNext 异步提交,返回提交胜利、失败状态

在此场景咱们不心愿丢数据,可自定义失败策略,提交失败有限重试,当然也能够调用异步办法本人重试。

 Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

在此之后就就能够调用 Sinks.asFlux 开心的应用各种操作符了。

3、窗口函数

Reactor 反对两类窗口聚合函数:

  • window 类:返回 Mono(Flux<T>)
  • buffer 类:返回 List<T>

在此场景中,应用 buffer 即可满足需要,bufferTimeout(int maxSize, Duration maxTime) 反对最大个数,最大等待时间操作,Flink 中的 keys 操作能够用 groupBy、collectMap 来实现。

4、消费者解决

Reactor 通过 buffer 后是一个一个的发送数据,如果应用 publishOn 或 subscribeOn 解决的话,只期待上游的 subscribe 解决实现才会从新 request 新的数据,buffer 操作符才会从新发送数据。如果此时 subscribe 消费者耗时较长,数据流会在 buffer 流程阻塞,显然并不是咱们想要的。

现实的操作是消费者在一个线程池里操作,可多线程并行处理,如果线程池满,再阻塞 buffer 操作符。解决方案是自定义一个线程池,并且当然线程池如果工作满 submit 反对阻塞,能够用自定义 RejectedExecutionHandler 来实现:

 RejectedExecutionHandler executionHandler = (r, executor) -> {
     try {executor.getQueue().put(r);
     } catch (InterruptedException e) {Thread.currentThread().interrupt();
         throw new RejectedExecutionException("Producer thread interrupted", e);
     }
 };
 
 new ThreadPoolExecutor(poolSize, poolSize,
         0L, TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
         executionHandler);

三、总结

1、总结一下整体的执行流程

  1. 提交工作:提交数据反对同步异步两种形式,反对多线程提交,失常状况下响应很快,同步的办法如果队列满则阻塞。
  2. 丰盛的操作符解决流式数据。
  3. buffer 操作符产生的数据多线程解决:同步提交到独自的消费者线程池,线程池工作满则阻塞。
  4. 消费者线程池:反对阻塞提交,保障不丢音讯,同时队列长度设置成 0,因为后面曾经有队列了。
  5. 背压:消费者线程池阻塞后,会背压到 buffer 操作符,并背压到缓冲队列,缓存队列满背压到数据提交者。

2、和 Flink 的比照

实现的 Flink 的性能:

  • 不输 Flink 的丰盛操作符
  • 反对背压,不丢数据

劣势:轻量级,可间接在业务代码中应用

劣势:

  • 外部执行流程简单,容易踩坑,不如 Flink 傻瓜化
  • 没有 watermark 性能,也就意味着只反对无序数据处理
  • 没有 savepoint 性能,尽管咱们用背压解决了局部问题,然而宕机后开始会失落缓存队列和消费者线程池里的数据,补救措施是增加 Java Hook 性能
  • 只反对单机,意味着你的缓存队列不能设置无限大,要思考线程池的大小,且没有 flink globalWindow 等性能
  • 需思考对上游数据源的影响,Flink 的上游个别是 mq,数据量大时可主动沉积,如果本文的计划上游是 http、rpc 调用,产生的阻塞影响就不能疏忽。弥补计划是每次提交数据都应用异步办法,如果失败则提交到 mq 中缓冲并生产该 mq 有限重试。

四、附录

本文源码地址:https://github.com/sofn/react…

Reactor 官网文档:https://projectreactor.io/doc…

Flink 文档:https://ci.apache.org/project…

Reactive 操作符:http://reactivex.io/documenta…


本文作者: 木小丰 ,美团 Java 高级工程师,关注架构、软件工程、全栈等,不定期分享软件研发过程中的实际、思考。欢送关注公共号:Java 研发

本文链接:https://lesofn.com/archives/s…

正文完
 0