共计 13529 个字符,预计需要花费 34 分钟才能阅读完成。
欢送拜访我的 GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;
《disruptor 笔记》系列链接
- 疾速入门
- Disruptor 类剖析
- 环形队列的根底操作 (不必 Disruptor 类)
- 事件生产知识点小结
- 事件生产实战
- 常见场景
- 期待策略
- 知识点补充 (终篇)
本篇概览
- 本文是《disruptor 笔记》系列的第三篇,次要工作是编码实现音讯生产和生产,与《disruptor 笔记之一:疾速入门》不同的是,本次开发不应用 Disruptor 类,和 Ring Buffer(环形队列) 相干的操作都是本人写代码实现;
- 这种脱离 Disruptor 类操作 Ring Buffer 的做法,不适宜用在生产环境,但在学习 Disruptor 的过程中,这是种高效的学习伎俩,通过本篇实战后,在今后应用 Disruptor 时,您在开发、调试、优化等各种场景下都能更加得心应手;
- 简略的音讯生产生产已不能满足咱们的学习激情,明天的实战要挑战以下三个场景:
- 100 个事件,单个消费者生产;
- 100 个事件,三个消费者,每个都单独生产这个 100 个事件;
- 100 个事件,三个消费者独特生产这个 100 个事件;
前文回顾
为了实现本篇的实战,前文《disruptor 笔记之二:Disruptor 类剖析》已做了充沛的钻研剖析,倡议观看,这里简略回顾以下 Disruptor 类的几个外围性能,这也是咱们编码时要实现的:
- 创立环形队列(RingBuffer 对象)
- 创立 SequenceBarrier 对象,用于接管 ringBuffer 中的可生产事件
- 创立 BatchEventProcessor,负责生产事件
- 绑定 BatchEventProcessor 对象的异样解决类
- 调用 ringBuffer.addGatingSequences,将消费者的 Sequence 传给 ringBuffer
- 启动独立线程,用来执行生产事件的业务逻辑
- 实践剖析曾经实现,接下来开始编码;
源码下载
- 本篇实战中的残缺源码可在 GitHub 下载到,地址和链接信息如下表所示 (https://github.com/zq2599/blo…:
名称 | 链接 | 备注 |
---|---|---|
我的项目主页 | https://github.com/zq2599/blo… | 该我的项目在 GitHub 上的主页 |
git 仓库地址 (https) | https://github.com/zq2599/blo… | 该我的项目源码的仓库地址,https 协定 |
git 仓库地址 (ssh) | git@github.com:zq2599/blog_demos.git | 该我的项目源码的仓库地址,ssh 协定 |
- 这个 git 我的项目中有多个文件夹,本次实战的源码在 <font color=”blue”>disruptor-tutorials</font> 文件夹下,如下图红框所示:
- <font color=”blue”>disruptor-tutorials</font> 是个父工程,外面有多个 module,本篇实战的 module 是 <font color=”red”>low-level-operate</font>,如下图红框所示:
开发
- 进入编码阶段,明天的工作是挑战以下三个场景:
- 100 个事件,单个消费者生产;
- 100 个事件,三个消费者,每个都单独生产这个 100 个事件;
- 100 个事件,三个消费者独特生产这个 100 个事件;
- 咱们先把工程建好,而后编写公共代码,例如事件定义、事件工厂等,最初才是每个场景的开发;
- 在父工程 <font color=”blue”>disruptor-tutorials</font> 新增名为 <font color=”red”>low-level-operate</font> 的 module,其 build.gradle 如下:
plugins {id 'org.springframework.boot'} | |
dependencies { | |
implementation 'org.projectlombok:lombok' | |
implementation 'org.springframework.boot:spring-boot-starter' | |
implementation 'org.springframework.boot:spring-boot-starter-web' | |
implementation 'com.lmax:disruptor' | |
testImplementation('org.springframework.boot:spring-boot-starter-test') | |
} |
- 而后是 springboot 启动类:
package com.bolingcavalry; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
@SpringBootApplication | |
public class LowLevelOperateApplication {public static void main(String[] args) {SpringApplication.run(LowLevelOperateApplication.class, args); | |
} | |
} |
- 事件类,这是事件的定义:
package com.bolingcavalry.service; | |
import lombok.Data; | |
import lombok.NoArgsConstructor; | |
import lombok.ToString; | |
@Data | |
@ToString | |
@NoArgsConstructor | |
public class StringEvent {private String value;} |
- 事件工厂,定义如何在内存中创立事件对象:
package com.bolingcavalry.service; | |
import com.lmax.disruptor.EventFactory; | |
public class StringEventFactory implements EventFactory<StringEvent> { | |
@Override | |
public StringEvent newInstance() {return new StringEvent(); | |
} | |
} |
- 事件生产类,定义如何将业务逻辑的事件转为 disruptor 事件公布到环形队列,用于生产:
package com.bolingcavalry.service; | |
import com.lmax.disruptor.RingBuffer; | |
public class StringEventProducer { | |
// 存储数据的环形队列 | |
private final RingBuffer<StringEvent> ringBuffer; | |
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {this.ringBuffer = ringBuffer;} | |
public void onData(String content) { | |
// ringBuffer 是个队列,其 next 办法返回的是下最初一条记录之后的地位,这是个可用地位 | |
long sequence = ringBuffer.next(); | |
try { | |
// sequence 地位取出的事件是空事件 | |
StringEvent stringEvent = ringBuffer.get(sequence); | |
// 空事件增加业务信息 | |
stringEvent.setValue(content); | |
} finally { | |
// 公布 | |
ringBuffer.publish(sequence); | |
} | |
} | |
} |
- 事件处理类,收到事件后具体的业务解决逻辑:
package com.bolingcavalry.service; | |
import com.lmax.disruptor.EventHandler; | |
import lombok.Setter; | |
import lombok.extern.slf4j.Slf4j; | |
import java.util.function.Consumer; | |
@Slf4j | |
public class StringEventHandler implements EventHandler<StringEvent> {public StringEventHandler(Consumer<?> consumer) {this.consumer = consumer;} | |
// 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次 | |
private Consumer<?> consumer; | |
@Override | |
public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); | |
// 这里延时 100ms,模仿生产事件的逻辑的耗时 | |
Thread.sleep(100); | |
// 如果内部传入了 consumer,就要执行一次 accept 办法 | |
if (null!=consumer) {consumer.accept(null); | |
} | |
} | |
} |
- 定义一个接口,内部通过调用接口的办法来生产音讯,再放几个常量在外面前面会用到:
package com.bolingcavalry.service; | |
public interface LowLevelOperateService { | |
/** | |
* 消费者数量 | |
*/ | |
int CONSUMER_NUM = 3; | |
/** | |
* 环形缓冲区大小 | |
*/ | |
int BUFFER_SIZE = 16; | |
/** | |
* 公布一个事件 | |
* @param value | |
* @return | |
*/ | |
void publish(String value); | |
/** | |
* 返回曾经解决的工作总数 | |
* @return | |
*/ | |
long eventCount();} |
- 以上就是公共代码了,接下来一一实现之前布局的三个场景;
-
100 个事件,单个消费者生产
- 这是最简略的性能了,实现公布音讯和单个消费者生产的性能,代码如下,有几处要留神的中央稍后提到:
package com.bolingcavalry.service.impl; | |
import com.bolingcavalry.service.*; | |
import com.lmax.disruptor.BatchEventProcessor; | |
import com.lmax.disruptor.RingBuffer; | |
import com.lmax.disruptor.SequenceBarrier; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.stereotype.Service; | |
import javax.annotation.PostConstruct; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.Consumer; | |
@Service("oneConsumer") | |
@Slf4j | |
public class OneConsumerServiceImpl implements LowLevelOperateService { | |
private RingBuffer<StringEvent> ringBuffer; | |
private StringEventProducer producer; | |
/** | |
* 统计音讯总数 | |
*/ | |
private final AtomicLong eventCount = new AtomicLong(); | |
private ExecutorService executors; | |
@PostConstruct | |
private void init() { | |
// 筹备一个匿名类,传给 disruptor 的事件处理类,// 这样每次处理事件时,都会将曾经处理事件的总数打印进去 | |
Consumer<?> eventCountPrinter = new Consumer<Object>() { | |
@Override | |
public void accept(Object o) {long count = eventCount.incrementAndGet(); | |
log.info("receive [{}] event", count); | |
} | |
}; | |
// 创立环形队列实例 | |
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); | |
// 筹备线程池 | |
executors = Executors.newFixedThreadPool(1); | |
// 创立 SequenceBarrier | |
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); | |
// 创立事件处理的工作类,外面执行 StringEventHandler 处理事件 | |
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>( | |
ringBuffer, | |
sequenceBarrier, | |
new StringEventHandler(eventCountPrinter)); | |
// 将消费者的 sequence 传给环形队列 | |
ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); | |
// 在一个独立线程中取事件并生产 | |
executors.submit(batchEventProcessor); | |
// 生产者 | |
producer = new StringEventProducer(ringBuffer); | |
} | |
@Override | |
public void publish(String value) {producer.onData(value); | |
} | |
@Override | |
public long eventCount() {return eventCount.get(); | |
} | |
} |
- 上述代码有以下几处须要留神:
- 本人创立环形队列 RingBuffer 实例
- 本人筹备线程池,外面的线程用来获取和生产音讯
- 本人入手创立 BatchEventProcessor 实例,并把事件处理类传入
- 通过 ringBuffer 创立 sequenceBarrier,传给 BatchEventProcessor 实例应用
- 将 BatchEventProcessor 的 sequence 传给 ringBuffer,确保 ringBuffer 的生产和生产不会呈现凌乱
- 启动线程池,意味着 BatchEventProcessor 实例在一个独立线程中一直的从 ringBuffer 中获取事件并生产;
- 为了验证上述代码是否失常工作,我这里写了个单元测试类,如下所示,逻辑很简略,调用 OneConsumerServiceImpl.publish 办法一百次,产生一百个事件,再查看 OneConsumerServiceImpl 记录的生产事件总数是不是等于一百:
package com.bolingcavalry.service.impl; | |
import com.bolingcavalry.service.LowLevelOperateService; | |
import lombok.extern.slf4j.Slf4j; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.test.context.junit4.SpringRunner; | |
import static org.junit.Assert.assertEquals; | |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
@Slf4j | |
public class LowLeverOperateServiceImplTest { | |
@Autowired | |
@Qualifier("oneConsumer") | |
LowLevelOperateService oneConsumer; | |
private static final int EVENT_COUNT = 100; | |
private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {for(int i=0;i<eventCount;i++) {log.info("publich {}", i); | |
service.publish(String.valueOf(i)); | |
} | |
// 异步生产,因而须要延时期待 | |
Thread.sleep(10000); | |
// 生产的事件总数应该等于公布的事件数 | |
assertEquals(expectEventCount, service.eventCount()); | |
} | |
@Test | |
public void testOneConsumer() throws InterruptedException {log.info("start testOneConsumerService"); | |
testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT); | |
} |
- 留神,如果您是间接在 IDEA 上点击图标来执行单元测试,记得勾选下图红框中选项,否则可能呈现编译失败:
- 执行上述单元测试类,后果如下图所示,音讯的生产和生产都合乎预期,并且生产逻辑是在独立线程中执行的:
- 持续挑战下一个场景;
100 个事件,三个消费者,每个都单独生产这个 100 个事件
- 这个场景在 kafka 中也有,就是三个消费者的 group 不同,这样每一条音讯,这两个消费者各自生产一次;
- 因而,100 个事件,3 个消费者每人都会独立生产这 100 个事件,一共生产 300 次;
- 代码如下,有几处要留神的中央稍后提到:
package com.bolingcavalry.service.impl; | |
import com.bolingcavalry.service.*; | |
import com.lmax.disruptor.BatchEventProcessor; | |
import com.lmax.disruptor.RingBuffer; | |
import com.lmax.disruptor.SequenceBarrier; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.stereotype.Service; | |
import javax.annotation.PostConstruct; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.Consumer; | |
@Service("multiConsumer") | |
@Slf4j | |
public class MultiConsumerServiceImpl implements LowLevelOperateService { | |
private RingBuffer<StringEvent> ringBuffer; | |
private StringEventProducer producer; | |
/** | |
* 统计音讯总数 | |
*/ | |
private final AtomicLong eventCount = new AtomicLong(); | |
/** | |
* 生产一个 BatchEventProcessor 实例,并且启动独立线程开始获取和生产音讯 | |
* @param executorService | |
*/ | |
private void addProcessor(ExecutorService executorService) { | |
// 筹备一个匿名类,传给 disruptor 的事件处理类,// 这样每次处理事件时,都会将曾经处理事件的总数打印进去 | |
Consumer<?> eventCountPrinter = new Consumer<Object>() { | |
@Override | |
public void accept(Object o) {long count = eventCount.incrementAndGet(); | |
log.info("receive [{}] event", count); | |
} | |
}; | |
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>( | |
ringBuffer, | |
ringBuffer.newBarrier(), | |
new StringEventHandler(eventCountPrinter)); | |
// 将以后消费者的 sequence 实例传给 ringBuffer | |
ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); | |
// 启动独立线程获取和生产事件 | |
executorService.submit(batchEventProcessor); | |
} | |
@PostConstruct | |
private void init() {ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); | |
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); | |
// 创立多个消费者,并在独立线程中获取和生产事件 | |
for (int i=0;i<CONSUMER_NUM;i++) {addProcessor(executorService); | |
} | |
// 生产者 | |
producer = new StringEventProducer(ringBuffer); | |
} | |
@Override | |
public void publish(String value) {producer.onData(value); | |
} | |
@Override | |
public long eventCount() {return eventCount.get(); | |
} | |
} |
- 上述代码和后面的 OneConsumerServiceImpl 相比差异不大,次要是创立了多个 BatchEventProcessor 实例,而后别离在线程池中提交;
- 验证办法仍旧是单元测试,在方才的 LowLeverOperateServiceImplTest.java 中减少代码即可,留神 testLowLevelOperateService 的第三个参数是 <font color=”blue”>EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM</font>,示意预期的被生产音讯数为 <font color=”red”>300</font>:
@Autowired | |
@Qualifier("multiConsumer") | |
LowLevelOperateService multiConsumer; | |
@Test | |
public void testMultiConsumer() throws InterruptedException {log.info("start testMultiConsumer"); | |
testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM); | |
} |
- 执行单元测试,如下图所示,一共生产了 300 个事件,并且三个消费者在不动线程:
100 个事件,三个消费者独特生产这个 100 个事件
- 本篇的最初一个实战是公布 100 个事件,而后让三个消费者独特生产 100 个(例如 A 生产 33 个,B 生产 33 个,C 生产 34 个);
- 后面用到的 BatchEventProcessor 是用来独立生产的,不适宜多个消费者独特生产,这种多个生产独特生产的场景须要借助 WorkerPool 来实现,这个名字还是很形象的:一个池子外面有很多个工作者,把工作放入这个池子,工作者们每人解决一部分,大家合力将工作实现;
- 传入 WorkerPool 的消费者须要实现 WorkHandler 接口,于是新增一个实现类:
package com.bolingcavalry.service; | |
import com.lmax.disruptor.WorkHandler; | |
import lombok.extern.slf4j.Slf4j; | |
import java.util.function.Consumer; | |
@Slf4j | |
public class StringWorkHandler implements WorkHandler<StringEvent> {public StringWorkHandler(Consumer<?> consumer) {this.consumer = consumer;} | |
// 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次 | |
private Consumer<?> consumer; | |
@Override | |
public void onEvent(StringEvent event) throws Exception {log.info("work handler event : {}", event); | |
// 这里延时 100ms,模仿生产事件的逻辑的耗时 | |
Thread.sleep(100); | |
// 如果内部传入了 consumer,就要执行一次 accept 办法 | |
if (null!=consumer) {consumer.accept(null); | |
} | |
} | |
} |
- 新增服务类,实现独特生产的逻辑,有几处要留神的中央稍后会提到:
package com.bolingcavalry.service.impl; | |
import com.bolingcavalry.service.*; | |
import com.lmax.disruptor.*; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.stereotype.Service; | |
import javax.annotation.PostConstruct; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.Consumer; | |
@Service("workerPoolConsumer") | |
@Slf4j | |
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService { | |
private RingBuffer<StringEvent> ringBuffer; | |
private StringEventProducer producer; | |
/** | |
* 统计音讯总数 | |
*/ | |
private final AtomicLong eventCount = new AtomicLong(); | |
@PostConstruct | |
private void init() {ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); | |
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); | |
StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM]; | |
// 创立多个 StringWorkHandler 实例,放入一个数组中 | |
for (int i=0;i < CONSUMER_NUM;i++) {handlers[i] = new StringWorkHandler(o -> {long count = eventCount.incrementAndGet(); | |
log.info("receive [{}] event", count); | |
}); | |
} | |
// 创立 WorkerPool 实例,将 StringWorkHandler 实例的数组传进去,代表独特消费者的数量 | |
WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers); | |
// 这一句很重要,去掉就会呈现反复生产同一个事件的问题 | |
ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); | |
workerPool.start(executorService); | |
// 生产者 | |
producer = new StringEventProducer(ringBuffer); | |
} | |
@Override | |
public void publish(String value) {producer.onData(value); | |
} | |
@Override | |
public long eventCount() {return eventCount.get(); | |
} | |
} |
- 上述代码中,要留神的有以下两处:
- StringWorkHandler 数组传入给 WorkerPool 后,每个 StringWorkHandler 实例都放入一个新的 WorkProcessor 实例,WorkProcessor 实现了 Runnable 接口,在执行 <font color=”blue”>workerPool.start</font> 时,会将 WorkProcessor 提交到线程池中;
- 和后面的独立生产相比,独特生产最大的特点在于只调用了一次 <font color=”blue”>ringBuffer.addGatingSequences</font> 办法,也就是说三个消费者共用一个 sequence 实例;
- 验证办法仍旧是单元测试,在方才的 LowLeverOperateServiceImplTest.java 中减少代码即可,留神 testWorkerPoolConsumer 的第三个参数是 <font color=”blue”>EVENT_COUNT</font>,示意预期的被生产音讯数为 <font color=”red”>100</font>:
@Autowired | |
@Qualifier("workerPoolConsumer") | |
LowLevelOperateService workerPoolConsumer; | |
@Test | |
public void testWorkerPoolConsumer() throws InterruptedException {log.info("start testWorkerPoolConsumer"); | |
testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT); | |
} |
- 执行单元测试如下图所示,三个消费者一共生产 100 个事件,且三个消费者在不同线程:
- 至此,咱们在不必 Disruptor 类的前提下实现了三种常见场景的音讯生产生产,置信您对 Disruptor 的底层实现也有了粗浅意识,今后不论是应用还是优化 Disruptor,肯定能够更加得心应手;
你不孤独,欣宸原创一路相伴
- Java 系列
- Spring 系列
- Docker 系列
- kubernetes 系列
- 数据库 + 中间件系列
- DevOps 系列
欢送关注公众号:程序员欣宸
微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos
正文完