hello 小伙伴儿们,昨天搞了一篇Disruptor的入门文章,看大家反馈不错,在大家一再催更下,昨天熬夜至上班,终于续写了第二篇Disruptor的高性能原理分析的文章,为大家揭开Disruptor高性能的神秘外衣。
如果小伙伴,错过了入门Disruptor的入门篇的文章,在这里自行查看:
如此狂妄,自称高性能队列的Disruptor有啥来头?
能比照测试
为了直观地感触 Disruptor 有多快,设计了一个性能比照测试:Producer 公布 1 亿次事件,从公布第一个事件开始计时,捕获 Consumer 解决完所有事件的耗时。
测试用例在 Producer 如何将事件告诉到 Consumer 的实现形式上,设计了两种不同的实现:
- Producer 的事件公布和 Consumer 的事件处理在不同的线程,通过 ArrayBlockingQueue 传递给 Consumer 进行解决;
- Producer 的事件公布和 Consumer 的事件处理在不同的线程,通过 Disruptor 传递给 Consumer 进行解决;
3.1 代码实现
3.1.1 计算代码
进行CAS累加运算
public class CommonUtils {
private static AtomicLong count = new AtomicLong(0);
public static void calculation() {
count.incrementAndGet();
}
public static long get() {
return count.get();
}
}
3.1.2 抽象类
进行一亿次 CAS运算计算耗时
/**
* 抽象类
*
* @param <T>
*/
public abstract class AbstractTask<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
//线程池
private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
//一亿次测试
public static long tasksize = 100000000;
/**
* 开始调用测试
*/
public void invok() {
//计算以后事件
long currentTime = System.currentTimeMillis();
//获取到监听器
Runnable monitor = monitor();
if (null != monitor) {
executor.execute(monitor);
}
//启动
start();
//执行工作公布
Runnable runnable = getTask();
for (long i = 0; i < tasksize; i++) {
runnable.run();
}
//进行工作
stop();
//期待工作公布实现
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
//获取处理结果
T result = getResult();
//计算耗时
long duration = System.currentTimeMillis() - currentTime;
//计算吞吐量
long throughput = (tasksize / duration) * 1000;
logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
}
/**
* 获取监听器
*
* @return
*/
public Runnable monitor() {
return null;
}
/**
* 启动工作
*/
public void start() {
}
/**
* 实现工作
*/
public void complete() {
countDownLatch.countDown();
}
/**
* 进行工作
*/
public void stop() {
}
/**
* 获取须要执行的工作
*
* @return
*/
public abstract Runnable getTask();
/**
* 获取运行后果
*
* @return
*/
public abstract T getResult();
}
3.1.3 Disruptor性能测试代码
public class DisruptorTest extends AbstractTask<Long> {
//定义随机数生成器
private static final Random r = new Random();
//定义Disruptor对象
private Disruptor disruptor = null;
//定义Disruptor事件公布对象
private LongEventProducerWithTranslator translator = null;
/**
* 启动
*/
@Override
public void start() {
//定义事件工厂
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// RingBuffer 大小,必须是 2 的 N 次方;
int ringBufferSize = 1024 * 1024;
//构建disruptor对象
disruptor = new Disruptor<LongEvent>(eventFactory,
ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
new YieldingWaitStrategy());
//定义事件处理类
EventHandler<LongEvent> eventHandler = new LongEventHandler();
//配置事件处理类
disruptor.handleEventsWith(eventHandler);
//启动disruptor
disruptor.start();
//创立事件公布对象
translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
}
/**
* 进行工作
*/
@Override
public void stop() {
disruptor.shutdown();
System.out.println("运算后果:" + CommonUtils.get());
//实现工作
complete();
}
/**
* 获取须要执行的工作
*
* @return
*/
@Override
public Runnable getTask() {
return () -> {
publishEvent();
};
}
/**
* 获取运行后果
*
* @return
*/
@Override
public Long getResult() {
return CommonUtils.get();
}
/**
* 公布对象
*/
private void publishEvent() {
//获取要通过事件传递的业务数据
Long data = r.nextLong();
// 公布事件
translator.onData(data);
}
public static void main(String[] args) {
DisruptorTest disruptorTest = new DisruptorTest();
disruptorTest.invok();
}
}
输入后果
10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue性能测试代码
public class ArrayBlockingQueueTest extends AbstractTask {
private static final Random r = new Random();
private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);
@Override
public Runnable monitor() {
return () -> {
try {
for (int i = 0; i < tasksize; i++) {
//获取一个元素
queue.take();
//执行计算
CommonUtils.calculation();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
complete();
};
}
public static void main(String[] args) {
ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
test.invok();
}
@Override
public Runnable getTask() {
return () -> {
publish();
};
}
@Override
public Object getResult() {
return CommonUtils.get();
}
public void publish() {
Long data = r.nextLong();
try {
queue.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输入后果
10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)
3.2 测试比照
测试类 | 运算次数 | 耗时(ms) | 吞吐量/s |
---|---|---|---|
ArrayBlockingQueue | 1亿次 | 16148 | 6192000 |
Disruptor | 1亿次 | 5503 | 18171000 |
3.3 Disruptor官网性能测试
Disruptor论文中讲述了一个试验:
- 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
- 机器环境:2.4G 6核
- 运算: 64位的计数器累加5亿次
Method | Time (ms) |
---|---|
单线程 | 300 |
单线程应用 CAS | 5,700 |
单线程应用锁 | 10,000 |
单线程应用volatile | 4,700 |
多线程应用 CAS | 30,000 |
多线程应用锁 | 224,000 |
4. 高性能原理
- 引入环形的数组构造:数组元素不会被回收,防止频繁的GC,
- 无锁的设计:采纳CAS无锁形式,保障线程的安全性
- 属性填充:通过增加额定的无用信息,防止伪共享问题
- 元素地位的定位:采纳跟一致性哈希一样的形式,一个索引,进行自增
4.1 伪共享概念
4.1.1 计算机缓存形成
下图是计算的根本构造。L1、L2、L3别离示意一级缓存、二级缓存、三级缓存,越凑近CPU的缓存,速度越快,容量也越小,所以L1缓存很小但很快,并且紧靠着在应用它的CPU内核;L2大一些,也慢一些,并且依然只能被一个独自的CPU核应用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最初是主存,由全副插槽上的所有CPU核共享。
当CPU要读取一个数据时,首先从一级缓存中查找,如果没有找到再从二级缓存中查找,如果还是没有就从三级缓存或内存中查找。一般来说,每级缓存的命中率大略都在80%左右,也就是说全副数据量的80%都能够在一级缓存中找到,只剩下20%的总数据量才须要从二级缓存、三级缓存或内存中读取,由此可见一级缓存是整个CPU缓存架构中最为重要的局部。
下表是一些缓存未命中的耗费数据:
从CPU到 | 大概须要的CPU周期 | 大概须要的工夫 |
---|---|---|
主存 | 约60-80ns | |
QPI总线 | 约20ns | |
L3 cache | 约40-45cycles | 约15ns |
L2 cache | 约10cycles | 约3ns |
L1 cache | 约3-4cycles | 约1ns |
寄存器 | 1cycle |
可见CPU读取主存中的数据会比从L1中读取慢了近2个数量级。
4.1.2 什么是缓存行
为了解决计算机系统中主内存与 CPU 之间运行速度差问题,会在 CPU 与主内存之间 增加一级或者多级高速缓冲存储器( Cache)。这个 Cache 个别是被集成到 CPU 外部的, 所以也叫 CPU Cache,如图所示是两级 Cache 构造。
Cache外部是按行存储的,其中每一行称为一个cache line,由很多个 Cache line 组成的,Cache line 是 cache 和 RAM 替换数据的最小单位,cache行的大小个别为2的幂次数字节,通常为 64 Byte。Cache line是Cache与主内存进行数据交换的单位。
当 CPU 把内存的数据载入 cache 时,会把邻近的共 64 Byte 的数据一起放入同一个Cache line,因为空间局部性:邻近的数据在未来被拜访的可能性大。
linux 查看缓存行大小
more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什么是共享
CPU缓存是以缓存行(cache line)为单位存储的。缓存行通常是 64 字节,并且它无效地援用主内存中的一块地址。一个 Java 的 long 类型是 8 字节,因而在一个缓存行中能够存 8 个 long 类型的变量。所以,如果你拜访一个 long 数组,当数组中的一个值被加载到缓存中,它会额定加载另外 7 个,以至你能十分快地遍历这个数组。事实上,你能够十分疾速的遍历在间断的内存块中调配的任意数据结构。而如果你在数据结构中的项在内存中不是彼此相邻的(如链表),你将得不到收费缓存加载所带来的劣势,并且在这些数据结构中的每一个项都可能会呈现缓存未命中。下图是一个CPU缓存行的示意图:
外表上 X 和 Y 都是被独立线程操作的,而且两操作之间也没有任何关系。只不过它们共享了一个缓存行,但所有竞争抵触都是来源于共享。
4.1.4 什么是伪共享
当CPU拜访某一个变量时候,首先会去看CPU Cache内是否有该变量,如果有则间接从中获取,否者就去主内存外面获取该变量,而后把该变量所在内存区域的一个Cache行大小的内存拷贝到Cache(cache行是Cache与主内存进行数据交换的单位)。
因为寄存到Cache行的的是内存块而不是单个变量,所以可能会把多个变量寄存到了一个cache行。当多个线程同时批改一个缓存行外面的多个变量时候,因为同时只能有一个线程操作缓存行,所以相比每个变量放到一个缓存行性能会有所降落,这就是伪共享。
如上图变量x,y同时被放到了CPU的一级和二级缓存,当线程1应用CPU1对变量x进行更新时候,首先会批改cpu1的一级缓存变量x所在缓存行,这时候缓存一致性协定会导致cpu2中变量x对应的缓存行生效,那么线程2写入变量x的时候就只能去二级缓存去查找,这就毁坏了一级缓存,而一级缓存比二级缓存更快。更坏的状况下如果cpu只有一级缓存,那么会导致频繁的间接拜访主内存。
咱们的缓存都是以缓存行作为一个单位来解决的,所以生效x的缓存的同时,也会把y生效,反之亦然。
4.1.5 为何会呈现伪共享
伪共享的产生是因为多个变量被放入了一个缓存行,并且多个线程同时去写入缓存行中不同变量。那么为何多个变量会被放入一个缓存行那。其实是因为Cache与内存替换数据的单位就是Cache line,当CPU要拜访的变量没有在Cache命中时候,依据程序运行的局部性原理会把该变量在内存中大小为Cache行的内寄存如缓存行。
long a;
long b;
long c;
long d;
如上代码,申明了四个long变量,假如cache line的大小为32个字节,那么当cpu拜访变量a时候发现该变量没有在cache命中,那么就会去主内存把变量a以及内存地址左近的b,c,d放入缓存行。也就是地址间断的多个变量才有可能会被放到一个缓存行中,当创立数组时候,数组外面的多个元素就会被放入到同一个缓存行。那么单线程下多个变量放入缓存行对性能有影响?其实失常状况下单线程拜访时候因为数组元素被放入到了一个或者多个cache行对代码执行是无利的,因为数据都在缓存中,代码执行会更快。
4.1.6 如何解伪共享
解决伪共享最间接的办法就是填充(padding),例如上面的VolatileLong,一个long占8个字节,Java的对象头占用8个字节(32位零碎)或者12字节(64位零碎,默认开启对象头压缩,不开启占16字节)。一个缓存行64字节,那么咱们能够填充6个long(6 * 8 = 48 个字节)。
4.1.6.1 不应用字段填充
public class VolatileData {
// 占用 8个字节 +48 + 对象头 = 64字节
//须要操作的数据
volatile long value;
public VolatileData() {
}
public VolatileData(long defValue) {
value = defValue;
}
public long accumulationAdd() {
//因为单线程操作不须要加锁
value++;
return value;
}
public long getValue() {
return value;
}
}
内存布局
4.6.1.2 填充字段
因为JDK1.7当前就主动优化代码会删除无用的代码,在JDK1.7当前的版本这些不失效了。
/**
* 缓存行填充父类
*/
public class DataPadding {
//填充 5个long类型字段 8*5 = 40 个字节
private long p1, p2, p3, p4, p5; //jvm 优化 删除无用代码
//须要操作的数据
volatile long value;
}
内存布局
4.1.6.3 继承的形式
/**
* 缓存行填充父类
*/
public class DataPadding {
//填充 5个long类型字段 8*5 = 40 个字节
private long p1, p2, p3, p4, p5;
}
继承缓存填充类
/**
* 继承DataPadding
*/
public class VolatileData extends DataPadding {
// 占用 8个字节 +48 + 对象头 = 64字节
public VolatileData() {
}
public VolatileData(long defValue) {
value = defValue;
}
public long accumulationAdd() {
//因为单线程操作不须要加锁
value++;
return value;
}
public long getValue() {
return value;
}
}
内存布局
4.1.6.4 Disruptor填充形式
class LhsPadding {
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
protected volatile long value;
}
class RhsPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15;
}
继承填充类
public class VolatileData extends RhsPadding {
// 占用 8个字节 +48 + 对象头 = 64字节
//须要操作的数据
volatile long value;
public VolatileData() {
}
public VolatileData(long defValue) {
value = defValue;
}
public long accumulationAdd() {
//因为单线程操作不须要加锁
value++;
return value;
}
public long getValue() {
return value;
}
}
内存布局
4.1.6.5 @Contended注解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
String value() default "";
}
注解填充类
@Contended
public class VolatileData {
// 占用 8个字节 +48 + 对象头 = 64字节
//须要操作的数据
volatile long value;
public VolatileData() {
}
public VolatileData(long defValue) {
value = defValue;
}
public long accumulationAdd() {
//因为单线程操作不须要加锁
value++;
return value;
}
public long getValue() {
return value;
}
}
内存布局
注意事项
在Java8中提供了@sun.misc.Contended来防止伪共享时,在运行时须要设置JVM启动参数-XX:-RestrictContended否则可能不失效。
4.1.7 性能比照
4.1.7.1 测试代码
应用和不应用缓存行填充的比照
/**
* 缓存行测试
*/
public class CacheLineTest {
/**
* 通过缓存行填充的变量
*/
private VolatileData volatileData1 = new VolatileData(0);
private VolatileData volatileData2 = new VolatileData(0);
private VolatileData volatileData3 = new VolatileData(0);
private VolatileData volatileData4 = new VolatileData(0);
private VolatileData volatileData5 = new VolatileData(0);
private VolatileData volatileData6 = new VolatileData(0);
private VolatileData volatileData7 = new VolatileData(0);
/**
* 循环次数
*/
private final long size = 100000000;
/**
* 进行累加操作
*/
public void accumulationX(VolatileData volatileData) {
//计算耗时
long currentTime = System.currentTimeMillis();
long value = 0;
//循环累加
for (int i = 0; i < size; i++) {
//应用缓存行填充的形式
value = volatileData.accumulationAdd();
}
//打印
System.out.println(value);
//打印耗时
System.out.println("耗时:" + (System.currentTimeMillis() - currentTime));
}
public static void main(String[] args) {
//创建对象
CacheLineTest cacheRowTest = new CacheLineTest();
//创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//启动三个线程个调用他们各自的办法
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
executorService.shutdown();
}
}
4.1.7.2 测试数据
同样的构造他们之间差了 将近 50倍的速度差距
对象 | NoPadding(MS) | DataPadding(MS) | RhsPadding(MS) | Contended(MS) |
---|---|---|---|---|
volatileData1 | 3751 | 1323 | 1307 | 1291 |
volatileData2 | 3790 | 1383 | 1311 | 1314 |
volatileData3 | 7551 | 1400 | 1311 | 1333 |
volatileData4 | 7669 | 1407 | 1317 | 1356 |
volatileData5 | 8577 | 1447 | 1327 | 1361 |
volatileData6 | 8705 | 1479 | 1339 | 1375 |
volatileData6 | 8741 | 1512 | 1368 | 1389 |
4.1.8 Disruptor解决伪共享
在Disruptor中有一个重要的类Sequence,该类包装了一个volatile润饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来取得下一个可应用的绝对地位。对于生产者和消费者来说,Sequence标示着它们的事件序号,来看看Sequence类的源码:
class LhsPadding {
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
protected volatile long value;
}
class RhsPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static {
UNSAFE = Util.getUnsafe();
try {
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
} catch(final Exception e) {
throw new RuntimeException(e);
}
}
public Sequence() {
this(INITIAL_VALUE);
}
public Sequence(final long initialValue) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
}
从第1到11行能够看到,真正应用到的变量value,它的前后空间都由8个long型的变量填补了,对于一个大小为64字节的缓存行,它刚好被填补满(一个long型变量value,8个字节加上前/后个7long型变量填补,7*8=56,56+8=64字节)。这样做每次把变量value读进高速缓存中时,都能把缓存行填充斥(对于大小为64个字节的缓存行来说,如果缓存行大小大于64个字节,那么还是会呈现伪共享问题),保障每次解决数据时都不会与其余变量发生冲突。
4.2 无锁的设计
4.2.1 锁机制存在的问题
- 在多线程竞争下,加锁、开释锁会导致比拟多的上下文切换和调度延时,引起性能问题,而且在上下文切换的时候,cpu之前缓存的指令和数据都将生效,对性能有很大的损失,用户态的锁尽管防止了这些问题,然而其实它们只是在没有实在的竞争时才无效。
- 一个线程持有锁会导致其它所有须要此锁的线程挂起直至该锁开释。
- 如果一个优先级高的线程期待一个优先级低的线程开释锁会导致导致优先级反转(Priority Inversion),引起性能危险。
4.2.2 CAS无锁算法
实现无锁(lock-free)的非阻塞算法有多种实现办法,其中 CAS(比拟与替换,Compare and swap) 是一种有名的无锁算法。CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不批改并通知V的值理论为多少”,CAS是一种 乐观锁 技术,当多个线程尝试应用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并能够再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要批改的新值B。当且仅当预期值A和内存值V雷同时,将内存值V批改为B,否则什么都不做。
这是一个CPU级别的指令,在我的意识中,它的工作形式有点像乐观锁——CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很显著,有其它操作先扭转了这个值。
留神,这能够是CPU的两个不同的外围,但不会是两个独立的CPU。
CAS操作比锁耗费资源少的多,因为它们不关涉操作系统,它们间接在CPU上操作。但它们并非没有代价——在下面的试验中,单线程无锁耗时300ms,单线程有锁耗时10000ms,单线程应用CAS耗时5700ms。所以它比应用锁耗时少,但比不须要思考竞争的单线程耗时多。
4.2.3 传统队列问题
队列的底层数据结构个别分成三种:数组、链表和堆。其中,堆这里是为了实现带有优先级个性的队列,暂且不思考。
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
在稳定性和性能要求特地高的零碎中,为了避免生产者速度过快,导致内存溢出,只能抉择有界队列;
同时,为了缩小Java的垃圾回收对系统性能的影响,会尽量抉择array/heap格局的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue,然而ArrayBlockingQueue是通过加锁的形式保障线程平安,而且ArrayBlockingQueue还存在伪共享问题,这两个问题重大影响了性能。
4.2.3.1 Disruptor的无锁设计
多线程环境下,多个生产者通过do/while循环的条件CAS,来判断每次申请的空间是否曾经被其余生产者占据。如果曾经被占据,该函数会返回失败,While循环从新执行,申请写入空间。
do
{
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));
//next 类比于ArrayBlockQueue的数组索引index
return next;
4.3 环形数组构造
环形数组构造是整个Disruptor的外围所在。
4.3.1 什么是环形数组
RingBuffer 是一个环(首尾相连的环),用做在不同上下文(线程)间传递数据的buffer,RingBuffer 领有一个序号,这个序号指向数组中下一个可用元素。
4.3.2 为什么应用环形数组
为了防止垃圾回收,采纳数组而非链表。同时,数组对处理器的缓存机制更加敌对
首先因为是数组,所以要比链表快,而且依据咱们对下面缓存行的解释晓得,数组中的一个元素加载,相邻的数组元素也是会被预加载的,因而在这样的构造中,cpu无需时不时去主存加载数组中的下一个元素。
而且,你能够为数组事后分配内存,使得数组对象始终存在(除非程序终止)。这就意味着不须要花大量的工夫用于垃圾回收。
此外,不像链表那样,须要为每一个增加到其下面的对象发明节点对象—对应的,当删除节点时,须要执行相应的内存清理操作。环形数组中的元素采纳笼罩形式,防止了jvm的GC。
其次构造作为环形,数组的大小为2的n次方,这样元素定位能够通过位运算效率会更高,这个跟一致性哈希中的环形策略有点像。在disruptor中,这个牛逼的环形构造就是RingBuffer,既然是数组,那么就有大小,而且这个大小必须是2的n次方,构造如下:
其实质只是一个一般的数组,只是当搁置数据填充斥队列(即达到2^n-1地位)之后,再填充数据,就会从0开始,笼罩之前的数据,于是就相当于一个环。
4.4 元素地位定位
数组长度2^n
,通过位运算,放慢定位的速度。下标采取递增的模式。不必放心index溢出的问题。index是long类型,即便100万QPS的处理速度,也须要30万年能力用完。
4.5 期待策略
定义 Consumer 如何进行期待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能体现)依据理论运行环境的 CPU 的硬件特点抉择失当的策略,并配合特定的 JVM 的配置参数,可能实现不同的性能晋升。
4.5.1 BlockingWaitStrategy
Disruptor的默认策略是BlockingWaitStrategy,在BlockingWaitStrategy外部是应用锁和condition来控制线程的唤醒
BlockingWaitStrategy是最低效的策略,但其对CPU的耗费最小并且在各种不同部署环境中能提供更加统一的性能体现。
4.5.2 SleepingWaitStrategy
SleepingWaitStrategy 的性能体现跟 BlockingWaitStrategy 差不多,对 CPU 的耗费也相似,但其对生产者线程的影响最小,通过应用LockSupport.parkNanos(1)
来实现循环期待,适宜用于异步日志相似的场景;
4.5.3 YieldingWaitStrategy
YieldingWaitStrategy是能够应用在低提早零碎的策略之一,YieldingWaitStrategy将自旋以期待序列减少到适当的值。在循环体内,将调用Thread.yield()
以容许其余排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑外围数的场景中,举荐应用此策略;
4.5.4 BusySpinWaitStrategy
性能最好,适宜用于低提早的零碎。在要求极高性能且事件处理线程数小于CPU逻辑外围数的场景中,举荐应用此策略;
4.5.5 PhasedBackoffWaitStrategy
自旋 + yield + 自定义策略,CPU资源紧缺,吞吐量和提早并不 的场景。
本文参加了思否技术征文,欢送正在浏览的你也退出。
如果本文对您有帮忙,欢送
关注
和点赞
`,您的反对是我保持创作的能源。转载请注明出处!
发表回复