欢送拜访我的GitHubhttps://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;
对于disruptordisruptor是LMAX公司开发的一个高性能队列,其作用和阻塞队列(BlockingQueue)相似,都是在雷同过程内、不同线程间传递数据(例如音讯、事件),另外disruptor也有本人的一些特色:
以播送的模式公布事件,并且消费者之间存在依赖关系;为事件提前分配内存;无锁算法;对于Ring Buffer(环形队列)提到disruptor个别都会提到Ring Buffer(环形队列)是它的特点,实际上从3.0版本之后,环形队列只是用来存储和更新事件数据,在其余更简单的场景下,用户能够通过自定义操作将其替换掉;
简略的说,disruptor官网认为Ring Buffe是外围概念(Core Concepts),但不是特色( key features)本篇概览作为《disruptor笔记》系列的开篇,本篇有两个工作:
创立名为<font color="blue">disruptor-tutorials</font>的gradle工程,作为整个系列的父工程,该系列所有代码都是这个父工程下的module;在<font color="blue">disruptor-tutorials</font>上面新建名为<font color="red">basic-event</font>的module,这是个springboot利用,作用是应用disruptor的基本功能:一个线程公布事件,另一个线程生产事件,也就是对环形队列最根本的操作,如下图:
用disruptor实现音讯的公布和生产的套路咱们提前小结用disruptor实现音讯的公布和生产的套路,前面的开发循序渐进即可,括号中是本篇对应的java类:事件的定义:一个一般的bean(StringEvent.java)事件工厂:定义如何生产事件的内存实例,这个实例刚从内存中创立,还没有任何业务数据(StringEventFactory.java)事件处理:封装了生产单个事件的具体逻辑(StringEventHandler.java)事件生产者:定义了如何将业务数据设置到还没有业务数据的事件中,就是工厂创立进去的那种(StringEventProducer.java)初始化逻辑:创立和启动disruptor对象,将事件工厂传给disruptor,创立事件生产者和事件处理对象,并别离与disruptor对象关联;业务逻辑:也就是调用事件生产者的<font color="blue">onData</font>办法公布事件,本文的做法是在单元测试类中公布事件,而后查看生产的事件数和生产的事件数是否统一;7
环境信息《Disruptor笔记》系列波及的环境信息如下:
操作系统:64位win10JDK:1.8.0_281IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)gradle:6.7.1springboot:2.3.8.RELEASEdisruptor:3.4.4源码下载本篇实战中的残缺源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...:名称链接备注我的项目主页https://github.com/zq2599/blo...该我的项目在GitHub上的主页git仓库地址(https)https://github.com/zq2599/blo...该我的项目源码的仓库地址,https协定git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该我的项目源码的仓库地址,ssh协定这个git我的项目中有多个文件夹,本次实战的源码在<font color="blue">disruptor-tutorials</font>文件夹下,如下图红框所示:
创立父工程因为是系列文章,所以这里做个父工程来治理所有依赖库和插件,新建名为<font color="blue">disruptor-tutorials</font>的gradle工程,build.gradle如下:import java.time.OffsetDateTimeimport java.time.format.DateTimeFormatterbuildscript { repositories { maven { url 'https://plugins.gradle.org/m2/' } // 如果有私服就在此配置,如果没有请正文掉 maven { url 'http://192.168.50.43:8081/repository/aliyun-proxy/' } // 阿里云 maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' } mavenCentral() } ext { // 我的项目版本 projectVersion = '1.0-SNAPSHOT' // sprignboot版本 https://github.com/spring-projects/spring-boot/releases springBootVersion = '2.3.8.RELEASE' }}plugins { id 'java' id 'java-library' id 'org.springframework.boot' version "${springBootVersion}" apply false id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'net.nemerosa.versioning' version '2.14.0' id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates}// If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find// a buildScan property to change. This avoids that problem.if (hasProperty('buildScan')) { buildScan { termsOfServiceUrl = 'https://gradle.com/terms-of-service' termsOfServiceAgree = 'yes' }}wrapper { gradleVersion = '6.7.1'}def buildTimeAndDate = OffsetDateTime.now()ext { // 构建时获得以后日期和工夫 buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate) buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate) buildRevision = versioning.info.commit}allprojects { apply plugin: 'java' apply plugin: 'idea' apply plugin: 'eclipse' apply plugin: 'io.spring.dependency-management' apply plugin: 'io.franzbecker.gradle-lombok' compileJava { sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 options.encoding = 'UTF-8' } compileJava.options*.compilerArgs = [ '-Xlint:all', '-Xlint:-processing' ] // Copy LICENSE tasks.withType(Jar) { from(project.rootDir) { include 'LICENSE' into 'META-INF' } } // 写入到MANIFEST.MF中的内容 jar { manifest { attributes( 'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(), 'Built-By': 'travis', 'Build-Date': buildDate, 'Build-Time': buildTime, 'Built-OS': "${System.properties['os.name']}", 'Build-Revision': buildRevision, 'Specification-Title': project.name, 'Specification-Version': projectVersion, 'Specification-Vendor': 'Will Zhao', 'Implementation-Title': project.name, 'Implementation-Version': projectVersion, 'Implementation-Vendor': 'Will Zhao' ) } } repositories { mavenCentral() // 如果有私服就在此配置,如果没有请正文掉 maven { url 'http://192.168.50.43:8081/repository/aliyun-proxy/' } // 阿里云 maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' } jcenter() } buildscript { repositories { maven { url 'https://plugins.gradle.org/m2/' } } }}allprojects { project -> buildscript { dependencyManagement { imports { mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}" mavenBom "org.junit:junit-bom:5.7.0" } dependencies { dependency 'org.projectlombok:lombok:1.16.16' dependency 'org.apache.commons:commons-lang3:3.11' dependency 'commons-collections:commons-collections:3.2.2' dependency 'com.lmax:disruptor:3.4.4' } } ext { springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version'] } }}group = 'bolingcavalry'version = projectVersion接下来编写音讯公布和生产的代码;新建module后面新建了整个《Disruptor笔记》系列的父工程,当初新建名为<font color="blue">basic-event</font>的module,其build.gradle内容如下:plugins { id 'org.springframework.boot'}dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'com.lmax:disruptor' testImplementation('org.springframework.boot:spring-boot-starter-test')}这个module是个springboot利用,启动类如下:package com.bolingcavalry;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class BasicEventApplication { public static void main(String[] args) { SpringApplication.run(BasicEventApplication.class, args); }}接下来依照后面总结的套路行事;事件的定义事件定义类StringEvent.java,可见就是个普普通通的java bean:package com.bolingcavalry.service;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class StringEvent { private String value;}事件工厂事件工厂的作用,是让disruptor晓得如何在内存中创立一个事件实例,不过,该实例和业务还没有任何关系,本篇的事件工厂如下,可见就是创立StringEvent实例,并没有特地的操作:package com.bolingcavalry.service;import com.lmax.disruptor.EventFactory;public class StringEventFactory implements EventFactory<StringEvent> { @Override public StringEvent newInstance() { return new StringEvent(); }}事件处理工夫解决类的作用是定义一个事件如何被生产,外面是具体的业务代码,每个事件都会执行此类的onEvent办法;本篇的事件处理类做的事件是打印事件内容,再用sleep耗费100毫秒,而后再调用内部传入的Consumer实现类的accept办法:package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class StringEventHandler implements EventHandler<StringEvent> { public StringEventHandler(Consumer<?> consumer) { this.consumer = consumer; } // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 这里延时100ms,模仿生产事件的逻辑的耗时 Thread.sleep(100); // 如果内部传入了consumer,就要执行一次accept办法 if (null!=consumer) { consumer.accept(null); } }}事件生产者每当业务要生产一个事件时,就会调用<font color="blue">事件生产者</font>的onData办法,将业务数据作为入参传进来,此时生产者会从环形队列中取出一个事件实例(就是后面的事件工厂创立的),把业务数据传给这个实例,再把实例正式公布进来:package com.bolingcavalry.service;import com.lmax.disruptor.RingBuffer;public class StringEventProducer { // 存储数据的环形队列 private final RingBuffer<StringEvent> ringBuffer; public StringEventProducer(RingBuffer<StringEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(String content) { // ringBuffer是个队列,其next办法返回的是下最初一条记录之后的地位,这是个可用地位 long sequence = ringBuffer.next(); try { // sequence地位取出的事件是空事件 StringEvent stringEvent = ringBuffer.get(sequence); // 空事件增加业务信息 stringEvent.setValue(content); } finally { // 公布 ringBuffer.publish(sequence); } }}初始化逻辑开发一个spring bean,这外面有disruptor的初始化逻辑,有几处须要关注的中央稍后会说到:package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.util.DaemonThreadFactory;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.time.LocalDateTime;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service@Slf4jpublic class BasicEventServiceImpl implements BasicEventService { private static final int BUFFER_SIZE = 16; private Disruptor<StringEvent> disruptor; private StringEventProducer producer; /** * 统计音讯总数 */ private final AtomicLong eventCount = new AtomicLong(); @PostConstruct private void init() { Executor executor = Executors.newCachedThreadPool(); // 实例化 disruptor = new Disruptor<>(new StringEventFactory(), BUFFER_SIZE, new CustomizableThreadFactory("event-handler-")); // 筹备一个匿名类,传给disruptor的事件处理类, // 这样每次处理事件时,都会将曾经处理事件的总数打印进去 Consumer<?> eventCountPrinter = new Consumer<Object>() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); } }; // 指定解决类 disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter)); // 启动 disruptor.start(); // 生产者 producer = new StringEventProducer(disruptor.getRingBuffer()); } @Override public void publish(String value) { producer.onData(value); } @Override public long eventCount() { return eventCount.get(); }}上述代码有以下几点须要留神:publish办法给内部调用,用于公布一个事件;eventCountPrinter是Consumer的实现类,被传给了StringEventHandler,这样StringEventHandler生产音讯的时候,eventCount就会减少,也就记下了曾经解决的事件总数;Disruptor的构造方法中,BUFFER_SIZE示意环形队列的大小,这里成心设置为16,这样能够轻易的将环形队列填满,此时再公布事件会不会导致环形队列上的数据被笼罩呢?稍后咱们能够测一下;记得调用start办法;web接口再写一个web接口类,这样就能够通过浏览器验证后面的代码了:
...