乐趣区

关于云计算:disruptor笔记之五事件消费实战

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

《disruptor 笔记》系列链接

  1. 疾速入门
  2. Disruptor 类剖析
  3. 环形队列的根底操作(不必 Disruptor 类)
  4. 事件生产知识点小结
  5. 事件生产实战
  6. 常见场景
  7. 期待策略
  8. 知识点补充(终篇)

本篇概览

本篇是《disruptor 笔记》的第五篇,前文《disruptor 笔记之四:事件生产知识点小结》从实践上梳理剖析了独立生产和独特生产,留下了三个工作,明天就来成这些工作,即编码实现以下三个场景:

  1. 100 个订单,短信和邮件系统独立生产
  2. 100 个订单,邮件系统的两个邮件服务器独特生产;
  3. 100 个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产;

源码下载

  • 本篇实战中的残缺源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo…):
名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址(https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定
  • 这个 git 我的项目中有多个文件夹,本次实战的源码在 <font color=”blue”>disruptor-tutorials</font> 文件夹下,如下图红框所示:

  • <font color=”blue”>disruptor-tutorials</font> 是个父工程,外面有多个 module,本篇实战的 module 是 <font color=”red”>consume-mode</font>,如下图红框所示:

编写公共代码

  • 为了实现工作,编码实现下面那三个场景,咱们须要先把公共代码写好;
  • 首先是在父工程 <font color=”blue”>disruptor-tutorials</font> 上面新建名为 <font color=”red”>consume-mode</font> 的 module,其 build.gradle 内容如下:
plugins {id 'org.springframework.boot'}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • springboot 启动类:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumeModeApplication {public static void main(String[] args) {SpringApplication.run(ConsumeModeApplication.class, args);
    }
}
  • 订单事件定义:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class OrderEvent {private String value;}
    • 订单事件的工程类,定义事件实例如何创立:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class OrderEventFactory implements EventFactory<OrderEvent> {

    @Override
    public OrderEvent newInstance() {return new OrderEvent();
    }
}
  • 订单事件生产者类,定义如何将业务信息通过事件公布到环形队列:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {
    // 存储数据的环形队列
    private final RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}

    public void onData(String content) {
        // ringBuffer 是个队列,其 next 办法返回的是下最初一条记录之后的地位,这是个可用地位
        long sequence = ringBuffer.next();

        try {
            // sequence 地位取出的事件是空事件
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 空事件增加业务信息
            orderEvent.setValue(content);
        } finally {
            // 公布
            ringBuffer.publish(sequence);
        }
    }
}
  • 生产订单事件的短信服务,实现 EventHandler 接口,所以是用在 <font color=”red”>独立生产</font> 的场景:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class SmsEventHandler implements EventHandler<OrderEvent> {public SmsEventHandler(Consumer<?> consumer) {this.consumer = consumer;}

    // 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("短信服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 这里延时 100ms,模仿生产事件的逻辑的耗时
        Thread.sleep(100);

        // 如果内部传入了 consumer,就要执行一次 accept 办法
        if (null!=consumer) {consumer.accept(null);
        }
    }
}
  • 生产订单事件的邮件服务,实现 EventHandler 接口,所以是用在 <font color=”red”>独立生产</font> 的场景:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class MailEventHandler implements EventHandler<OrderEvent> {public MailEventHandler(Consumer<?> consumer) {this.consumer = consumer;}

    // 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("邮件服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 这里延时 100ms,模仿生产事件的逻辑的耗时
        Thread.sleep(100);

        // 如果内部传入了 consumer,就要执行一次 accept 办法
        if (null!=consumer) {consumer.accept(null);
        }
    }
}
  • 生产订单事件的邮件服务,实现 WorkHandler 接口,所以是用在 <font color=”red”>独特生产</font> 的场景:
