一、Disruptor简介

  1. Disruptor是英国外汇交易公司LMAX开发的一个低提早高性能=无锁的有界循环数组。基于Disruptor开发的零碎单线程能撑持每秒600万订单,目前曾经开源的并发框架。Log4j2底层应用的并发框架
  2. Disruptor设计特点

    1. 环形数据结构:底层应用的是数组而非连表
    2. 元素地位定位:数组的长度是2^n,下标是递增的,能够通过位运算疾速定位
    3. 无锁设计,生产者或消费者须要先申请地位,申请胜利当前能力读写,申请过程中通过CAS保障线程平安。
  3. 用于解决单机多线程之间的数据交换,而非相似于kafka的分布式队列。

二、JDK里的队列解决方案

队里有界性底层构造
ArrayBlockingQueue有界有锁数组
LinkedBlockingQueue有界有锁链表
ConcurrentLinkedQueue无界无锁链表

在高并发且要求较高的稳定性的零碎场景下,非了避免生产者速度过快,只能选有界队列;同时,为了缩小Java的垃圾回收对系统性能的影响尽量抉择“数组”作为队列的底层构造,符合条件只有一个:ArrayBlockingQueue

2.1 ArrayBlockingQueue的问题

  1. 加锁:不加锁的性能 > CAS操作的性能 > 加锁的性能。

    2.1.2 伪共享

    伪共享:缓存零碎中是以缓存行(cache line)为单位存储的,当多线程批改相互独立的变量时,如果这些变量共享同一个缓存行,就会无心中影响彼此的性能,
  2. CPU 和主内存之间有好几层缓存,间隔CPU越近,缓存空间越小,速度越快。
  3. CPU运算时,优先从最近的缓存寻找数据,找不到时再往下层去找。

  1. 缓存系中以 缓存行(cache line) 为单位存储,一个缓存行有64字节,能够存储8个long类型数据。当cpu拜访一个long类型的数组,当数组中的一个值被加载到缓存中,它会额定加载另外7个。当数组的一个值生效,则整个缓存行生效,它将换出其余7个值。
  2. ArrayBlockingQueue有三个成员变量:

      • takeIndex:须要被取走的元素下标
      • putIndex:可被元素插入的地位的下标
      • count:队列中元素的数量

这三个变量很容易放到一个缓存行中,然而之间批改没有太多的关联。所以每次批改,都会使之前缓存的数据生效,从而不能齐全达到共享的成果。

public class ArrayBlockingQueue<E> {    /** The queued items */    final Object[] items;    /** items index for next take, poll, peek or remove */    int takeIndex;    /** items index for next put, offer, or add */    int putIndex;    /** Number of elements in the queue */    int count;}
  1. 伪共享解决思路:增大数组元素的距离使得由不同线程存取的元素位于不同的缓存行上,以空间换工夫。
// value1和value2可能会产生伪共享class ValueNoPadding {    protected volatile long value1 = 0L;    protected volatile long value2 = 0L;}// value1和value2两头插入无用值 p1~p14 class ValuePadding {    protected long p1, p2, p3, p4, p5, p6, p7;    protected volatile long value1 = 0L;    protected long p9, p10, p11, p12, p13, p14;    protected volatile long value2 = 0L;}

三、Disruptor

RingBuffer

  1. ringBuffer是一个环,用做在不同线程间传递数据的空间
  2. ringBuffer领有一个序号,整个序号是递增的,用于指向下一个可用元素。
  3. 队列空间在创立时就固定不再扭转,可用升高GC的压力

应用示例

  1. 筹备数据容器

    // 数据容器,寄存生产和生产的数据内容public class LongEvent { private long value;}
  2. 筹备数据容器的生产工厂,用于RingBuffer初始化时的数据填充

    // 数据容器生产工厂public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() {    return new LongEvent(); }}
  3. 筹备消费者

    //消费者public class LongEventConsumer implements EventHandler<LongEvent> { /**  *  * @param longEvent  * @param sequence 以后的序列  * @param endOfBatch 是否是最初一个数据  * @throws Exception  */ @Override public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {     String str = String.format("long event : %s l:%s b:%s", longEvent.getValue(), sequence, endOfBatch);     System.out.println(str); }}
  4. 生产线程、主线程

    public class Main { public static void main(String[] args) throws Exception {          // 线程工厂     ThreadFactory threadFactory = (r) -> new Thread(r);     // disruptor-创立一个disruptor     // 设置数据容器的工厂类,ringBuffer的初始化大小,消费者线程的工厂类     Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(), 8, threadFactory);     // disruptor-设置消费者     disruptor.handleEventsWith(new LongEventConsumer());     disruptor.start();          // 获取disruptor的RingBuffer     RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();     // 主线程开始生产     for (long l = 0; l <= 8; l++) {         long nextIndex = ringBuffer.next();                  LongEvent event = ringBuffer.get(nextIndex);         event.setValue(l);         ringBuffer.publish(nextIndex);                  Thread.sleep(1000);     } }}

实现原理

单生产者生产数据的流程

  1. 生产者线程申请写入M个数据
  2. disruptor从以后指针cursor程序去找M个可写空间,返回找到的可用空间的最大序号
  3. 通过CAS比对返回的序号和申请的序号是否统一,判断是否会笼罩未读的元素,若返回正确,间接写入数据

多生产者生产数据的流程

  1. 引入一个与ringBuffer大小雷同的buff:availableBuffer用于记录ringBuffer每一个空间的应用状况,若生产者写入数据,则将对应availableBuffer地位标记为写入胜利,若消费者读取了数据,则将对应的availableBuffer地位标记为闲暇。
  2. 多个生产者调配空间时,应用CAS给每一个线程获取不同的数组空间进行操作。
  3. 多个消费者在生产数据时,程序的从availableBuffer搜寻一段间断可读的空间,并返回该空间的最大序列号,并读取数据,同时将availableBuffer的对应的地位进行标记闲暇。

Disruptor 解决伪共享与线程可见性问题

// 数据左右两边插入多余变量隔离真正的变量class LhsPadding{    protected long p1, p2, p3, p4, p5, p6, p7;}class Value extends LhsPadding{    protected volatile long value;}class RhsPadding extends Value{    protected long p9, p10, p11, p12, p13, p14, p15;}
public class Sequence extends RhsPadding{    static final long INITIAL_VALUE = -1L;    private static final Unsafe UNSAFE;    private static final long VALUE_OFFSET;     public Sequence(final long initialValue)    {        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);    }    public long get()    {        return value;    }    // 应用UNSAFE操作间接批改内存值    public void set(final long value)    {        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);    }}

四、参考文献

  1. https://tech.meituan.com/2016...
  2. https://www.cnblogs.com/crazy...