乐趣区

高性能的生产者消费者新选择无锁的缓存框架-Disruptor

    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.4</version>
    </dependency>

public class PCData {

private long value;

public long getValue() {return value;}

public void setValue(long value) {this.value = value;}

}

public class PCDataFactory implements EventFactory<PCData> {

@Override
public PCData newInstance() {return new PCData();
}

}

public class Producer {

private final RingBuffer<PCData> ringBuffer;

public Producer(RingBuffer<PCData> ringBuffer) {this.ringBuffer = ringBuffer;}

public void pushData(ByteBuffer byteBuffer){long sequence = ringBuffer.next();
    try{PCData event = ringBuffer.get(sequence);
        event.setValue(byteBuffer.getLong(0));
    }finally {ringBuffer.publish(sequence);
    }
}

}

public class Consumer implements WorkHandler<PCData> {

@Override
public void onEvent(PCData pcData) throws Exception {System.out.println(Thread.currentThread().getName()+"Event:--"+pcData.getValue()*pcData.getValue()+"--");
}

}

public class App {

public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();
    PCDataFactory factory = new PCDataFactory();

    int bufferSize = 1024;
    Disruptor<PCData> dataDisruptor = new Disruptor<PCData>(factory,bufferSize,executorService,
            ProducerType.MULTI,new BlockingWaitStrategy());
    dataDisruptor.handleEventsWithWorkerPool(new Consumer(),
            new Consumer(),
            new Consumer(),
            new Consumer());

    dataDisruptor.start();

    RingBuffer<PCData> ringBuffer = dataDisruptor.getRingBuffer();
    Producer producer = new Producer(ringBuffer);
    ByteBuffer byteBuffer = ByteBuffer.allocate(8);
    for(long l=0;true;l++){byteBuffer.putLong(0,l);
        producer.pushData(byteBuffer);
        Thread.sleep(100);
        System.out.println("add data"+l);
    }
}

}

退出移动版