欢送拜访我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;

对于disruptor

disruptor是LMAX公司开发的一个高性能队列,其作用和阻塞队列(BlockingQueue)相似,都是在雷同过程内、不同线程间传递数据(例如音讯、事件),另外disruptor也有本人的一些特色:

  1. 以播送的模式公布事件,并且消费者之间存在依赖关系;
  2. 为事件提前分配内存;
  3. 无锁算法;

对于Ring Buffer(环形队列)

  • 提到disruptor个别都会提到Ring Buffer(环形队列)是它的特点,实际上从3.0版本之后,环形队列只是用来存储和更新事件数据,在其余更简单的场景下,用户能够通过自定义操作将其替换掉;

  • 简略的说,disruptor官网认为Ring Buffe是外围概念(Core Concepts),但不是特色( key features)

本篇概览

作为《disruptor笔记》系列的开篇,本篇有两个工作:

  • 创立名为<font color="blue">disruptor-tutorials</font>的gradle工程,作为整个系列的父工程,该系列所有代码都是这个父工程下的module;
  • 在<font color="blue">disruptor-tutorials</font>上面新建名为<font color="red">basic-event</font>的module,这是个springboot利用,作用是应用disruptor的基本功能:一个线程公布事件,另一个线程生产事件,也就是对环形队列最根本的操作,如下图:

用disruptor实现音讯的公布和生产的套路

  • 咱们提前小结用disruptor实现音讯的公布和生产的套路,前面的开发循序渐进即可,括号中是本篇对应的java类:
  1. 事件的定义:一个一般的bean(StringEvent.java)
  2. 事件工厂:定义如何生产事件的内存实例,这个实例刚从内存中创立,还没有任何业务数据(StringEventFactory.java)
  3. 事件处理:封装了生产单个事件的具体逻辑(StringEventHandler.java)
  4. 事件生产者:定义了如何将业务数据设置到还没有业务数据的事件中,就是工厂创立进去的那种(StringEventProducer.java)
  5. 初始化逻辑:创立和启动disruptor对象,将事件工厂传给disruptor,创立事件生产者和事件处理对象,并别离与disruptor对象关联;
  6. 业务逻辑:也就是调用事件生产者的<font color="blue">onData</font>办法公布事件,本文的做法是在单元测试类中公布事件,而后查看生产的事件数和生产的事件数是否统一;
    7

    环境信息

《Disruptor笔记》系列波及的环境信息如下:

  1. 操作系统:64位win10
  2. JDK:1.8.0_281
  3. IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)
  4. gradle:6.7.1
  5. springboot:2.3.8.RELEASE
  6. disruptor:3.4.4

源码下载

  • 本篇实战中的残缺源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...:
名称链接备注
我的项目主页https://github.com/zq2599/blo...该我的项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blo...该我的项目源码的仓库地址,https协定
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该我的项目源码的仓库地址,ssh协定
  • 这个git我的项目中有多个文件夹,本次实战的源码在<font color="blue">disruptor-tutorials</font>文件夹下,如下图红框所示:

创立父工程

  • 因为是系列文章,所以这里做个父工程来治理所有依赖库和插件,新建名为<font color="blue">disruptor-tutorials</font>的gradle工程,build.gradle如下:
import java.time.OffsetDateTimeimport java.time.format.DateTimeFormatterbuildscript {    repositories {        maven {            url 'https://plugins.gradle.org/m2/'        }        // 如果有私服就在此配置,如果没有请正文掉        maven {            url 'http://192.168.50.43:8081/repository/aliyun-proxy/'        }        // 阿里云        maven {            url 'http://maven.aliyun.com/nexus/content/groups/public/'        }        mavenCentral()    }    ext {        // 我的项目版本        projectVersion = '1.0-SNAPSHOT'        // sprignboot版本 https://github.com/spring-projects/spring-boot/releases        springBootVersion = '2.3.8.RELEASE'    }}plugins {    id 'java'    id 'java-library'    id 'org.springframework.boot' version "${springBootVersion}" apply false    id 'io.spring.dependency-management' version '1.0.11.RELEASE'    id 'net.nemerosa.versioning' version '2.14.0'    id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false    id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates}// If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find// a buildScan property to change. This avoids that problem.if (hasProperty('buildScan')) {    buildScan {        termsOfServiceUrl = 'https://gradle.com/terms-of-service'        termsOfServiceAgree = 'yes'    }}wrapper {    gradleVersion = '6.7.1'}def buildTimeAndDate = OffsetDateTime.now()ext {    // 构建时获得以后日期和工夫    buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate)    buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate)    buildRevision = versioning.info.commit}allprojects {    apply plugin: 'java'    apply plugin: 'idea'    apply plugin: 'eclipse'    apply plugin: 'io.spring.dependency-management'    apply plugin: 'io.franzbecker.gradle-lombok'    compileJava {        sourceCompatibility = JavaVersion.VERSION_1_8        targetCompatibility = JavaVersion.VERSION_1_8        options.encoding = 'UTF-8'    }    compileJava.options*.compilerArgs = [            '-Xlint:all', '-Xlint:-processing'    ]    // Copy LICENSE    tasks.withType(Jar) {        from(project.rootDir) {            include 'LICENSE'            into 'META-INF'        }    }    // 写入到MANIFEST.MF中的内容    jar {        manifest {            attributes(                    'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),                    'Built-By': 'travis',                    'Build-Date': buildDate,                    'Build-Time': buildTime,                    'Built-OS': "${System.properties['os.name']}",                    'Build-Revision': buildRevision,                    'Specification-Title': project.name,                    'Specification-Version': projectVersion,                    'Specification-Vendor': 'Will Zhao',                    'Implementation-Title': project.name,                    'Implementation-Version': projectVersion,                    'Implementation-Vendor': 'Will Zhao'            )        }    }    repositories {        mavenCentral()        // 如果有私服就在此配置,如果没有请正文掉        maven {            url 'http://192.168.50.43:8081/repository/aliyun-proxy/'        }        // 阿里云        maven {            url 'http://maven.aliyun.com/nexus/content/groups/public/'        }        jcenter()    }    buildscript {        repositories {            maven { url 'https://plugins.gradle.org/m2/' }        }    }}allprojects { project ->    buildscript {        dependencyManagement {            imports {                mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"                mavenBom "org.junit:junit-bom:5.7.0"            }            dependencies {                dependency 'org.projectlombok:lombok:1.16.16'                dependency 'org.apache.commons:commons-lang3:3.11'                dependency 'commons-collections:commons-collections:3.2.2'                dependency 'com.lmax:disruptor:3.4.4'            }        }        ext {            springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']        }    }}group = 'bolingcavalry'version = projectVersion
  • 接下来编写音讯公布和生产的代码;

新建module

  • 后面新建了整个《Disruptor笔记》系列的父工程,当初新建名为<font color="blue">basic-event</font>的module,其build.gradle内容如下:
plugins {    id 'org.springframework.boot'}dependencies {    implementation 'org.projectlombok:lombok'    implementation 'org.springframework.boot:spring-boot-starter'    implementation 'org.springframework.boot:spring-boot-starter-web'    implementation 'com.lmax:disruptor'    testImplementation('org.springframework.boot:spring-boot-starter-test')}
  • 这个module是个springboot利用,启动类如下:
package com.bolingcavalry;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class BasicEventApplication {    public static void main(String[] args) {        SpringApplication.run(BasicEventApplication.class, args);    }}
  • 接下来依照后面总结的套路行事;

事件的定义

  • 事件定义类StringEvent.java,可见就是个普普通通的java bean:
package com.bolingcavalry.service;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class StringEvent {    private String value;}

事件工厂

  • 事件工厂的作用,是让disruptor晓得如何在内存中创立一个事件实例,不过,该实例和业务还没有任何关系,本篇的事件工厂如下,可见就是创立StringEvent实例,并没有特地的操作:
package com.bolingcavalry.service;import com.lmax.disruptor.EventFactory;public class StringEventFactory implements EventFactory<StringEvent> {    @Override    public StringEvent newInstance() {        return new StringEvent();    }}

事件处理

  • 工夫解决类的作用是定义一个事件如何被生产,外面是具体的业务代码,每个事件都会执行此类的onEvent办法;
  • 本篇的事件处理类做的事件是打印事件内容,再用sleep耗费100毫秒,而后再调用内部传入的Consumer实现类的accept办法:
package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class StringEventHandler implements EventHandler<StringEvent> {    public StringEventHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 内部能够传入Consumer实现类,每解决一条音讯的时候,consumer的accept办法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);                // 这里延时100ms,模仿生产事件的逻辑的耗时        Thread.sleep(100);        // 如果内部传入了consumer,就要执行一次accept办法        if (null!=consumer) {            consumer.accept(null);        }    }}

事件生产者

  • 每当业务要生产一个事件时,就会调用<font color="blue">事件生产者</font>的onData办法,将业务数据作为入参传进来,此时生产者会从环形队列中取出一个事件实例(就是后面的事件工厂创立的),把业务数据传给这个实例,再把实例正式公布进来:
package com.bolingcavalry.service;import com.lmax.disruptor.RingBuffer;public class StringEventProducer {    // 存储数据的环形队列    private final RingBuffer<StringEvent> ringBuffer;    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {        this.ringBuffer = ringBuffer;    }    public void onData(String content) {        // ringBuffer是个队列,其next办法返回的是下最初一条记录之后的地位,这是个可用地位        long sequence = ringBuffer.next();        try {            // sequence地位取出的事件是空事件            StringEvent stringEvent = ringBuffer.get(sequence);            // 空事件增加业务信息            stringEvent.setValue(content);        } finally {            // 公布            ringBuffer.publish(sequence);        }    }}

初始化逻辑

  • 开发一个spring bean,这外面有disruptor的初始化逻辑,有几处须要关注的中央稍后会说到:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.util.DaemonThreadFactory;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.time.LocalDateTime;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service@Slf4jpublic class BasicEventServiceImpl implements BasicEventService {    private static final int BUFFER_SIZE = 16;    private Disruptor<StringEvent> disruptor;    private StringEventProducer producer;    /**     * 统计音讯总数     */    private final AtomicLong eventCount = new AtomicLong();    @PostConstruct    private void init() {        Executor executor = Executors.newCachedThreadPool();        // 实例化        disruptor = new Disruptor<>(new StringEventFactory(),                BUFFER_SIZE,                new CustomizableThreadFactory("event-handler-"));        // 筹备一个匿名类,传给disruptor的事件处理类,        // 这样每次处理事件时,都会将曾经处理事件的总数打印进去        Consumer<?> eventCountPrinter = new Consumer<Object>() {            @Override            public void accept(Object o) {                long count = eventCount.incrementAndGet();                log.info("receive [{}] event", count);            }        };        // 指定解决类        disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));        // 启动        disruptor.start();        // 生产者        producer = new StringEventProducer(disruptor.getRingBuffer());    }    @Override    public void publish(String value) {        producer.onData(value);    }    @Override    public long eventCount() {        return eventCount.get();    }}
  • 上述代码有以下几点须要留神:
  1. publish办法给内部调用,用于公布一个事件;
  2. eventCountPrinter是Consumer的实现类,被传给了StringEventHandler,这样StringEventHandler生产音讯的时候,eventCount就会减少,也就记下了曾经解决的事件总数;
  3. Disruptor的构造方法中,BUFFER_SIZE示意环形队列的大小,这里成心设置为16,这样能够轻易的将环形队列填满,此时再公布事件会不会导致环形队列上的数据被笼罩呢?稍后咱们能够测一下;
  4. 记得调用start办法;

web接口

再写一个web接口类,这样就能够通过浏览器验证后面的代码了:

package com.bolingcavalry.controller;import com.bolingcavalry.service.BasicEventService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.*;import java.time.LocalDateTime;@RestControllerpublic class BasicEventController {    @Autowired    BasicEventService basicEventService;    @RequestMapping(value = "/{value}", method = RequestMethod.GET)    public String publish(@PathVariable("value") String value) {        basicEventService.publish(value);        return "success, " + LocalDateTime.now().toString();    }}

业务逻辑

  • 当初生产事件的接口已筹备好,生产事件的代码也实现了,接下来就是如何调用生产事件的接口来验证生产和生产是否失常,这里我抉择应用单元测试来验证;
  • 在<font color="blue">disruptor-tutorials\basic-event\src\test\java</font>目录下新增测试类BasicEventServiceImplTest.java,测试逻辑是公布了一百个事件,再验证生产事件的数量是否也等于一百:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.BasicEventService;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import static org.junit.Assert.assertEquals;@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class BasicEventServiceImplTest {    @Autowired    BasicEventService basicEventService;    @Test    public void publish() throws InterruptedException {        log.info("start publich test");        int count = 100;        for(int i=0;i<count;i++) {            log.info("publich {}", i);            basicEventService.publish(String.valueOf(i));        }        // 异步生产,因而须要延时期待        Thread.sleep(1000);        // 生产的事件总数应该等于公布的事件数        assertEquals(count, basicEventService.eventCount());    }}
  • 编码实现后,点击下图红框1中的按钮运行单元测试,后果如红框2所示,测试通过:

  • 聪慧的您可能会产生纳闷:环形数组大小只有16,生产一个事件耗时很长(100毫秒),那么环形数组中的事件还未生产完时如果还在公布事件会产生什么呢?新事件会笼罩未生产的事件吗?显然不会,因为测试后果是通过的,那么disruptor是怎么做到的呢?其实从日志上能够看出一些端倪,下图是测试过程中日志的开端局部,红框显示,始终到测试快完结,公布事件的线程还在执行公布操作,这就意味着:如果生产速度过慢导致环形队列里放不进新的事件时,公布事件的线程就会阻塞,晓得环形队列中能够放入事件为止:

  • 至此,disrupor的入门操作就实现了,咱们曾经体验过根本的公布和生产性能,接下来的文章咱们会持续深刻学习其余更弱小的性能;

你不孤独,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos