单机最快MQ—Disruptor
明天来讲讲我所晓得的单机最快的MQ,它叫Disruptor
先来介绍一下Disruptor,从翻译上来看,Disruptor—决裂、瓦解,Disruptor是国外某个金融、股票交易所开发的,2011年取得Duke奖,为成为单机最快的MQ,性能及高,无锁CAS,单机反对高并发
怎么样,心动了没?来来来,让我来带大家学习一下明天的配角—Disruptor
大家能够把Disruptor当做是内存里的高效的队列
Disruptor简介
无锁(CAS)、高并发,应用环形Buffer,间接笼罩(不必革除)旧数据,升高GC频繁,实现了基于事件的生产者消费者模型(观察者模式)
- 为什么说它是观察者模式呢?因为消费者时刻关注着队列里有没有音讯,一旦有新音讯产生,消费者线程就会立即把它生产
环形队列(RingBuffer)
RingBuffer有一个序号sequence,指向下一个可用元素,采纳数组实现,没有首尾指针
- Disruptor要求你对他设置长度的时候,设置成2的n次幂,这样有利于二进制的运算
首先,它是基于数组实现的,遍历起来要比链表要快
其次不必保护首尾指针,当然他也没有首尾指针,之须要保护一个sequence即可- **当所有地位都放满了,再放下一个时,就会把0号地位笼罩掉
这时就会有小伙伴焦急了,怎么能笼罩掉呢,那我数据不就失落了吗?**
那必定是不会就让他这么轻易滴把这数据笼罩掉滴,当须要笼罩数据时,会执行一个策略,Disruptor给提供多种策略,说说比拟罕用的
- BlockingWaitStrategy策略,常见且默认的期待策略,当这个队列里满了,不执行笼罩,而是在里面阻塞期待
- SleepingWaitStrategy策略,看字面意思,用睡眠来期待,期待中循环调用LockSupport.parkNanos(1)来睡眠
- YieldingWaitStrategy策略,循环期待sequence减少到适合的值,循环中调用Thread.yieId(),容许其余筹备好的线程执行
Disruptor开发步骤
- 定义Event—队列中须要解决的元素
- 定义Event工厂,用于填充队列
- 定义EventHandler(消费者),解决容器中的元素
//定义Event音讯(事件)类public class LongEvent{ private long value; private String name; @Override public String toString() { return "LongEvent{" + "value=" + value + ", name='" + name + '\'' + '}'; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getValue() { return value; } public void setValue(long value) { this.value = value; }}
//定义音讯(事件)工厂public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); }}
//定义音讯(事件)的生产形式public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getName()+"-----"+longEvent.getValue()); }}
//音讯(事件)生产者public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long val, String name) { long sequence = ringBuffer.next(); try { LongEvent event = ringBuffer.get(sequence); event.setValue(val); event.setName(name); } finally { ringBuffer.publish(sequence); } }}
public static void main(String[] args) { //new一个音讯(事件)工厂 LongEventFactory factory = new LongEventFactory(); //设置环形Buffer的SIZE int size = 1024; //new Disruptor,参数是音讯(事件)工厂,Buffer的Size,线程工厂 Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(factory, size, Executors.defaultThreadFactory()); //设置如何生产生产者产出的音讯(事件) longEventDisruptor.handleEventsWith(new LongEventHandler()); //启动--环形Buffer创立胜利,所有的地位均已创立好Event对象 longEventDisruptor.start(); //获取Disruptor的环形Buffer RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer(); //new 音讯(事件)生产者 LongEventProducer producer = new LongEventProducer(ringBuffer); //循环调用-往里增加音讯 for(long l = 0; l<100; l++) { //TODO 调用producer的生产音讯(事件)的办法 producer.onData(l,"MingLog-"+l); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //将音讯(事件)公布进来 longEventDisruptor.shutdown(); }
回过头来看看,为什么Disruptor这么快呢?
- 底层是数组,循环起来要比链表快
- 没有首尾指针,免去了保护两个指针的工夫
- start()办法被调用,Disruptor被初始化,所有可用空间上的Event全副被初始化(提前创立好,每次进来在原对象上进行批改,不必从新new,不必创立新的对象,也就能够升高GC的频率),因为是一开始就把所有的Event初始化好的,所以next获取下一个可用的Event时就不须要再去判断该Event是否被初始化,缩小了一步判断
- Disruptor的Size是2的n次幂,不便进行二进制位运算,来确定音讯应该放在那个可用区域
好了,Disruptor解说到这里就完结了,大家有什么想要学习的都能够私信或评论通知我哦\~ 我会尽全力满足大家滴,我学,你也学,咳咳\~广告看多了
点赞、关注来一波好吗,秋梨膏~