package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class MailWorkHandler implements WorkHandler<OrderEvent> {public MailWorkHandler(Consumer<?> consumer) {this.consumer = consumer;}

    // 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(OrderEvent event) throws Exception {log.info("独特生产模式的邮件服务 : {}", event);

        // 这里延时 100ms,模仿生产事件的逻辑的耗时
        Thread.sleep(100);

        // 如果内部传入了 consumer,就要执行一次 accept 办法
        if (null!=consumer) {consumer.accept(null);
        }
    }
}
  • 最初,将公布和生产事件的逻辑写在一个抽象类里,然而具体如何生产事件并不在此类中实现,而是留给子类,这个抽象类中有几处要留神的中央稍后会提到:
package com.bolingcavalry.service;

import com.lmax.disruptor.dsl.Disruptor;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public abstract class ConsumeModeService {
    /**
     * 独立消费者数量
     */
    public static final int INDEPENDENT_CONSUMER_NUM = 2;

    /**
     * 环形缓冲区大小
     */
    protected int BUFFER_SIZE = 16;

    protected Disruptor<OrderEvent> disruptor;

    @Setter
    private OrderEventProducer producer;

    /**
     * 统计音讯总数
     */
    protected final AtomicLong eventCount = new AtomicLong();

    /**
     * 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
     */
    private CountDownLatch countDownLatch;

    /**
     * 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
     */
    private int countDownLatchGate;

    /**
     * 筹备一个匿名类,传给 disruptor 的事件处理类,* 这样每次处理事件时,都会将曾经处理事件的总数打印进去
     */
    protected Consumer<?> eventCountPrinter = new Consumer<Object>() {
        @Override
        public void accept(Object o) {long count = eventCount.incrementAndGet();

            /**
             * 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
             */
            if (null!=countDownLatch && count>=countDownLatchGate) {countDownLatch.countDown();
            }
        }
    };

    /**
     * 公布一个事件
     * @param value
     * @return
     */
    public void publish(String value) {producer.onData(value);
    }

    /**
     * 返回曾经解决的工作总数
     * @return
     */
    public long eventCount() {return eventCount.get();
    }

    /**
     * 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
     * @param countDownLatch
     * @param countDownLatchGate
     */
    public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {
        this.countDownLatch = countDownLatch;
        this.countDownLatchGate = countDownLatchGate;
    }

    /**
     * 留给子类实现具体的事件生产逻辑
     */
    protected abstract void disruptorOperate();

    @PostConstruct
    private void init() {
        // 实例化
        disruptor = new Disruptor<>(new OrderEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"));

        // 留给子类实现具体的事件生产逻辑
        disruptorOperate();

        // 启动
        disruptor.start();

        // 生产者
        setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
    }
}
  • 上述代码,有以下几处须要留神:
  • init 办法是 spring bean 实例化后要执行的办法,这外面实例化 Disruptor,还启动了生产线程,并且实例化了事件生产者,具体的事件生产逻辑,由子类在 disruptorOperate 办法中实现;
  • eventCountPrinter 是个匿名类实例,传给事件生产的 handler 后,每生产一个事件都会执行一次 eventCountPrinter.accept 办法,这样就把生产事件的总数精确的保留在 eventCount 变量中了;
  • countDownLatch 和 countDownLatchGate 是为了辅助单元测试而筹备的,测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,这样测试主线程就能够完结期待了
  • 至此,专用代码就写完了,可见形象父类曾经做好了大部分事件,咱们的子类能够聚焦事件生产的逻辑编排了,开始挨个实现那三个场景;

100 个订单,短信和邮件系统独立生产

  • 两个消费者独立生产的逻辑非常简单,就一行代码,调用 <font color=”blue”>handleEventsWith</font> 办法把所有消费者实例传进去,就完事了:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;

@Service("independentModeService")
public class IndependentModeServiceImpl extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        // 调用 handleEventsWith,示意创立的多个消费者,每个都是独立生产的
        // 这里创立两个消费者,一个是短信的,一个是邮件的
        disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));
    }
}
  • 单元测试代码如下,要留神的中央是公布完 <font color=”red”>100</font> 事件后,调用 <font color=”blue”>countDownLatch.await()</font> 办法开始期待,直到消费者线程调用 <font color=”blue”>countDownLatch.countDown()</font> 办法解除期待,还有就是预期的生产音讯总数等于 <font color=”red”>200</font>:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class ConsumeModeServiceTest {

    @Autowired
    @Qualifier("independentModeService")
    ConsumeModeService independentModeService;

    /**
     * 测试时生产的音讯数量
     */
    private static final int EVENT_COUNT = 100;

    private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);

        // 通知 service,等生产到 expectEventCount 个音讯时,就执行 countDownLatch.countDown 办法
        service.setCountDown(countDownLatch, expectEventCount);

        for(int i=0;i<eventCount;i++) {log.info("publich {}", i);
            service.publish(String.valueOf(i));
        }

        // 以后线程开始期待,后面的 service.setCountDown 办法曾经通知过 service,// 等生产到 expectEventCount 个音讯时,就执行 countDownLatch.countDown 办法
        // 千万留神,要调用 await 办法,而不是 wait 办法!countDownLatch.await();

        // 生产的事件总数应该等于公布的事件数
        assertEquals(expectEventCount, service.eventCount());
    }

    @Test
    public void testIndependentModeService() throws InterruptedException {log.info("start testIndependentModeService");
        testConsumeModeService(independentModeService,
                EVENT_COUNT,
                EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
    }
}
  • 单元测试执行后果如下,合乎预期:

