Java中有哪些队列
ArrayBlockingQueue 应用ReentrantLock
LinkedBlockingQueue 应用ReentrantLock
ConcurrentLinkedQueue 应用CAS
等等
咱们分明应用锁的性能比拟低,尽量应用无锁设计。接下来就咱们来意识下Disruptor。
Disruptor简略应用
github地址:github.com/LMAX-Exchan…
先简略介绍下:
Disruptor它是一个开源的并发框架,并取得2011 Duke’s程序框架创新奖【Oracle】,可能在无锁的状况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能撑持每秒600万订单~
日志框架Log4j2 异步模式采纳了Disruptor来解决
局限呢,他就是个内存队列,也就是说无奈撑持分布式场景。
简略应用
数据传输对象
@Data
public class EventData {
private Long value;
}
复制代码
消费者
public class EventConsumer implements WorkHandler<EventData> {
/** * 生产回调 * @param eventData * @throws Exception */@Overridepublic void onEvent(EventData eventData) throws Exception { Thread.sleep(5000); System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());}
}
复制代码
生产者
public class EventProducer {
private final RingBuffer<EventData> ringBuffer;public EventProducer(RingBuffer<EventData> ringBuffer) { this.ringBuffer = ringBuffer;}public void sendData(Long v){ // cas展位 long next = ringBuffer.next(); try { EventData eventData = ringBuffer.get(next); eventData.setValue(v); } finally { // 告诉期待的消费者 System.out.println("EventProducer send success, sequence:"+next); ringBuffer.publish(next); }}
}
复制代码
测试类
public class DisruptorTest {
public static void main(String[] args) { // 2的n次方 int bufferSize = 8; Disruptor<EventData> disruptor = new Disruptor<EventData>( () -> new EventData(), // 事件工厂 bufferSize, // 环形数组大小 Executors.defaultThreadFactory(), // 线程池工厂 ProducerType.MULTI, // 反对多事件发布者 new BlockingWaitStrategy()); // 期待策略 // 设置消费者 disruptor.handleEventsWithWorkerPool( new EventConsumer(), new EventConsumer(), new EventConsumer(), new EventConsumer()); disruptor.start(); RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer(); EventProducer eventProducer = new EventProducer(ringBuffer); long i = 0; for(;;){ i++; eventProducer.sendData(i); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } }}
}
复制代码
外围组件
基于下面简略例子来看的确很简略,Disruptor帮咱们封装好了生产生产模型的实现,接下来咱们来看下他是基于哪些外围组件来撑持起一个高性能无锁队列呢?
RingBuffer: 环形数组,底层应用数组entries,在初始化时填充数组,防止一直新建对象带来的开销。后续只会对entries做更新操作
Sequencer: 外围管家
定义生产同步的实现:SingleProducerSequencer单生产、MultiProducerSequencer多生产
以后写的进度Sequence cursor
所有消费者进度的数组Sequence[] gatingSequences
MultiProducerSequencer可用区availableBuffer【利用空间换取查问效率】
Sequence: 自身就是一个序号器用来标识解决进度,也能够当做是一个atomicInteger; 还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在前面介绍。
workProcessor: 解决Event的循环,在循环中获取Disruptor的事件,而后把事件调配给各个handler
EventHandler: 负责业务逻辑的handler,本人实现。
WaitStrategy: 消费者 如何期待 事件的策略,定义了如下策略
leepingWaitStrategy:自旋 + yield + sleep
BlockingWaitStrategy:加锁,适宜CPU资源缓和(不须要切换线程),零碎吞吐量无要求的
YieldingWaitStrategy:自旋 + yield + 自旋
BusySpinWaitStrategy:自旋,缩小线程之前切换
PhasedBackoffWaitStrategy:自旋 + yield + 自定义策略
带着问题来解析代码?
1、多生产者如何保障音讯生产不会互相笼罩。【如何达到互斥成果】
每个线程获取不同的一段数组空间,而后通过CAS判断这段空间是否曾经调配进来。
接下来咱们看下多生产类MultiProducerSequencer中next办法【获取生产序号】
// 消费者上一次生产的最小序号 // 后续第二点会讲到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 以后进度的序号
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消费者的序号 //后续第二点会讲到
protected volatile Sequence[] gatingSequences = new Sequence[0];
public long next(int n)
{ if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { // 以后进度的序号,Sequence的value具备可见性,保障多线程间线程之间能感知到可申请的最新值 current = cursor.get(); // 要申请的序号空间:最大序列号 next = current + n; long wrapPoint = next - bufferSize; // 消费者最小序列号 long cachedGatingSequence = gatingSequenceCache.get(); // 大于一圈 || 最小生产序列号>以后进度 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 阐明大于1圈,并没有多余空间能够申请 if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } // 更新最小值到Sequence的value中 gatingSequenceCache.set(gatingSequence); } // CAS胜利后更新以后Sequence的value else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next;}
复制代码
2、生产者向序号器申请写的序号,如序号正在被生产,Sequencer是如何晓得哪些序号是能够被写入的呢?【未生产则被笼罩如何解决】
从gatingSequences中获得最小的序号,生产者最多能写到这个序号的后一位。艰深来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号-buffersize 要小于/等于 最小生产的序列号】的时候, 能力申请到以后写的序号
public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);consumerRepository.add(workerPool, sequenceBarrier);final Sequence[] workerSequences = workerPool.getWorkerSequences();updateGatingSequencesForNextInChain(barrierSequences, workerSequences);return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0){ // 消费者启动后就会将所有消费者寄存入AbstractSequencer中gatingSequences ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}
}
复制代码
3、在多生产者状况下,生产者是申请到一段可写入的序号,而后再写入这些序号中,那么消费者是如何感知哪些序号是能够被生产的呢?【借问提1图阐明】
这个前提是多生产者状况下,第一点咱们说过每个线程获取不同的一段数组空间,那么当初单单通过序号曾经不够用了,MultiProducerSequencer应用了int 数组 【availableBuffer】来标识以后序号是否可用。当生产者胜利生产事件后会将availableBuffer中以后序列号置为1标识能够读取。
如此消费者能够读取的的最大序号就是咱们availableBuffer中第一个不可用序号-1。
初始化availableBuffer流程
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);// 初始化可用数组availableBuffer = new int[bufferSize];indexMask = bufferSize - 1;indexShift = Util.log2(bufferSize);initialiseAvailableBuffer();
}
// 初始化默认availableBuffer为-1
private void initialiseAvailableBuffer()
{
for (int i = availableBuffer.length - 1; i != 0; i--){ setAvailableBufferValue(i, -1);}setAvailableBufferValue(0, -1);
}
// 生产者胜利生产事件将可用区数组置为1
public void publish(final long sequence)
{
setAvailable(sequence);waitStrategy.signalAllWhenBlocking();
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
复制代码
消费者生产流程
WorkProcessor类中生产run办法
public void run()
{ boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // 先通过cas获取生产事件的占有权 if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } // 数据就绪,能够生产 if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); // 触发回调函数 workHandler.onEvent(event); processedSequence = true; } else { // 获取能够被读取的下标 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } // ....省略 } notifyShutdown(); running.set(false);}public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{ checkAlert(); // 这个值获取的current write 下标,能够认为全局生产下标。此处与每一段的write1和write2下标辨别开 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 通过availableBuffer筛选出第一个不可用序号 -1 return sequencer.getHighestPublishedSequence(sequence, availableSequence);}public long getHighestPublishedSequence(long lowerBound, long availableSequence){ // 从current read下标开始, 循环至 current write,如果碰到availableBuffer 为-1 间接返回 for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence;}
复制代码
解决伪共享问题
什么是伪共享问题呢?
为了进步CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因而在一个缓存行中能够存8个long类型的变量。如果你拜访一个long数组,当数组中的一个值被加载到缓存中,它会额定加载另外7个。因而你能十分快地遍历这个数组。
关注公众号:码猿技术专栏 每天定时推送更多精彩内容
伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cache line中,此时线程1批改数据,线程2取其数据时就要从新从内存中拉取,两个线程相互影响,导致数据尽管在cache line中,每次却要去内存中拉取。
Disruptor是如何解决的呢?
在value前后对立都退出7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存
回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?
缓存行填充,防止缓存频繁生效。【java8中也引入@sun.misc.Contended注解来防止伪共享】
无锁竞争:通过CAS 【二阶段提交】
环形数组:数据都是笼罩,防止GC
底层更多的应用位运算来晋升效率