欢送拜访我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;

《disruptor笔记》系列链接

  1. 疾速入门
  2. Disruptor类剖析
  3. 环形队列的根底操作(不必Disruptor类)
  4. 事件生产知识点小结
  5. 事件生产实战
  6. 常见场景
  7. 期待策略
  8. 知识点补充(终篇)

本篇概览

  • 本文是《disruptor笔记》系列的第三篇,次要工作是编码实现音讯生产和生产,与《disruptor笔记之一:疾速入门》不同的是,本次开发不应用Disruptor类,和Ring Buffer(环形队列)相干的操作都是本人写代码实现;
  • 这种脱离Disruptor类操作Ring Buffer的做法,不适宜用在生产环境,但在学习Disruptor的过程中,这是种高效的学习伎俩,通过本篇实战后,在今后应用Disruptor时,您在开发、调试、优化等各种场景下都能更加得心应手;
  • 简略的音讯生产生产已不能满足咱们的学习激情,明天的实战要挑战以下三个场景:
  • 100个事件,单个消费者生产;
  • 100个事件,三个消费者,每个都单独生产这个100个事件;
  • 100个事件,三个消费者独特生产这个100个事件;

前文回顾

为了实现本篇的实战,前文《disruptor笔记之二:Disruptor类剖析》已做了充沛的钻研剖析,倡议观看,这里简略回顾以下Disruptor类的几个外围性能,这也是咱们编码时要实现的:

  1. 创立环形队列(RingBuffer对象)
  2. 创立SequenceBarrier对象,用于接管ringBuffer中的可生产事件
  3. 创立BatchEventProcessor,负责生产事件
  4. 绑定BatchEventProcessor对象的异样解决类
  5. 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
  6. 启动独立线程,用来执行生产事件的业务逻辑
  7. 实践剖析曾经实现,接下来开始编码;

源码下载

  • 本篇实战中的残缺源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...:
名称链接备注
我的项目主页https://github.com/zq2599/blo...该我的项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blo...该我的项目源码的仓库地址,https协定
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该我的项目源码的仓库地址,ssh协定
  • 这个git我的项目中有多个文件夹,本次实战的源码在<font color="blue">disruptor-tutorials</font>文件夹下,如下图红框所示:

  • <font color="blue">disruptor-tutorials</font>是个父工程,外面有多个module,本篇实战的module是<font color="red">low-level-operate</font>,如下图红框所示:

开发

  • 进入编码阶段,明天的工作是挑战以下三个场景:
  • 100个事件,单个消费者生产;
  • 100个事件,三个消费者,每个都单独生产这个100个事件;
  • 100个事件,三个消费者独特生产这个100个事件;
  • 咱们先把工程建好,而后编写公共代码,例如事件定义、事件工厂等,最初才是每个场景的开发;
  • 在父工程<font color="blue">disruptor-tutorials</font>新增名为<font color="red">low-level-operate</font>的module,其build.gradle如下:
plugins {    id 'org.springframework.boot'}dependencies {    implementation 'org.projectlombok:lombok'    implementation 'org.springframework.boot:spring-boot-starter'    implementation 'org.springframework.boot:spring-boot-starter-web'    implementation 'com.lmax:disruptor'    testImplementation('org.springframework.boot:spring-boot-starter-test')}
  • 而后是springboot启动类:
package com.bolingcavalry;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class LowLevelOperateApplication {    public static void main(String[] args) {        SpringApplication.run(LowLevelOperateApplication.class, args);    }}
  • 事件类,这是事件的定义:
package com.bolingcavalry.service;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class StringEvent {    private String value;}
  • 事件工厂,定义如何在内存中创立事件对象:
package com.bolingcavalry.service;import com.lmax.disruptor.EventFactory;public class StringEventFactory implements EventFactory<StringEvent> {    @Override    public StringEvent newInstance() {        return new StringEvent();    }}
  • 事件生产类,定义如何将业务逻辑的事件转为disruptor事件公布到环形队列,用于生产:
package com.bolingcavalry.service;import com.lmax.disruptor.RingBuffer;public class StringEventProducer {    // 存储数据的环形队列    private final RingBuffer<StringEvent> ringBuffer;    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {        this.ringBuffer = ringBuffer;    }    public void onData(String content) {        // ringBuffer是个队列,其next办法返回的是下最初一条记录之后的地位,这是个可用地位        long sequence = ringBuffer.next();        try {            // sequence地位取出的事件是空事件            StringEvent stringEvent = ringBuffer.get(sequence);            // 空事件增加业务信息            stringEvent.setValue(content);        } finally {            // 公布            ringBuffer.publish(sequence);        }    }}
  • 事件处理类,收到事件后具体的业务解决逻辑:
package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class StringEventHandler implements EventHandler<StringEvent> {    public StringEventHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);        // 这里延时100ms,模仿生产事件的逻辑的耗时        Thread.sleep(100);        // 如果内部传入了consumer,就要执行一次accept办法        if (null!=consumer) {            consumer.accept(null);        }    }}
  • 定义一个接口,内部通过调用接口的办法来生产音讯,再放几个常量在外面前面会用到:
package com.bolingcavalry.service;public interface LowLevelOperateService {    /**     * 消费者数量     */    int CONSUMER_NUM = 3;    /**     * 环形缓冲区大小     */    int BUFFER_SIZE = 16;    /**     * 公布一个事件     * @param value     * @return     */    void publish(String value);    /**     * 返回曾经解决的工作总数     * @return     */    long eventCount();}
  • 以上就是公共代码了,接下来一一实现之前布局的三个场景;
  • 100个事件,单个消费者生产

  • 这是最简略的性能了,实现公布音讯和单个消费者生产的性能,代码如下,有几处要留神的中央稍后提到:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.BatchEventProcessor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.SequenceBarrier;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service("oneConsumer")@Slf4jpublic class OneConsumerServiceImpl implements LowLevelOperateService {    private RingBuffer<StringEvent> ringBuffer;    private StringEventProducer producer;    /**     * 统计音讯总数     */    private final AtomicLong eventCount = new AtomicLong();    private ExecutorService executors;    @PostConstruct    private void init() {        // 筹备一个匿名类,传给disruptor的事件处理类,        // 这样每次处理事件时,都会将曾经处理事件的总数打印进去        Consumer<?> eventCountPrinter = new Consumer<Object>() {            @Override            public void accept(Object o) {                long count = eventCount.incrementAndGet();                log.info("receive [{}] event", count);            }        };        // 创立环形队列实例        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);        // 筹备线程池        executors = Executors.newFixedThreadPool(1);        //创立SequenceBarrier        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();        // 创立事件处理的工作类,外面执行StringEventHandler处理事件        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(                ringBuffer,                sequenceBarrier,                new StringEventHandler(eventCountPrinter));        // 将消费者的sequence传给环形队列        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());        // 在一个独立线程中取事件并生产        executors.submit(batchEventProcessor);        // 生产者        producer = new StringEventProducer(ringBuffer);    }    @Override    public void publish(String value) {        producer.onData(value);    }    @Override    public long eventCount() {        return eventCount.get();    }}
  • 上述代码有以下几处须要留神:
  1. 本人创立环形队列RingBuffer实例
  2. 本人筹备线程池,外面的线程用来获取和生产音讯
  3. 本人入手创立BatchEventProcessor实例,并把事件处理类传入
  4. 通过ringBuffer创立sequenceBarrier,传给BatchEventProcessor实例应用
  5. 将BatchEventProcessor的sequence传给ringBuffer,确保ringBuffer的生产和生产不会呈现凌乱
  6. 启动线程池,意味着BatchEventProcessor实例在一个独立线程中一直的从ringBuffer中获取事件并生产;
  • 为了验证上述代码是否失常工作,我这里写了个单元测试类,如下所示,逻辑很简略,调用OneConsumerServiceImpl.publish办法一百次,产生一百个事件,再查看OneConsumerServiceImpl记录的生产事件总数是不是等于一百:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.LowLevelOperateService;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import static org.junit.Assert.assertEquals;@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class LowLeverOperateServiceImplTest {    @Autowired    @Qualifier("oneConsumer")    LowLevelOperateService oneConsumer;    private static final int EVENT_COUNT = 100;    private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {        for(int i=0;i<eventCount;i++) {            log.info("publich {}", i);            service.publish(String.valueOf(i));        }        // 异步生产,因而须要延时期待        Thread.sleep(10000);        // 生产的事件总数应该等于公布的事件数        assertEquals(expectEventCount, service.eventCount());    }    @Test    public void testOneConsumer() throws InterruptedException {        log.info("start testOneConsumerService");        testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);    }
  • 留神,如果您是间接在IDEA上点击图标来执行单元测试,记得勾选下图红框中选项,否则可能呈现编译失败:

  • 执行上述单元测试类,后果如下图所示,音讯的生产和生产都合乎预期,并且生产逻辑是在独立线程中执行的:

  • 持续挑战下一个场景;

100个事件,三个消费者,每个都单独生产这个100个事件

  • 这个场景在kafka中也有,就是三个消费者的group不同,这样每一条音讯,这两个消费者各自生产一次;
  • 因而,100个事件,3个消费者每人都会独立生产这100个事件,一共生产300次;
  • 代码如下,有几处要留神的中央稍后提到:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.BatchEventProcessor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.SequenceBarrier;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service("multiConsumer")@Slf4jpublic class MultiConsumerServiceImpl implements LowLevelOperateService {    private RingBuffer<StringEvent> ringBuffer;    private StringEventProducer producer;    /**     * 统计音讯总数     */    private final AtomicLong eventCount = new AtomicLong();    /**     * 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和生产音讯     * @param executorService     */    private void addProcessor(ExecutorService executorService) {        // 筹备一个匿名类,传给disruptor的事件处理类,        // 这样每次处理事件时,都会将曾经处理事件的总数打印进去        Consumer<?> eventCountPrinter = new Consumer<Object>() {            @Override            public void accept(Object o) {                long count = eventCount.incrementAndGet();                log.info("receive [{}] event", count);            }        };        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(                ringBuffer,                ringBuffer.newBarrier(),                new StringEventHandler(eventCountPrinter));        // 将以后消费者的sequence实例传给ringBuffer        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());        // 启动独立线程获取和生产事件        executorService.submit(batchEventProcessor);    }    @PostConstruct    private void init() {        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);        // 创立多个消费者,并在独立线程中获取和生产事件        for (int i=0;i<CONSUMER_NUM;i++) {            addProcessor(executorService);        }        // 生产者        producer = new StringEventProducer(ringBuffer);    }    @Override    public void publish(String value) {        producer.onData(value);    }    @Override    public long eventCount() {        return eventCount.get();    }}
  • 上述代码和后面的OneConsumerServiceImpl相比差异不大,次要是创立了多个BatchEventProcessor实例,而后别离在线程池中提交;
  • 验证办法仍旧是单元测试,在方才的LowLeverOperateServiceImplTest.java中减少代码即可,留神testLowLevelOperateService的第三个参数是<font color="blue">EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM</font>,示意预期的被生产音讯数为<font color="red">300</font>:
     @Autowired    @Qualifier("multiConsumer")    LowLevelOperateService multiConsumer;    @Test    public void testMultiConsumer() throws InterruptedException {        log.info("start testMultiConsumer");        testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);    }
  • 执行单元测试,如下图所示,一共生产了300个事件,并且三个消费者在不动线程:

100个事件,三个消费者独特生产这个100个事件

  • 本篇的最初一个实战是公布100个事件,而后让三个消费者独特生产100个(例如A生产33个,B生产33个,C生产34个);
  • 后面用到的BatchEventProcessor是用来独立生产的,不适宜多个消费者独特生产,这种多个生产独特生产的场景须要借助WorkerPool来实现,这个名字还是很形象的:一个池子外面有很多个工作者,把工作放入这个池子,工作者们每人解决一部分,大家合力将工作实现;
  • 传入WorkerPool的消费者须要实现WorkHandler接口,于是新增一个实现类:
package com.bolingcavalry.service;import com.lmax.disruptor.WorkHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class StringWorkHandler implements WorkHandler<StringEvent> {    public StringWorkHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(StringEvent event) throws Exception {        log.info("work handler event : {}", event);        // 这里延时100ms,模仿生产事件的逻辑的耗时        Thread.sleep(100);        // 如果内部传入了consumer,就要执行一次accept办法        if (null!=consumer) {            consumer.accept(null);        }    }}
  • 新增服务类,实现独特生产的逻辑,有几处要留神的中央稍后会提到:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.*;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service("workerPoolConsumer")@Slf4jpublic class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {    private RingBuffer<StringEvent> ringBuffer;    private StringEventProducer producer;    /**     * 统计音讯总数     */    private final AtomicLong eventCount = new AtomicLong();    @PostConstruct    private void init() {        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);        StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];        // 创立多个StringWorkHandler实例,放入一个数组中        for (int i=0;i < CONSUMER_NUM;i++) {            handlers[i] = new StringWorkHandler(o -> {                long count = eventCount.incrementAndGet();                log.info("receive [{}] event", count);            });        }        // 创立WorkerPool实例,将StringWorkHandler实例的数组传进去,代表独特消费者的数量        WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);        // 这一句很重要,去掉就会呈现反复生产同一个事件的问题        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());        workerPool.start(executorService);        // 生产者        producer = new StringEventProducer(ringBuffer);    }    @Override    public void publish(String value) {        producer.onData(value);    }    @Override    public long eventCount() {        return eventCount.get();    }}
  • 上述代码中,要留神的有以下两处:
  1. StringWorkHandler数组传入给WorkerPool后,每个StringWorkHandler实例都放入一个新的WorkProcessor实例,WorkProcessor实现了Runnable接口,在执行<font color="blue">workerPool.start</font>时,会将WorkProcessor提交到线程池中;
  2. 和后面的独立生产相比,独特生产最大的特点在于只调用了一次<font color="blue">ringBuffer.addGatingSequences</font>办法,也就是说三个消费者共用一个sequence实例;
  3. 验证办法仍旧是单元测试,在方才的LowLeverOperateServiceImplTest.java中减少代码即可,留神testWorkerPoolConsumer的第三个参数是<font color="blue">EVENT_COUNT</font>,示意预期的被生产音讯数为<font color="red">100</font>:
     @Autowired    @Qualifier("workerPoolConsumer")    LowLevelOperateService workerPoolConsumer;        @Test    public void testWorkerPoolConsumer() throws InterruptedException {        log.info("start testWorkerPoolConsumer");        testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);    }
  • 执行单元测试如下图所示,三个消费者一共生产100个事件,且三个消费者在不同线程:

  • 至此,咱们在不必Disruptor类的前提下实现了三种常见场景的音讯生产生产,置信您对Disruptor的底层实现也有了粗浅意识,今后不论是应用还是优化Disruptor,肯定能够更加得心应手;

你不孤独,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos