作者:章北海

咱们的线上特色数据服务DataService,为了解决应用线程池模型导致机器cpu利用率不高,长尾申请提早不线性(p99、p999呈现J型曲线)的问题。在利用Disruptor替换线程池之后获得不错的性能后果。本文次要是简略的介绍一下对Disruptor的集体了解以及落地的后果。

背景

Disruptor是一个高性能的解决并发问题的框架,由LMAX(一个英国做金融交易的公司)设计开发用于本人金融交易零碎建设。之后开源被很多出名的开源库应用,例如前段时间暴发破绽的Log4j。

其中Log4j2应用Disruptor来优化多线程模式下的日志落盘性能,Log4j2做了一个测试应用:同步模式(Sync)、Async(ArrayBlockingQueue)、ALL_ASYNC(Disruptor)别离进行压测,失去如下测试论断:https://logging.apache.org/lo...

Disruptor模式的吞吐能力是JDK ArrayBlockQueue的12倍,是同步模式的68倍。

响应工夫P99指标Disruptor模式比BlockQueue也更加优良,尤其是开启了Garbage-free等优化参数之后。

通过log4j的例子看来,disruptor能够让你的零碎在达到更高吞吐的同时带来更加稳固且低的响应工夫。

那么为什么disruptor能够带来这些收益而jdk的线程池模式又有什么问题呢?

Disruptor介绍

LMAX是一个金融交易公司,他们的交易中有大量的生产者消费者模型业务逻辑,很天然他们将生产者产出的数据放到队列中(eg. ArrayBlockingQueue)而后开启多个消费者线程进行并发生产。

而后他们测试了一把数据在队列中传递的性能跟拜访磁盘(RAID、SSD)差不多,当业务逻辑需数据要多个队列在不同的业务Stage之间传递数据时,多个串行的队列开销是不可忍耐的,而后他们开始剖析为什么JDK的队列会有这么重大的性能问题。

BolckQueue的问题

为什么应用BlockQueue会有这么激烈的差异,以Java的ArrayBlockingqueue为例。底层实现其实是一个数组,在入队、出队的时候通过重入锁来保障并发状况下的队列数据的线程平安。

