关于java:Disruptor测试结果运算1亿次耗时5503ms吞吐量18171000s于是我扒开了Disruptor高性能的外衣

9次阅读

共计 14722 个字符,预计需要花费 37 分钟才能阅读完成。

hello 小伙伴儿们,昨天搞了一篇 Disruptor 的入门文章,看大家反馈不错,在大家一再催更下,昨天熬夜至上班,终于续写了第二篇 Disruptor 的高性能原理分析的文章,为大家揭开 Disruptor 高性能的神秘外衣。
如果小伙伴,错过了入门 Disruptor 的入门篇的文章,在这里自行查看:

如此狂妄,自称高性能队列的 Disruptor 有啥来头?

能比照测试

为了直观地感触 Disruptor 有多快,设计了一个性能比照测试:Producer 公布 1 亿次事件,从公布第一个事件开始计时,捕获 Consumer 解决完所有事件的耗时。

测试用例在 Producer 如何将事件告诉到 Consumer 的实现形式上,设计了两种不同的实现:

  1. Producer 的事件公布和 Consumer 的事件处理在不同的线程,通过 ArrayBlockingQueue 传递给 Consumer 进行解决;
  2. Producer 的事件公布和 Consumer 的事件处理在不同的线程,通过 Disruptor 传递给 Consumer 进行解决;

3.1 代码实现

3.1.1 计算代码

进行 CAS 累加运算

public class CommonUtils {private static AtomicLong count = new AtomicLong(0);

    public static void calculation() {count.incrementAndGet();
    }

    public static long get() {return count.get();
    }
}
3.1.2 抽象类

进行一亿次 CAS 运算计算耗时

/**
 * 抽象类
 *
 * @param <T>
 */
public abstract class AbstractTask<T> {private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
    // 线程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    // 一亿次测试
    public static long tasksize = 100000000;


    /**
     * 开始调用测试
     */
    public void invok() {
        // 计算以后事件
        long currentTime = System.currentTimeMillis();
        // 获取到监听器
        Runnable monitor = monitor();
        if (null != monitor) {executor.execute(monitor);
        }
        // 启动
        start();

        // 执行工作公布
        Runnable runnable = getTask();
        for (long i = 0; i < tasksize; i++) {runnable.run();
        }

        // 进行工作
        stop();
        // 期待工作公布实现
        try {countDownLatch.await();
        } catch (InterruptedException e) {e.printStackTrace();
        }
        executor.shutdown();
        // 获取处理结果
        T result = getResult();
        // 计算耗时
        long duration = System.currentTimeMillis() - currentTime;
        // 计算吞吐量
        long throughput = (tasksize / duration) * 1000;
        logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
    }


    /**
     * 获取监听器
     *
     * @return
     */
    public Runnable monitor() {return null;}

    /**
     * 启动工作
     */
    public void start() {}

    /**
     * 实现工作
     */
    public void complete() {countDownLatch.countDown();
    }

    /**
     * 进行工作
     */
    public void stop() {}

    /**
     * 获取须要执行的工作
     *
     * @return
     */
    public abstract Runnable getTask();

    /**
     * 获取运行后果
     *
     * @return
     */
    public abstract T getResult();}
3.1.3 Disruptor 性能测试代码
public class DisruptorTest extends AbstractTask<Long> {
    // 定义随机数生成器
    private static final Random r = new Random();
    // 定义 Disruptor 对象
    private Disruptor disruptor = null;
    // 定义 Disruptor 事件公布对象
    private LongEventProducerWithTranslator translator = null;

    /**
     * 启动
     */
    @Override
    public void start() {
        // 定义事件工厂
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // RingBuffer 大小,必须是 2 的 N 次方;
        int ringBufferSize = 1024 * 1024;
        // 构建 disruptor 对象
        disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new YieldingWaitStrategy());
        // 定义事件处理类
        EventHandler<LongEvent> eventHandler = new LongEventHandler();
        // 配置事件处理类
        disruptor.handleEventsWith(eventHandler);
        // 启动 disruptor
        disruptor.start();
        // 创立事件公布对象
        translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
    }

    /**
     * 进行工作
     */
    @Override
    public void stop() {disruptor.shutdown();
        System.out.println("运算后果:" + CommonUtils.get());
        // 实现工作
        complete();}

    /**
     * 获取须要执行的工作
     *
     * @return
     */
    @Override
    public Runnable getTask() {return () -> {publishEvent();
        };
    }

    /**
     * 获取运行后果
     *
     * @return
     */
    @Override
    public Long getResult() {return CommonUtils.get();
    }


    /**
     * 公布对象
     */
    private void publishEvent() {
        // 获取要通过事件传递的业务数据
        Long data = r.nextLong();
        // 公布事件
        translator.onData(data);
    }


    public static void main(String[] args) {DisruptorTest disruptorTest = new DisruptorTest();
        disruptorTest.invok();}

}

输入后果

10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue 性能测试代码
public class ArrayBlockingQueueTest extends AbstractTask {private static final Random r = new Random();
    private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);


    @Override
    public Runnable monitor() {return () -> {
            try {for (int i = 0; i < tasksize; i++) {
                    // 获取一个元素
                    queue.take();
                    // 执行计算
                    CommonUtils.calculation();}
            } catch (InterruptedException e) {e.printStackTrace();
            }
            complete();};
    }

    public static void main(String[] args) {ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        test.invok();}

    @Override
    public Runnable getTask() {return () -> {publish();
        };
    }

    @Override
    public Object getResult() {return CommonUtils.get();
    }

    public void publish() {Long data = r.nextLong();
        try {queue.put(data);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

输入后果

10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)

3.2 测试比照

测试类 运算次数 耗时(ms) 吞吐量 /s
ArrayBlockingQueue 1 亿次 16148 6192000
Disruptor 1 亿次 5503 18171000

3.3 Disruptor 官网性能测试

Disruptor 论文中讲述了一个试验:

  • 这个测试程序调用了一个函数,该函数会对一个 64 位的计数器循环自增 5 亿次。
  • 机器环境:2.4G 6 核
  • 运算:64 位的计数器累加 5 亿次
Method Time (ms)
单线程 300
单线程应用 CAS 5,700
单线程应用锁 10,000
单线程应用 volatile 4,700
多线程应用 CAS 30,000
多线程应用锁 224,000

4. 高性能原理

  • 引入环形的数组构造:数组元素不会被回收,防止频繁的 GC,
  • 无锁的设计:采纳 CAS 无锁形式,保障线程的安全性
  • 属性填充:通过增加额定的无用信息,防止伪共享问题
  • 元素地位的定位:采纳跟一致性哈希一样的形式,一个索引,进行自增

4.1 伪共享概念

4.1.1 计算机缓存形成

​ 下图是计算的根本构造。L1、L2、L3 别离示意一级缓存、二级缓存、三级缓存,越凑近 CPU 的缓存,速度越快,容量也越小,所以 L1 缓存很小但很快,并且紧靠着在应用它的 CPU 内核;L2 大一些,也慢一些,并且依然只能被一个独自的 CPU 核应用;L3 更大、更慢,并且被单个插槽上的所有 CPU 核共享;最初是主存,由全副插槽上的所有 CPU 核共享。

​ 当 CPU 要读取一个数据时,首先从一级缓存中查找,如果没有找到再从二级缓存中查找,如果还是没有就从三级缓存或内存中查找。一般来说,每级缓存的命中率大略都在 80% 左右,也就是说全副数据量的 80% 都能够在一级缓存中找到,只剩下 20% 的总数据量才须要从二级缓存、三级缓存或内存中读取,由此可见一级缓存是整个 CPU 缓存架构中最为重要的局部。

下表是一些缓存未命中的耗费数据:

从 CPU 到 大概须要的 CPU 周期 大概须要的工夫
主存 约 60-80ns
QPI 总线 约 20ns
L3 cache 约 40-45cycles 约 15ns
L2 cache 约 10cycles 约 3ns
L1 cache 约 3 -4cycles 约 1ns
寄存器 1cycle

可见 CPU 读取主存中的数据会比从 L1 中读取慢了近 2 个数量级。

4.1.2 什么是缓存行

​ 为了解决计算机系统中主内存与 CPU 之间运行速度差问题,会在 CPU 与主内存之间 增加一级或者多级高速缓冲存储器(Cache)。这个 Cache 个别是被集成到 CPU 外部的,所以也叫 CPU Cache,如图所示是两级 Cache 构造。

​ Cache 外部是按行存储的,其中每一行称为一个 cache line,由很多个 Cache line 组成的,Cache line 是 cache 和 RAM 替换数据的最小单位,cache 行的大小个别为 2 的幂次数字节,通常为 64 Byte。Cache line 是 Cache 与主内存进行数据交换的单位。

​ 当 CPU 把内存的数据载入 cache 时,会把邻近的共 64 Byte 的数据一起放入同一个 Cache line,因为空间局部性:邻近的数据在未来被拜访的可能性大。

linux 查看缓存行大小

more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什么是共享

​ CPU 缓存是以缓存行(cache line)为单位存储的。缓存行通常是 64 字节,并且它无效地援用主内存中的一块地址。一个 Java 的 long 类型是 8 字节,因而在一个缓存行中能够存 8 个 long 类型的变量。所以,如果你拜访一个 long 数组,当数组中的一个值被加载到缓存中,它会额定加载另外 7 个,以至你能十分快地遍历这个数组。事实上,你能够十分疾速的遍历在间断的内存块中调配的任意数据结构。而如果你在数据结构中的项在内存中不是彼此相邻的(如链表),你将得不到收费缓存加载所带来的劣势,并且在这些数据结构中的每一个项都可能会呈现缓存未命中。下图是一个 CPU 缓存行的示意图:

​ 外表上 X 和 Y 都是被独立线程操作的,而且两操作之间也没有任何关系。只不过它们共享了一个缓存行,但所有竞争抵触都是来源于共享。

4.1.4 什么是伪共享

​ 当 CPU 拜访某一个变量时候,首先会去看 CPU Cache 内是否有该变量,如果有则间接从中获取,否者就去主内存外面获取该变量,而后把该变量所在内存区域的一个 Cache 行大小的内存拷贝到 Cache(cache 行是 Cache 与主内存进行数据交换的单位)。

​ 因为寄存到 Cache 行的的是内存块而不是单个变量,所以可能会把多个变量寄存到了一个 cache 行。当多个线程同时批改一个缓存行外面的多个变量时候,因为同时只能有一个线程操作缓存行,所以相比每个变量放到一个缓存行性能会有所降落,这就是伪共享。

​ 如上图变量 x,y 同时被放到了 CPU 的一级和二级缓存,当线程 1 应用 CPU1 对变量 x 进行更新时候,首先会批改 cpu1 的一级缓存变量 x 所在缓存行,这时候缓存一致性协定会导致 cpu2 中变量 x 对应的缓存行生效,那么线程 2 写入变量 x 的时候就只能去二级缓存去查找,这就毁坏了一级缓存,而一级缓存比二级缓存更快。更坏的状况下如果 cpu 只有一级缓存,那么会导致频繁的间接拜访主内存。

​ 咱们的缓存都是以缓存行作为一个单位来解决的,所以生效 x 的缓存的同时,也会把 y 生效,反之亦然。

4.1.5 为何会呈现伪共享

​ 伪共享的产生是因为多个变量被放入了一个缓存行,并且多个线程同时去写入缓存行中不同变量。那么为何多个变量会被放入一个缓存行那。其实是因为 Cache 与内存替换数据的单位就是 Cache line,当 CPU 要拜访的变量没有在 Cache 命中时候,依据程序运行的局部性原理会把该变量在内存中大小为 Cache 行的内寄存如缓存行。

long a;
long b;
long c;
long d;

​ 如上代码,申明了四个 long 变量,假如 cache line 的大小为 32 个字节,那么当 cpu 拜访变量 a 时候发现该变量没有在 cache 命中,那么就会去主内存把变量 a 以及内存地址左近的 b,c,d 放入缓存行。也就是地址间断的多个变量才有可能会被放到一个缓存行中,当创立数组时候,数组外面的多个元素就会被放入到同一个缓存行。那么单线程下多个变量放入缓存行对性能有影响?其实失常状况下单线程拜访时候因为数组元素被放入到了一个或者多个 cache 行对代码执行是无利的,因为数据都在缓存中,代码执行会更快。

4.1.6 如何解伪共享

​ 解决伪共享最间接的办法就是填充(padding),例如上面的 VolatileLong,一个 long 占 8 个字节,Java 的对象头占用 8 个字节(32 位零碎)或者 12 字节(64 位零碎,默认开启对象头压缩,不开启占 16 字节)。一个缓存行 64 字节,那么咱们能够填充 6 个 long(6 * 8 = 48 个字节)。

4.1.6.1 不应用字段填充
public class VolatileData {
    // 占用 8 个字节 +48 + 对象头 = 64 字节

    // 须要操作的数据
    volatile long value;

    public VolatileData() {}

    public VolatileData(long defValue) {value = defValue;}

    public long accumulationAdd() {
        // 因为单线程操作不须要加锁
        value++;
        return value;
    }

    public long getValue() {return value;}
}

内存布局

4.6.1.2 填充字段

因为 JDK1.7 当前就主动优化代码会删除无用的代码,在 JDK1.7 当前的版本这些不失效了。

/**
 * 缓存行填充父类
 */
public class DataPadding {
    // 填充 5 个 long 类型字段 8*5 = 40 个字节
    private long p1, p2, p3, p4, p5; //jvm 优化 删除无用代码
    // 须要操作的数据
    volatile long value;
}

内存布局

4.1.6.3 继承的形式
/**
 * 缓存行填充父类
 */
public class DataPadding {
    // 填充 5 个 long 类型字段 8*5 = 40 个字节
    private long p1, p2, p3, p4, p5;
}

继承缓存填充类

/**
 * 继承 DataPadding
 */
public class VolatileData extends DataPadding {
    // 占用 8 个字节 +48 + 对象头 = 64 字节

    public VolatileData() {}

    public VolatileData(long defValue) {value = defValue;}

    public long accumulationAdd() {
        // 因为单线程操作不须要加锁
        value++;
        return value;
    }

    public long getValue() {return value;}
}

内存布局

4.1.6.4 Disruptor 填充形式
class LhsPadding {protected long p1, p2, p3, p4, p5, p6, p7;}

class Value extends LhsPadding {protected volatile long value;}

class RhsPadding extends Value {protected long p9, p10, p11, p12, p13, p14, p15;}

继承填充类

public class VolatileData extends RhsPadding {
    // 占用 8 个字节 +48 + 对象头 = 64 字节

    // 须要操作的数据
    volatile long value;

    public VolatileData() {}

    public VolatileData(long defValue) {value = defValue;}

    public long accumulationAdd() {
        // 因为单线程操作不须要加锁
        value++;
        return value;
    }

    public long getValue() {return value;}
}

内存布局

4.1.6.5 @Contended 注解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {String value() default "";
}

注解填充类

@Contended
public class VolatileData  {
    // 占用 8 个字节 +48 + 对象头 = 64 字节

    // 须要操作的数据
    volatile long value;
    
    public VolatileData() {}

    public VolatileData(long defValue) {value = defValue;}

    public long accumulationAdd() {
        // 因为单线程操作不须要加锁
        value++;
        return value;
    }

    public long getValue() {return value;}
}

内存布局

注意事项

​ 在 Java8 中提供了 @sun.misc.Contended 来防止伪共享时,在运行时须要设置 JVM 启动参数 -XX:-RestrictContended 否则可能不失效。

4.1.7 性能比照
4.1.7.1 测试代码

应用和不应用缓存行填充的比照

/**
 * 缓存行测试
 */
public class CacheLineTest {
    /**
     * 通过缓存行填充的变量
     */
    private VolatileData volatileData1 = new VolatileData(0);
    private VolatileData volatileData2 = new VolatileData(0);
    private VolatileData volatileData3 = new VolatileData(0);
    private VolatileData volatileData4 = new VolatileData(0);
    private VolatileData volatileData5 = new VolatileData(0);
    private VolatileData volatileData6 = new VolatileData(0);
    private VolatileData volatileData7 = new VolatileData(0);

    /**
     * 循环次数
     */
    private final long size = 100000000;

    /**
     * 进行累加操作
     */
    public void accumulationX(VolatileData volatileData) {
        // 计算耗时
        long currentTime = System.currentTimeMillis();
        long value = 0;
        // 循环累加
        for (int i = 0; i < size; i++) {
            // 应用缓存行填充的形式
            value = volatileData.accumulationAdd();}
        // 打印
        System.out.println(value);
        // 打印耗时
        System.out.println("耗时:" + (System.currentTimeMillis() - currentTime));
    }


    public static void main(String[] args) {
        // 创建对象
        CacheLineTest cacheRowTest = new CacheLineTest();
        // 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        // 启动三个线程个调用他们各自的办法
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
        executorService.shutdown();}
}
4.1.7.2 测试数据

同样的构造他们之间差了 将近 50 倍的速度差距

对象 NoPadding(MS) DataPadding(MS) RhsPadding(MS) Contended(MS)
volatileData1 3751 1323 1307 1291
volatileData2 3790 1383 1311 1314
volatileData3 7551 1400 1311 1333
volatileData4 7669 1407 1317 1356
volatileData5 8577 1447 1327 1361
volatileData6 8705 1479 1339 1375
volatileData6 8741 1512 1368 1389
4.1.8 Disruptor 解决伪共享

​ 在 Disruptor 中有一个重要的类 Sequence,该类包装了一个 volatile 润饰的 long 类型数据 value,无论是 Disruptor 中的基于数组实现的缓冲区 RingBuffer,还是生产者,消费者,都有各自独立的 Sequence,RingBuffer 缓冲区中,Sequence 标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用 RingBuffer.next()来取得下一个可应用的绝对地位。对于生产者和消费者来说,Sequence 标示着它们的事件序号,来看看 Sequence 类的源码:

class LhsPadding {protected long p1, p2, p3, p4, p5, p6, p7;}

class Value extends LhsPadding {protected volatile long value;}

class RhsPadding extends Value {protected long p9, p10, p11, p12, p13, p14, p15;}

public class Sequence extends RhsPadding {
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;
    static {UNSAFE = Util.getUnsafe();
        try {VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        } catch(final Exception e) {throw new RuntimeException(e);
        }
    }
    


    public Sequence() {this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue) {UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

}

​ 从第 1 到 11 行能够看到,真正应用到的变量 value,它的前后空间都由 8 个 long 型的变量填补了,对于一个大小为 64 字节的缓存行,它刚好被填补满(一个 long 型变量 value,8 个字节加上前 / 后个 7long 型变量填补,7*8=56,56+8=64 字节)。这样做每次把变量 value 读进高速缓存中时,都能把缓存行填充斥(对于大小为 64 个字节的缓存行来说,如果缓存行大小大于 64 个字节,那么还是会呈现伪共享问题),保障每次解决数据时都不会与其余变量发生冲突。

4.2 无锁的设计

4.2.1 锁机制存在的问题
  • 在多线程竞争下,加锁、开释锁会导致比拟多的上下文切换和调度延时,引起性能问题,而且在上下文切换的时候,cpu 之前缓存的指令和数据都将生效,对性能有很大的损失,用户态的锁尽管防止了这些问题,然而其实它们只是在没有实在的竞争时才无效。
  • 一个线程持有锁会导致其它所有须要此锁的线程挂起直至该锁开释。
  • 如果一个优先级高的线程期待一个优先级低的线程开释锁会导致导致优先级反转(Priority Inversion),引起性能危险。
4.2.2 CAS 无锁算法

​ 实现无锁(lock-free)的非阻塞算法有多种实现办法,其中 CAS(比拟与替换,Compare and swap)是一种有名的无锁算法。CAS 的语义是“我认为 V 的值应该为 A,如果是,那么将 V 的值更新为 B,否则不批改并通知 V 的值理论为多少”,CAS 是一种 乐观锁 技术,当多个线程尝试应用 CAS 同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并能够再次尝试。CAS 有 3 个操作数,内存值 V,旧的预期值 A,要批改的新值 B。当且仅当预期值 A 和内存值 V 雷同时,将内存值 V 批改为 B,否则什么都不做。

​ 这是一个 CPU 级别的指令,在我的意识中,它的工作形式有点像乐观锁——CPU 去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很显著,有其它操作先扭转了这个值。

留神,这能够是 CPU 的两个不同的外围,但不会是两个独立的 CPU。

​ CAS 操作比锁耗费资源少的多,因为它们不关涉操作系统,它们间接在 CPU 上操作。但它们并非没有代价——在下面的试验中,单线程无锁耗时 300ms,单线程有锁耗时 10000ms,单线程应用 CAS 耗时 5700ms。所以它比应用锁耗时少,但比不须要思考竞争的单线程耗时多。

4.2.3 传统队列问题

队列的底层数据结构个别分成三种:数组、链表和堆。其中,堆这里是为了实现带有优先级个性的队列,暂且不思考。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

​ 在稳定性和性能要求特地高的零碎中,为了避免生产者速度过快,导致内存溢出,只能抉择有界队列;

​ 同时,为了缩小 Java 的垃圾回收对系统性能的影响,会尽量抉择 array/heap 格局的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue,然而 ArrayBlockingQueue 是通过加锁的形式保障线程平安,而且 ArrayBlockingQueue 还存在伪共享问题,这两个问题重大影响了性能。

4.2.3.1 Disruptor 的无锁设计

​ 多线程环境下,多个生产者通过 do/while 循环的条件 CAS,来判断每次申请的空间是否曾经被其余生产者占据。如果曾经被占据,该函数会返回失败,While 循环从新执行,申请写入空间。

do
{current = cursor.get();
    next = current + n;

    if (!hasAvailableCapacity(gatingSequences, n, current))
    {throw InsufficientCapacityException.INSTANCE;}
}
while (!cursor.compareAndSet(current, next));
//next 类比于 ArrayBlockQueue 的数组索引 index
return next;

4.3 环形数组构造

环形数组构造是整个 Disruptor 的外围所在。

4.3.1 什么是环形数组

​ RingBuffer 是一个环 (首尾相连的环),用做在不同上下文(线程) 间传递数据的 buffer,RingBuffer 领有一个序号,这个序号指向数组中下一个可用元素。

4.3.2 为什么应用环形数组

为了防止垃圾回收,采纳数组而非链表。同时,数组对处理器的缓存机制更加敌对

​ 首先因为是数组,所以要比链表快,而且依据咱们对下面缓存行的解释晓得,数组中的一个元素加载,相邻的数组元素也是会被预加载的,因而在这样的构造中,cpu 无需时不时去主存加载数组中的下一个元素。

​ 而且,你能够为数组事后分配内存,使得数组对象始终存在(除非程序终止)。这就意味着不须要花大量的工夫用于垃圾回收。

​ 此外,不像链表那样,须要为每一个增加到其下面的对象发明节点对象—对应的,当删除节点时,须要执行相应的内存清理操作。环形数组中的元素采纳笼罩形式,防止了 jvm 的 GC。

​ 其次构造作为环形,数组的大小为 2 的 n 次方,这样元素定位能够通过位运算效率会更高,这个跟一致性哈希中的环形策略有点像。在 disruptor 中,这个牛逼的环形构造就是 RingBuffer,既然是数组,那么就有大小,而且这个大小必须是 2 的 n 次方,构造如下:

​ 其实质只是一个一般的数组,只是当搁置数据填充斥队列(即达到 2^n- 1 地位)之后,再填充数据,就会从 0 开始,笼罩之前的数据,于是就相当于一个环。

4.4 元素地位定位

​ 数组长度2^n,通过位运算,放慢定位的速度。下标采取递增的模式。不必放心 index 溢出的问题。index 是 long 类型,即便 100 万 QPS 的处理速度,也须要 30 万年能力用完。

4.5 期待策略

​ 定义 Consumer 如何进行期待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能体现)依据理论运行环境的 CPU 的硬件特点抉择失当的策略,并配合特定的 JVM 的配置参数,可能实现不同的性能晋升。

4.5.1 BlockingWaitStrategy

​ Disruptor 的默认策略是 BlockingWaitStrategy,在 BlockingWaitStrategy 外部是应用锁和 condition 来控制线程的唤醒

​ BlockingWaitStrategy 是最低效的策略,但其对 CPU 的耗费最小并且在各种不同部署环境中能提供更加统一的性能体现。

4.5.2 SleepingWaitStrategy

​ SleepingWaitStrategy 的性能体现跟 BlockingWaitStrategy 差不多,对 CPU 的耗费也相似,但其对生产者线程的影响最小,通过应用 LockSupport.parkNanos(1) 来实现循环期待,适宜用于异步日志相似的场景;

4.5.3 YieldingWaitStrategy

​ YieldingWaitStrategy 是能够应用在低提早零碎的策略之一,YieldingWaitStrategy 将自旋以期待序列减少到适当的值。在循环体内,将调用 Thread.yield() 以容许其余排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑外围数的场景中,举荐应用此策略;

4.5.4 BusySpinWaitStrategy

​ 性能最好,适宜用于低提早的零碎。在要求极高性能且事件处理线程数小于 CPU 逻辑外围数的场景中,举荐应用此策略;

4.5.5 PhasedBackoffWaitStrategy

​ 自旋 + yield + 自定义策略,CPU 资源紧缺,吞吐量和提早并不 的场景。

本文参加了思否技术征文,欢送正在浏览的你也退出。

如果本文对您有帮忙,欢送 关注 点赞`,您的反对是我保持创作的能源。

转载请注明出处!

正文完
 0