关于算法工程:Disruptor在云音乐特征服务中的应用

34次阅读

共计 9751 个字符,预计需要花费 25 分钟才能阅读完成。

作者:章北海

咱们的线上特色数据服务 DataService,为了解决应用线程池模型导致机器 cpu 利用率不高,长尾申请提早不线性(p99、p999 呈现 J 型曲线)的问题。在利用 Disruptor 替换线程池之后获得不错的性能后果。本文次要是简略的介绍一下对 Disruptor 的集体了解以及落地的后果。

背景

Disruptor 是一个高性能的解决并发问题的框架,由 LMAX(一个英国做金融交易的公司)设计开发用于本人金融交易零碎建设。之后开源被很多出名的开源库应用,例如前段时间暴发破绽的 Log4j。

其中 Log4j2 应用 Disruptor 来优化多线程模式下的日志落盘性能,Log4j2 做了一个测试应用:同步模式(Sync)、Async(ArrayBlockingQueue)、ALL_ASYNC(Disruptor)别离进行压测,失去如下测试论断:https://logging.apache.org/lo…

Disruptor 模式的吞吐能力是 JDK ArrayBlockQueue 的 12 倍,是同步模式的 68 倍。

响应工夫 P99 指标 Disruptor 模式比 BlockQueue 也更加优良,尤其是开启了 Garbage-free 等优化参数之后。

通过 log4j 的例子看来,disruptor 能够让你的零碎在达到更高吞吐的同时带来更加稳固且低的响应工夫。

那么为什么 disruptor 能够带来这些收益而 jdk 的线程池模式又有什么问题呢?

Disruptor 介绍

LMAX 是一个金融交易公司,他们的交易中有大量的生产者消费者模型业务逻辑,很天然他们将生产者产出的数据放到队列中(eg. ArrayBlockingQueue)而后开启多个消费者线程进行并发生产。

而后他们测试了一把数据在队列中传递的性能跟拜访磁盘(RAID、SSD)差不多,当业务逻辑需数据要多个队列在不同的业务 Stage 之间传递数据时,多个串行的队列开销是不可忍耐的,而后他们开始剖析为什么 JDK 的队列会有这么重大的性能问题。

BolckQueue 的问题

为什么应用 BlockQueue 会有这么激烈的差异,以 Java 的 ArrayBlockingqueue 为例。底层实现其实是一个数组,在入队、出队的时候通过重入锁来保障并发状况下的队列数据的线程平安。

/**
 * ArrayBlockQueue 的入队实现
 */
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
            // 全局锁
        lock.lockInterruptibly();
        try {while (count == items.length) {if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {lock.unlock();
        }
}

/**
 * ArrayBlockQueue 的出队实现
 */
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {return (count == 0) ? null : dequeue();} finally {lock.unlock();
        }
}

