关于reactor:聊聊Kafka-Reactor线程模型

1、是否简略形容下reactor线程模型?三个点,事件驱动、一个或者多个数据源、多路复用事件驱动简略了解 : 场景1 : 比如说小A HR要招聘一个NB的程序员,既要Java根底厉害,又波及大数据畛域(不单单只会皮毛的)的人,忽然小A在xxx招聘网站看到了一个小B,发现小B就是我想要找的人哈,怎么办?打个电话吧?嘟嘟嘟...,接通了,聊了聊,挺好,过去下班吧场景2 : 还有一种状况就是,小A和小B聊了聊,小A说,你先关注订阅一下咱们公司,咱们有新音讯了告诉你,因为我本人做不了主,须要主管看看,一个工夫,主管看了看,桌子一拍,唉吆喂,这不就是我想要找的人么,回调发一个邮件,来下班吧,小A收到邮件,高高兴兴来下班了 同步了解音讯驱动 :HR 小A在各种招聘网站都看,要么Java根底不行的,要么大数据不会的,肿么办?要不在xxx上公布一个招聘音讯吧,一旦有这样能力的人看到,说不定会分割我 总结 :简略了解 : 事件驱动就是我是被动的,比如说RPC,我被动来调用你,你有事件告诉我 ; 音讯驱动,我把音讯处理完毕了,放到这里了,你想生产就生产哈 一个或者多个数据源 : NIO 多路复用能够反对百万client连贯,承受客户端的Accept、Read、Write事件 多路复用 : client channel的注册到一个selector,轮询channel事件,事件处理 2、Kafka的 reactor线程模型能说说么?最好画个图吧?简略说说,Acceptor线程疯狂的承受客户端的连贯,come on,来和我建设连贯吧,Acceptor领有Processor线程的援用,过去了,是吧。好,打入到Processor线程中,一直的轮询打。一旦客户端有新的事件,我要读,好,Processor线程感知到了,而后又摔到一个线程平安的队列中,Handler线程池中,感知到了,会从队列中获取,比如说真正的IO操作(磁盘读/写操作,可能很浪费时间,没事,搞到线程池中,你干你的,不要阻塞我上游的线程),一旦Handler线程操作实现,因为Handler线程领有Processor中的响应队列援用,间接写回去,Processor线程在轮询的时候,吆,Handler处理完毕了,好吧,我写给客户端,好的,客户端实现了一次读取操作 欢送点赞加关注,用最简略的话,聊艰涩难懂的技术

April 2, 2023 · 1 min · jiezi

关于reactor:Reactor响应式编程-之-简介

1 reactor 呈现的背景、初衷和要达到什么样的指标Reactor 我的项目始于 2012 年。 通过长时间的外部孵化,于 2013 年公布 Reactor 1.x 版本。 Reactor 1 在各种架构下都能胜利部署,包含开源的(如 Meltdown)和商业的(如 Pivotal RTI)。2014年,通过与一些新兴的响应式数据流标准单干,从新设计并于 2015 年 4 月公布 Reactor 2.0 版本。 1.1 阻塞浪费资源互联网企业基本上都有着大量的用户,即便当代硬件的性能曾经晋升了很多,然而性能问题始终是互联网企业不能疏忽的一个问题。通常有两种形式来晋升利用的性能: 应用更多的线程和硬件资源达到并行化。这也是很多企业采纳的形式;在以后应用的资源上寻求更高效的解决。这在寰球经济上行的背景下,是一种老本更低的形式;1.2 异步能援救所有嘛?通过编写异步非阻塞的代码,能够将执行切换到应用了雷同底层资源的另一流动工作上,而后在异步实现之后返回到当前任务。晋升资源利用率。 java 提供了两种编写异步(异步不肯定非阻塞)代码的形式。 Callbacks:不立刻返回对象,然而提供了一个 callback 参数,当后果可返回时调用。Future:这也是当初大部分程序员在应用的形式。异步办法会立刻返回一个 Future<T>。Future 对象对获取该值进行了包装,这个对象能够始终轮询晓得返回(除非设置了超时工夫)。例如,ExecutorService 应用 Future 对象执行 Callable<T> 工作。这些技术都有本人的问题: callback 不好组合,编写有难度,且很容易导致代码难以浏览和保护。 Future 比callback好很多,然而也有本人的问题。 调用 get() 办法会阻塞;不足对多值和高级错误处理的反对。1.3 从命令式到响应式作为响应式编程方向上的第一步,Microsoft在.NET生态中创立了响应式(Rx)扩大库。而后RxJava实现了JVM上的响应式编程。随着工夫的推移,通过Reactive Streams的致力,一套基于JVM为响应式库定义接口与交互规定的标准规范Reactive Streams 呈现了。其接口曾经集成到了Java9中的 Flow 类下。响应式旨在解决上述 JVM 提供的异步形式的毛病,同时关注了其余一些方面: 组合型和易读性数据作为 流 操作,有着丰盛的操作符在订阅之前什么都不会产生(有什么长处?)背压,消费者能够向生产者发送信号示意公布速率太快与并发无关的高阶形象reactor 是响应式编程的一种实现。 古代应用程序须要解决大量并发申请并解决大量数据。规范的阻塞代码不再足以满足这些要求。 反应式设计模式是一种基于事件的架构办法,用于异步解决来自单个或多个服务处理程序的大量并发服务申请。 Project Reactor 基于这种模式,并有一个明确而雄心勃勃的指标,即在 JVM 上构建非阻塞、反应式应用程序。 2 reactor 劣势和劣势别离是什么劣势异步非阻塞代码可读性高背压 解决音讯的生产可能比生产慢。劣势对于非响应式 java 开发者来说,学习曲线平缓。debug 难度高3 reactor 的实用场景创立事件驱动程序;亚马逊等大型在线购物平台的告诉服务为银行业提供宏大的交易解决服务股票价格同时变动的股票交易业务4 reactor 组成部分和要害节点4.1 Mono一种生成数据流的形式。蕴含0-1个后果的异步序列。 ...

October 24, 2022 · 1 min · jiezi

关于reactor:Reactor-之-flatMap-vs-map-详解