/** * ArrayBlockQueue的入队实现 */public boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException {        checkNotNull(e);        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;            // 全局锁        lock.lockInterruptibly();        try {            while (count == items.length) {                if (nanos <= 0)                    return false;                nanos = notFull.awaitNanos(nanos);            }            enqueue(e);            return true;        } finally {            lock.unlock();        }}/** * ArrayBlockQueue的出队实现 */public E poll() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return (count == 0) ? null : dequeue();        } finally {            lock.unlock();        }}/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */private void enqueue(E x) {    // assert lock.getHoldCount() == 1;    // assert items[putIndex] == null;    final Object[] items = this.items;    items[putIndex] = x;    if (++putIndex == items.length)        putIndex = 0;    count++;    notEmpty.signal();}/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */private E dequeue() {    // assert lock.getHoldCount() == 1;    // assert items[takeIndex] != null;    final Object[] items = this.items;    @SuppressWarnings("unchecked")    E x = (E) items[takeIndex];    items[takeIndex] = null;    if (++takeIndex == items.length)        takeIndex = 0;    count--;    if (itrs != null)        itrs.elementDequeued();    notFull.signal();    return x;}

能够看到ArrayBlockQueue是由一个ReentrantLock在读写时进行互斥爱护,这样做会导致两个问题:

  1. 数据的出队、入队会互斥,不论是什么特点的利用都会频繁的引起锁碰撞。
  2. ReentrantLock自身每次加锁可能会引起多个cas操作,而每个Cas锁操作的代价没有设想中的那么小。

    1. 锁状态变更触发cas操作。
    2. 锁竞争失败之后进入竞争队列会触发cas。
    3. 当持有锁线程开释之后通过Condition同步,唤醒竞争线程之后,唤醒线程出队还会导致Cas操作。

为了验证这个猜测LMAX又跑了一个测试,验证各种Lock的开销到底有多大。

他们的测试Case是将一个int64始终累加一亿次,区别只是应用单个线程、单个线程加锁(synchronize、cas)、还是多个线程加锁(synchronize、cas)。

失去的测试后果如下:https://lmax-exchange.github....,The%20Cost%20of%20Locks,-Locks%20provide%20mutual

  1. 当单个线程无锁执行时只须要300ms就能够实现。
  2. 当单个线程加锁(理论没有竞争)执行时,就须要10s。
  3. 单个线程应用CAS执行时比互斥锁体现好一点。
  4. 当线程越多,不论是互斥锁还是CAS测试Case执行的耗时越来越大。
  5. volatile修饰符跟CAS体现在数量级上差不多。
MethodTime (ms)
Single thread300
Single thread with lock10,000
Two threads with lock224,000
Single thread with CAS5,700
Two threads with CAS30,000
Single thread with volatile write4,700

这样看起来锁还有CAS操作的开销比设想中的高很多,那么具体为什么会有这么大的性能开销。

在并发环境中(在Java生态)锁的实现有两种:synchronize、cas,上面别离剖析两种锁的开销。

Synchronize开销

jdk对于synchronize的介绍:https://wiki.openjdk.java.net...

在java中互斥锁就体现在synchronize关键字润饰的代码块中(synchronize在锁降级中会应用Mutex实现的,对于Linux就是pthread_mutex_t)。

  1. 内核仲裁

    当synchronize关键字润饰的代码块被多个线程竞争时就须要进行用户态、内核态切换,须要零碎内核仲裁竞争资源的归属,这种切换的代价是十分低廉的(保留和复原一些寄存器、内存数据等过程上下文数据)。

  2. 缓存净化

    当初CPU都有多个外围,因为外围的计算能力远远高于内存的IO能力。为了和谐解决外围跟内存的速度差别,引入了cpu缓存。当外围执行运算时如果须要内存数据先从L1缓存中获取、如果没命中就从L2缓存获取如果始终没命中就从主从中load。

    当产生线程上下文切换,切换走的线程就会让出CPU让另外的线程去执行他的逻辑,而他刚刚从主存中load进来的数据就会被新的线程净化。下次他竞争胜利,还是须要再次从主从中load数据,竞争会加剧缓存净化进一步影响零碎性能。

    从CPU到大概须要的CPU周期大概须要的工夫
    主存-约60-80ns
    QPI 总线传输(between sockets, not drawn)-约20ns
    L3 cache约40-45 cycles约15ns
    L2 cache约10 cycles约3ns
    L1 cache约3-4 cycles约1ns
    寄存器1 cycle-
  3. 伪共享

    互斥锁还会引发自身没有加锁的变量被迫互斥的问题。

    CPU缓存治理的根本然而是缓存行,当cpu须要从主存中load数据时会依照缓存行的大小将对应地位的内存块一起load进去。当cpu批改内存中的数据时,也是间接批改缓存中的数据,有缓存一致性协定保障将缓存中的变动刷到内存中。

    看上面这个例子:

    class Eg {  private int a;  private int b;    public void synchronize incr_a(){    a++;  }    public void incr_b(){    b++;  }}

    这个对象中a、b两个字段很大概率被调配到相邻的内存中,当cpu触发缓存load时这块内存很可能会被一起加载到同一个缓存行。

    当一个线程调用incr_a的同时另外一个线程调用incr_b办法时,因为incr_a被互斥锁爱护导致持有a、b两个变量的缓存行也被互斥锁爱护起来,这样尽管incr_b没有显示的互斥锁但实际上也被锁住了,这个景象被成为伪共享。

  4. 额定的CAS开销

    在Synchronize在外部保护了count计数、对象头中有持有线程的id等变量,当线程屡次进入竞争块时须要通过CAS操作去更改count计数、对象头中的线程id,所以synchronize自身还会有cas的开销。

CAS的开销

CAS是古代处理器反对的一个原子指令(例如: lock cmpxchg x86),具体的含意是当变更的变量原始值合乎冀望就间接更新,不合乎冀望就失败。

在Java中各种AutoXX类就是对CAS指令的封装,其中java的重入锁(ReentrantLock)的实现原理就是一个CAS操作。

对应Cas自身的开销问题这里能够思考这样的一个例子:

假如位于两个外围的两个线程同时CAS一个变量a,当线程1CAS胜利时将数据变更写入到缓存A中。那么这个时候线程2怎么可能感知到变量a当初的值曾经产生了变更,本次CAS操作须要失败呢。

这里就须要缓存一致性协定来进行保障,须要在CAS变量的变更前后插入内存屏障来保障变量在多个外围中的可见性,这也是java volatile关键字的实在含意。

所以在最开始的那个例子中,cas操作跟volatile变量的性能体现差不多的起因,就是两者都须要进行缓存同步。

这里咱们须要意识到,cas操作尽管比互斥锁性能更好然而也不是齐全没有开销的。当大量的cas操作失败重试导致大量的缓存生效有时候会引发更为严重的问题。

具体的缓存一致性以及内存屏障的细节能够参考这个文章:http://www.rdrop.com/users/pa...

Disruptor的优化

LMAX在大量的测试跟深入分析之后,正视锁的开销,依照他们的业务形象出了一套通用的能够做到无锁的并发解决框架。

组件阐明

在具体介绍Disruptor之前先简略的对Disruptor的外围形象进行阐明。

形象组件阐明
Ring Buffer环形队列,用于寄存生产者、消费者之间流转的事件数据
Sequence一个自增序列,用于生产者、消费者之间寄存能够被(公布、生产)的队列游标,能够简略认为是一个AutomicLong的自定义实现。
Sequencer持有生产者、消费者的Sequence,是用于协调两边的并发问题,是Disruptor的外围组件。
Sequence Barrier由Sequencer创立用于消费者能够跟踪到上游生产者的状况,获取可生产的事件。
Wait Strategy用于消费者期待可生产事件时的策略,有很多实现策略。
Event业务事件
Event Processor业务事件生产程序,能够认为是物理线程的形象
Event Handler实在的业务解决逻辑,每个Processor持有一个
Producer生产者

数据生产

数据的生产非常简单,有两种状况:

  1. 单生产者

    单生产者的状况下,向RingBuffer中生产数据是没有任何竞争的,惟一须要留神的点是须要关注消费者的生产能力,不要笼罩了最慢的消费者未生产的数据。

    为了达到这个目标,须要通过Sequencer来察看最慢的消费者的生产进度,代码如下,能够看到只有一次volatile操作全程不会有任何锁:

    // 申请n个可用于公布的slotpublic long next(int n)    {        if (n < 1)        {            throw new IllegalArgumentException("n must be > 0");        }        long nextValue = this.nextValue;        long nextSequence = nextValue + n;        long wrapPoint = nextSequence - bufferSize;        long cachedGatingSequence = this.cachedValue;        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)        {                    // 一次 volatile 操作            cursor.setVolatile(nextValue);  // StoreLoad fence            long minSequence;            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){                // 当最慢的消费者进度低于以后须要申请的slot时,尝试唤醒消费者(唤醒策略不同体现不同,很多策略基本不会阻塞会始终spin)                 waitStrategy.signalAllWhenBlocking();                 // park 1 纳秒持续尝试                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?            }                 // 申请胜利            this.cachedValue = minSequence;        }        this.nextValue = nextSequence;        return nextSequence; }
  1. 多生产者

    多生产者比较复杂的点是生产者线程之前有写竞争,须要CAS来进行协调。也就是生产者的Seq须要额定进行一个CAS操作、全程无锁,申请代码如下:

    // 申请n个可公布的slotpublic long next(int n)    {        if (n < 1)        {            throw new IllegalArgumentException("n must be > 0");        }        long current;        long next;        do        {            current = cursor.get();            next = current + n;            long wrapPoint = next - bufferSize;            long cachedGatingSequence = gatingSequenceCache.get();            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)            {                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);                if (wrapPoint > gatingSequence){                  // 当最慢的消费者进度低于以后须要申请的slot时,尝试唤醒消费者(唤醒策略不同体现不同,很多策略基本不会阻塞会始终spin)                     waitStrategy.signalAllWhenBlocking();                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?                    continue;                }                gatingSequenceCache.set(gatingSequence);            }            else if (cursor.compareAndSet(current, next)){                 // 通过自旋 + cas去协调多个生产者的                break;            }        }        while (true);        return next;    }

    尽管相比单生产者仅仅多了一个CAS操作,然而Disruptr的外围作者始终强调为了更高的吞吐以及跟稳固的提早,单生产者的设计准则是十分有必要的,否则随着吞吐的升高长尾的申请会呈现不线性的提早增长。

    具体作者的文章见:https://mechanical-sympathy.b...

数据生产

不论是单生产者、还是多生产者数据的生产都是不受影响的。Disruptor反对开启多个Processor(也就是线程),每个Processor应用相似while true的模式拉取可生产的事件进行解决。

这样的跟线程池模式的益处是防止线程创立、销毁、上下文切换代理的性能损失(缓存净化……)。

对于多个消费者之间的竞争关系通过Sequence Barrier这个形象组件进行协调,代码见下,能够看到除期待策略可能有策略是锁实现、其余步骤全程无锁。

while (true){            try            {                // if previous sequence was processed - fetch the next sequence and set                // that we have successfully processed the previous sequence                // typically, this will be true                // this prevents the sequence getting too far forward if an exception                // is thrown from the WorkHandler                if (processedSequence)                {                    processedSequence = false;                    do                    {                        nextSequence = workSequence.get() + 1L;                        // 一次 Store/Store barrier                          sequence.set(nextSequence - 1L);                    }                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));                    // 通过自旋 + cas协调消费者进度                }                if (cachedAvailableSequence >= nextSequence){                    // 批量申请slot进度高于以后进度,间接生产                    event = ringBuffer.get(nextSequence);                    workHandler.onEvent(event);                    processedSequence = true;                }                else{                    // 无音讯可生产是更具不同的策略进行期待(能够阻塞、能够自旋、能够阻塞+超时……)                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);                }            }      catch (final TimeoutException e){                notifyTimeout(sequence.get());            }     catch (final AlertException ex){                if (!running.get())                {                    break;                }            }     catch (final Throwable ex){                // handle, mark as processed, unless the exception handler threw an exception                exceptionHandler.handleEventException(ex, nextSequence, event);                processedSequence = true;     }}

能够看到最外围的生产者、消费者并发协调实现是waitStrategy,框架自身反对多种waitStrategy。

名称措施实用场景
BlockingWaitStrategysynchronizedCPU资源紧缺,吞吐量和提早并不重要的场景
BusySpinWaitStrategy自旋(while true)通过一直重试,缩小切换线程导致的零碎调用,而升高提早。举荐在线程绑定到固定的CPU的场景下应用
PhasedBackoffWaitStrategy自旋 + yield + 自定义策略CPU资源紧缺,吞吐量和提早并不重要的场景
SleepingWaitStrategy自旋 + parkNanos性能和CPU资源之间有很好的折中。提早不平均
TimeoutBlockingWaitStrategysynchronized + 有超时限度CPU资源紧缺,吞吐量和提早并不重要的场景
YieldingWaitStrategy自旋 + yield性能和CPU资源之间有很好的折中。提早比拟平均

对于以上的多种策略其实能够分为两类:

  1. 能够焚烧CPU性能,以极限高吞吐、低提早为指标的

    1. YieldingWaitStrategy,一直的自旋Yield
    2. BusySpinWaitStrategy,一直的while true
    3. PhasedBackoffWaitStrategy,能够反对自定义的策略
  2. 对极限性能要求不高

    1. SleepingWaitStrategy,对主线程影响很小例如Log4j实现
    2. BlockingWaitStrategy
    3. TimeoutBlockingWaitStrategy

其余优化

  1. 伪共享解决

    后面提到的伪共享导致的误锁以及被误杀的cpu缓存问题,也有简略的解决办法。

    个别的Cache Line大小在64字节左右,而后Disruptor在十分重要的字段前后加了很多额定的无用字段。能够让这一个字段占满一整个缓存行,这样就能够防止未共享导致的误杀。

  1. 内存预调配

    在Disruptor中事件对象在ringBuffer中反对预调配,在新事件到来的时候能够将要害的信息复制到预调配的构造上。防止大量事件对象代来的GC问题。

  1. 批量申请Slot

    多生产、多消费者存在竞争的时候能够批量的申请多个可生产、可公布的slot,进一步缩小竞争带来的CAS开销。

理论利用

在咱们的特色服务零碎中应用Disruptor代替原先jdk的线程池,获得了十分不错的性能后果。

测试阐明

  1. 压测机器配置

    配置项配置值
    机器物理机
    零碎CentOS Linux release 7.3.1611 (Core)
    内存256G内存
    cpu40核
  2. 测试Case

    1. 通过异步客户端拜访特色服务随机查问若干特色,特色存储在(Redis、Tair、Hbase)三种内部存储中。
    2. 特色服务在物理机上部署单个节点。
    3. 测试线程池、Disruptor两个解决队列的吞吐能力、响应提早散布。

测试后果

压测流量还是从5w/s开始逐步提高压力直到10w/s

  1. 响应工夫

    在同样吞吐的状况下,disruptor比线程池模式更加问题,长尾响应更少。

  2. 超时率:

    超时率也是开启Disruptor之后更加稳固

参考资料

  1. Disruptor用户手册:https://lmax-exchange.github....
  2. DIsruptor technical paper :https://lmax-exchange.github....
  3. 但生产者模式阐述:https://mechanical-sympathy.b...
  4. Why Memory Bairriers:http://www.rdrop.com/users/pa...
  5. Log4j2 Asynchronous Loggers for Low-Latency Logging: https://logging.apache.org/lo...
本文公布自网易云音乐技术团队,文章未经受权禁止任何模式的转载。咱们长年招收各类技术岗位,如果你筹备换工作,又恰好喜爱云音乐,那就退出咱们 grp.music-fe(at)corp.netease.com!