关于java:单机最快MQDisruptor

48次阅读

共计 3154 个字符,预计需要花费 8 分钟才能阅读完成。

单机最快 MQ—Disruptor

明天来讲讲我所晓得的单机最快的 MQ,它叫 Disruptor

先来介绍一下 Disruptor,从翻译上来看,Disruptor—决裂、瓦解,Disruptor 是国外某个金融、股票交易所开发的,2011 年取得 Duke 奖,为成为单机最快的 MQ,性能及高,无锁 CAS,单机反对高并发

怎么样,心动了没?来来来,让我来带大家学习一下明天的配角—Disruptor

大家能够把 Disruptor 当做是内存里的高效的队列

Disruptor 简介

  • 无锁 (CAS)、高并发,应用环形 Buffer,间接笼罩(不必革除) 旧数据,升高 GC 频繁,实现了基于事件的生产者消费者模型(观察者模式)

    • 为什么说它是观察者模式呢?因为消费者时刻关注着队列里有没有音讯,一旦有新音讯产生,消费者线程就会立即把它生产

环形队列(RingBuffer)

  1. RingBuffer 有一个序号 sequence,指向下一个可用元素,采纳数组实现,没有首尾指针

    • Disruptor 要求你对他设置长度的时候,设置成 2 的 n 次幂,这样有利于二进制的运算


    首先,它是基于数组实现的,遍历起来要比链表要快
    其次不必保护首尾指针,当然他也没有首尾指针,之须要保护一个 sequence 即可

  2. ** 当所有地位都放满了,再放下一个时,就会把 0 号地位笼罩掉

这时就会有小伙伴焦急了,怎么能笼罩掉呢,那我数据不就失落了吗?**

那必定是不会就让他这么轻易滴把这数据笼罩掉滴,当须要笼罩数据时,会执行一个策略,Disruptor 给提供多种策略,说说比拟罕用的

  • BlockingWaitStrategy 策略,常见且默认的期待策略,当这个队列里满了,不执行笼罩,而是在里面阻塞期待
  • SleepingWaitStrategy 策略,看字面意思,用睡眠来期待,期待中循环调用 LockSupport.parkNanos(1)来睡眠
  • YieldingWaitStrategy 策略,循环期待 sequence 减少到适合的值,循环中调用 Thread.yieId(),容许其余筹备好的线程执行

Disruptor 开发步骤

  1. 定义 Event—队列中须要解决的元素
  2. 定义 Event 工厂,用于填充队列
  3. 定义 EventHandler(消费者),解决容器中的元素
// 定义 Event 音讯(事件)类
public class LongEvent{

    private long value;
    private String name;

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                ", name='" + name + '\'' +
                '}';
    }
    public String getName() {return name;}
    public void setName(String name) {this.name = name;}
    public long getValue() {return value;}
    public void setValue(long value) {this.value = value;}
}
// 定义音讯(事件)工厂
public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {return new LongEvent();
    }
}
// 定义音讯(事件)的生产形式
public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {System.out.println(longEvent.getName()+"-----"+longEvent.getValue());
    }
}
// 音讯(事件)生产者
public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;}
    public void onData(long val, String name) {long sequence = ringBuffer.next();
        try {LongEvent event = ringBuffer.get(sequence);
            event.setValue(val);
            event.setName(name);
        } finally {ringBuffer.publish(sequence);
        }
    }
}
public static void main(String[] args) {
        //new 一个音讯(事件)工厂
        LongEventFactory factory = new LongEventFactory();
        // 设置环形 Buffer 的 SIZE
        int size = 1024;
        //new Disruptor,参数是音讯(事件)工厂,Buffer 的 Size,线程工厂
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(factory, size, Executors.defaultThreadFactory());
        // 设置如何生产生产者产出的音讯(事件)longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 启动 -- 环形 Buffer 创立胜利,所有的地位均已创立好 Event 对象
        longEventDisruptor.start();
        // 获取 Disruptor 的环形 Buffer
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
        //new 音讯(事件)生产者
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        // 循环调用 - 往里增加音讯
        for(long l = 0; l<100; l++) {
            //TODO   调用 producer 的生产音讯(事件)的办法
            producer.onData(l,"MingLog-"+l);
            try {Thread.sleep(100);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        // 将音讯(事件)公布进来
        longEventDisruptor.shutdown();}

回过头来看看,为什么 Disruptor 这么快呢?

  1. 底层是数组,循环起来要比链表快
  2. 没有首尾指针,免去了保护两个指针的工夫
  3. start()办法被调用,Disruptor 被初始化,所有可用空间上的 Event 全副被初始化(提前创立好,每次进来在原对象上进行批改,不必从新 new,不必创立新的对象,也就能够升高 GC 的频率),因为是一开始就把所有的 Event 初始化好的,所以 next 获取下一个可用的 Event 时就不须要再去判断该 Event 是否被初始化,缩小了一步判断
  4. Disruptor 的 Size 是 2 的 n 次幂,不便进行二进制位运算,来确定音讯应该放在那个可用区域

好了,Disruptor 解说到这里就完结了,大家有什么想要学习的都能够私信或评论通知我哦 \~ 我会尽全力满足大家滴,我学,你也学,咳咳 \~ 广告看多了

点赞、关注来一波好吗,秋梨膏~

正文完
 0