本文分享如何应用KRaft部署Kafka集群,以及Spring中如何实现Kafka响应式交互。

KRaft

咱们晓得,Kafka应用Zookeeper负责为kafka存储broker,Consumer Group等元数据,并应用Zookeeper实现broker选主等操作。
尽管应用Zookeeper简化了Kafka的工作,但这也使Kafka的部署和运维更简单。

Kafka 2.8.0开始移除了Zookeeper,并应用Kafka內部的仲裁(Quorum)控制器來取代ZooKeeper,官网称这个控制器为 "Kafka Raft metadata mode",即KRaft mode。从此用户能够在不须要Zookeeper的状况下部署Kafka集群,这使Fafka更加简略,轻量级。
应用KRaft模式后,用户只须要专一于保护Kafka集群即可。

留神:因为该性能改变较大,目前Kafka2.8版本提供的KRaft模式是一个测试版本,不举荐在生产环境应用。置信Kafka后续版本很快会提供生产可用的kraft版本。

上面介绍一下如果应用Kafka部署kafka集群。
这里应用3台机器部署3个Kafka节点,应用的Kafka版本为2.8.0。

1.生成ClusterId以及配置文件。
(1)应用kafka-storage.sh生成ClusterId。

$ ./bin/kafka-storage.sh random-uuiddPqzXBF9R62RFACGSg5c-Q

(2)应用ClusterId生成配置文件

$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs

留神:只须要在生成一个ClusterId,并应用该ClusterId在所有机器上生成配置文件,即集群中所有节点应用的ClusterId需雷同。

2.批改配置文件
脚本生成的配置文件只能用于单个Kafka节点,如果在部署Kafka集群,须要对配置文件进行一下批改。

(1)批改config/kraft/server.properties(稍后应用该配置启动kafka)

process.roles=broker,controller node.id=1listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093advertised.listeners=PLAINTEXT://172.17.0.2:9092controller.quorum.voters=1@172.17.0.2:9093,2@172.17.0.3:9093,3@172.17.0.4:9093

process.roles指定了该节点角色,有以下取值

  • broker: 这台机器将仅仅当作一个broker
  • controller: 作为Raft quorum的控制器节点
  • broker,controller: 蕴含以上两者的性能

一个集群中不同节点的node.id须要不同。
controller.quorum.voters须要配置集群中所有的controller节点,配置格局为<nodeId>@<ip>:<port>。

(2)
kafka-storage.sh脚本生成的配置,默认将kafka数据寄存在/tmp/kraft-combined-logs/,
咱们还须要/tmp/kraft-combined-logs/meta.properties配置中的node.id,使其与server.properties配置中放弃一起。

node.id=1

3.启动kafka
应用kafka-server-start.sh脚本启动Kafka节点

$ ./bin/kafka-server-start.sh ./config/kraft/server.properties

上面测试一下该kafka集群
1.创立主题

$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 

2.生产音讯

$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1

3.生产音讯

$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning

这部分命令的应用与低版本的Kafka保持一致。

Kafka的性能临时还不欠缺,这是展现一个简略的部署示例。
Kafka文档:https://github.com/apache/kaf...

Spring中能够应用Spring-Kafka、Spring-Cloud-Stream两个框架实现kafka响应式交互。
上面别离看一下这两个框架的应用。

Spring-Kafka

1.增加援用
增加spring-kafka援用

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>2.5.8.RELEASE</version></dependency>

2.筹备配置文件,内容如下

spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.group-id=warehouse-consumersspring.kafka.consumer.properties.spring.json.trusted.packages=*

别离是生产者和消费者对应的配置,很简略。

3.发送音讯
Spring-Kakfa中能够应用ReactiveKafkaProducerTemplate发送音讯。
首先,咱们须要创立一个ReactiveKafkaProducerTemplate实例。(目前SpringBoot会主动创立KafkaTemplate实例,但不会创立ReactiveKafkaProducerTemplate实例)。

@Configurationpublic class KafkaConfig {    @Autowired    private KafkaProperties properties;    @Bean    public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {        SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties());        ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options);        return template;    }}

KafkaProperties实例由SpringBoot主动创立,读取下面配置文件中对应的配置。

接下来,就能够应用ReactiveKafkaProducerTemplate发送音讯了

    @Autowired    private ReactiveKafkaProducerTemplate template;    public static final String WAREHOUSE_TOPIC = "warehouse";    public Mono<Boolean> add(Warehouse warehouse) {        Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse);        return resultMono.flatMap(rs -> {            if(rs.exception() != null) {                logger.error("send kafka error", rs.exception());                return Mono.just(false);            }            return Mono.just(true);        });    }

