欢送拜访我的GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;
《disruptor笔记》系列链接
- 疾速入门
- Disruptor类剖析
- 环形队列的根底操作(不必Disruptor类)
- 事件生产知识点小结
- 事件生产实战
- 常见场景
- 期待策略
- 知识点补充(终篇)
本篇概览
本篇是《disruptor笔记》的第五篇,前文《disruptor笔记之四:事件生产知识点小结》从实践上梳理剖析了独立生产和独特生产,留下了三个工作,明天就来成这些工作,即编码实现以下三个场景:
- 100个订单,短信和邮件系统独立生产
- 100个订单,邮件系统的两个邮件服务器独特生产;
- 100个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产;
源码下载
- 本篇实战中的残缺源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...):
名称 | 链接 | 备注 |
---|---|---|
我的项目主页 | https://github.com/zq2599/blo... | 该我的项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blo... | 该我的项目源码的仓库地址,https协定 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该我的项目源码的仓库地址,ssh协定 |
- 这个git我的项目中有多个文件夹,本次实战的源码在<font color="blue">disruptor-tutorials</font>文件夹下,如下图红框所示:
- <font color="blue">disruptor-tutorials</font>是个父工程,外面有多个module,本篇实战的module是<font color="red">consume-mode</font>,如下图红框所示:
编写公共代码
- 为了实现工作,编码实现下面那三个场景,咱们须要先把公共代码写好;
- 首先是在父工程<font color="blue">disruptor-tutorials</font>上面新建名为<font color="red">consume-mode</font>的module,其build.gradle内容如下:
plugins { id 'org.springframework.boot'}dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'com.lmax:disruptor' testImplementation('org.springframework.boot:spring-boot-starter-test')}
- springboot启动类:
package com.bolingcavalry;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ConsumeModeApplication { public static void main(String[] args) { SpringApplication.run(ConsumeModeApplication.class, args); }}
- 订单事件定义:
package com.bolingcavalry.service;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class OrderEvent { private String value;}
- 订单事件的工程类,定义事件实例如何创立:
package com.bolingcavalry.service;import com.lmax.disruptor.EventFactory;public class OrderEventFactory implements EventFactory<OrderEvent> { @Override public OrderEvent newInstance() { return new OrderEvent(); }}
- 订单事件生产者类,定义如何将业务信息通过事件公布到环形队列:
package com.bolingcavalry.service;import com.lmax.disruptor.RingBuffer;public class OrderEventProducer { // 存储数据的环形队列 private final RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(String content) { // ringBuffer是个队列,其next办法返回的是下最初一条记录之后的地位,这是个可用地位 long sequence = ringBuffer.next(); try { // sequence地位取出的事件是空事件 OrderEvent orderEvent = ringBuffer.get(sequence); // 空事件增加业务信息 orderEvent.setValue(content); } finally { // 公布 ringBuffer.publish(sequence); } }}
- 生产订单事件的短信服务,实现EventHandler接口,所以是用在<font color="red">独立生产</font>的场景:
package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class SmsEventHandler implements EventHandler<OrderEvent> { public SmsEventHandler(Consumer<?> consumer) { this.consumer = consumer; } // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("短信服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 这里延时100ms,模仿生产事件的逻辑的耗时 Thread.sleep(100); // 如果内部传入了consumer,就要执行一次accept办法 if (null!=consumer) { consumer.accept(null); } }}
- 生产订单事件的邮件服务,实现EventHandler接口,所以是用在<font color="red">独立生产</font>的场景:
package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class MailEventHandler implements EventHandler<OrderEvent> { public MailEventHandler(Consumer<?> consumer) { this.consumer = consumer; } // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("邮件服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 这里延时100ms,模仿生产事件的逻辑的耗时 Thread.sleep(100); // 如果内部传入了consumer,就要执行一次accept办法 if (null!=consumer) { consumer.accept(null); } }}
- 生产订单事件的邮件服务,实现WorkHandler接口,所以是用在<font color="red">独特生产</font>的场景:
package com.bolingcavalry.service;import com.lmax.disruptor.WorkHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class MailWorkHandler implements WorkHandler<OrderEvent> { public MailWorkHandler(Consumer<?> consumer) { this.consumer = consumer; } // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(OrderEvent event) throws Exception { log.info("独特生产模式的邮件服务 : {}", event); // 这里延时100ms,模仿生产事件的逻辑的耗时 Thread.sleep(100); // 如果内部传入了consumer,就要执行一次accept办法 if (null!=consumer) { consumer.accept(null); } }}
- 最初,将公布和生产事件的逻辑写在一个抽象类里,然而具体如何生产事件并不在此类中实现,而是留给子类,这个抽象类中有几处要留神的中央稍后会提到:
package com.bolingcavalry.service;import com.lmax.disruptor.dsl.Disruptor;import lombok.Setter;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;import javax.annotation.PostConstruct;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;public abstract class ConsumeModeService { /** * 独立消费者数量 */ public static final int INDEPENDENT_CONSUMER_NUM = 2; /** * 环形缓冲区大小 */ protected int BUFFER_SIZE = 16; protected Disruptor<OrderEvent> disruptor; @Setter private OrderEventProducer producer; /** * 统计音讯总数 */ protected final AtomicLong eventCount = new AtomicLong(); /** * 这是辅助测试用的, * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待, * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法, * 这样测试主线程就能够完结期待了 */ private CountDownLatch countDownLatch; /** * 这是辅助测试用的, * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待, * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法, * 这样测试主线程就能够完结期待了 */ private int countDownLatchGate; /** * 筹备一个匿名类,传给disruptor的事件处理类, * 这样每次处理事件时,都会将曾经处理事件的总数打印进去 */ protected Consumer<?> eventCountPrinter = new Consumer<Object>() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); /** * 这是辅助测试用的, * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待, * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法, * 这样测试主线程就能够完结期待了 */ if (null!=countDownLatch && count>=countDownLatchGate) { countDownLatch.countDown(); } } }; /** * 公布一个事件 * @param value * @return */ public void publish(String value) { producer.onData(value); } /** * 返回曾经解决的工作总数 * @return */ public long eventCount() { return eventCount.get(); } /** * 这是辅助测试用的, * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待, * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法, * 这样测试主线程就能够完结期待了 * @param countDownLatch * @param countDownLatchGate */ public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) { this.countDownLatch = countDownLatch; this.countDownLatchGate = countDownLatchGate; } /** * 留给子类实现具体的事件生产逻辑 */ protected abstract void disruptorOperate(); @PostConstruct private void init() { // 实例化 disruptor = new Disruptor<>(new OrderEventFactory(), BUFFER_SIZE, new CustomizableThreadFactory("event-handler-")); // 留给子类实现具体的事件生产逻辑 disruptorOperate(); // 启动 disruptor.start(); // 生产者 setProducer(new OrderEventProducer(disruptor.getRingBuffer())); }}
- 上述代码,有以下几处须要留神:
- init办法是spring bean实例化后要执行的办法,这外面实例化Disruptor,还启动了生产线程,并且实例化了事件生产者,具体的事件生产逻辑,由子类在disruptorOperate办法中实现;
- eventCountPrinter是个匿名类实例,传给事件生产的handler后,每生产一个事件都会执行一次eventCountPrinter.accept办法,这样就把生产事件的总数精确的保留在eventCount变量中了;
- countDownLatch和countDownLatchGate是为了辅助单元测试而筹备的,测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待,在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法,这样测试主线程就能够完结期待了
- 至此,专用代码就写完了,可见形象父类曾经做好了大部分事件,咱们的子类能够聚焦事件生产的逻辑编排了,开始挨个实现那三个场景;
100个订单,短信和邮件系统独立生产
- 两个消费者独立生产的逻辑非常简单,就一行代码,调用<font color="blue">handleEventsWith</font>办法把所有消费者实例传进去,就完事了:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import com.bolingcavalry.service.MailEventHandler;import com.bolingcavalry.service.SmsEventHandler;import org.springframework.stereotype.Service;@Service("independentModeService")public class IndependentModeServiceImpl extends ConsumeModeService { @Override protected void disruptorOperate() { // 调用handleEventsWith,示意创立的多个消费者,每个都是独立生产的 // 这里创立两个消费者,一个是短信的,一个是邮件的 disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter)); }}
- 单元测试代码如下,要留神的中央是公布完<font color="red">100</font>事件后,调用<font color="blue">countDownLatch.await()</font>办法开始期待,直到消费者线程调用<font color="blue">countDownLatch.countDown()</font>办法解除期待,还有就是预期的生产音讯总数等于<font color="red">200</font>:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CountDownLatch;import static org.junit.Assert.assertEquals;@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class ConsumeModeServiceTest { @Autowired @Qualifier("independentModeService") ConsumeModeService independentModeService; /** * 测试时生产的音讯数量 */ private static final int EVENT_COUNT = 100; private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); // 通知service,等生产到expectEventCount个音讯时,就执行countDownLatch.countDown办法 service.setCountDown(countDownLatch, expectEventCount); for(int i=0;i<eventCount;i++) { log.info("publich {}", i); service.publish(String.valueOf(i)); } // 以后线程开始期待,后面的service.setCountDown办法曾经通知过service, // 等生产到expectEventCount个音讯时,就执行countDownLatch.countDown办法 // 千万留神,要调用await办法,而不是wait办法! countDownLatch.await(); // 生产的事件总数应该等于公布的事件数 assertEquals(expectEventCount, service.eventCount()); } @Test public void testIndependentModeService() throws InterruptedException { log.info("start testIndependentModeService"); testConsumeModeService(independentModeService, EVENT_COUNT, EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM); }}
- 单元测试执行后果如下,合乎预期:
100个订单,邮件系统的两个邮件服务器独特生产
- 两个消费者独特生产的代码也很简略,调用handleEventsWithWorkerPool办法即可,把独特生产的MailWorkHandler实例作为参数传入:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import com.bolingcavalry.service.MailWorkHandler;import org.springframework.stereotype.Service;@Service("shareModeService")public class ShareModeServiceImpl extends ConsumeModeService { @Override protected void disruptorOperate() { // mailWorkHandler1模仿一号邮件服务器 MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter); // mailWorkHandler2模仿一号邮件服务器 MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter); // 调用handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产 disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2); }}
- 单元测试是在ConsumeModeServiceTest.java中增加如下代码,留神因为是独特生产,因而预期的生产事件数等于音讯数,都是100:
@Autowired @Qualifier("shareModeService") ConsumeModeService shareModeService; @Test public void testShareModeService() throws InterruptedException { log.info("start testShareModeService"); testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT); }
- 执行单元测试,后果如下图:
100个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产
- 最初一个场景,仍旧很简略,handleEventsWith调用一次,再调用一次handleEventsWithWorkerPool即可:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import com.bolingcavalry.service.MailWorkHandler;import com.bolingcavalry.service.SmsEventHandler;import org.springframework.stereotype.Service;@Service("independentAndShareModeService")public class IndependentAndShareModeServiceImpl extends ConsumeModeService { @Override protected void disruptorOperate() { // 调用handleEventsWith,示意创立的多个消费者,每个都是独立生产的 // 这里创立一个消费者,短信服务 disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter)); // mailWorkHandler1模仿一号邮件服务器 MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter); // mailWorkHandler2模仿一号邮件服务器 MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter); // 调用handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产 disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2); }}
- 单元测试是在ConsumeModeServiceTest.java中增加如下代码,预期的生产事件数应该是200,因为整体上是两个独立生产,只不过其中的一个外部有两个消费者独特生产:
@Autowired @Qualifier("independentAndShareModeService") ConsumeModeService independentAndShareModeService; @Test public void independentAndShareModeService() throws InterruptedException { log.info("start independentAndShareModeService"); testConsumeModeService(independentAndShareModeService, EVENT_COUNT, EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM); }
- 单元测试后果如下,合乎预期:
- 至此,独立生产和独特生产的实战就实现了,借助disruptor,三个常见场景都能够轻松实现,如果您正在做这些场景的开发,心愿本文能给您一些参考;
你不孤独,欣宸原创一路相伴
- Java系列
- Spring系列
- Docker系列
- kubernetes系列
- 数据库+中间件系列
- DevOps系列
欢送关注公众号:程序员欣宸
微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos