乐趣区

关于云计算:disruptor笔记之一快速入门

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

对于 disruptor

disruptor 是 LMAX 公司开发的一个高性能队列,其作用和阻塞队列 (BlockingQueue) 相似,都是在雷同过程内、不同线程间传递数据(例如音讯、事件),另外 disruptor 也有本人的一些特色:

  1. 以播送的模式公布事件,并且消费者之间存在依赖关系;
  2. 为事件提前分配内存;
  3. 无锁算法;

对于 Ring Buffer(环形队列)

  • 提到 disruptor 个别都会提到 Ring Buffer(环形队列)是它的特点,实际上从 3.0 版本之后,环形队列只是用来存储和更新事件数据,在其余更简单的场景下,用户能够通过自定义操作将其替换掉;

  • 简略的说,disruptor 官网认为 Ring Buffe 是外围概念(Core Concepts),但不是特色(key features)

本篇概览

作为《disruptor 笔记》系列的开篇,本篇有两个工作:

  • 创立名为 <font color=”blue”>disruptor-tutorials</font> 的 gradle 工程,作为整个系列的父工程,该系列所有代码都是这个父工程下的 module;
  • 在 <font color=”blue”>disruptor-tutorials</font> 上面新建名为 <font color=”red”>basic-event</font> 的 module,这是个 springboot 利用,作用是应用 disruptor 的基本功能:一个线程公布事件,另一个线程生产事件,也就是对环形队列最根本的操作,如下图:

用 disruptor 实现音讯的公布和生产的套路

  • 咱们提前小结用 disruptor 实现音讯的公布和生产的套路,前面的开发循序渐进即可,括号中是本篇对应的 java 类:
  1. 事件的定义:一个一般的 bean(StringEvent.java)
  2. 事件工厂:定义如何生产事件的内存实例,这个实例刚从内存中创立,还没有任何业务数据(StringEventFactory.java)
  3. 事件处理:封装了生产单个事件的具体逻辑(StringEventHandler.java)
  4. 事件生产者:定义了如何将业务数据设置到还没有业务数据的事件中,就是工厂创立进去的那种(StringEventProducer.java)
  5. 初始化逻辑:创立和启动 disruptor 对象,将事件工厂传给 disruptor,创立事件生产者和事件处理对象,并别离与 disruptor 对象关联;
  6. 业务逻辑:也就是调用事件生产者的 <font color=”blue”>onData</font> 办法公布事件,本文的做法是在单元测试类中公布事件,而后查看生产的事件数和生产的事件数是否统一;
    7

    环境信息

《Disruptor 笔记》系列波及的环境信息如下:

  1. 操作系统:64 位 win10
  2. JDK:1.8.0_281
  3. IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)
  4. gradle:6.7.1
  5. springboot:2.3.8.RELEASE
  6. disruptor:3.4.4

源码下载

  • 本篇实战中的残缺源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo…:
名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址(https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定
  • 这个 git 我的项目中有多个文件夹,本次实战的源码在 <font color=”blue”>disruptor-tutorials</font> 文件夹下,如下图红框所示:

创立父工程

  • 因为是系列文章,所以这里做个父工程来治理所有依赖库和插件,新建名为 <font color=”blue”>disruptor-tutorials</font> 的 gradle 工程,build.gradle 如下:
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter

buildscript {
    repositories {
        maven {url 'https://plugins.gradle.org/m2/'}
        // 如果有私服就在此配置,如果没有请正文掉
        maven {url 'http://192.168.50.43:8081/repository/aliyun-proxy/'}
        // 阿里云
        maven {url 'http://maven.aliyun.com/nexus/content/groups/public/'}

        mavenCentral()}
    ext {
        // 我的项目版本
        projectVersion = '1.0-SNAPSHOT'

        // sprignboot 版本 https://github.com/spring-projects/spring-boot/releases
        springBootVersion = '2.3.8.RELEASE'
    }
}

plugins {
    id 'java'
    id 'java-library'
    id 'org.springframework.boot' version "${springBootVersion}" apply false
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'net.nemerosa.versioning' version '2.14.0'
    id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false
    id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates
}

// If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find
// a buildScan property to change. This avoids that problem.
if (hasProperty('buildScan')) {
    buildScan {
        termsOfServiceUrl = 'https://gradle.com/terms-of-service'
        termsOfServiceAgree = 'yes'
    }
}

wrapper {gradleVersion = '6.7.1'}

def buildTimeAndDate = OffsetDateTime.now()

ext {
    // 构建时获得以后日期和工夫
    buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate)
    buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate)
    buildRevision = versioning.info.commit
}

allprojects {
    apply plugin: 'java'
    apply plugin: 'idea'
    apply plugin: 'eclipse'
    apply plugin: 'io.spring.dependency-management'
    apply plugin: 'io.franzbecker.gradle-lombok'

    compileJava {
        sourceCompatibility = JavaVersion.VERSION_1_8
        targetCompatibility = JavaVersion.VERSION_1_8
        options.encoding = 'UTF-8'
    }

    compileJava.options*.compilerArgs = ['-Xlint:all', '-Xlint:-processing']

    // Copy LICENSE
    tasks.withType(Jar) {from(project.rootDir) {
            include 'LICENSE'
            into 'META-INF'
        }
    }

    // 写入到 MANIFEST.MF 中的内容
    jar {
        manifest {
            attributes('Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),
                    'Built-By': 'travis',
                    'Build-Date': buildDate,
                    'Build-Time': buildTime,
                    'Built-OS': "${System.properties['os.name']}",
                    'Build-Revision': buildRevision,
                    'Specification-Title': project.name,
                    'Specification-Version': projectVersion,
                    'Specification-Vendor': 'Will Zhao',
                    'Implementation-Title': project.name,
                    'Implementation-Version': projectVersion,
                    'Implementation-Vendor': 'Will Zhao'
            )
        }
    }

    repositories {mavenCentral()

        // 如果有私服就在此配置,如果没有请正文掉
        maven {url 'http://192.168.50.43:8081/repository/aliyun-proxy/'}

        // 阿里云
        maven {url 'http://maven.aliyun.com/nexus/content/groups/public/'}

        jcenter()}

    buildscript {
        repositories {maven { url 'https://plugins.gradle.org/m2/'}
        }
    }
}

allprojects { project ->
    buildscript {
        dependencyManagement {
            imports {mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"
                mavenBom "org.junit:junit-bom:5.7.0"
            }

            dependencies {
                dependency 'org.projectlombok:lombok:1.16.16'
                dependency 'org.apache.commons:commons-lang3:3.11'
                dependency 'commons-collections:commons-collections:3.2.2'
                dependency 'com.lmax:disruptor:3.4.4'
            }
        }

        ext {springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']
        }
    }
}

group = 'bolingcavalry'
version = projectVersion
  • 接下来编写音讯公布和生产的代码;

新建 module

  • 后面新建了整个《Disruptor 笔记》系列的父工程,当初新建名为 <font color=”blue”>basic-event</font> 的 module,其 build.gradle 内容如下:
plugins {id 'org.springframework.boot'}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • 这个 module 是个 springboot 利用,启动类如下:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BasicEventApplication {public static void main(String[] args) {SpringApplication.run(BasicEventApplication.class, args);
    }
}
  • 接下来依照后面总结的套路行事;

事件的定义

  • 事件定义类 StringEvent.java,可见就是个普普通通的 java bean:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class StringEvent {private String value;}

事件工厂

  • 事件工厂的作用,是让 disruptor 晓得如何在内存中创立一个事件实例,不过,该实例和业务还没有任何关系,本篇的事件工厂如下,可见就是创立 StringEvent 实例,并没有特地的操作:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {

    @Override
    public StringEvent newInstance() {return new StringEvent();
    }
}

事件处理

  • 工夫解决类的作用是定义一个事件如何被生产,外面是具体的业务代码,每个事件都会执行此类的 onEvent 办法;
  • 本篇的事件处理类做的事件是打印事件内容,再用 sleep 耗费 100 毫秒,而后再调用内部传入的 Consumer 实现类的 accept 办法:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {public StringEventHandler(Consumer<?> consumer) {this.consumer = consumer;}

    // 内部能够传入 Consumer 实现类,每解决一条音讯的时候,consumer 的 accept 办法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
        
        // 这里延时 100ms,模仿生产事件的逻辑的耗时
        Thread.sleep(100);

        // 如果内部传入了 consumer,就要执行一次 accept 办法
        if (null!=consumer) {consumer.accept(null);
        }
    }
}

事件生产者

  • 每当业务要生产一个事件时,就会调用 <font color=”blue”> 事件生产者 </font> 的 onData 办法,将业务数据作为入参传进来,此时生产者会从环形队列中取出一个事件实例(就是后面的事件工厂创立的),把业务数据传给这个实例,再把实例正式公布进来:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {
    // 存储数据的环形队列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {this.ringBuffer = ringBuffer;}

    public void onData(String content) {
        // ringBuffer 是个队列,其 next 办法返回的是下最初一条记录之后的地位,这是个可用地位
        long sequence = ringBuffer.next();

        try {
            // sequence 地位取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件增加业务信息
            stringEvent.setValue(content);
        } finally {
            // 公布
            ringBuffer.publish(sequence);
        }
    }
}

初始化逻辑

  • 开发一个 spring bean,这外面有 disruptor 的初始化逻辑,有几处须要关注的中央稍后会说到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service
@Slf4j
public class BasicEventServiceImpl implements BasicEventService {

    private static final int BUFFER_SIZE = 16;

    private Disruptor<StringEvent> disruptor;

    private StringEventProducer producer;

    /**
     * 统计音讯总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    @PostConstruct
    private void init() {Executor executor = Executors.newCachedThreadPool();

        // 实例化
        disruptor = new Disruptor<>(new StringEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"));

        // 筹备一个匿名类,传给 disruptor 的事件处理类,// 这样每次处理事件时,都会将曾经处理事件的总数打印进去
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        // 指定解决类
        disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));

        // 启动
        disruptor.start();

        // 生产者
        producer = new StringEventProducer(disruptor.getRingBuffer());
    }

    @Override
    public void publish(String value) {producer.onData(value);
    }

    @Override
    public long eventCount() {return eventCount.get();
    }
}
  • 上述代码有以下几点须要留神:
  1. publish 办法给内部调用,用于公布一个事件;
  2. eventCountPrinter 是 Consumer 的实现类,被传给了 StringEventHandler,这样 StringEventHandler 生产音讯的时候,eventCount 就会减少,也就记下了曾经解决的事件总数;
  3. Disruptor 的构造方法中,BUFFER_SIZE 示意环形队列的大小,这里成心设置为 16,这样能够轻易的将环形队列填满,此时再公布事件会不会导致环形队列上的数据被笼罩呢?稍后咱们能够测一下;
  4. 记得调用 start 办法;

web 接口

再写一个 web 接口类,这样就能够通过浏览器验证后面的代码了:

package com.bolingcavalry.controller;

import com.bolingcavalry.service.BasicEventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;

@RestController
public class BasicEventController {

    @Autowired
    BasicEventService basicEventService;

    @RequestMapping(value = "/{value}", method = RequestMethod.GET)
    public String publish(@PathVariable("value") String value) {basicEventService.publish(value);
        return "success," + LocalDateTime.now().toString();
    }
}

业务逻辑

  • 当初生产事件的接口已筹备好,生产事件的代码也实现了,接下来就是如何调用生产事件的接口来验证生产和生产是否失常,这里我抉择应用单元测试来验证;
  • 在 <font color=”blue”>disruptor-tutorials\basic-event\src\test\java</font> 目录下新增测试类 BasicEventServiceImplTest.java,测试逻辑是公布了一百个事件,再验证生产事件的数量是否也等于一百:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.BasicEventService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class BasicEventServiceImplTest {

    @Autowired
    BasicEventService basicEventService;

    @Test
    public void publish() throws InterruptedException {log.info("start publich test");

        int count = 100;

        for(int i=0;i<count;i++) {log.info("publich {}", i);
            basicEventService.publish(String.valueOf(i));
        }

        // 异步生产,因而须要延时期待
        Thread.sleep(1000);
        // 生产的事件总数应该等于公布的事件数
        assertEquals(count, basicEventService.eventCount());
    }
}
  • 编码实现后,点击下图红框 1 中的按钮运行单元测试,后果如红框 2 所示,测试通过:

  • 聪慧的您可能会产生纳闷:环形数组大小只有 16,生产一个事件耗时很长(100 毫秒),那么环形数组中的事件还未生产完时如果还在公布事件会产生什么呢?新事件会笼罩未生产的事件吗?显然不会,因为测试后果是通过的,那么 disruptor 是怎么做到的呢?其实从日志上能够看出一些端倪,下图是测试过程中日志的开端局部,红框显示,始终到测试快完结,公布事件的线程还在执行公布操作,这就意味着:如果生产速度过慢导致环形队列里放不进新的事件时,公布事件的线程就会阻塞,晓得环形队列中能够放入事件为止:

  • 至此,disrupor 的入门操作就实现了,咱们曾经体验过根本的公布和生产性能,接下来的文章咱们会持续深刻学习其余更弱小的性能;

你不孤独,欣宸原创一路相伴

  1. Java 系列
  2. Spring 系列
  3. Docker 系列
  4. kubernetes 系列
  5. 数据库 + 中间件系列
  6. DevOps 系列

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

退出移动版