乐趣区

关于java:单机最快的队列Disruptor解析和使用

前言

介绍高性能队列 Disruptor 原理以及应用例子。

Disruptor 是什么?

Disruptor 是外汇和加密货币交易所运营商 LMAX group 建设高性能的金融交易所的后果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。能够对标 JDK 中的 ArrayBlockingQueue。是目前单机且基于内存存储的最高性能的队列实现。见 与 ArrayBlockingQueue 性能比照。

Disruptor 高性能秘诀

应用 CAS 代替锁

锁十分低廉,因为它们在竞争时须要仲裁。这种仲裁是通过到操作系统内核的上下文切换来实现的,该内核将挂起期待锁的线程,直到它被开释。零碎提供的原子操作 CAS(Compare And Swap/Set)是很好的锁代替计划,Disruptor 中同步就是应用的这种。

比方多生产者模式中 com.lmax.disruptor.MultiProducerSequencer 就是用了 Java 里 sun.misc.Unsafe 类基于 CAS 实现的 API。

期待策略 com.lmax.disruptor.BlockingWaitStrategy 应用了基于 CAS 实现的 ReentrantLock。

独占缓存行

为了提高效率 CPU 硬件不会以字节或字为单位挪动内存,而是以缓存行,通常大小为 32-256 字节的缓存行,最常见的缓存行是 64 字节。这意味着,如果两个变量在同一个缓存行中,并且由不同的线程写入,那么它们会呈现与单个变量雷同的写入争用问题。为了取得高性能,如果要最小化争用,那么确保独立但同时写入的变量不共享雷同的缓存行是很重要的。

比方 com.lmax.disruptor.RingBuffer 中属性前后都用未赋值的 long 来独占。com.lmax.disruptor.SingleProducerSequencerPad 也有雷同解决形式。

环形队列

  • 应用有界队列,缩小线程争用

队列相比链表在访问速度上占据劣势,而有界队列相比可动静扩容的无界队列则防止扩容产生的同步问题效率更高。Disruptor 和 JDK 中的 ArrayBlockingQueue 一样应用有界队列。队列长度要设为 2 的 n 次幂,有利于二进制计算。

  • 应用环形数组,防止生产和生产速度差别导致队列头和尾争用

Disruptor 在逻辑上将数组的的头尾看成是相连的,即一个环形数组(RingBuffer)。

  • Sequence

生产和生产都须要保护自增序列值(Sequence),从 0 开始。

生产方只保护一个代表生产的最初一个元素的序号。代表生产的最初一个元素的序号。每次向 Disruptor 公布一个元素都调用 Sequenced.next() 来获取下个地位的写入权。

在单生产者模式(SINGLE)因为不存在并发写入,则不须要解决同步问题。在多生产者模式(MULTI)就须要借助 JDK 中基于 CAS(Compare And Swap/Set)实现的 API 来保障线程平安。

多个消费者各自保护本人的生产序列值(Sequence)保留数组中。

而环形通过与运算(sequence & indexMask)实现的,indexMask 就是环形队列的长度 -1。以环形队列长度 8 为例,第 9 个元素 Sequence 为 8,8 & 7 = 0,刚好又回到了数组第 1 个地位。

见 com.lmax.disruptor.RingBuffer.elementAt(long sequence)

预分配内存

环形队列寄存的是 Event 对象,而且是在 Disruptor 创立的时候调用 EventFactory 创立并一次将队列填满。Event 保留生产者生产的数据,生产也是通过 Event 获取,后续生产则只须要替换掉 Event 中的属性值。这种形式防止了反复创建对象,升高 JVM 的 GC 产频率。

见 com.lmax.disruptor.RingBuffer.fill(EventFactory<E> eventFactory)

消费者 8 种期待策略

当生产速度大于生产速度状况下,消费者执行的期待策略。

