乐趣区

关于后端:Disruptor生产和消费模式详解及高级应用并行模式

小伙伴们大家好,昨天的文章,带着大家扒开了 Disruptor 富丽的外衣,最重要的是咱们晓得了 Disruptor 高性能的起因几个重要的起因,

  • 引入环形的数组构造:数组元素不会被回收,防止频繁的 GC,
  • 无锁的设计:采纳 CAS 无锁形式,保障线程的安全性
  • 属性填充:通过增加额定的无用信息,防止伪共享问题
  • 元素地位的定位:采纳跟一致性哈希一样的形式,一个索引,进行自增

这篇文章就在上篇文章的根底上来点实战利用。钻研下 Disruptor 的生产和生产模式,以及高级利用,至此对于 Disruptor 的系列的文章,也就到此结束了,我曾经尽力了,如果还有什么没能满足大家需要的,以及对于文章的内容大家有任何其余的认识的,也欢送在评论区留言,毕竟自己满腹经纶,期待各位大佬能给小弟指导一二。

前两篇文章在这里哦:

  • 如此狂妄,自称高性能队列的 Disruptor 有啥来头?
  • Disruptor 测试后果运算 1 亿次,耗时 5503ms,吞吐量 18171000/s,于是我扒开了 Disruptor 高性能的外衣

生产和生产模式

依据下面的环形构造,咱们来具体分析一下 Disruptor 的工作原理。

​ Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(下面的 seq),那么这个是如何保障生产的音讯不会笼罩没有生产掉的音讯呢。

​ 在 Disruptor 中生产者分为单生产者和多生产者,而消费者并没有辨别。

​ 单生产者状况下,就是一般的生产者向 RingBuffer 中搁置数据,消费者获取最大可生产的地位,并进行生产。而多生产者时候,又多出了一个跟 RingBuffer 同样大小的 Buffer,称为 AvailableBuffer。

​ 在多生产者中,每个生产者首先通过 CAS 竞争获取能够写的空间,而后再进行缓缓往里放数据,如果正好这个时候消费者要生产数据,那么每个消费者都须要获取最大可生产的下标,这个下标是在 AvailableBuffer 进行获取失去的最长间断的序列下标。

5.1 单生产者生产数据

生产者单线程写数据的流程比较简单:

  1. 申请写入 m 个元素;
  2. 若是有 m 个元素能够入,则返回最大的序列号。这儿次要判断是否会笼罩未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

5.2 多生产者生产数据

​ 多个生产者的状况下,会遇到“如何避免多个线程反复写同一个元素”的问题。Disruptor 的解决办法是,每个线程获取不同的一段数组空间进行操作。这个通过 CAS 很容易达到。只须要在调配元素的时候,通过 CAS 判断一下这段空间是否曾经调配进来即可。

​ 然而会遇到一个新问题:如何避免读取的时候,读到还未写的元素。Disruptor 在多个生产者的状况下,引入了一个与 Ring Buffer 大小雷同的 buffer:available Buffer。当某个地位写入胜利的时候,便把 availble Buffer 相应的地位置位,标记为写入胜利。读取的时候,会遍历 available Buffer,来判断元素是否曾经就绪。

5.3.1 生产流程
  1. 申请写入 m 个元素;
  2. 若是有 m 个元素能够写入,则返回最大的序列号。每个生产者会被调配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置 available Buffer 外面相应的地位,以标记本人哪些地位是曾经写入胜利的。

如下图所示,Writer1 和 Writer2 两个线程写入数组,都申请可写的数组空间。Writer1 被调配了下标 3 到下表 5 的空间,Writer2 被调配了下标 6 到下标 9 的空间。

Writer1 写入下标 3 地位的元素,同时把 available Buffer 相应地位置位,标记曾经写入胜利,往后移一位,开始写下标 4 地位的元素。Writer2 同样的形式。最终都写入实现。

5.3.2 CAS 检测空间占用

避免不同生产者对同一段空间写入的代码,如下所示:

​ 通过 do/while 循环的条件 cursor.compareAndSet(current, next),来判断每次申请的空间是否曾经被其余生产者占据。如果曾经被占据,该函数会返回失败,While 循环从新执行,申请写入空间。

public long tryNext(int n) throws InsufficientCapacityException
{if (n < 1)
    {throw new IllegalArgumentException("n must be > 0");
    }
 
    long current;
    long next;
 
    do
    {current = cursor.get();
        next = current + n;
 
        if (!hasAvailableCapacity(gatingSequences, n, current))
        {throw InsufficientCapacityException.INSTANCE;}
    }
    while (!cursor.compareAndSet(current, next));
 
    return next;
}

5.4 多生产者生产数据

绿色代表曾经写 OK 的数据

​ 假如三个生产者在写中,还没有置位 AvailableBuffer,那么消费者可获取的生产下标只能获取到 6,而后等生产者都写 OK 后,告诉到消费者,消费者持续反复下面的步骤。

5.4.1 生产流程
  1. 申请读取到序号 n;
  2. 若 writer cursor >= n,这时依然无奈确定间断可读的最大下标。从 reader cursor 开始读取 available Buffer,始终查到第一个不可用的元素,而后返回最大间断可读元素的地位;
  3. 消费者读取元素

如下图所示,读线程读到下标为 2 的元素,三个线程 Writer1/Writer2/Writer3 正在向 RingBuffer 相应地位写数据,写线程被调配到的最大元素下标是 11。

读线程申请读取到下标从 3 到 11 的元素,判断 writer cursor>=11。而后开始读取 availableBuffer,从 3 开始,往后读取,发现下标为 7 的元素没有生产胜利,于是 WaitFor(11)返回 6。

而后,消费者读取下标从 3 到 6 共计 4 个元素。

6. 高级应用

6.1 并行模式

6.1.1 繁多写者模式

​ 在并发零碎中进步性能最好的形式之一就是繁多写者准则,对 Disruptor 也是实用的。如果在你的代码中仅仅有一个事件生产者,那么能够设置为繁多生产者模式来进步零碎的性能。

public class singleProductorLongEventMain {public static void main(String[] args) throws Exception { 
        //.....// Construct the Disruptor with a SingleProducerSequencer 
 
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                bufferSize, 
                ProducerType.SINGLE, // 繁多写者模式, 
                executor);//..... 
    } 
} 
6.1.2 串行生产

比方:当初触发一个注册 Event,须要有一个 Handler 来存储信息,一个 Hanlder 来发邮件等等。

/**
  * 串行顺次执行
  * <br/>
  * p --> c11 --> c21
  * @param disruptor
  */
 public static void serial(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
     disruptor.start();}
6.1.3 菱形形式执行
 public static void diamond(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
     disruptor.start();}
6.1.4 链式并行计算
 public static void chain(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
     disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
     disruptor.start();}
6.1.5 互相隔离模式
 public static void parallelWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
     disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
     disruptor.start();}
6.1.6 航道模式
/**
  * 串行顺次执行, 同时 C11,C21 别离有 2 个实例
   * <br/>
   * p --> c11 --> c21
   * @param disruptor
   */
  public static void serialWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
      disruptor.start();}

本文参加了思否技术征文,欢送正在浏览的你也退出。

如果本文对您有帮忙,欢送 关注 点赞`,您的反对是我保持创作的能源。

转载请注明出处!

退出移动版