100 个订单,邮件系统的两个邮件服务器独特生产

  • 两个消费者独特生产的代码也很简略,调用 handleEventsWithWorkerPool 办法即可,把独特生产的 MailWorkHandler 实例作为参数传入:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;

@Service("shareModeService")
public class ShareModeServiceImpl extends ConsumeModeService {
    @Override
    protected void disruptorOperate() {
        // mailWorkHandler1 模仿一号邮件服务器
        MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);

        // mailWorkHandler2 模仿一号邮件服务器
        MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);

        // 调用 handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产
        disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
    }
}
  • 单元测试是在 ConsumeModeServiceTest.java 中增加如下代码,留神因为是独特生产,因而预期的生产事件数等于音讯数,都是 100:
    @Autowired
    @Qualifier("shareModeService")
    ConsumeModeService shareModeService;

    @Test
    public void testShareModeService() throws InterruptedException {log.info("start testShareModeService");
        testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);
    }
  • 执行单元测试,后果如下图:

100 个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产

  • 最初一个场景,仍旧很简略,handleEventsWith 调用一次,再调用一次 handleEventsWithWorkerPool 即可:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;

@Service("independentAndShareModeService")
public class IndependentAndShareModeServiceImpl extends ConsumeModeService {
    @Override
    protected void disruptorOperate() {
        // 调用 handleEventsWith,示意创立的多个消费者,每个都是独立生产的
        // 这里创立一个消费者,短信服务
        disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter));

        // mailWorkHandler1 模仿一号邮件服务器
        MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);

        // mailWorkHandler2 模仿一号邮件服务器
        MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);

        // 调用 handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产
        disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
    }
}
  • 单元测试是在 ConsumeModeServiceTest.java 中增加如下代码,预期的生产事件数应该是 200,因为整体上是两个独立生产,只不过其中的一个外部有两个消费者独特生产:
    @Autowired
    @Qualifier("independentAndShareModeService")
    ConsumeModeService independentAndShareModeService;

    @Test
    public void independentAndShareModeService() throws InterruptedException {log.info("start independentAndShareModeService");
        testConsumeModeService(independentAndShareModeService,
                EVENT_COUNT,
                EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
    }
  • 单元测试后果如下,合乎预期:

  • 至此,独立生产和独特生产的实战就实现了,借助 disruptor,三个常见场景都能够轻松实现,如果您正在做这些场景的开发,心愿本文能给您一些参考;

你不孤独,欣宸原创一路相伴

  1. Java 系列
  2. Spring 系列
  3. Docker 系列
  4. kubernetes 系列
  5. 数据库 + 中间件系列
  6. DevOps 系列

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

退出移动版