策略类名 形容
BlockingWaitStrategy(罕用) 应用 ReentrantLock,失败则进入期待队列期待唤醒重试。当吞吐量和低提早不如 CPU 资源重要时应用。
YieldingWaitStrategy(罕用) 尝试 100 次,全失败后调用 Thread.yield() 让出 CPU。该策略将应用 100% 的 CPU,如果其余线程申请 CPU 资源,这种策略更容易让出 CPU 资源。
SleepingWaitStrategy(罕用) 尝试 200 次。前 100 次间接重试,后 100 次每次失败后调用 Thread.yield() 让出 CPU,全失败线程睡眠(默认 100 纳秒)。
BusySpinWaitStrategy 线程始终自旋期待,比拟耗 CPU。最好是将线程绑定到特定的 CPU 外围上应用。
LiteBlockingWaitStrategy 与 BlockingWaitStrategy 相似,区别在减少了原子变量 signalNeeded,如果两个线程同时别离拜访 waitFor() 和 signalAllWhenBlocking(),能够缩小 ReentrantLock 加锁次数。
LiteTimeoutBlockingWaitStrategy 与 LiteBlockingWaitStrategy 相似,区别在于设置了阻塞工夫,超过工夫后抛异样。
TimeoutBlockingWaitStrategy 与 BlockingWaitStrategy 相似,区别在于设置了阻塞工夫,超过工夫后抛异样。
PhasedBackoffWaitStrategy 依据工夫参数和传入的期待策略来决定应用哪种期待策略。当吞吐量和低提早不如 CPU 资源重要时,能够应用此策略。

消费者序列

所有消费者的生产序列(Sequence)都放在一个数组中,见 com.lmax.disruptor.AbstractSequencer,通过 SEQUENCE_UPDATER 来更新对应的序列值。

调用更新的中央在 com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence… gatingSequences)。

生产太慢队列满了怎么办?

生产者线程被阻塞。生产者调用 Sequenced.next() 抢夺写入权的时候须要判断最小的生产序列值进行比拟。如果写入的地位还未生产则会进入循环不断获取最小生产序列值进行比拟。

见包 com.lmax.disruptor 下 SingleProducerSequencer 或 MultiProducerSequencer 中 next(int n) 办法。

Disruptor 开发步骤

  • 创立 Event、EventFactory、EventHandler 和 ExceptionHandler 类

Event 是环形队列(RingBuffer)中的元素,是生产者数据的载体;EventFactory 是定义 Event 创立形式的工厂类;EventHandler 则是 Event 的处理器,定义如何生产 Event 中的数据。

另外有必要定义一个生产异样处理器 ExceptionHandler,它是和 EventHandler 绑定的。当 EventHandler.onEvent() 执行抛出异样时会执行对应的异样回调办法。

  • 实例化 Disruptor

创立 Disruptor 须要指定 5 个参数 eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。

EventFactory 是下面定义的 Event 工厂类;

ringBufferSize 是环形队列的长度,这个值要是 2 的 N 次方;

threadFactory 是定义消费者线程创立形式的工厂类;

producerType 是指明生产者是一个(SINGLE)还是多个(MULTI)。默认是 MULTI,会应用 CAS(Compare And Swap/Set)保障线程平安。如果指定为 SINGLE,则不应用没必要的 CAS,使单线程解决更高效。

waitStrategy 指明消费者期待生产时的策略。

  • 设置消费者

指明 EventHandler 并绑定 ExceptionHandler。指定多个 EventHandler 时,会为每个 EventHandler 调配一个线程,一个 Event 会被多个并行 EventHandler 解决。

也能够指明多个 WorkHandler,每个 WorkHandler 调配一个线程并行生产队列中的 Event,一个 Event 只会被一个 WorkHandler 解决。

  • 创立 / 实例化 EventTranslator

EventTranslator 定义生产者数据转换为 Event 的形式,不同数量参数有不同的接口用来实现。

  • 最初用 Disruptor.publishEvent() 来公布元素指明 EventTranslator 和参数

例子程序

  • 先引入 Maven 依赖
<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.4</version>
</dependency>
  • Event
/**
 * 事件
 *
 * @param <T> 公布的数据类型
 */
public class MyEvent<T> {

    private T data;

    public T getData() {return data;}

    public MyEvent<T> setData(T data) {
        this.data = data;
        return this;
    }
}
  • EventFactory
import com.lmax.disruptor.EventFactory;

/**
 * 创立事件的工厂
 *
 * @param <T> 公布的数据类型
 */
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {

    @Override
    public MyEvent<T> newInstance() {return new MyEvent<>();
    }
}
  • EventHandler
import com.lmax.disruptor.EventHandler;