1 作用不同1.2 映射?展平?map 只执行映射flatMap 既执行映射,也执行展平什么叫只能执行映射? 我了解是把一个数据执行一个办法,转换成另外一个数据。举个例子:mapper 函数把输出的字符串转换成大写。map()办法执行这个 mapper 函数。 Function<String, String > mapper = String::toUpperCase; Flux<String> inFlux = Flux.just("hello", ".", "com"); Flux<String> outFlux = inFlux.map(mapper); // reactor 测试包提供的测试方法 StepVerifier.create(outFlux) .expectNext("HELLO", ".", "COM") .expectComplete() .verify();什么叫展平? mapper 函数把字符串转成大写,而后宰割成一个一个字符。 Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split("")); Flux<String> inFlux = Flux.just("hello", ".", "com"); // 这里只能应用 flatMap,因为参数是 Function<T, Publisher<V>> 模式 Flux<String> outFlux = inFlux.flatMap(mapper); List<String> output = new ArrayList<>(); outFlux.subscribe(output::add); // 输入 [H, E, L, L, O, ., C, O, M] System.out.println(output); 请留神,因为来自不同起源的数据项交织,它们在输入中的程序可能与咱们在输出中看到的不同。 ...

August 19, 2022 · 2 min · jiezi

关于reactor:GrowingIO-Reactor速成指南

简介Reactor响应式编程(Reactive Programming): Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFuture, Stream, and Duration. It offers composable asynchronous sequence APIs — Flux (for [N] elements) and Mono (for [0|1] elements) — and extensively implements the Reactive Streams specification.翻译一下就是:Reactor是JVM的一个齐全无阻塞的响应式编程根底,具备高效的需要治理(以治理“背压”的模式)。它间接与Java 8性能api集成,尤其是CompletableFuture、Stream和Duration。它提供了可组合的异步序列api——Flux(用于[N]元素)和Mono(用于[0|1]元素)——并且宽泛地实现了反馈流标准。 背景咱们服务端的我的项目大多采纳了Spring WebFlux,reactor是 Spring WebFlux 的首选反馈库,WebFlux 须要 Reactor 作为外围依赖项。Reactor 存在肯定的学习老本,在开发中咱们遇到了些bug,相当一部分是因为咱们不够理解 reactor ,踩了很多坑。所以在本文档中咱们次要针对的是一些学习过程容易让新人感到迷茫的知识点(map、flatMap、异步、并发),冀望能让新人更好上手 Spring WebFlux。 ...

December 20, 2021 · 5 min · jiezi

关于reactor:reactor模式

IO线程模型概述目前的IO线程解决模型个别能够分为以下三类 单线程阻塞I/O服务模型多线程阻塞IO服务模型Reactor模式依据Reactor的数量和解决资源池线程的数量不同,Reactor模式有如下3种典型的实现 单Reactor单线程单Reactor多线程主从Reactor多线程传统阻塞I/O线程模型传统阻塞的IO线程线程模型在解决IO事件的时候其实就是一直应用一个循环监听端口是否有新的套接字连贯,如果有就进行相应的解决,然而在业务逻辑较为简单的状况下,无奈疾速返回进行新的连贯操作,会导致后续的连贯阻塞,效率太低。其线程模型如下图所示:(对于多线程阻塞IO这里就不再赘述,就是一个线程对应一个连贯) Reactor模型针对传统阻塞I/O服务模型的毛病,解决方案个别有两个 基于 I/O 复用模型:多个连贯共用一个阻塞对象,应用程序只须要在一个阻塞对象期待,无需阻塞期待所有连贯。当某个连贯有新的数据能够解决时,操作系统告诉应用程序,线程从阻塞状态返回,开始进行业务解决。基于线程池复用线程资源:不用再为每个连贯创立线程,将连贯实现后的业务解决任务分配给线程进行解决,一个线程能够解决多个连贯的业务。I/O复用联合线程池,就是Reactor模式根本设计思维,如图所示: 单Reactor单线程模式长处:模型简略,没有多线程、过程通信、竞争的问题,全副都在一个线程中实现毛病: 性能问题,只有一个线程,无奈齐全施展多核 CPU 的性能。Handler 在解决某个连贯上的业务时,整个过程无奈解决其余连贯事件,很容易导致性能瓶颈。可靠性问题,线程意外终止,或者进入死循环,会导致整个零碎通信模块不可用,不能接管和解决内部音讯,造成节点故障应用场景:客户端的数量无限,业务解决十分疾速,比方 Redis在业务解决的工夫复杂度 O(1) 的状况 单Reactor多线程模型长处:能够充沛的利用多核cpu 的解决能力毛病:多线程数据共享和拜访比较复杂, reactor 解决所有的事件的监听和响应,在单线程运行, 在高并发场景容易呈现性能瓶颈. 主从Reactor多线程模型长处:父线程与子线程的数据交互简略职责明确,父线程只须要接管新连贯,子线程实现后续的业务解决。Reactor 主线程只须要把新连贯传给子线程,子线程无需返回数据。毛病:编程复杂度较高联合实例:这种模型在许多我的项目中宽泛应用,包含 Nginx 主从 Reactor 多过程模型,Memcached 主从多线程,Netty 主从多线程模型也进行了反对 Netty中的reactor模型Netty次要是基于主从Reactor多线程模式做了肯定的改良,其中主从Reactor都由单线程一个变成了多线程。Server 端蕴含 1 个 Boss的NioEventLoopGroup 和 1 个 Worker的NioEventLoopGroup。NioEventLoopGroup 相当于 1 个事件循环组,这个组里蕴含多个事件循环 NioEventLoop,每个 NioEventLoop 蕴含 1 个 Selector 和 1 个事件循环线程。每个Boss的NioEventLoop循环执行的工作蕴含 3 步: 轮询 Accept 事件;解决 Accept I/O 事件,与 Client 建设连贯,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上;解决工作队列中的工作,runAllTasks。工作队列中的工作包含用户调用eventloop.execute或schedule执行的工作,或者其余线程提交到该eventloop的工作。每个 Worker NioEventLoop 循环执行的工作蕴含 3 步: ...

July 23, 2021 · 1 min · jiezi

关于reactor:源于生活的Reactor设计模式浅析

