乐趣区

关于java:如此狂妄自称高性能队列的Disruptor有啥来头

并发框架 Disruptor

1. Disruptor 概述

1.1 背景

​ Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的提早问题(在性能测试中发现居然与 I / O 操作处于同样的数量级),基于 Disruptor 开发的零碎单线程能撑持每秒 600 万订单,2010 年在 QCon 演讲后,取得了业界关注,2011 年,企业应用软件专家 Martin Fowler 专门撰写长文介绍。同年它还取得了 Oracle 官网的 Duke 大奖。

​ 目前,包含 Apache Storm、Camel、Log4j 2 在内的很多出名我的项目都利用了 Disruptor 以获取高性能。

​ 须要特地指出的是,这里所说的队列是零碎外部的内存队列,而不是 Kafka 这样的分布式队列。

有界无锁 高并发队列

1.2 什么是 Disruptor

​ Disruptor 是用于一个 JVM 中多个线程之间的音讯队列,作用与 ArrayBlockingQueue 有相似之处,然而 Disruptor 从性能、性能都远好于 ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,能够思考应用 Disruptor 作为 ArrayBlockingQueue 的替代者。

​ 官网也对 Disruptor 和 ArrayBlockingQueue 的性能在不同的利用场景下做了比照,目测性能只有有 5~10 倍左右的晋升。

1.3 为什么应用 Disruptor

​ 传统阻塞的队列应用锁保障线程平安,而锁通过操作系统内核上下文切换实现,会暂停线程去期待锁,直到锁开释。

​ 执行这样的上下文切换,会失落之前保留的数据和指令。因为消费者和生产者之间的速度差别,队列总是靠近满或者空的状态,这种状态会导致高水平的写入争用。

1.3.1 传统队列问题

首先这里说的队列也仅限于 Java 外部的音讯队列

队列 有界性 构造 队列类型
ArrayBlockingQueue 有界 加锁 数组 阻塞
LinkedBlockingQueue 可选 加锁 链表 阻塞
ConcurrentLinkedQueue 无界 无锁 链表 非阻塞
LinkedTransferQueue 无界 无锁 链表 阻塞
PriorityBlockingQueue 无界 加锁 阻塞
DelayQueue 无界 加锁 阻塞
1.3.2 Disruptor 利用场景

参考应用到 disruptor 的一些框架.

1.3.2.1 log4j2

​ Log4j2 异步日志应用到了 disruptor, 日志个别是有缓冲区, 满了才写到文件, 增量追加文件联合 NIO 等应该也比拟快,所以无论是 EventHandler 还是 WorkHandler 解决应该提早比拟小的,写的文件也不多,所以场景是比拟适合的。

1.3.2.2 Jstorm

​ 在流解决中不同线程中数据交换,数据计算可能蛮多内存中计算,流计算快进快出,disruptor 应该不错的抉择。

1.3.2.3 百度 uid-generator

​ 局部应用 Ring buffer 和去伪共享等思路缓存已生成的 uid, 应该也局部参考了 disruptor 吧。

1.4 Disruptor 的外围概念

先从理解 Disruptor 的外围概念开始,来理解它是如何运作的。上面介绍的概念模型,既是畛域对象,也是映射到代码实现上的外围对象。

1.4.1 Ring Buffer

Disruptor 中的数据结构,用于存储生产者生产的数据

​ 如其名,环形的缓冲区。已经 RingBuffer 是 Disruptor 中的最次要的对象,但从 3.0 版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行替换的数据(事件)进行存储和更新。在一些更高级的利用场景中,Ring Buffer 能够由用户的自定义实现来齐全代替。

1.4.2 Sequence

序号,在 Disruptor 框架中,任何中央都有序号

​ 生产者生产的数据放在 RingBuffer 中的哪个地位,消费者应该生产哪个地位的数据,RingBuffer 中的某个地位的数据是什么,这些都是由这个序号来决定的。这个序号能够简略的了解为一个 AtomicLong 类型的变量。其应用了 padding 的办法去打消缓存的伪共享问题。

1.4.3 Sequencer

序号生成器,这个类次要是用来协调生产者的

​ 在生产者生产数据的时候,Sequencer 会产生一个可用的序号(Sequence),而后生产者就就晓得数据放在环形队列的那个地位了。

​ Sequencer 是 Disruptor 的真正外围,此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer,它们定义在生产者和消费者之间疾速、正确地传递数据的并发算法。

1.4.4 Sequence Barrier

序号屏障

​ 咱们都晓得,消费者在生产数据的时候,须要晓得生产哪个地位的数据。消费者总不能自己想取哪个数据生产,就取哪个数据生产吧。这个 SequencerBarrier 起到的就是这样一个“栅栏”般的阻隔作用。你消费者想生产数据,得,我通知你一个序号(Sequence),你去生产那个地位上的数据。要是没有数据,就好好等着吧

1.4.5 Wait Strategy

Wait Strategy 决定了一个消费者怎么期待生产者将事件(Event)放入 Disruptor 中。

​ 构想一种这样的情景:生产者生产的十分慢,而消费者生产的十分快。那么必然会呈现数据不够的状况,这个时候消费者怎么进行期待呢?WaitStrategy 就是为了解决问题而诞生的。

1.4.6 Event

​ 从生产者到消费者传递的数据叫做 Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

1.4.7 EventHandler

​ Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

1.4.8 Producer

​ 即生产者,只是泛指调用 Disruptor 公布事件的用户代码,Disruptor 没有定义特定接口或类型。

1.5 Disruptor 个性

