欢送拜访我的 GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;
《disruptor 笔记》系列链接
- 疾速入门
- Disruptor 类剖析
- 环形队列的根底操作(不必 Disruptor 类)
- 事件生产知识点小结
- 事件生产实战
- 常见场景
- 期待策略
- 知识点补充(终篇)
本篇概览
本篇是《disruptor 笔记》的第五篇,前文《disruptor 笔记之四:事件生产知识点小结》从实践上梳理剖析了独立生产和独特生产,留下了三个工作,明天就来成这些工作,即编码实现以下三个场景:
- 100 个订单,短信和邮件系统独立生产
- 100 个订单,邮件系统的两个邮件服务器独特生产;
- 100 个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产;
源码下载
- 本篇实战中的残缺源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo…):
名称 | 链接 | 备注 |
---|---|---|
我的项目主页 | https://github.com/zq2599/blo… | 该我的项目在 GitHub 上的主页 |
git 仓库地址(https) | https://github.com/zq2599/blo… | 该我的项目源码的仓库地址,https 协定 |
git 仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该我的项目源码的仓库地址,ssh 协定 |
- 这个 git 我的项目中有多个文件夹,本次实战的源码在 <font color=”blue”>disruptor-tutorials</font> 文件夹下,如下图红框所示:
- <font color=”blue”>disruptor-tutorials</font> 是个父工程,外面有多个 module,本篇实战的 module 是 <font color=”red”>consume-mode</font>,如下图红框所示:
编写公共代码
- 为了实现工作,编码实现下面那三个场景,咱们须要先把公共代码写好;
- 首先是在父工程 <font color=”blue”>disruptor-tutorials</font> 上面新建名为 <font color=”red”>consume-mode</font> 的 module,其 build.gradle 内容如下:
plugins {id 'org.springframework.boot'}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor'
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
- springboot 启动类:
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumeModeApplication {public static void main(String[] args) {SpringApplication.run(ConsumeModeApplication.class, args);
}
}
- 订单事件定义:
package com.bolingcavalry.service;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class OrderEvent {private String value;}
-
- 订单事件的工程类,定义事件实例如何创立:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventFactory;
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {return new OrderEvent();
}
}
- 订单事件生产者类,定义如何将业务信息通过事件公布到环形队列:
package com.bolingcavalry.service;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
// 存储数据的环形队列
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}
public void onData(String content) {
// ringBuffer 是个队列,其 next 办法返回的是下最初一条记录之后的地位,这是个可用地位
long sequence = ringBuffer.next();
try {
// sequence 地位取出的事件是空事件
OrderEvent orderEvent = ringBuffer.get(sequence);
// 空事件增加业务信息
orderEvent.setValue(content);
} finally {
// 公布
ringBuffer.publish(sequence);
}
}
}
- 生产订单事件的短信服务,实现 EventHandler 接口,所以是用在 <font color=”red”>独立生产</font> 的场景:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class SmsEventHandler implements EventHandler<OrderEvent> {public SmsEventHandler(Consumer<?> consumer) {this.consumer = consumer;}
// 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
private Consumer<?> consumer;
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("短信服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 这里延时 100ms,模仿生产事件的逻辑的耗时
Thread.sleep(100);
// 如果内部传入了 consumer,就要执行一次 accept 办法
if (null!=consumer) {consumer.accept(null);
}
}
}
- 生产订单事件的邮件服务,实现 EventHandler 接口,所以是用在 <font color=”red”>独立生产</font> 的场景:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class MailEventHandler implements EventHandler<OrderEvent> {public MailEventHandler(Consumer<?> consumer) {this.consumer = consumer;}
// 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
private Consumer<?> consumer;
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("邮件服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 这里延时 100ms,模仿生产事件的逻辑的耗时
Thread.sleep(100);
// 如果内部传入了 consumer,就要执行一次 accept 办法
if (null!=consumer) {consumer.accept(null);
}
}
}
- 生产订单事件的邮件服务,实现 WorkHandler 接口,所以是用在 <font color=”red”>独特生产</font> 的场景:
package com.bolingcavalry.service;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class MailWorkHandler implements WorkHandler<OrderEvent> {public MailWorkHandler(Consumer<?> consumer) {this.consumer = consumer;}
// 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
private Consumer<?> consumer;
@Override
public void onEvent(OrderEvent event) throws Exception {log.info("独特生产模式的邮件服务 : {}", event);
// 这里延时 100ms,模仿生产事件的逻辑的耗时
Thread.sleep(100);
// 如果内部传入了 consumer,就要执行一次 accept 办法
if (null!=consumer) {consumer.accept(null);
}
}
}
- 最初,将公布和生产事件的逻辑写在一个抽象类里,然而具体如何生产事件并不在此类中实现,而是留给子类,这个抽象类中有几处要留神的中央稍后会提到:
package com.bolingcavalry.service;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public abstract class ConsumeModeService {
/**
* 独立消费者数量
*/
public static final int INDEPENDENT_CONSUMER_NUM = 2;
/**
* 环形缓冲区大小
*/
protected int BUFFER_SIZE = 16;
protected Disruptor<OrderEvent> disruptor;
@Setter
private OrderEventProducer producer;
/**
* 统计音讯总数
*/
protected final AtomicLong eventCount = new AtomicLong();
/**
* 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
*/
private CountDownLatch countDownLatch;
/**
* 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
*/
private int countDownLatchGate;
/**
* 筹备一个匿名类,传给 disruptor 的事件处理类,* 这样每次处理事件时,都会将曾经处理事件的总数打印进去
*/
protected Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {long count = eventCount.incrementAndGet();
/**
* 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
*/
if (null!=countDownLatch && count>=countDownLatchGate) {countDownLatch.countDown();
}
}
};
/**
* 公布一个事件
* @param value
* @return
*/
public void publish(String value) {producer.onData(value);
}
/**
* 返回曾经解决的工作总数
* @return
*/
public long eventCount() {return eventCount.get();
}
/**
* 这是辅助测试用的,* 测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,* 在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,* 这样测试主线程就能够完结期待了
* @param countDownLatch
* @param countDownLatchGate
*/
public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {
this.countDownLatch = countDownLatch;
this.countDownLatchGate = countDownLatchGate;
}
/**
* 留给子类实现具体的事件生产逻辑
*/
protected abstract void disruptorOperate();
@PostConstruct
private void init() {
// 实例化
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"));
// 留给子类实现具体的事件生产逻辑
disruptorOperate();
// 启动
disruptor.start();
// 生产者
setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
}
}
- 上述代码,有以下几处须要留神:
- init 办法是 spring bean 实例化后要执行的办法,这外面实例化 Disruptor,还启动了生产线程,并且实例化了事件生产者,具体的事件生产逻辑,由子类在 disruptorOperate 办法中实现;
- eventCountPrinter 是个匿名类实例,传给事件生产的 handler 后,每生产一个事件都会执行一次 eventCountPrinter.accept 办法,这样就把生产事件的总数精确的保留在 eventCount 变量中了;
- countDownLatch 和 countDownLatchGate 是为了辅助单元测试而筹备的,测试的时候,实现事件公布后,测试主线程就用这个 countDownLatch 开始期待,在生产到指定的数量 (countDownLatchGate) 后,生产线程执行 countDownLatch 的 countDown 办法,这样测试主线程就能够完结期待了
- 至此,专用代码就写完了,可见形象父类曾经做好了大部分事件,咱们的子类能够聚焦事件生产的逻辑编排了,开始挨个实现那三个场景;
100 个订单,短信和邮件系统独立生产
- 两个消费者独立生产的逻辑非常简单,就一行代码,调用 <font color=”blue”>handleEventsWith</font> 办法把所有消费者实例传进去,就完事了:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;
@Service("independentModeService")
public class IndependentModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// 调用 handleEventsWith,示意创立的多个消费者,每个都是独立生产的
// 这里创立两个消费者,一个是短信的,一个是邮件的
disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));
}
}
- 单元测试代码如下,要留神的中央是公布完 <font color=”red”>100</font> 事件后,调用 <font color=”blue”>countDownLatch.await()</font> 办法开始期待,直到消费者线程调用 <font color=”blue”>countDownLatch.countDown()</font> 办法解除期待,还有就是预期的生产音讯总数等于 <font color=”red”>200</font>:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class ConsumeModeServiceTest {
@Autowired
@Qualifier("independentModeService")
ConsumeModeService independentModeService;
/**
* 测试时生产的音讯数量
*/
private static final int EVENT_COUNT = 100;
private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
// 通知 service,等生产到 expectEventCount 个音讯时,就执行 countDownLatch.countDown 办法
service.setCountDown(countDownLatch, expectEventCount);
for(int i=0;i<eventCount;i++) {log.info("publich {}", i);
service.publish(String.valueOf(i));
}
// 以后线程开始期待,后面的 service.setCountDown 办法曾经通知过 service,// 等生产到 expectEventCount 个音讯时,就执行 countDownLatch.countDown 办法
// 千万留神,要调用 await 办法,而不是 wait 办法!countDownLatch.await();
// 生产的事件总数应该等于公布的事件数
assertEquals(expectEventCount, service.eventCount());
}
@Test
public void testIndependentModeService() throws InterruptedException {log.info("start testIndependentModeService");
testConsumeModeService(independentModeService,
EVENT_COUNT,
EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
}
}
- 单元测试执行后果如下,合乎预期:
100 个订单,邮件系统的两个邮件服务器独特生产
- 两个消费者独特生产的代码也很简略,调用 handleEventsWithWorkerPool 办法即可,把独特生产的 MailWorkHandler 实例作为参数传入:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("shareModeService")
public class ShareModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// mailWorkHandler1 模仿一号邮件服务器
MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);
// mailWorkHandler2 模仿一号邮件服务器
MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);
// 调用 handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产
disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
}
}
- 单元测试是在 ConsumeModeServiceTest.java 中增加如下代码,留神因为是独特生产,因而预期的生产事件数等于音讯数,都是 100:
@Autowired
@Qualifier("shareModeService")
ConsumeModeService shareModeService;
@Test
public void testShareModeService() throws InterruptedException {log.info("start testShareModeService");
testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);
}
- 执行单元测试,后果如下图:
100 个订单,短信零碎独立生产,与此同时,两个邮件服务器独特生产
- 最初一个场景,仍旧很简略,handleEventsWith 调用一次,再调用一次 handleEventsWithWorkerPool 即可:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;
@Service("independentAndShareModeService")
public class IndependentAndShareModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// 调用 handleEventsWith,示意创立的多个消费者,每个都是独立生产的
// 这里创立一个消费者,短信服务
disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter));
// mailWorkHandler1 模仿一号邮件服务器
MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);
// mailWorkHandler2 模仿一号邮件服务器
MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);
// 调用 handleEventsWithWorkerPool,示意创立的多个消费者以独特生产的模式生产
disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
}
}
- 单元测试是在 ConsumeModeServiceTest.java 中增加如下代码,预期的生产事件数应该是 200,因为整体上是两个独立生产,只不过其中的一个外部有两个消费者独特生产:
@Autowired
@Qualifier("independentAndShareModeService")
ConsumeModeService independentAndShareModeService;
@Test
public void independentAndShareModeService() throws InterruptedException {log.info("start independentAndShareModeService");
testConsumeModeService(independentAndShareModeService,
EVENT_COUNT,
EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
}
- 单元测试后果如下,合乎预期:
- 至此,独立生产和独特生产的实战就实现了,借助 disruptor,三个常见场景都能够轻松实现,如果您正在做这些场景的开发,心愿本文能给您一些参考;
你不孤独,欣宸原创一路相伴
- Java 系列
- Spring 系列
- Docker 系列
- kubernetes 系列
- 数据库 + 中间件系列
- DevOps 系列
欢送关注公众号:程序员欣宸
微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos