序本文主要展示一下如何使用reactor-kafkamaven <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> <version>1.0.1.RELEASE</version> </dependency>准备启动zookeepercd zookeeper-3.4.13sh bin/zkServer.sh startZooKeeper JMX enabled by defaultZooKeeper remote JMX Port set to 8999ZooKeeper remote JMX authenticate set to falseZooKeeper remote JMX ssl set to falseZooKeeper remote JMX log4j set to trueUsing config: zookeeper-3.4.13/bin/../conf/zoo.cfg-n Starting zookeeper …STARTED启动kafkacd kafka_2.11-1.1.1sh bin/kafka-server-start.sh config/server.properties创建topiccd kafka_2.11-1.1.1sh bin/kafka-topics.sh –create –topic demotopic –replication-factor 1 –partitions 3 –zookeeper localhost:2181Created topic “demotopic”.实例producer @Test public void testProducer() throws InterruptedException { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, “sample-producer”); props.put(ProducerConfig.ACKS_CONFIG, “all”); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); SenderOptions<Integer, String> senderOptions = SenderOptions.create(props); KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions); SimpleDateFormat dateFormat = new SimpleDateFormat(“HH:mm:ss:SSS z dd MMM yyyy”); CountDownLatch latch = new CountDownLatch(100); sender.<Integer>send(Flux.range(1, 100) .map(i -> SenderRecord.create(new ProducerRecord<>(TOPIC, i, “Message_” + i), i))) .doOnError(e -> log.error(“Send failed”, e)) .subscribe(r -> { RecordMetadata metadata = r.recordMetadata(); System.out.printf(“Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n”, r.correlationMetadata(), metadata.topic(), metadata.partition(), metadata.offset(), dateFormat.format(new Date(metadata.timestamp()))); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); sender.close(); }consumer @Test public void testConsumer() throws InterruptedException { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.CLIENT_ID_CONFIG, “sample-consumer”); props.put(ConsumerConfig.GROUP_ID_CONFIG, “sample-group”); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(props); SimpleDateFormat dateFormat = new SimpleDateFormat(“HH:mm:ss:SSS z dd MMM yyyy”); CountDownLatch latch = new CountDownLatch(100); ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(TOPIC)) .addAssignListener(partitions -> log.debug(“onPartitionsAssigned {}”, partitions)) .addRevokeListener(partitions -> log.debug(“onPartitionsRevoked {}”, partitions)); Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive(); Disposable disposable = kafkaFlux.subscribe(record -> { ReceiverOffset offset = record.receiverOffset(); System.out.printf(“Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n”, offset.topicPartition(), offset.offset(), dateFormat.format(new Date(record.timestamp())), record.key(), record.value()); offset.acknowledge(); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); }小结reactor-kafka对kafka的api进行封装,改造为reactive streams模式,这样用起来更为顺手,熟悉reactor的开发人员可以轻车熟路。docreactor-kafka-samplesReactor Kafka Reference Guide
...