1 源于生存的设计模式 咱们在生活中有一个十分不违心去然而却又不得不去的中央就是医院,咱们去医院看病须要经验一下流程: 第1步:挂号 第2步:期待叫号 第3步:找对应的医生讲述病情,医生依据症状开化验单 第4步:咱们带上化验单应用设施化验 第5步:期待化验后果进去之后咱们要拿上化验单再去找医生 第6步:医生依据化验后果隔靴搔痒。 每个人都不违心去医院,因为咱们感觉去医院十分麻烦,经验下面的流程一天的工夫就过来了,在这所有的流程中期待叫号和等化验后果工夫是最长的,有的化验单可能好几天能力进去,化验单进去之后也须要排队让医生看化验后果。 作为病人心愿到医院之后有一对一的服务,到医院之后有对应的医生给咱们全程跟踪看病。 然而如果咱们作为医院的管理者会怎么思考这个问题呢,医院的医生和设施数量是无限的,然而每天的病人却很多,基本上是医生的几十倍,如何应用无限的医生和设施资源解决更多病人的问题呢。 假如医院有10个医生,20台设施,医生执行第3步须要10分钟,执行第6步须要10分钟,设施执行第4步须要20分钟,第5步须要20分钟。如果医生接到病人之后给病人开化验单,而后等着病人出后果之后给病人开药,再送走,就是从第3步到第6步始终有医生跟着,这样会十分耗时,一个医生看一个病人须要破费 10+20+20+10=60分钟 的工夫,一天8小时只能给8病人看病,一个医院每天只能接待108=80个病人。然而如果医生开完化验单之后病人去化验这段时间给下个挂过号的病人看病,即执行完第3步之后,医生能够持续叫号执行第2步,病人拿到化验单之后即执行完第5步之后能够再去找医生执行第6步,这样就把资源充分利用起来了,一个医生看一个病人只须要精力第3步和第6不,破费10+10=20分钟,每天可能给860/20=24集体看病,整个医院每天能接待10*24=240个病人。这就是Reactor设计模式最外围的思维,解决了一对多的问题,大大晋升了资源的使用率。 【文章福利】须要C/C++ Linux服务器架构师学习材料加群1106747042(材料包含C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等) 2 Reactor设计模式实现 NIO是应用reactor设计模式十分经典的案例,传统的IO客户端过去一个连贯,服务端就须要专门一个线程来解决,每一个线程会将一次交互操作全副解决实现,包含读取和返回,外表上仿佛连贯不在线程外面,然而如果线程不够,新连贯将无奈失去解决。所以一个线程就肩负了连贯、读取、写入的责任。 这种解决模式海量并发的状况下即便引入线程池也不能满足需要,这个时候思考将一次残缺的申请切分为几个小的工作,就像医院把医生问诊和仪器查看离开一样,每个小工作都是非阻塞的,对于读写操作应用NIO对其进行读写。所以咱们把一次连贯操作拆分为多个工作,每个工作都是非阻塞的,这样就大大晋升了效率,然而这样做线程池中的线程数量业也会大大增长,然而线程更加简略且工作繁多,有点像咱们微服务的思维。 Reactor从线程池和Reactor的抉择上能够细分为:Reactor单线程模型,Reactor多线程模型,Reactor主从模型。 2.1 Reactor单线程模型 单线程模型是针对客户端申请应用一个专门的线程去解决,这个线程循环监听是否有客户端的申请到达,一旦收到客户端申请,将其分发给相应解决线程进行解决。这种模式采纳了基于事件驱动的设计,当有事件触发时才会调用处理器进行数据处理。应用Reactor模式能够对线程的数量进行管制,能够应用一个线程去解决大量的事件。 2.2 Reactor多线程模型 应用一个线程能够反对所有的IO解决,然而瓶颈也是不言而喻的,如果客户端屡次申请时在业务线程中解决较慢,后续的客户端会被积压,导致响应变慢,所以须要引入Reactor多线程模型。能够将工作线程引入线程池,将处理器的执行放入线程池,并应用多线程解决业务逻辑。 2.3 Reactor主从模型 对于多个CPU的机器,为了充分利用系统资源会将Reactor拆分为两局部。 1)Main Reactor 负责监听连贯,将监听到的连贯交给Sub Reactor解决,主Reactor用于响应连贯申请。 2)Sub Reactor 解决连贯,从Reactor用于解决IO操作申请。 3 架构模型 Handle:操作系统的句柄,能够是关上的文件、一个Socket连贯、Timer定时器等。 Synchronous Event Demultiplexer :同步(多路)事件分离器,阻塞期待Handles中的事件产生。 Initiation Dispatcher :初始事件散发器,提供了注册、删除、转发Event Handler的办法 Event Handler :事件处理器的接口 Concrete Event Handler :事件处理器的理论实现,而且绑定了一个Handle。因为在理论状况中,咱们往往不止一种事件处理器,因而这里将事件处理器接口和实现离开,与C++、Java这些高级语言中的多态相似。 ————————————————

May 28, 2021 · 1 min · jiezi

关于reactor:带你彻底搞懂高性能网络模式Reactor-和-Proactor