/**
 * 事件生产办法
 *
 * @param <T> 公布的数据类型
 */
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {

    @Override
    public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {System.out.println(Thread.currentThread().getName() + "MyEventHandler 生产:" + tMyEvent.getData());
    }
}
  • ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;

/**
 * 消费者异样处理器
 *
 * @param <T> 公布的数据类型
 */
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {

    @Override
    public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {System.out.println("handleEventException");
    }

    @Override
    public void handleOnStartException(Throwable ex) {System.out.println("handleOnStartException");
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {System.out.println("handleOnShutdownException");
    }
}

单消费者

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 单消费者
 */
public class SingleConsumerSample {public static void main(String[] args) {
        // 环形数组长度,必须是 2 的 n 次幂
        int ringBufferSize = 1024;
        // 创立事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创立消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 期待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一个处理器
        MyEventHandler<String> eventHandler = new MyEventHandler<>();
        disruptor.handleEventsWith(eventHandler);
        // 处理器异样处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将公布的数据转换到事件对象(Event)中
        // 这里是一个参数的转换器,另外还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)// 和多个(EventTranslatorVararg)参数的转换器能够应用,参数类型能够不一样
        EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
                new EventTranslatorOneArg<MyEvent<String>, String>() {
                    @Override
                    public void translateTo(MyEvent<String> event, long sequence, String arg0) {event.setData(arg0);
                    }
                };

        // 公布
        for (int i = 0; i < 10; i++) {disruptor.publishEvent(eventTranslatorOneArg, "One arg" + i);
        }

        disruptor.shutdown();}
}

单消费者 Lambda 写法

这种只是投合 Java8 Lambda 语法个性,代码更简洁。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class LambdaSample {public static void main(String[] args) {
        // 环形数组长度,必须是 2 的 n 次幂
        int ringBufferSize = 1024;
        // 创立消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 期待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一个处理器
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler 生产:" + event.getData());
        disruptor.handleEventsWith(eventHandler);
        // 处理器异样处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将公布的数据转换到事件对象(Event)中
        // 一个参数的转换器
        disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg");
        // 两个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg", 1);
        // 三个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
                , "Three arg", 1, false);
        // 多个参数的转换器
        disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
            event.setData("Var arg" + String.join(",", paramList));
        }, "param1", "param2", "param3");

        disruptor.shutdown();}
}

多消费者反复生产元素

要害只在于指定多个 EventHandler,并且 EventHandler 还能够别离绑定不同的 ExceptionHandler。

每个 EventHandler 调配一个线程,一个 Event 会被每个 EventHandler 解决,适宜两个不同的业务都须要解决同一个元素的状况,相似播送模式。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 一个元素多个消费者反复生产
 */
public class RepetitionConsumerSample {public static void main(String[] args) {
        // 环形数组长度,必须是 2 的 n 次幂
        int ringBufferSize = 1024;
        // 创立事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创立消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 期待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);


        // 这里指定了 2 个消费者,那就会产生 2 个生产线程,一个事件会被生产 2 次
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler 生产:" + event.getData());
        EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler——2 生产:" + event.getData());
        disruptor.handleEventsWith(eventHandler, eventHandler2);
        // 别离指定异样处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
        disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);

        disruptor.start();

        for (int i = 0; i < 10; i++) {disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg" + i);
        }

        disruptor.shutdown();}
}

多消费者

要害只在于定义 WorkHandler,而后实例化多个来生产。

每个 WorkHandler 调配一个线程,一个元素只会被一个 WorkHandler 解决。

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class MultiConsumerSample {public static void main(String[] args) {
        // 环形数组长度,必须是 2 的 n 次幂
        int ringBufferSize = 1024;
        // 创立事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创立消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 期待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 处理器异样处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        // 设置 2 个消费者,2 个线程,一个 Event 只被一个消费者生产
        WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
                System.out.println(Thread.currentThread().getName() + "WorkHandler 生产:" + tMyEvent.getData());
        disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);

        disruptor.start();

        for (int i = 0; i < 10; i++) {disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg" + i);
        }

        disruptor.shutdown();}
}

参考链接

Disruptor 主页

Disruptor 技术文档

GitHub Disruptor

GitHub Disruptor Getting Started

Maven Repository Disruptor Framework

LMAX 官网

退出移动版