<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
public class PCData {
private long value;
public long getValue() {return value;}
public void setValue(long value) {this.value = value;}
}
public class PCDataFactory implements EventFactory<PCData> {
@Override
public PCData newInstance() {return new PCData();
}
}
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {this.ringBuffer = ringBuffer;}
public void pushData(ByteBuffer byteBuffer){long sequence = ringBuffer.next();
try{PCData event = ringBuffer.get(sequence);
event.setValue(byteBuffer.getLong(0));
}finally {ringBuffer.publish(sequence);
}
}
}
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData pcData) throws Exception {System.out.println(Thread.currentThread().getName()+"Event:--"+pcData.getValue()*pcData.getValue()+"--");
}
}
public class App {
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData> dataDisruptor = new Disruptor<PCData>(factory,bufferSize,executorService,
ProducerType.MULTI,new BlockingWaitStrategy());
dataDisruptor.handleEventsWithWorkerPool(new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
dataDisruptor.start();
RingBuffer<PCData> ringBuffer = dataDisruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for(long l=0;true;l++){byteBuffer.putLong(0,l);
producer.pushData(byteBuffer);
Thread.sleep(100);
System.out.println("add data"+l);
}
}
}