共计 3865 个字符,预计需要花费 10 分钟才能阅读完成。
一、Disruptor 简介
- Disruptor 是英国外汇交易公司 LMAX 开发的一个低提早高性能 = 无锁的有界循环数组。基于 Disruptor 开发的零碎单线程能撑持每秒 600 万订单,目前曾经开源的并发框架。Log4j2 底层应用的并发框架
-
Disruptor 设计特点
- 环形数据结构:底层应用的是数组而非连表
- 元素地位定位:数组的长度是 2^n,下标是递增的,能够通过位运算疾速定位
- 无锁设计,生产者或消费者须要先申请地位,申请胜利当前能力读写,申请过程中通过 CAS 保障线程平安。
- 用于解决单机多线程之间的数据交换,而非相似于 kafka 的分布式队列。
二、JDK 里的队列解决方案
队里 | 有界性 | 锁 | 底层构造 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 有锁 | 数组 |
LinkedBlockingQueue | 有界 | 有锁 | 链表 |
ConcurrentLinkedQueue | 无界 | 无锁 | 链表 |
在高并发且要求较高的稳定性的零碎场景下,非了避免生产者速度过快,只能选有界队列;同时,为了缩小 Java 的垃圾回收对系统性能的影响尽量抉择“数组”作为队列的底层构造,符合条件只有一个:ArrayBlockingQueue
2.1 ArrayBlockingQueue 的问题
-
加锁:不加锁的性能 > CAS 操作的性能 > 加锁的性能。
2.1.2 伪共享
伪共享:缓存零碎中是以缓存行(cache line)为单位存储的,当多线程批改相互独立的变量时,如果这些变量共享同一个缓存行,就会无心中影响彼此的性能,
- CPU 和主内存之间有好几层缓存,间隔 CPU 越近,缓存空间越小,速度越快。
- CPU 运算时,优先从最近的缓存寻找数据,找不到时再往下层去找。
- 缓存系中以 缓存行 (cache line) 为单位存储, 一个缓存行有 64 字节,能够存储 8 个 long 类型数据。当 cpu 拜访一个 long 类型的数组,当数组中的一个值被加载到缓存中,它会额定加载另外 7 个。当数组的一个值生效,则整个缓存行生效,它将换出其余 7 个值。
-
ArrayBlockingQueue 有三个成员变量:
-
- takeIndex:须要被取走的元素下标
-
- putIndex:可被元素插入的地位的下标
-
- count:队列中元素的数量
-
这三个变量很容易放到一个缓存行中,然而之间批改没有太多的关联。所以每次批改,都会使之前缓存的数据生效,从而不能齐全达到共享的成果。
public class ArrayBlockingQueue<E> {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
}
- 伪共享解决思路:增大数组元素的距离使得由不同线程存取的元素位于不同的缓存行上,以空间换工夫。
// value1 和 value2 可能会产生伪共享
class ValueNoPadding {
protected volatile long value1 = 0L;
protected volatile long value2 = 0L;
}
// value1 和 value2 两头插入无用值 p1~p14
class ValuePadding {
protected long p1, p2, p3, p4, p5, p6, p7;
protected volatile long value1 = 0L;
protected long p9, p10, p11, p12, p13, p14;
protected volatile long value2 = 0L;
}
三、Disruptor
RingBuffer
- ringBuffer 是一个环,用做在不同线程间传递数据的空间
- ringBuffer 领有一个序号,整个序号是递增的,用于指向下一个可用元素。
- 队列空间在创立时就固定不再扭转,可用升高 GC 的压力
应用示例
-
筹备数据容器
// 数据容器, 寄存生产和生产的数据内容 public class LongEvent {private long value;}
-
筹备数据容器的生产工厂,用于 RingBuffer 初始化时的数据填充
// 数据容器生产工厂 public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {return new LongEvent(); } }
-
筹备消费者
// 消费者 public class LongEventConsumer implements EventHandler<LongEvent> { /** * * @param longEvent * @param sequence 以后的序列 * @param endOfBatch 是否是最初一个数据 * @throws Exception */ @Override public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {String str = String.format("long event : %s l:%s b:%s", longEvent.getValue(), sequence, endOfBatch); System.out.println(str); } }
-
生产线程、主线程
public class Main {public static void main(String[] args) throws Exception { // 线程工厂 ThreadFactory threadFactory = (r) -> new Thread(r); // disruptor- 创立一个 disruptor // 设置数据容器的工厂类,ringBuffer 的初始化大小, 消费者线程的工厂类 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(), 8, threadFactory); // disruptor- 设置消费者 disruptor.handleEventsWith(new LongEventConsumer()); disruptor.start(); // 获取 disruptor 的 RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 主线程开始生产 for (long l = 0; l <= 8; l++) {long nextIndex = ringBuffer.next(); LongEvent event = ringBuffer.get(nextIndex); event.setValue(l); ringBuffer.publish(nextIndex); Thread.sleep(1000); } } }
实现原理
单生产者生产数据的流程
- 生产者线程申请写入 M 个数据
- disruptor 从以后指针 cursor 程序去找 M 个可写空间,返回找到的可用空间的最大序号
- 通过 CAS 比对返回的序号和申请的序号是否统一,判断是否会笼罩未读的元素,若返回正确,间接写入数据
多生产者生产数据的流程
- 引入一个与 ringBuffer 大小雷同的 buff:availableBuffer 用于记录 ringBuffer 每一个空间的应用状况,若生产者写入数据,则将对应 availableBuffer 地位标记为写入胜利,若消费者读取了数据,则将对应的 availableBuffer 地位标记为闲暇。
- 多个生产者调配空间时,应用 CAS 给每一个线程获取不同的数组空间进行操作。
- 多个消费者在生产数据时,程序的从 availableBuffer 搜寻一段间断可读的空间,并返回该空间的最大序列号,并读取数据,同时将 availableBuffer 的对应的地位进行标记闲暇。
Disruptor 解决伪共享与线程可见性问题
// 数据左右两边插入多余变量隔离真正的变量
class LhsPadding
{protected long p1, p2, p3, p4, p5, p6, p7;}
class Value extends LhsPadding
{protected volatile long value;}
class RhsPadding extends Value
{protected long p9, p10, p11, p12, p13, p14, p15;}
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
public Sequence(final long initialValue)
{UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
public long get()
{return value;}
// 应用 UNSAFE 操作间接批改内存值
public void set(final long value)
{UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
}
四、参考文献
- https://tech.meituan.com/2016…
- https://www.cnblogs.com/crazy…
正文完