关于算法工程:Disruptor在云音乐特征服务中的应用
作者:章北海咱们的线上特色数据服务DataService,为了解决应用线程池模型导致机器cpu利用率不高,长尾申请提早不线性(p99、p999呈现J型曲线)的问题。在利用Disruptor替换线程池之后获得不错的性能后果。本文次要是简略的介绍一下对Disruptor的集体了解以及落地的后果。 背景Disruptor是一个高性能的解决并发问题的框架,由LMAX(一个英国做金融交易的公司)设计开发用于本人金融交易零碎建设。之后开源被很多出名的开源库应用,例如前段时间暴发破绽的Log4j。 其中Log4j2应用Disruptor来优化多线程模式下的日志落盘性能,Log4j2做了一个测试应用:同步模式(Sync)、Async(ArrayBlockingQueue)、ALL_ASYNC(Disruptor)别离进行压测,失去如下测试论断:https://logging.apache.org/lo... Disruptor模式的吞吐能力是JDK ArrayBlockQueue的12倍,是同步模式的68倍。 响应工夫P99指标Disruptor模式比BlockQueue也更加优良,尤其是开启了Garbage-free等优化参数之后。 通过log4j的例子看来,disruptor能够让你的零碎在达到更高吞吐的同时带来更加稳固且低的响应工夫。 那么为什么disruptor能够带来这些收益而jdk的线程池模式又有什么问题呢? Disruptor介绍LMAX是一个金融交易公司,他们的交易中有大量的生产者消费者模型业务逻辑,很天然他们将生产者产出的数据放到队列中(eg. ArrayBlockingQueue)而后开启多个消费者线程进行并发生产。 而后他们测试了一把数据在队列中传递的性能跟拜访磁盘(RAID、SSD)差不多,当业务逻辑需数据要多个队列在不同的业务Stage之间传递数据时,多个串行的队列开销是不可忍耐的,而后他们开始剖析为什么JDK的队列会有这么重大的性能问题。 BolckQueue的问题为什么应用BlockQueue会有这么激烈的差异,以Java的ArrayBlockingqueue为例。底层实现其实是一个数组,在入队、出队的时候通过重入锁来保障并发状况下的队列数据的线程平安。 /** * ArrayBlockQueue的入队实现 */public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 全局锁 lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); }}/** * ArrayBlockQueue的出队实现 */public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); }}/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();}/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x;}能够看到ArrayBlockQueue是由一个ReentrantLock在读写时进行互斥爱护,这样做会导致两个问题: ...