/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {// assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();}

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {// assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

能够看到 ArrayBlockQueue 是由一个 ReentrantLock 在读写时进行互斥爱护,这样做会导致两个问题:

  1. 数据的出队、入队会互斥,不论是什么特点的利用都会频繁的引起锁碰撞。
  2. ReentrantLock 自身每次加锁可能会引起多个 cas 操作,而每个 Cas 锁操作的代价没有设想中的那么小。

    1. 锁状态变更触发 cas 操作。
    2. 锁竞争失败之后进入竞争队列会触发 cas。
    3. 当持有锁线程开释之后通过 Condition 同步,唤醒竞争线程之后,唤醒线程出队还会导致 Cas 操作。

为了验证这个猜测 LMAX 又跑了一个测试,验证各种 Lock 的开销到底有多大。

他们的测试 Case 是将一个 int64 始终累加一亿次,区别只是应用单个线程、单个线程加锁(synchronize、cas)、还是多个线程加锁(synchronize、cas)。

失去的测试后果如下:https://lmax-exchange.github….,The%20Cost%20of%20Locks,-Locks%20provide%20mutual

  1. 当单个线程无锁执行时只须要 300ms 就能够实现。
  2. 当单个线程加锁(理论没有竞争)执行时,就须要 10s。
  3. 单个线程应用 CAS 执行时比互斥锁体现好一点。
  4. 当线程越多,不论是互斥锁还是 CAS 测试 Case 执行的耗时越来越大。
  5. volatile 修饰符跟 CAS 体现在数量级上差不多。
Method Time (ms)
Single thread 300
Single thread with lock 10,000
Two threads with lock 224,000
Single thread with CAS 5,700
Two threads with CAS 30,000
Single thread with volatile write 4,700

这样看起来锁还有 CAS 操作的开销比设想中的高很多,那么具体为什么会有这么大的性能开销。

在并发环境中(在 Java 生态)锁的实现有两种:synchronize、cas,上面别离剖析两种锁的开销。

Synchronize 开销

jdk 对于 synchronize 的介绍:https://wiki.openjdk.java.net…

在 java 中互斥锁就体现在 synchronize 关键字润饰的代码块中(synchronize 在锁降级中会应用 Mutex 实现的,对于 Linux 就是 pthread_mutex_t)。

  1. 内核仲裁

    当 synchronize 关键字润饰的代码块被多个线程竞争时就须要进行用户态、内核态切换,须要零碎内核仲裁竞争资源的归属,这种切换的代价是十分低廉的(保留和复原一些寄存器、内存数据等过程上下文数据)。

  2. 缓存净化

    当初 CPU 都有多个外围,因为外围的计算能力远远高于内存的 IO 能力。为了和谐解决外围跟内存的速度差别,引入了 cpu 缓存。当外围执行运算时如果须要内存数据先从 L1 缓存中获取、如果没命中就从 L2 缓存获取如果始终没命中就从主从中 load。

    当产生线程上下文切换,切换走的线程就会让出 CPU 让另外的线程去执行他的逻辑,而他刚刚从主存中 load 进来的数据就会被新的线程净化。下次他竞争胜利,还是须要再次从主从中 load 数据,竞争会加剧缓存净化进一步影响零碎性能。

    从 CPU 到 大概须要的 CPU 周期 大概须要的工夫
    主存 约 60-80ns
    QPI 总线传输 (between sockets, not drawn) 约 20ns
    L3 cache 约 40-45 cycles 约 15ns
    L2 cache 约 10 cycles 约 3ns
    L1 cache 约 3 -4 cycles 约 1ns
    寄存器 1 cycle
  3. 伪共享

    互斥锁还会引发自身没有加锁的变量被迫互斥的问题。

    CPU 缓存治理的根本然而是缓存行,当 cpu 须要从主存中 load 数据时会依照缓存行的大小将对应地位的内存块一起 load 进去。当 cpu 批改内存中的数据时,也是间接批改缓存中的数据,有缓存一致性协定保障将缓存中的变动刷到内存中。

    看上面这个例子:

    class Eg {
      private int a;
      private int b;
      
      public void synchronize incr_a(){a++;}
      
      public void incr_b(){b++;}
    }

    这个对象中 a、b 两个字段很大概率被调配到相邻的内存中,当 cpu 触发缓存 load 时这块内存很可能会被一起加载到同一个缓存行。

    当一个线程调用 incr_a 的同时另外一个线程调用 incr_b 办法时,因为 incr_a 被互斥锁爱护导致持有 a、b 两个变量的缓存行也被互斥锁爱护起来,这样尽管 incr_b 没有显示的互斥锁但实际上也被锁住了,这个景象被成为伪共享。

  4. 额定的 CAS 开销

    在 Synchronize 在外部保护了 count 计数、对象头中有持有线程的 id 等变量,当线程屡次进入竞争块时须要通过 CAS 操作去更改 count 计数、对象头中的线程 id,所以 synchronize 自身还会有 cas 的开销。

CAS 的开销

CAS 是古代处理器反对的一个原子指令(例如: lock cmpxchg x86),具体的含意是当变更的变量原始值合乎冀望就间接更新,不合乎冀望就失败。

在 Java 中各种 AutoXX 类就是对 CAS 指令的封装,其中 java 的重入锁(ReentrantLock)的实现原理就是一个 CAS 操作。

对应 Cas 自身的开销问题这里能够思考这样的一个例子:

假如位于两个外围的两个线程同时 CAS 一个变量 a,当线程 1CAS 胜利时将数据变更写入到缓存 A 中。那么这个时候线程 2 怎么可能感知到变量 a 当初的值曾经产生了变更,本次 CAS 操作须要失败呢。

这里就须要缓存一致性协定来进行保障,须要在 CAS 变量的变更前后插入内存屏障来保障变量在多个外围中的可见性,这也是 java volatile 关键字的实在含意。

所以在最开始的那个例子中,cas 操作跟 volatile 变量的性能体现差不多的起因,就是两者都须要进行缓存同步。

这里咱们须要意识到,cas 操作尽管比互斥锁性能更好然而也不是齐全没有开销的。当大量的 cas 操作失败重试导致大量的缓存生效有时候会引发更为严重的问题。

具体的缓存一致性以及内存屏障的细节能够参考这个文章:http://www.rdrop.com/users/pa…

Disruptor 的优化

LMAX 在大量的测试跟深入分析之后,正视锁的开销,依照他们的业务形象出了一套通用的能够做到无锁的并发解决框架。

组件阐明

在具体介绍 Disruptor 之前先简略的对 Disruptor 的外围形象进行阐明。

形象组件 阐明
Ring Buffer 环形队列,用于寄存生产者、消费者之间流转的事件数据
Sequence 一个自增序列,用于生产者、消费者之间寄存能够被(公布、生产)的队列游标,能够简略认为是一个 AutomicLong 的自定义实现。
Sequencer 持有生产者、消费者的 Sequence,是用于协调两边的并发问题,是 Disruptor 的外围组件。
Sequence Barrier 由 Sequencer 创立用于消费者能够跟踪到上游生产者的状况,获取可生产的事件。
Wait Strategy 用于消费者期待可生产事件时的策略,有很多实现策略。
Event 业务事件
Event Processor 业务事件生产程序,能够认为是物理线程的形象
Event Handler 实在的业务解决逻辑,每个 Processor 持有一个
Producer 生产者

数据生产

数据的生产非常简单,有两种状况:

  1. 单生产者

    单生产者的状况下,向 RingBuffer 中生产数据是没有任何竞争的,惟一须要留神的点是须要关注消费者的生产能力,不要笼罩了最慢的消费者未生产的数据。

    为了达到这个目标,须要通过 Sequencer 来察看最慢的消费者的生产进度,代码如下,能够看到只有一次 volatile 操作全程不会有任何锁:

    // 申请 n 个可用于公布的 slot
    public long next(int n)
        {if (n < 1)
            {throw new IllegalArgumentException("n must be > 0");
            }
    
            long nextValue = this.nextValue;
    
            long nextSequence = nextValue + n;
            long wrapPoint = nextSequence - bufferSize;
            long cachedGatingSequence = this.cachedValue;
    
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
            {        
                // 一次 volatile 操作
                cursor.setVolatile(nextValue);  // StoreLoad fence
    
                long minSequence;
                while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){
                    // 当最慢的消费者进度低于以后须要申请的 slot 时,尝试唤醒消费者(唤醒策略不同体现不同,很多策略基本不会阻塞会始终 spin)waitStrategy.signalAllWhenBlocking();
                     // park 1 纳秒持续尝试
                    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
                }
                     // 申请胜利
                this.cachedValue = minSequence;
            }
    
            this.nextValue = nextSequence;
    
            return nextSequence;
     }
  1. 多生产者

    多生产者比较复杂的点是生产者线程之前有写竞争,须要 CAS 来进行协调。也就是生产者的 Seq 须要额定进行一个 CAS 操作、全程无锁,申请代码如下:

    // 申请 n 个可公布的 slot
    public long next(int n)
        {if (n < 1)
            {throw new IllegalArgumentException("n must be > 0");
            }
    
            long current;
            long next;
    
            do
            {current = cursor.get();
                next = current + n;
    
                long wrapPoint = next - bufferSize;
                long cachedGatingSequence = gatingSequenceCache.get();
    
                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                {long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
    
                    if (wrapPoint > gatingSequence){
                      // 当最慢的消费者进度低于以后须要申请的 slot 时,尝试唤醒消费者(唤醒策略不同体现不同,很多策略基本不会阻塞会始终 spin)waitStrategy.signalAllWhenBlocking();
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }
    
                    gatingSequenceCache.set(gatingSequence);
                }
                else if (cursor.compareAndSet(current, next)){
                     // 通过自旋 + cas 去协调多个生产者的
                    break;
                }
            }
            while (true);
    
            return next;
        }

    尽管相比单生产者仅仅多了一个 CAS 操作,然而 Disruptr 的外围作者始终强调为了更高的吞吐以及跟稳固的提早,单生产者的设计准则是十分有必要的,否则随着吞吐的升高长尾的申请会呈现不线性的提早增长。

    具体作者的文章见:https://mechanical-sympathy.b…

数据生产

不论是单生产者、还是多生产者数据的生产都是不受影响的。Disruptor 反对开启多个 Processor(也就是线程),每个 Processor 应用相似 while true 的模式拉取可生产的事件进行解决。

这样的跟线程池模式的益处是防止线程创立、销毁、上下文切换代理的性能损失(缓存净化……)。

对于多个消费者之间的竞争关系通过 Sequence Barrier 这个形象组件进行协调,代码见下,能够看到除期待策略可能有策略是锁实现、其余步骤全程无锁。

while (true){
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {nextSequence = workSequence.get() + 1L;
                        // 一次 Store/Store barrier  
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                    // 通过自旋 + cas 协调消费者进度
                }

                if (cachedAvailableSequence >= nextSequence){
                    // 批量申请 slot 进度高于以后进度,间接生产
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else{
                    // 无音讯可生产是更具不同的策略进行期待(能够阻塞、能够自旋、能够阻塞 + 超时……)cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
      catch (final TimeoutException e){notifyTimeout(sequence.get());
            }
     catch (final AlertException ex){if (!running.get())
                {break;}
            }
     catch (final Throwable ex){
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
     }
}

能够看到最外围的生产者、消费者并发协调实现是 waitStrategy,框架自身反对多种 waitStrategy。

名称 措施 实用场景
BlockingWaitStrategy synchronized CPU 资源紧缺,吞吐量和提早并不重要的场景
BusySpinWaitStrategy 自旋(while true) 通过一直重试,缩小切换线程导致的零碎调用,而升高提早。举荐在线程绑定到固定的 CPU 的场景下应用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU 资源紧缺,吞吐量和提早并不重要的场景
SleepingWaitStrategy 自旋 + parkNanos 性能和 CPU 资源之间有很好的折中。提早不平均
TimeoutBlockingWaitStrategy synchronized + 有超时限度 CPU 资源紧缺,吞吐量和提早并不重要的场景
YieldingWaitStrategy 自旋 + yield 性能和 CPU 资源之间有很好的折中。提早比拟平均

对于以上的多种策略其实能够分为两类:

  1. 能够焚烧 CPU 性能,以极限高吞吐、低提早为指标的

    1. YieldingWaitStrategy,一直的自旋 Yield
    2. BusySpinWaitStrategy,一直的 while true
    3. PhasedBackoffWaitStrategy,能够反对自定义的策略
  2. 对极限性能要求不高

    1. SleepingWaitStrategy,对主线程影响很小例如 Log4j 实现
    2. BlockingWaitStrategy
    3. TimeoutBlockingWaitStrategy

其余优化

  1. 伪共享解决

    后面提到的伪共享导致的误锁以及被误杀的 cpu 缓存问题,也有简略的解决办法。

    个别的 Cache Line 大小在 64 字节左右,而后 Disruptor 在十分重要的字段前后加了很多额定的无用字段。能够让这一个字段占满一整个缓存行,这样就能够防止未共享导致的误杀。

  1. 内存预调配

    在 Disruptor 中事件对象在 ringBuffer 中反对预调配,在新事件到来的时候能够将要害的信息复制到预调配的构造上。防止大量事件对象代来的 GC 问题。

  1. 批量申请 Slot

    多生产、多消费者存在竞争的时候能够批量的申请多个可生产、可公布的 slot,进一步缩小竞争带来的 CAS 开销。

理论利用

在咱们的特色服务零碎中应用 Disruptor 代替原先 jdk 的线程池,获得了十分不错的性能后果。

测试阐明

  1. 压测机器配置

    配置项 配置值
    机器 物理机
    零碎 CentOS Linux release 7.3.1611 (Core)
    内存 256G 内存
    cpu 40 核
  2. 测试 Case

    1. 通过异步客户端拜访特色服务随机查问若干特色,特色存储在(Redis、Tair、Hbase)三种内部存储中。
    2. 特色服务在物理机上部署单个节点。
    3. 测试线程池、Disruptor 两个解决队列的吞吐能力、响应提早散布。

测试后果

压测流量还是从 5w/ s 开始逐步提高压力直到 10w/s

  1. 响应工夫

    在同样吞吐的状况下,disruptor 比线程池模式更加问题,长尾响应更少。

  2. 超时率:

    超时率也是开启 Disruptor 之后更加稳固

参考资料

  1. Disruptor 用户手册:https://lmax-exchange.github….
  2. DIsruptor technical paper:https://lmax-exchange.github….
  3. 但生产者模式阐述:https://mechanical-sympathy.b…
  4. Why Memory Bairriers:http://www.rdrop.com/users/pa…
  5. Log4j2 Asynchronous Loggers for Low-Latency Logging: https://logging.apache.org/lo…

本文公布自网易云音乐技术团队,文章未经受权禁止任何模式的转载。咱们长年招收各类技术岗位,如果你筹备换工作,又恰好喜爱云音乐,那就退出咱们 grp.music-fe(at)corp.netease.com!

正文完
 0