欢送拜访我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;

《disruptor笔记》系列链接

  1. 疾速入门
  2. Disruptor类剖析
  3. 环形队列的根底操作(不必Disruptor类)
  4. 事件生产知识点小结
  5. 事件生产实战
  6. 常见场景
  7. 期待策略
  8. 知识点补充(终篇)

本篇概览

本篇是《disruptor笔记》的第五篇,前文《disruptor笔记之四:事件生产知识点小结》从实践上梳理剖析了独立生产和独特生产,留下了三个工作,明天就来成这些工作,即编码实现以下三个场景:

  1. 100个订单,短信和邮件系统独立生产
  2. 100个订单,邮件系统的两个邮件服务器独特生产;
  3. 100个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产;

源码下载

  • 本篇实战中的残缺源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...):
名称链接备注
我的项目主页https://github.com/zq2599/blo...该我的项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blo...该我的项目源码的仓库地址,https协定
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该我的项目源码的仓库地址,ssh协定
  • 这个git我的项目中有多个文件夹,本次实战的源码在<font color="blue">disruptor-tutorials</font>文件夹下,如下图红框所示:

  • <font color="blue">disruptor-tutorials</font>是个父工程,外面有多个module,本篇实战的module是<font color="red">consume-mode</font>,如下图红框所示:

编写公共代码

  • 为了实现工作,编码实现下面那三个场景,咱们须要先把公共代码写好;
  • 首先是在父工程<font color="blue">disruptor-tutorials</font>上面新建名为<font color="red">consume-mode</font>的module,其build.gradle内容如下:
plugins {    id 'org.springframework.boot'}dependencies {    implementation 'org.projectlombok:lombok'    implementation 'org.springframework.boot:spring-boot-starter'    implementation 'org.springframework.boot:spring-boot-starter-web'    implementation 'com.lmax:disruptor'    testImplementation('org.springframework.boot:spring-boot-starter-test')}
  • springboot启动类:
package com.bolingcavalry;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ConsumeModeApplication {    public static void main(String[] args) {        SpringApplication.run(ConsumeModeApplication.class, args);    }}
  • 订单事件定义:
package com.bolingcavalry.service;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class OrderEvent {    private String value;}
    • 订单事件的工程类,定义事件实例如何创立:
package com.bolingcavalry.service;import com.lmax.disruptor.EventFactory;public class OrderEventFactory implements EventFactory<OrderEvent> {    @Override    public OrderEvent newInstance() {        return new OrderEvent();    }}
  • 订单事件生产者类,定义如何将业务信息通过事件公布到环形队列:
package com.bolingcavalry.service;import com.lmax.disruptor.RingBuffer;public class OrderEventProducer {    // 存储数据的环形队列    private final RingBuffer<OrderEvent> ringBuffer;    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {        this.ringBuffer = ringBuffer;    }    public void onData(String content) {        // ringBuffer是个队列,其next办法返回的是下最初一条记录之后的地位,这是个可用地位        long sequence = ringBuffer.next();        try {            // sequence地位取出的事件是空事件            OrderEvent orderEvent = ringBuffer.get(sequence);            // 空事件增加业务信息            orderEvent.setValue(content);        } finally {            // 公布            ringBuffer.publish(sequence);        }    }}
  • 生产订单事件的短信服务,实现EventHandler接口,所以是用在<font color="red">独立生产</font>的场景:
package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class SmsEventHandler implements EventHandler<OrderEvent> {    public SmsEventHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {        log.info("短信服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);        // 这里延时100ms,模仿生产事件的逻辑的耗时        Thread.sleep(100);        // 如果内部传入了consumer,就要执行一次accept办法        if (null!=consumer) {            consumer.accept(null);        }    }}
  • 生产订单事件的邮件服务,实现EventHandler接口,所以是用在<font color="red">独立生产</font>的场景:
package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class MailEventHandler implements EventHandler<OrderEvent> {    public MailEventHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {        log.info("邮件服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);        // 这里延时100ms,模仿生产事件的逻辑的耗时        Thread.sleep(100);        // 如果内部传入了consumer,就要执行一次accept办法        if (null!=consumer) {            consumer.accept(null);        }    }}
  • 生产订单事件的邮件服务,实现WorkHandler接口,所以是用在<font color="red">独特生产</font>的场景:
package com.bolingcavalry.service;import com.lmax.disruptor.WorkHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class MailWorkHandler implements WorkHandler<OrderEvent> {    public MailWorkHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(OrderEvent event) throws Exception {        log.info("独特生产模式的邮件服务 : {}", event);        // 这里延时100ms,模仿生产事件的逻辑的耗时        Thread.sleep(100);        // 如果内部传入了consumer,就要执行一次accept办法        if (null!=consumer) {            consumer.accept(null);        }    }}
  • 最初,将公布和生产事件的逻辑写在一个抽象类里,然而具体如何生产事件并不在此类中实现,而是留给子类,这个抽象类中有几处要留神的中央稍后会提到:
package com.bolingcavalry.service;import com.lmax.disruptor.dsl.Disruptor;import lombok.Setter;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;import javax.annotation.PostConstruct;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;public abstract class ConsumeModeService {    /**     * 独立消费者数量     */    public static final int INDEPENDENT_CONSUMER_NUM = 2;    /**     * 环形缓冲区大小     */    protected int BUFFER_SIZE = 16;    protected Disruptor<OrderEvent> disruptor;    @Setter    private OrderEventProducer producer;    /**     * 统计音讯总数     */    protected final AtomicLong eventCount = new AtomicLong();    /**     * 这是辅助测试用的,     * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待,     * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法,     * 这样测试主线程就能够完结期待了     */    private CountDownLatch countDownLatch;    /**     * 这是辅助测试用的,     * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待,     * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法,     * 这样测试主线程就能够完结期待了     */    private int countDownLatchGate;    /**     * 筹备一个匿名类,传给disruptor的事件处理类,     * 这样每次处理事件时,都会将曾经处理事件的总数打印进去     */    protected Consumer<?> eventCountPrinter = new Consumer<Object>() {        @Override        public void accept(Object o) {            long count = eventCount.incrementAndGet();            /**             * 这是辅助测试用的,             * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待,             * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法,             * 这样测试主线程就能够完结期待了             */            if (null!=countDownLatch && count>=countDownLatchGate) {                countDownLatch.countDown();            }        }    };    /**     * 公布一个事件     * @param value     * @return     */    public void publish(String value) {        producer.onData(value);    }    /**     * 返回曾经解决的工作总数     * @return     */    public long eventCount() {        return eventCount.get();    }    /**     * 这是辅助测试用的,     * 测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待,     * 在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法,     * 这样测试主线程就能够完结期待了     * @param countDownLatch     * @param countDownLatchGate     */    public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {        this.countDownLatch = countDownLatch;        this.countDownLatchGate = countDownLatchGate;    }    /**     * 留给子类实现具体的事件生产逻辑     */    protected abstract void disruptorOperate();    @PostConstruct    private void init() {        // 实例化        disruptor = new Disruptor<>(new OrderEventFactory(),                BUFFER_SIZE,                new CustomizableThreadFactory("event-handler-"));        // 留给子类实现具体的事件生产逻辑        disruptorOperate();        // 启动        disruptor.start();        // 生产者        setProducer(new OrderEventProducer(disruptor.getRingBuffer()));    }}
  • 上述代码,有以下几处须要留神:
  • init办法是spring bean实例化后要执行的办法,这外面实例化Disruptor,还启动了生产线程,并且实例化了事件生产者,具体的事件生产逻辑,由子类在disruptorOperate办法中实现;
  • eventCountPrinter是个匿名类实例,传给事件生产的handler后,每生产一个事件都会执行一次eventCountPrinter.accept办法,这样就把生产事件的总数精确的保留在eventCount变量中了;
  • countDownLatch和countDownLatchGate是为了辅助单元测试而筹备的,测试的时候,实现事件公布后,测试主线程就用这个countDownLatch开始期待,在生产到指定的数量(countDownLatchGate)后,生产线程执行countDownLatch的countDown办法,这样测试主线程就能够完结期待了
  • 至此,专用代码就写完了,可见形象父类曾经做好了大部分事件,咱们的子类能够聚焦事件生产的逻辑编排了,开始挨个实现那三个场景;

100个订单,短信和邮件系统独立生产

  • 两个消费者独立生产的逻辑非常简单,就一行代码,调用<font color="blue">handleEventsWith</font>办法把所有消费者实例传进去,就完事了:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import com.bolingcavalry.service.MailEventHandler;import com.bolingcavalry.service.SmsEventHandler;import org.springframework.stereotype.Service;@Service("independentModeService")public class IndependentModeServiceImpl extends ConsumeModeService {    @Override    protected void disruptorOperate() {        // 调用handleEventsWith,示意创立的多个消费者,每个都是独立生产的        // 这里创立两个消费者,一个是短信的,一个是邮件的        disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));    }}
  • 单元测试代码如下,要留神的中央是公布完<font color="red">100</font>事件后,调用<font color="blue">countDownLatch.await()</font>办法开始期待,直到消费者线程调用<font color="blue">countDownLatch.countDown()</font>办法解除期待,还有就是预期的生产音讯总数等于<font color="red">200</font>:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CountDownLatch;import static org.junit.Assert.assertEquals;@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class ConsumeModeServiceTest {    @Autowired    @Qualifier("independentModeService")    ConsumeModeService independentModeService;    /**     * 测试时生产的音讯数量     */    private static final int EVENT_COUNT = 100;    private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(1);        // 通知service,等生产到expectEventCount个音讯时,就执行countDownLatch.countDown办法        service.setCountDown(countDownLatch, expectEventCount);        for(int i=0;i<eventCount;i++) {            log.info("publich {}", i);            service.publish(String.valueOf(i));        }        // 以后线程开始期待,后面的service.setCountDown办法曾经通知过service,        // 等生产到expectEventCount个音讯时,就执行countDownLatch.countDown办法        // 千万留神,要调用await办法,而不是wait办法!        countDownLatch.await();        // 生产的事件总数应该等于公布的事件数        assertEquals(expectEventCount, service.eventCount());    }    @Test    public void testIndependentModeService() throws InterruptedException {        log.info("start testIndependentModeService");        testConsumeModeService(independentModeService,                EVENT_COUNT,                EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);    }}
  • 单元测试执行后果如下,合乎预期:

100个订单,邮件系统的两个邮件服务器独特生产

  • 两个消费者独特生产的代码也很简略,调用handleEventsWithWorkerPool办法即可,把独特生产的MailWorkHandler实例作为参数传入:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import com.bolingcavalry.service.MailWorkHandler;import org.springframework.stereotype.Service;@Service("shareModeService")public class ShareModeServiceImpl extends ConsumeModeService {    @Override    protected void disruptorOperate() {        // mailWorkHandler1模仿一号邮件服务器        MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);        // mailWorkHandler2模仿一号邮件服务器        MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);        // 调用handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产        disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);    }}
  • 单元测试是在ConsumeModeServiceTest.java中增加如下代码,留神因为是独特生产,因而预期的生产事件数等于音讯数,都是100:
    @Autowired    @Qualifier("shareModeService")    ConsumeModeService shareModeService;    @Test    public void testShareModeService() throws InterruptedException {        log.info("start testShareModeService");        testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);    }
  • 执行单元测试,后果如下图:

100个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产

  • 最初一个场景,仍旧很简略,handleEventsWith调用一次,再调用一次handleEventsWithWorkerPool即可:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;import com.bolingcavalry.service.MailWorkHandler;import com.bolingcavalry.service.SmsEventHandler;import org.springframework.stereotype.Service;@Service("independentAndShareModeService")public class IndependentAndShareModeServiceImpl extends ConsumeModeService {    @Override    protected void disruptorOperate() {        // 调用handleEventsWith,示意创立的多个消费者,每个都是独立生产的        // 这里创立一个消费者,短信服务        disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter));        // mailWorkHandler1模仿一号邮件服务器        MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);        // mailWorkHandler2模仿一号邮件服务器        MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);        // 调用handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产        disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);    }}
  • 单元测试是在ConsumeModeServiceTest.java中增加如下代码,预期的生产事件数应该是200,因为整体上是两个独立生产,只不过其中的一个外部有两个消费者独特生产:
    @Autowired    @Qualifier("independentAndShareModeService")    ConsumeModeService independentAndShareModeService;    @Test    public void independentAndShareModeService() throws InterruptedException {        log.info("start independentAndShareModeService");        testConsumeModeService(independentAndShareModeService,                EVENT_COUNT,                EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);    }
  • 单元测试后果如下,合乎预期:

  • 至此,独立生产和独特生产的实战就实现了,借助disruptor,三个常见场景都能够轻松实现,如果您正在做这些场景的开发,心愿本文能给您一些参考;

你不孤独,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos