关于kafka:Reactive-Spring实战-响应式Kafka交互

10次阅读

共计 7504 个字符,预计需要花费 19 分钟才能阅读完成。

本文分享如何应用 KRaft 部署 Kafka 集群,以及 Spring 中如何实现 Kafka 响应式交互。

KRaft

咱们晓得,Kafka 应用 Zookeeper 负责为 kafka 存储 broker,Consumer Group 等元数据,并应用 Zookeeper 实现 broker 选主等操作。
尽管应用 Zookeeper 简化了 Kafka 的工作,但这也使 Kafka 的部署和运维更简单。

Kafka 2.8.0 开始移除了 Zookeeper,并应用 Kafka 內部的仲裁(Quorum)控制器來取代 ZooKeeper,官网称这个控制器为 “Kafka Raft metadata mode”,即 KRaft mode。从此用户能够在不须要 Zookeeper 的状况下部署 Kafka 集群,这使 Fafka 更加简略,轻量级。
应用 KRaft 模式后,用户只须要专一于保护 Kafka 集群即可。

留神:因为该性能改变较大,目前 Kafka2.8 版本提供的 KRaft 模式是一个测试版本,不举荐在生产环境应用。置信 Kafka 后续版本很快会提供生产可用的 kraft 版本。

上面介绍一下如果应用 Kafka 部署 kafka 集群。
这里应用 3 台机器部署 3 个 Kafka 节点,应用的 Kafka 版本为 2.8.0。

1. 生成 ClusterId 以及配置文件。
(1)应用 kafka-storage.sh 生成 ClusterId。

$ ./bin/kafka-storage.sh random-uuid
dPqzXBF9R62RFACGSg5c-Q

(2)应用 ClusterId 生成配置文件

$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs

留神:只须要在生成一个 ClusterId,并应用该 ClusterId 在所有机器上生成配置文件,即集群中所有节点应用的 ClusterId 需雷同。

2. 批改配置文件
脚本生成的配置文件只能用于单个 Kafka 节点,如果在部署 Kafka 集群,须要对配置文件进行一下批改。

(1)批改 config/kraft/server.properties(稍后应用该配置启动 kafka)

process.roles=broker,controller 
node.id=1
listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093
advertised.listeners=PLAINTEXT://172.17.0.2:9092
controller.quorum.voters=1@172.17.0.2:9093,2@172.17.0.3:9093,3@172.17.0.4:9093

process.roles 指定了该节点角色,有以下取值

  • broker: 这台机器将仅仅当作一个 broker
  • controller: 作为 Raft quorum 的控制器节点
  • broker,controller: 蕴含以上两者的性能

一个集群中不同节点的 node.id 须要不同。
controller.quorum.voters 须要配置集群中所有的 controller 节点,配置格局为 <nodeId>@<ip>:<port>。

(2)
kafka-storage.sh 脚本生成的配置,默认将 kafka 数据寄存在 /tmp/kraft-combined-logs/,
咱们还须要 /tmp/kraft-combined-logs/meta.properties 配置中的 node.id,使其与 server.properties 配置中放弃一起。

node.id=1

3. 启动 kafka
应用 kafka-server-start.sh 脚本启动 Kafka 节点

$ ./bin/kafka-server-start.sh ./config/kraft/server.properties

上面测试一下该 kafka 集群
1. 创立主题

$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 

2. 生产音讯

$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1

3. 生产音讯

$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning

这部分命令的应用与低版本的 Kafka 保持一致。

Kafka 的性能临时还不欠缺,这是展现一个简略的部署示例。
Kafka 文档:https://github.com/apache/kaf…

Spring 中能够应用 Spring-Kafka、Spring-Cloud-Stream 两个框架实现 kafka 响应式交互。
上面别离看一下这两个框架的应用。

Spring-Kafka

1. 增加援用
增加 spring-kafka 援用

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.8.RELEASE</version>
</dependency>

2. 筹备配置文件,内容如下

spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=warehouse-consumers
spring.kafka.consumer.properties.spring.json.trusted.packages=*

别离是生产者和消费者对应的配置,很简略。

3. 发送音讯
Spring-Kakfa 中能够应用 ReactiveKafkaProducerTemplate 发送音讯。
首先,咱们须要创立一个 ReactiveKafkaProducerTemplate 实例。(目前 SpringBoot 会主动创立 KafkaTemplate 实例,但不会创立 ReactiveKafkaProducerTemplate 实例)。

@Configuration
public class KafkaConfig {
    @Autowired
    private KafkaProperties properties;

    @Bean
    public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties());
        ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options);
        return template;
    }
}

KafkaProperties 实例由 SpringBoot 主动创立,读取下面配置文件中对应的配置。

接下来,就能够应用 ReactiveKafkaProducerTemplate 发送音讯了

    @Autowired
    private ReactiveKafkaProducerTemplate template;

    public static final String WAREHOUSE_TOPIC = "warehouse";
    public Mono<Boolean> add(Warehouse warehouse) {Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse);
        return resultMono.flatMap(rs -> {if(rs.exception() != null) {logger.error("send kafka error", rs.exception());
                return Mono.just(false);
            }
            return Mono.just(true);
        });
    }

ReactiveKafkaProducerTemplate#send 办法返回一个 Mono(这是 Spring Reactor 中的外围对象),Mono 中携带了 SenderResult,SenderResult 中的 RecordMetadata、exception 存储该记录的元数据(包含 offset、timestamp 等信息)以及发送操作的异样。

4. 生产音讯
Spring-Kafka 应用 ReactiveKafkaConsumerTemplate 生产音讯。

@Service
public class WarehouseConsumer {
    @Autowired
    private KafkaProperties properties;

    @PostConstruct
    public void consumer() {ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties());
        options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC));
        new ReactiveKafkaConsumerTemplate(options)
                .receiveAutoAck()
                .subscribe(record -> {logger.info("Warehouse Record:" + record);
                });
    }
}

这里与之前应用 @KafkaListener 注解实现的音讯监听者不同,不过也非常简单,分为两个步骤:
(1)ReceiverOptions#subscription 办法将 ReceiverOptions 关联到 kafka 主题
(2)创立 ReactiveKafkaConsumerTemplate,并注册 subscribe 的回调函数生产音讯。
提醒:receiveAutoAck 办法会主动提交生产组 offset。

Spring-Cloud-Stream

Spring-Cloud-Stream 是 Spring 提供的用于构建音讯驱动微服务的框架。
它为不同的消息中间件产品提供一种灵便的,对立的编程模型,能够屏蔽底层不同音讯组件的差别,目前反对 RabbitMQ、Kafka、RocketMQ 等音讯组件。

这里简略展现 Spring-Cloud-Stream 中实现 Kafka 响应式交互的示例,不深刻介绍 Spring-Cloud-Stream 的利用。

1. 引入 spring-cloud-starter-stream-kafka 的援用

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

2. 增加配置

spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/json
spring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2
# 音讯格局
spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json
# 音讯目的地,能够了解为 Kafka 主题
spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2
# 定义消费者生产组,能够了解为 Kafka 生产组
spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers
# 映射办法名
spring.cloud.function.definition=warehouse2;warehouse3

Spring-Cloud-Stream 3.1 版本之后,@EnableBinding、@Output 等 StreamApi 注解都标记为废除,并提供了一种更简洁的函数式编程模型。
该版本后,用户不须要应用注解,只有在配置文件中指定须要绑定的办法,Spring-Cloud-Stream 会为用户将这些办法与底层音讯组件绑定,用户能够间接调用这些办法发送音讯,或者接管到音讯时 Spring-Cloud-Stream 会调用这些办法生产音讯。

通过以下格局定义输出、输入函数的相干属性:
输入(发送音讯):<functionName> + -out- + <index>
输出(生产音讯):<functionName> + -in- + <index>
对于典型的单个输出 / 输入函数,index 始终为 0,因而它仅与具备多个输出和输入参数的函数相干。
Spring-Cloud-Stream 反对具备多个输出(函数参数)/ 输入(函数返回值)的函数。

spring.cloud.function.definition 配置指定须要绑定的办法名,不增加该配置,Spring-Cloud-Stream 会主动尝试绑定返回类型为 Supplier/Function/Consumer 的办法,然而应用该配置能够防止 Spring-Cloud-Stream 绑定混同。

3. 发送音讯
用户能够编写一个返回类型为 Supplier 的办法,并定时发送音讯

    @PollableBean
    public Supplier<Flux<Warehouse>> warehouse2() {Warehouse warehouse = new Warehouse();
        warehouse.setId(333L);
        warehouse.setName("天下第一仓");
        warehouse.setLabel("一级仓");

        logger.info("Supplier Add : {}", warehouse);
        return () -> Flux.just(warehouse);
    }

定义该办法后,Spring-Cloud-Stream 每秒调用一次该办法,生成 Warehouse 实例,并发送到 Kafka。
(这里办法名 warehouse3 曾经配置在 spring.cloud.function.definition 中。)

通常场景下,利用并不需要定时发送音讯,而是由业务场景触发发送音讯操作,如 Rest 接口,
这时能够应用 StreamBridge 接口

    @Autowired
    private StreamBridge streamBridge;

    public boolean add2(Warehouse warehouse) {return streamBridge.send("warehouse2-out-0", warehouse);
    }

临时未发现 StreamBridge 如何实现响应式交互。

4. 生产音讯
利用要生产音讯,只须要定义一个返回类型为 Function/Consumer 的办法即可。如下

    @Bean
    public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {Logger logger = LoggerFactory.getLogger("WarehouseFunction");
        return flux -> flux.doOnNext(data -> {logger.info("Warehouse Data: {}", data);
        }).then();}

留神:办法名与<functionName> + -out- + <index>/<functionName> + -in- + <index>
spring.cloud.function.definition 中的配置须要保持一致,免得出错。

SpringCloudStream 文档:https://docs.spring.io/spring…

文章残缺代码:https://gitee.com/binecy/bin-…

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!

正文完
 0