ReactiveKafkaProducerTemplate#send办法返回一个Mono(这是Spring Reactor中的外围对象),Mono中携带了SenderResult,SenderResult中的RecordMetadata、exception存储该记录的元数据(包含offset、timestamp等信息)以及发送操作的异样。

4.生产音讯
Spring-Kafka应用ReactiveKafkaConsumerTemplate生产音讯。

@Servicepublic class WarehouseConsumer {    @Autowired    private KafkaProperties properties;    @PostConstruct    public void consumer() {        ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties());        options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC));        new ReactiveKafkaConsumerTemplate(options)                .receiveAutoAck()                .subscribe(record -> {                    logger.info("Warehouse Record:" + record);                });    }}

这里与之前应用@KafkaListener注解实现的音讯监听者不同,不过也非常简单,分为两个步骤:
(1)ReceiverOptions#subscription办法将ReceiverOptions关联到kafka主题
(2)创立ReactiveKafkaConsumerTemplate,并注册subscribe的回调函数生产音讯。
提醒:receiveAutoAck办法会主动提交生产组offset。

Spring-Cloud-Stream

Spring-Cloud-Stream是Spring提供的用于构建音讯驱动微服务的框架。
它为不同的消息中间件产品提供一种灵便的,对立的编程模型,能够屏蔽底层不同音讯组件的差别,目前反对RabbitMQ、Kafka、RocketMQ等音讯组件。

这里简略展现Spring-Cloud-Stream中实现Kafka响应式交互的示例,不深刻介绍Spring-Cloud-Stream的利用。

1.引入spring-cloud-starter-stream-kafka的援用

    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-stream-kafka</artifactId>    </dependency>

2.增加配置

spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/jsonspring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2# 音讯格局spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json# 音讯目的地,能够了解为Kafka主题spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2# 定义消费者生产组,能够了解为Kafka生产组spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers# 映射办法名spring.cloud.function.definition=warehouse2;warehouse3

Spring-Cloud-Stream 3.1版本之后,@EnableBinding、@Output等StreamApi注解都标记为废除,并提供了一种更简洁的函数式编程模型。
该版本后,用户不须要应用注解,只有在配置文件中指定须要绑定的办法,Spring-Cloud-Stream会为用户将这些办法与底层音讯组件绑定,用户能够间接调用这些办法发送音讯,或者接管到音讯时Spring-Cloud-Stream会调用这些办法生产音讯。

通过以下格局定义输出、输入函数的相干属性:
输入(发送音讯):<functionName> + -out- + <index>
输出(生产音讯):<functionName> + -in- + <index>
对于典型的单个输出/输入函数,index始终为0,因而它仅与具备多个输出和输入参数的函数相干。
Spring-Cloud-Stream反对具备多个输出(函数参数)/输入(函数返回值)的函数。

spring.cloud.function.definition配置指定须要绑定的办法名,不增加该配置,Spring-Cloud-Stream会主动尝试绑定返回类型为Supplier/Function/Consumer的办法,然而应用该配置能够防止Spring-Cloud-Stream绑定混同。

3.发送音讯
用户能够编写一个返回类型为Supplier的办法,并定时发送音讯

    @PollableBean    public Supplier<Flux<Warehouse>> warehouse2() {        Warehouse warehouse = new Warehouse();        warehouse.setId(333L);        warehouse.setName("天下第一仓");        warehouse.setLabel("一级仓");        logger.info("Supplier Add : {}", warehouse);        return () -> Flux.just(warehouse);    }

定义该办法后,Spring-Cloud-Stream每秒调用一次该办法,生成Warehouse实例,并发送到Kafka。
(这里办法名warehouse3曾经配置在spring.cloud.function.definition中。)

通常场景下,利用并不需要定时发送音讯,而是由业务场景触发发送音讯操作, 如Rest接口,
这时能够应用StreamBridge接口

    @Autowired    private StreamBridge streamBridge;    public boolean add2(Warehouse warehouse) {        return streamBridge.send("warehouse2-out-0", warehouse);    }

临时未发现StreamBridge如何实现响应式交互。

4.生产音讯
利用要生产音讯,只须要定义一个返回类型为Function/Consumer的办法即可。如下

    @Bean    public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {        Logger logger = LoggerFactory.getLogger("WarehouseFunction");        return flux -> flux.doOnNext(data -> {            logger.info("Warehouse Data: {}", data);        }).then();    }

留神:办法名与<functionName> + -out- + <index>/<functionName> + -in- + <index>
spring.cloud.function.definition中的配置须要保持一致,免得出错。

SpringCloudStream文档:https://docs.spring.io/spring...

文章残缺代码:https://gitee.com/binecy/bin-...

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!