摘要:无论是 Reactor,还是 Proactor,都是一种基于「事件散发」的网络编程模式,区别在于 Reactor 模式是基于「待实现」的 I/O 事件,而 Proactor 模式则是基于「已实现」的 I/O 事件。本文分享自华为云社区《高性能网络框架:Reactor和 Proactor》,原文作者:小林 coding。 这次就来图解 Reactor 和 Proactor 这两个高性能网络模式。 别小看这两个货色,特地是 Reactor 模式,市面上常见的开源软件很多都采纳了这个计划,比方 Redis、Nginx、Netty 等等,所以学好这个模式设计的思维,不仅有助于咱们了解很多开源软件,而且也能在面试时吹逼。 发车! 演进如果要让服务器服务多个客户端,那么最间接的形式就是为每一条连贯创立线程。 其实创立过程也是能够的,原理是一样的,过程和线程的区别在于线程比拟轻量级些,线程的创立和线程间切换的老本要小些,为了形容简述,前面都以线程为例。 解决完业务逻辑后,随着连贯敞开后线程也同样要销毁了,然而这样不停地创立和销毁线程,不仅会带来性能开销,也会造成浪费资源,而且如果要连贯几万条连贯,创立几万个线程去应答也是不事实的。 要这么解决这个问题呢?咱们能够应用「资源复用」的形式。 也就是不必再为每个连贯创立线程,而是创立一个「线程池」,将连贯调配给线程,而后一个线程能够解决多个连贯的业务。 不过,这样又引来一个新的问题,线程怎样才能高效地解决多个连贯的业务? 当一个连贯对应一个线程时,线程个别采纳「read -> 业务解决 -> send」的解决流程,如果以后连贯没有数据可读,那么线程会阻塞在 read 操作上( socket 默认状况是阻塞 I/O),不过这种阻塞形式并不影响其余线程。 然而引入了线程池,那么一个线程要解决多个连贯的业务,线程在解决某个连贯的 read 操作时,如果遇到没有数据可读,就会产生阻塞,那么线程就没方法持续解决其余连贯的业务。 要解决这一个问题,最简略的形式就是将 socket 改成非阻塞,而后线程一直地轮询调用 read 操作来判断是否有数据,这种形式尽管该可能解决阻塞的问题,然而解决的形式比拟粗犷,因为轮询是要耗费 CPU 的,而且随着一个 线程解决的连贯越多,轮询的效率就会越低。 下面的问题在于,线程并不知道以后连贯是否有数据可读,从而须要每次通过 read 去试探。 那有没有方法在只有当连贯上有数据的时候,线程才去发动读申请呢?答案是有的,实现这一技术的就是 I/O 多路复用。 I/O 多路复用技术会用一个零碎调用函数来监听咱们所有关怀的连贯,也就说能够在一个监控线程外面监控很多的连贯。 咱们相熟的 select/poll/epoll 就是内核提供给用户态的多路复用零碎调用,线程能够通过一个零碎调用函数从内核中获取多个事件。 PS:如果想晓得 select/poll/epoll 的区别,能够看看小林之前写的这篇文章:这次许可我,一举拿下 I/O 多路复用! select/poll/epoll 是如何获取网络事件的呢? ...

May 14, 2021 · 4 min · jiezi

并发异步同步阻塞非阻塞模型