​ Disruptor 其实就像一个队列一样,用于在不同的线程之间迁徙数据,然而 Disruptor 也实现了一些其余队列没有的个性,如:

  • 同一个“事件”能够有多个消费者,消费者之间既能够并行处理,也能够相互依赖造成解决的先后秩序(造成一个依赖图);
  • 预调配用于存储事件内容的内存空间;
  • 针对极高的性能指标而实现的极度优化和无锁的设计;

2. Disruptor 入门

咱们应用一个简略的例子来体验一下 Disruptor,生产者会传递一个 long 类型的值到消费者,消费者承受到这个值后会打印出这个值。

2.1 增加依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2.2 Disruptor API

Disruptor 的 API 非常简略,次要有以下几个步骤

2.2.1 定义事件

首先创立一个 LongEvent 类,这个类将会被放入环形队列中作为音讯内容。

事件 (Event) 就是通过 Disruptor 进行替换的数据类型。

public class LongEvent {
    private long value;

    public void set(long value) {this.value = value;}

    public long getValue() {return value;}
}
2.2.2 定义事件工厂

为了应用 Disruptor 的内存预调配 event,咱们须要定义一个 EventFactory

​ 事件工厂 (Event Factory) 定义了如何实例化后面第 1 步中定义的事件(Event),须要实现接口 com.lmax.disruptor.EventFactory\<T\>。

Disruptor 通过 EventFactory 在 RingBuffer 中预创立 Event 的实例。

​ 一个 Event 实例实际上被用作一个“数据槽”,发布者公布前,先从 RingBuffer 取得一个 Event 的实例,而后往 Event 实例中填充数据,之后再公布到 RingBuffer 中,之后由 Consumer 取得该 Event 实例并从中读取数据。

public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {return new LongEvent();
    }
}
2.2.3 定义事件处理的具体实现

为了让消费者解决这些事件,所以咱们这里定义一个事件处理器,负责打印 event

通过实现接口 com.lmax.disruptor.EventHandler\<T\> 定义事件处理的具体实现。

public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {//CommonUtils.accumulation();
        System.out.println("consumer:" + Thread.currentThread().getName() + "Event: value=" + event.getValue() + ",sequence=" + sequence);
    }
}
2.2.4 指定期待策略

Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于形象 Consumer 如何期待新事件,这是策略模式的利用

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
2.2.5 启动 Disruptor

留神 ringBufferSize 的大小必须是 2 的 N 次方

// 指定事件工厂
LongEventFactory factory = new LongEventFactory();

// 指定 ring buffer 字节大小, 必须是 2 的 N 次方
int bufferSize = 1024;

// 单线程模式,获取额定的性能
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                                                          bufferSize, Executors.defaultThreadFactory(),
                                                          ProducerType.SINGLE,
                                                          new YieldingWaitStrategy());
// 设置事件业务处理器 --- 消费者
disruptor.handleEventsWith(new LongEventHandler());

// 启动 disruptor 线程
disruptor.start();
2.2.6 应用 Translators 公布事件

在 Disruptor 的 3.0 版本中,因为退出了丰盛的 Lambda 格调的 API,能够用来帮组开发人员简化流程。所以在 3.0 版本后首选应用 Event Publisher/Event Translator 来公布事件。

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;}

    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {public void translateTo(LongEvent event, long sequence, Long data) {event.set(data);
                }
            };

    public void onData(Long data) {ringBuffer.publishEvent(TRANSLATOR, data);
    }
}
2.2.7 敞开 Disruptor
disruptor.shutdown();// 敞开 disruptor,办法会梗塞,直至所有的事件都失去解决

2.3 代码整合

2.3.1 LongEventMain

消费者 - 生产者启动类, 其依附结构 Disruptor 对象,调用 start()办法实现启动线程。Disruptor 须要 ringbuffer 环,消费者数据处理工厂,WaitStrategy 等

  • ByteBuffer 类字节 buffer,用于包装音讯。
  • ProducerType.SINGLE 为单线程,能够进步性能
public class LongEventMain {public static void main(String[] args) {
        // 指定事件工厂
        LongEventFactory factory = new LongEventFactory();

        // 指定 ring buffer 字节大小, 必须是 2 的 N 次方
        int bufferSize = 1024;

        // 单线程模式,获取额定的性能
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        // 设置事件业务处理器 --- 消费者
        disruptor.handleEventsWith(new LongEventHandler());

        // 启动 disruptor 线程
        disruptor.start();
        // 获取 ring buffer 环,用于接取生产者生产的事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        // 为 ring buffer 指定事件生产者
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        // 循环遍历
        for (int i = 0; i < 100; i++) {
            // 获取一个随机数
            long value = (long) ((Math.random() * 1000000) + 1);
            // 公布数据
            producer.onData(value);
        }
        // 进行 disruptor 线程
        disruptor.shutdown();}
}
2.3.2 运行测试

测试后果

consumer:pool-1-thread-1 Event: value=579797,sequence=0
consumer:pool-1-thread-1 Event: value=974942,sequence=1
consumer:pool-1-thread-1 Event: value=978977,sequence=2
consumer:pool-1-thread-1 Event: value=398080,sequence=3
consumer:pool-1-thread-1 Event: value=867251,sequence=4
consumer:pool-1-thread-1 Event: value=796707,sequence=5
consumer:pool-1-thread-1 Event: value=786555,sequence=6
consumer:pool-1-thread-1 Event: value=182193,sequence=7
.....

Event: value = 为消费者接管到的数据,sequence 为数据在 ringbuffer 环的地位。

如果本文对您有帮忙,欢送 关注 点赞`,您的反对是我保持创作的能源。

转载请注明出处!

退出移动版