网络从网卡到线程过程 1.一个以太网接口接收发送到它的单播地址和以太网广播地址的帧。当一个完整的帧可用时,接口就产生一个硬中断,并且内核调用接口层函数leintr2.leintr检测硬件,并且如果有一个帧到达,就调用leread把这个帧从接口转移到一个mbuf(各层之间传输数据都用这个)链中,构造单独的地址信息etherheaher。etherinput检查结构etherheaher来判断接收到的数据的类型,根据以太网类型字段来跳转。对于一个IP分组,schednetisr调度一个IP软件中断,并选择IP输入队列,ipintrq。对于一个ARP分组,调度ARP软件中断,并选择arpintrq。并将接收到的分组加入到队列中等待处理。3.当收到的数据报的协议字段指明这是一个TCP报文段时,ipintrq(通过协议转换表中的prinput函数)会调用tcpinp t进行处理.从mbuf中取ip,tcp首部,寻找pcb,发送给插口层3.1个关于pcb每个线程的task_struct都有打开文件描述符,如果是sock类型还会关联到全局inpcb中.listen某个fd时(new socket()=>bind(listen的端口)=>lisnten())在pcb中该fd是listen状态3.2in_pcblookup搜索它的整个Internet PCB表,找到一个匹配。完全匹配获得最高优先级,包含通配的最小通配数量的优先级高。所以当同一个端口已经建立连接后有了外部地址和端口,再有数据会选择该插口。比如当140.252.1.11:1500来的数据直接就匹配到第三个的插口,其他地址的发送到第一个插口。4.插口层4.1 listen 如果监听插口收到了报文段listen状态下只接收SYN,即使携带了数据也等建立连接后才发送,创建新的so,在收到三次握手的最后一个报文段后,调用soisconnected唤醒插口层accept 4.2 插口层acceptwhile (so->so_qlen == 0 && so->so_error == 0) {tsleep((caddr_t)&so->so_timeo, PSOCK | PCATCH,netcon, 0))} 当so_qlen不是0,调用falloc(p, &fp, &tmpfd)创建新的插口fd,从插口队列中复制,返回,此时该fd也在线程的打开文件中了。调用soqremque将插口从接收队列中删除。4.3 插口层read,send 从缓冲区复制mbuf。略5.线程调用内核的accept,从调用开始会sleep直到此时可以返回fd。【后文若用了epoll在lsfd有事件时通知线程再调用accept会节省调用到sleep时间。】 epoll原理 poll/select/epoll的实现都是基于文件提供的poll方法(f_op->poll),该方法利用poll_table提供的_qproc方法向文件内部事件掩码_key对应的的一个或多个等待队列(wait_queue_head_t)上添加包含唤醒函数(wait_queue_t.func)的节点(wait_queue_t),并检查文件当前就绪的状态返回给poll的调用者(依赖于文件的实现)。当文件的状态发生改变时(例如网络数据包到达),文件就会遍历事件对应的等待队列并调用回调函数(wait_queue_t.func)唤醒等待线程。数据结构:epoll_crete 创建event_poll,实际上创建了一个socketfd,称epfd。epoll_ctl 将回调为ep_poll_callback的节点 加入到epitem对应fd的等待队列中(即sk_sleep的wait_queue_head_t),关联到event_poll的红黑树等结构体中epoll_wait 将回调为try_to_wake_up的节点 加入到epfd的等待队列中你。 当发生事件,socket调用ep_poll_callback 会调用try_to_wake_up 进而唤醒wait的线程,向用户赋值rdlist数据,用户线程继续执行 (以scoket为例,当socket数据ready,终端会调用相应的接口函数比如rawv6_rcv_skb,此函数会调用sock_def_readable然后,通过sk_has_sleeper判断sk_sleep上是否有等待的进程,如果有那么通过wake_up_interruptible_sync_poll函数调用ep_poll_callback。从wait队列中调出epitem,检查状态epitem的event.events,若是感兴趣的事情发生,加入到rdllist或者ovflist中,调用try_to_wake_up。)数据拷贝: 1.拷贝新添加的events YSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,struct epoll_event __user *, event) copy_from_user(&epds, event, sizeof(struct epoll_event))) 2.传给用户的就绪的event 在ep_send_events_proc时__put_user(revents, &uevent->events。__put_user(epi->event.data, &uevent->data)与select区别 重复读入参数,全量的扫描文件描述符;调用开始,将进程加入到每个文件描述符的等待队列,在调用结束后又把进程从等待队列中删除;(每次发生事件fd位图结构改变,重新清除再select)select最多支持1024个文件描述符。 epoll 注册事件只需要一次拷贝(增量拷贝,依靠回调),另外返回就绪fd,不需要遍历所有的。运行模型是否立即返回。 阻塞:空cpu,IO阻塞线程非阻塞是否由本线程执行 同步IO异步1.所有请求单进程/线程2.一个请求一个线程/进程accept后一个连接全部过程在一个进程/线程 ,结束后结束线程/进程,每次有新连接创建一个新的进程/线程去处理请求 一个连接一个进程:父进程fork子进程 =》fork代价大 百级别prefork 多个进程会accept同一个lsfd,linux2.6已经支持就觉多进程同时accept一个时的惊群现象。一个连接一个线程 万级别限制,线程相互影响不稳定prethread 多线程共享数据,可以直接accept后分配给线程,也可以多个线程共同accept(accept实现了线程安全?).3.一个进程/线程处理多个请求线程池/进程池+非阻塞+IO多路复用 (非阻塞+IO多路复用 少了哪个这种模型都没有意义)reactor 监听所有类型事件,区分accept和业务处理 ...

May 11, 2019 · 1 min · jiezi

处理Empty Mono的方法

在Reactor编程中有时候我们需要对empty Mono<T>做一些特定业务逻辑。下面看一段非reactor编程的代码:public void oldCheck(Token token) { if (token == null) { // business logic return; } if (token.isExpired) { // business logic return; } // business logic return;}如果让你改成reactor你也许会改成这样:public Mono<Void> badCheck(Mono<Token> tokenMono) { return tokenMono .flatMap(token -> { if (token == null) { // CAUTION: You will never be in here // business logic return Mono.empty(); } if (token.isExpired) { // business logic return Mono.empty(); } // business logic return Mono.empty(); });}上面的示例代码里的注释已经写了if (token == null) {}的这个条件是永远成立的,这是因为当Mono<Token>是empty时,它是不会触发flatMap的。诸如flatMap的绝大部分Operator都依赖于Publisher(Mono和Flux都是Pubisher)推送数据(详情请看javadoc),如果Publisher本身无数据可推送,那么就不会触发Operator。换句话说flatMap内部是不可能得到null的。那么怎么做才可以?你可以使用Java 8的Optional来作为中间值:public Mono<Void> goodCheck(Mono<Token> tokenMono) { return tokenMono // Transform Mono<Token> to Mono<Optional<Token>>. // If Mono<Token> is empty, flatMap will not be triggered, // then we will get a empty Mono<Optional<Token>> .flatMap(token -> Mono.just(Optional.of(token))) // If Mono<Optional<Token>> is empty, provide an empty Optional<Token>, // then we will get a non-empty Mono<Optional<Token>> anyway .defaultIfEmpty(Optional.empty()) // Since Mono<Optional<Token>> is not empty, flatMap will always be triggered. .flatMap(tokenOptional -> { if (!tokenOptional.isPresent()) { // business logic return Mono.empty(); } Token token = tokenOptional.get(); if (token.isExpired) { // business logic return Mono.empty(); } // business logic return Mono.empty(); });}除了defaultIfEmpty之外,Reactor还提供了switchIfEmpty、repeatWhenEmpty来处理empty Mono/Flux。 ...

April 19, 2019 · 1 min · jiezi

Reactor学习笔记

本文基于:https://htmlpreview.github.io…://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html#flux1 Reactor简介Reactor是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理能力。与Java8函数式API直接集成(CompletableFuture,Stream以及Duration)。它提供了异步序列API Flux(用于N个元素)和Mono(用于0|1个元素)。适用于微服务架构,并且完整支持响应式编解码。2 响应式编程Reactor是响应式编程范式的实现。响应式编程通常作为面向对象编程中的观察者模式的一种扩展。Reactor主要是弥补一些经典的JVM异步方式所带来的不足,此外还关注几个方面:可编排性以及可读性使用丰富的操作符阿里处理形如流的数据在订阅(subscribe)之前什么都不会发生背压(backpressure)具体来说就是消费者能够反向告知生产者生产内容的速度的能力高层次的抽象,从而达到并发无关的效果3 Reactor核心特性3.1 Flux,包含0-N个元素的异步序列Flux<T>是一个能够发出0到N个元素的标准的Publisher<T>,它会被一个”error”或者“completion”信号终止,因此,一个flux的可能结果是一个value、completion或error。3.2 Mono,异步的0-1结果Mono<T>是一种特殊的Publisher<T>,它最多发出一个元素,然后终止于一个onComplete信号或一个onError信号。它只适用其中一部分可用于Flux的操作。比如,(两个Mono的)结合类操作可以忽略其中之一而发出另一个Mono,也可以将两个都发出,对于后一种情况会切换为一个Flux。

February 21, 2019 · 1 min · jiezi

Spring Flux中Request与HandlerMapping关系的形成过程

一、前言Spring Flux中的核心DispatcherHandler的处理过程分为三步,其中首步就是通过HandlerMapping接口查找Request所对应的Handler。本文就是通过阅读源码的方式,分析一下HandlerMapping接口的实现者之一——RequestMappingHandlerMapping类,用于处理基于注解的路由策略,把所有用@Controller和@RequestMapping标记的类中的Handler识别出来,以便DispatcherHandler调用的。HandlerMapping接口的另外两种实现类:1、RouterFunctionMapping用于函数式端点的路由;2、SimpleUrlHandlerMapping用于显式注册的URL模式与WebHandler配对。<!– more –>文章系列关于非阻塞IO:《从时间碎片角度理解阻塞IO模型及非阻塞模型》关于SpringFlux新手入门:《快速上手Spring Flux框架》为什么Spring要引入SpringFlux框架 尚未完成Spring Flux中Request与HandlerMapping关系的形成过程 本文Spring Flux中执行HandlerMapping的过程 尚未完成Spring Flux中是如何处理HandlerResult的 尚未完成Spring Flux与WEB服务器之Servlet 3.1+ 尚未完成Spring Flux与WEB服务器之Netty 尚未完成二、对基于注解的路由控制器的抽象Spring中基于注解的控制器的使用方法大致如下:@Controllerpublic class MyHandler{ @RequestMapping("/") public String handlerMethod(){ }}在Spring WebFlux中,对上述使用方式进行了三层抽象模型。Mapping用户定义的基于annotation的映射关系该抽象对应的类是:org.springframework.web.reactive.result.method.RequestMappingInfo比如上述例子中的 @RequestMapping("/")所代表的映射关系Handler代表控制器的类该抽象对应的类是:java.lang.Class比如上述例子中的MyHandler类Method具体处理映射的方法该抽象对应的类是:java.lang.reflect.Method比如上述例子中的String handlerMethod()方法基于上述三层抽象模型,进而可以作一些组合。HandlerMethodHandler与Method的结合体,Handler(类)与Method(方法)搭配后就成为一个可执行的单元了Mapping vs HandlerMethod把Mapping与HandlerMethod作为字典存起来,就可以根据请求中的关键信息(路径、头信息等)来匹配到Mapping,再根据Mapping找到HandlerMethod,然后执行HandlerMethod,并传递随请求而来的参数。理解了这个抽象模型后,接下来分析源码来理解Spring WebFlux如何处理请求与Handler之间的Mapping关系时,就非常容易了。HandlerMapping接口及其各实现类负责上述模型的构建与运作。三、HandlerMapping接口实现的设计模式HandlerMapping接口实现,采用了"模版方法"这种设计模式。1层:AbstractHandlerMapping implements HandlerMapping, Ordered, BeanNameAware ^ | 2层:AbstractHandlerMethodMapping implements InitializingBean ^ | 3层:RequestMappingInfoHandlerMapping ^ | 4层:RequestMappingHandlerMapping implements EmbeddedValueResolverAware 下面对各层的职责作简要说明:第1层主要实现了对外提供模型的接口即重载了HandlerMapping接口的"Mono<Object> getHandler(ServerWebExchange exchange) “方法,并定义了骨架代码。第2层有两个责任 —— 解析用户定义的HandlerMethod + 实现对外提供模型接口实现所需的抽象方法通过实现了InitializingBean接口的"void afterPropertiesSet()“方法,解析用户定义的Handler和Method。实现第1层对外提供模型接口实现所需的抽象方法:“Mono<?> getHandlerInternal(ServerWebExchange exchange)“第3层提供根据请求匹配Mapping模型实例的方法第4层实现一些高层次用到的抽象方法来创建具体的模型实例。小结一下,就是HandlerMapping接口及其实现类,把用户定义的各Controller等,抽象为上述的Mapping、Handler及Method模型,并将Mapping与HandlerMethod作为字典关系存起来,还提供通过匹配请求来获得HandlerMethod的公共方法。接下来的章节,将先分析解析用户定义的模型并缓存模型的过程,然后再分析一下匹配请求来获得HandlerMethod的公共方法的过程。四、解析用户定义的模型并缓存模型的过程4-1、实现InitializingBean接口第2层AbstractHandlerMethodMapping抽象类中的一个重要方法——实现了InitializingBean接口的"void afterPropertiesSet()“方法,为Spring WebFlux带来了解析用户定义的模型并缓存模型的机会 —— Spring容器初初始化完成该类的具体类的Bean后,将会回调这个方法。在该方法中,实现获取用户定义的Handler、Method、Mapping以及缓存Mapping与HandlerMethod映射关系的功能。@Overridepublic void afterPropertiesSet() { initHandlerMethods(); // Total includes detected mappings + explicit registrations via registerMapping.. …}4-2、找到用户定义的HandlerafterPropertiesSet方法中主要是调用了void initHandlerMethods()方法,具体如下:protected void initHandlerMethods() { //获取Spring容器中所有Bean名字 String[] beanNames = obtainApplicationContext().getBeanNamesForType(Object.class); for (String beanName : beanNames) { if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) { Class<?> beanType = null; try { //获取Bean的类型 beanType = obtainApplicationContext().getType(beanName); } catch (Throwable ex) { // An unresolvable bean type, probably from a lazy bean - let’s ignore it. if (logger.isTraceEnabled()) { logger.trace(“Could not resolve type for bean ‘” + beanName + “’”, ex); } } //如果获取到类型,并且类型是Handler,则继续加载Handler方法。 if (beanType != null && isHandler(beanType)) { detectHandlerMethods(beanName); } } } //初始化后收尾工作 handlerMethodsInitialized(getHandlerMethods());}这儿首先获取Spring容器中所有Bean名字,然后循环处理每一个Bean。如果Bean名称不是以SCOPED_TARGET_NAME_PREFIX常量开头,则获取Bean的类型。如果获取到类型,并且类型是Handler,则继续加载Handler方法。isHandler(beanType)调用,检查Bean的类型是否符合handler定义。AbstractHandlerMethodMapping抽象类中定义的抽象方法"boolean isHandler(Class<?> beanType)",是由RequestMappingHandlerMapping类实现的。具体实现代码如下:protected boolean isHandler(Class<?> beanType) { return (AnnotatedElementUtils.hasAnnotation(beanType, Controller.class) || AnnotatedElementUtils.hasAnnotation(beanType, RequestMapping.class));}不难看出,对于RequestMappingHandlerMapping这个实现类来说,只有拥有@Controller或者@RequestMapping注解的类,才是Handler。(言下之意对于其他实现类来说Handler的定义不同)。具体handler的定义,在HandlerMapping各实现类来说是不同的,这也是isHandler抽象方法由具体实现类来实现的原因。4-3、发现Handler的Method接下来我们要重点看一下"detectHandlerMethods(beanName);“这个方法调用。protected void detectHandlerMethods(final Object handler) { Class<?> handlerType = (handler instanceof String ? obtainApplicationContext().getType((String) handler) : handler.getClass()); if (handlerType != null) { //将handlerType转换为用户类型(通常等同于被转换的类型,不过诸如CGLIB生成的子类会被转换为原始类型) final Class<?> userType = ClassUtils.getUserClass(handlerType); //寻找目标类型userType中的Methods,selectMethods方法的第二个参数是lambda表达式,即感兴趣的方法的过滤规则 Map<Method, T> methods = MethodIntrospector.selectMethods(userType, //回调函数metadatalookup将通过controller定义的mapping与手动定义的mapping合并起来 (MethodIntrospector.MetadataLookup<T>) method -> getMappingForMethod(method, userType)); if (logger.isTraceEnabled()) { logger.trace(“Mapped " + methods.size() + " handler method(s) for " + userType + “: " + methods); } methods.forEach((key, mapping) -> { //再次核查方法与类型是否匹配 Method invocableMethod = AopUtils.selectInvocableMethod(key, userType); //如果是满足要求的方法,则注册到全局的MappingRegistry实例里 registerHandlerMethod(handler, invocableMethod, mapping); }); }}首先将参数handler(即外部传入的BeanName或者BeanType)转换为Class<?>类型变量handlerType。如果转换成功,再将handlerType转换为用户类型(通常等同于被转换的类型,不过诸如CGLIB生成的子类会被转换为原始类型)。接下来获取该用户类型里所有的方法(Method)。循环处理每个方法,如果是满足要求的方法,则注册到全局的MappingRegistry实例里。4-4、解析Mapping信息其中,以下代码片段有必要深入探究一下 Map<Method, T> methods = MethodIntrospector.selectMethods(userType, (MethodIntrospector.MetadataLookup<T>) method -> getMappingForMethod(method, userType));MethodIntrospector.selectMethods方法的调用,将会把用@RequestMapping标记的方法筛选出来,并交给第二个参数所定义的MetadataLookup回调函数将通过controller定义的mapping与手动定义的mapping合并起来。第二个参数是用lambda表达式传入的,表达式中将method、userType传给getMappingForMethod(method, userType)方法。getMappingForMethod方法在高层次中是抽象方法,具体的是现在第4层RequestMappingHandlerMapping类中实现。在具体实现getMappingForMethod时,会调用到RequestMappingHandlerMapping类的下面这个方法。从该方法中,我们可以看到,首先会获得参数element(即用户在Controller中定义的方法)的RequestMapping类型的类实例,然后构造代表Mapping抽象模型的RequestmappingInfo类型实例并返回。private RequestMappingInfo createRequestMappingInfo(AnnotatedElement element) { RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(element, RequestMapping.class); RequestCondition<?> condition = (element instanceof Class ? getCustomTypeCondition((Class<?>) element) : getCustomMethodCondition((Method) element)); return (requestMapping != null ? createRequestMappingInfo(requestMapping, condition) : null); }构造代表Mapping抽象模型的RequestmappingInfo类型实例,用的是createRequestMappingInfo方法,如下。可以看到RequestMappingInfo所需要的信息,包括paths、methods、params、headers、consumers、produces、mappingName,即用户定义@RequestMapping注解时所设定的可能的参数,都被存在这儿了。拥有了这些信息,当请求来到时,RequestMappingInfo就可以测试自身是否是处理该请求的人选之一了。protected RequestMappingInfo createRequestMappingInfo( RequestMapping requestMapping, @Nullable RequestCondition<?> customCondition) { RequestMappingInfo.Builder builder = RequestMappingInfo .paths(resolveEmbeddedValuesInPatterns(requestMapping.path())) .methods(requestMapping.method()) .params(requestMapping.params()) .headers(requestMapping.headers()) .consumes(requestMapping.consumes()) .produces(requestMapping.produces()) .mappingName(requestMapping.name()); if (customCondition != null) { builder.customCondition(customCondition); } return builder.options(this.config).build(); }4-5、缓存Mapping与HandlerMethod关系最后,registerHandlerMethod(handler, invocableMethod, mapping)调用将缓存HandlerMethod,其中mapping参数是RequestMappingInfo类型的。。内部调用的是MappingRegistry实例的void register(T mapping, Object handler, Method method)方法,其中T是RequestMappingInfo类型。MappingRegistry类维护所有指向Handler Methods的映射,并暴露方法用于查找映射,同时提供并发控制。public void register(T mapping, Object handler, Method method) { this.readWriteLock.writeLock().lock(); try { HandlerMethod handlerMethod = createHandlerMethod(handler, method); …… this.registry.put(mapping, new MappingRegistration<>(mapping, handlerMethod, directUrls, name)); } finally { this.readWriteLock.writeLock().unlock(); } }五、匹配请求来获得HandlerMethodAbstractHandlerMethodMapping类的“Mono<HandlerMethod> getHandlerInternal(ServerWebExchange exchange)”方法,具体实现了根据请求查找HandlerMethod的逻辑。 @Override public Mono<HandlerMethod> getHandlerInternal(ServerWebExchange exchange) { //获取读锁 this.mappingRegistry.acquireReadLock(); try { HandlerMethod handlerMethod; try { //调用其它方法继续查找HandlerMethod handlerMethod = lookupHandlerMethod(exchange); } catch (Exception ex) { return Mono.error(ex); } if (handlerMethod != null) { handlerMethod = handlerMethod.createWithResolvedBean(); } return Mono.justOrEmpty(handlerMethod); } //释放读锁 finally { this.mappingRegistry.releaseReadLock(); } }handlerMethod = lookupHandlerMethod(exchange)调用,继续查找HandlerMethod。我们继续看一下HandlerMethod lookupHandlerMethod(ServerWebExchange exchange)方法的定义。为方便阅读,我把注释也写在了代码里。 protected HandlerMethod lookupHandlerMethod(ServerWebExchange exchange) throws Exception { List<Match> matches = new ArrayList<>(); //查找所有满足请求的Mapping,并放入列表mathes addMatchingMappings(this.mappingRegistry.getMappings().keySet(), matches, exchange); if (!matches.isEmpty()) { //获取比较器comparator Comparator<Match> comparator = new MatchComparator(getMappingComparator(exchange)); //使用比较器将列表matches排序 matches.sort(comparator); //将排在第1位的作为最佳匹配项 Match bestMatch = matches.get(0); if (matches.size() > 1) { //将排在第2位的作为次佳匹配项 Match secondBestMatch = matches.get(1); } handleMatch(bestMatch.mapping, bestMatch.handlerMethod, exchange); return bestMatch.handlerMethod; } else { return handleNoMatch(this.mappingRegistry.getMappings().keySet(), exchange); } }六、总结理解了Spring WebFlux在获取映射关系方面的抽象设计模型后,就很容易读懂代码,进而更加理解框架的具体处理方式,在使用框架时做到“知己知彼”。原文:http://www.yesdata.net/2018/11/27/spring-flux-request-mapping/ ...

November 30, 2018 · 3 min · jiezi

reactor-rabbitmq小试牛刀

序本文主要研究一下如何使用reactor-rabbitmqmaven <dependency> <groupId>io.projectreactor.rabbitmq</groupId> <artifactId>reactor-rabbitmq</artifactId> <version>1.0.0.M2</version> </dependency>rabbitmq参考docker搭建rabbitmq集群当前使用的镜像是bijukunjummen/rabbitmq-server:3.7.0,docker-compose文件配置的账号密码为myuser/mypass访问http://192.168.99.100:15672可以查看界面实例 @Test public void testProducer() throws InterruptedException { int count = 100; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername(“myuser”); connectionFactory.setPassword(“mypass”); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)}, “reactive-sender”)) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count) .map(i -> new OutboundMessage("", QUEUE, (“Message_” + i).getBytes()))); CountDownLatch latch = new CountDownLatch(count); sender.declareQueue(QueueSpecification.queue(QUEUE)) .thenMany(confirmations) .doOnError(e -> LOGGER.error(“Send failed”, e)) .subscribe(r -> { if (r.isAck()) { LOGGER.info(“Message {} sent successfully”, new String(r.getOutboundMessage().getBody())); latch.countDown(); } }); latch.await(10, TimeUnit.SECONDS); sender.close(); } @Test public void testConsumer() throws InterruptedException { int count = 100; CountDownLatch latch = new CountDownLatch(count); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername(“myuser”); connectionFactory.setPassword(“mypass”); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)}, “reactive-sender”)) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE)); ReceiverOptions receiverOptions = new ReceiverOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)}, “reactive-receiver”)) .connectionSubscriptionScheduler(Schedulers.elastic()); Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions); Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE); Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> { LOGGER.info(“Received message {}”, new String(m.getBody())); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); sender.close(); receiver.close(); }由于设置了账号密码,因而需要在ConnectionFactory那里指定账号密码另外由于使用了rabbitmq集群,因而通过connectionSupplier指定要连接的多个rabbitmq地址这里不管是producer还是consumer,都通过queueDeclaration进行操作小结reactor-rabbitmq对rabbitmq的api进行封装,改造为reactive streams模式,提供了Non-blocking Back-pressure以及End-to-end Reactive Pipeline特性。docreactor-rabbitmq-samplesReactor RabbitMQ Reference Guide ...

October 7, 2018 · 1 min · jiezi

reactor-kafka小试牛刀

序本文主要展示一下如何使用reactor-kafkamaven <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> <version>1.0.1.RELEASE</version> </dependency>准备启动zookeepercd zookeeper-3.4.13sh bin/zkServer.sh startZooKeeper JMX enabled by defaultZooKeeper remote JMX Port set to 8999ZooKeeper remote JMX authenticate set to falseZooKeeper remote JMX ssl set to falseZooKeeper remote JMX log4j set to trueUsing config: zookeeper-3.4.13/bin/../conf/zoo.cfg-n Starting zookeeper …STARTED启动kafkacd kafka_2.11-1.1.1sh bin/kafka-server-start.sh config/server.properties创建topiccd kafka_2.11-1.1.1sh bin/kafka-topics.sh –create –topic demotopic –replication-factor 1 –partitions 3 –zookeeper localhost:2181Created topic “demotopic”.实例producer @Test public void testProducer() throws InterruptedException { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, “sample-producer”); props.put(ProducerConfig.ACKS_CONFIG, “all”); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); SenderOptions<Integer, String> senderOptions = SenderOptions.create(props); KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions); SimpleDateFormat dateFormat = new SimpleDateFormat(“HH:mm:ss:SSS z dd MMM yyyy”); CountDownLatch latch = new CountDownLatch(100); sender.<Integer>send(Flux.range(1, 100) .map(i -> SenderRecord.create(new ProducerRecord<>(TOPIC, i, “Message_” + i), i))) .doOnError(e -> log.error(“Send failed”, e)) .subscribe(r -> { RecordMetadata metadata = r.recordMetadata(); System.out.printf(“Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n”, r.correlationMetadata(), metadata.topic(), metadata.partition(), metadata.offset(), dateFormat.format(new Date(metadata.timestamp()))); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); sender.close(); }consumer @Test public void testConsumer() throws InterruptedException { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.CLIENT_ID_CONFIG, “sample-consumer”); props.put(ConsumerConfig.GROUP_ID_CONFIG, “sample-group”); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(props); SimpleDateFormat dateFormat = new SimpleDateFormat(“HH:mm:ss:SSS z dd MMM yyyy”); CountDownLatch latch = new CountDownLatch(100); ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(TOPIC)) .addAssignListener(partitions -> log.debug(“onPartitionsAssigned {}”, partitions)) .addRevokeListener(partitions -> log.debug(“onPartitionsRevoked {}”, partitions)); Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive(); Disposable disposable = kafkaFlux.subscribe(record -> { ReceiverOffset offset = record.receiverOffset(); System.out.printf(“Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n”, offset.topicPartition(), offset.offset(), dateFormat.format(new Date(record.timestamp())), record.key(), record.value()); offset.acknowledge(); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); }小结reactor-kafka对kafka的api进行封装,改造为reactive streams模式,这样用起来更为顺手,熟悉reactor的开发人员可以轻车熟路。docreactor-kafka-samplesReactor Kafka Reference Guide ...

October 5, 2018 · 2 min · jiezi