本文分享如何应用KRaft部署Kafka集群,以及Spring中如何实现Kafka响应式交互。
KRaft
咱们晓得,Kafka应用Zookeeper负责为kafka存储broker,Consumer Group等元数据,并应用Zookeeper实现broker选主等操作。
尽管应用Zookeeper简化了Kafka的工作,但这也使Kafka的部署和运维更简单。
Kafka 2.8.0开始移除了Zookeeper,并应用Kafka內部的仲裁(Quorum)控制器來取代ZooKeeper,官网称这个控制器为 "Kafka Raft metadata mode",即KRaft mode。从此用户能够在不须要Zookeeper的状况下部署Kafka集群,这使Fafka更加简略,轻量级。
应用KRaft模式后,用户只须要专一于保护Kafka集群即可。
留神:因为该性能改变较大,目前Kafka2.8版本提供的KRaft模式是一个测试版本,不举荐在生产环境应用。置信Kafka后续版本很快会提供生产可用的kraft版本。
上面介绍一下如果应用Kafka部署kafka集群。
这里应用3台机器部署3个Kafka节点,应用的Kafka版本为2.8.0。
1.生成ClusterId以及配置文件。
(1)应用kafka-storage.sh生成ClusterId。
$ ./bin/kafka-storage.sh random-uuiddPqzXBF9R62RFACGSg5c-Q
(2)应用ClusterId生成配置文件
$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs
留神:只须要在生成一个ClusterId,并应用该ClusterId在所有机器上生成配置文件,即集群中所有节点应用的ClusterId需雷同。
2.批改配置文件
脚本生成的配置文件只能用于单个Kafka节点,如果在部署Kafka集群,须要对配置文件进行一下批改。
(1)批改config/kraft/server.properties(稍后应用该配置启动kafka)
process.roles=broker,controller node.id=1listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093advertised.listeners=PLAINTEXT://172.17.0.2:9092controller.quorum.voters=1@172.17.0.2:9093,2@172.17.0.3:9093,3@172.17.0.4:9093
process.roles指定了该节点角色,有以下取值
- broker: 这台机器将仅仅当作一个broker
- controller: 作为Raft quorum的控制器节点
- broker,controller: 蕴含以上两者的性能
一个集群中不同节点的node.id须要不同。
controller.quorum.voters须要配置集群中所有的controller节点,配置格局为<nodeId>@<ip>:<port>。
(2)
kafka-storage.sh脚本生成的配置,默认将kafka数据寄存在/tmp/kraft-combined-logs/,
咱们还须要/tmp/kraft-combined-logs/meta.properties配置中的node.id,使其与server.properties配置中放弃一起。
node.id=1
3.启动kafka
应用kafka-server-start.sh脚本启动Kafka节点
$ ./bin/kafka-server-start.sh ./config/kraft/server.properties
上面测试一下该kafka集群
1.创立主题
$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1
2.生产音讯
$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1
3.生产音讯
$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning
这部分命令的应用与低版本的Kafka保持一致。
Kafka的性能临时还不欠缺,这是展现一个简略的部署示例。
Kafka文档:https://github.com/apache/kaf...
Spring中能够应用Spring-Kafka、Spring-Cloud-Stream两个框架实现kafka响应式交互。
上面别离看一下这两个框架的应用。
Spring-Kafka
1.增加援用
增加spring-kafka援用
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.8.RELEASE</version></dependency>
2.筹备配置文件,内容如下
spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.group-id=warehouse-consumersspring.kafka.consumer.properties.spring.json.trusted.packages=*
别离是生产者和消费者对应的配置,很简略。
3.发送音讯
Spring-Kakfa中能够应用ReactiveKafkaProducerTemplate发送音讯。
首先,咱们须要创立一个ReactiveKafkaProducerTemplate实例。(目前SpringBoot会主动创立KafkaTemplate实例,但不会创立ReactiveKafkaProducerTemplate实例)。
@Configurationpublic class KafkaConfig { @Autowired private KafkaProperties properties; @Bean public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() { SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties()); ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options); return template; }}
KafkaProperties实例由SpringBoot主动创立,读取下面配置文件中对应的配置。
接下来,就能够应用ReactiveKafkaProducerTemplate发送音讯了
@Autowired private ReactiveKafkaProducerTemplate template; public static final String WAREHOUSE_TOPIC = "warehouse"; public Mono<Boolean> add(Warehouse warehouse) { Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse); return resultMono.flatMap(rs -> { if(rs.exception() != null) { logger.error("send kafka error", rs.exception()); return Mono.just(false); } return Mono.just(true); }); }
ReactiveKafkaProducerTemplate#send办法返回一个Mono(这是Spring Reactor中的外围对象),Mono中携带了SenderResult,SenderResult中的RecordMetadata、exception存储该记录的元数据(包含offset、timestamp等信息)以及发送操作的异样。
4.生产音讯
Spring-Kafka应用ReactiveKafkaConsumerTemplate生产音讯。
@Servicepublic class WarehouseConsumer { @Autowired private KafkaProperties properties; @PostConstruct public void consumer() { ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties()); options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC)); new ReactiveKafkaConsumerTemplate(options) .receiveAutoAck() .subscribe(record -> { logger.info("Warehouse Record:" + record); }); }}
这里与之前应用@KafkaListener注解实现的音讯监听者不同,不过也非常简单,分为两个步骤:
(1)ReceiverOptions#subscription办法将ReceiverOptions关联到kafka主题
(2)创立ReactiveKafkaConsumerTemplate,并注册subscribe的回调函数生产音讯。
提醒:receiveAutoAck办法会主动提交生产组offset。
Spring-Cloud-Stream
Spring-Cloud-Stream是Spring提供的用于构建音讯驱动微服务的框架。
它为不同的消息中间件产品提供一种灵便的,对立的编程模型,能够屏蔽底层不同音讯组件的差别,目前反对RabbitMQ、Kafka、RocketMQ等音讯组件。
这里简略展现Spring-Cloud-Stream中实现Kafka响应式交互的示例,不深刻介绍Spring-Cloud-Stream的利用。
1.引入spring-cloud-starter-stream-kafka的援用
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
2.增加配置
spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/jsonspring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2# 音讯格局spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json# 音讯目的地,能够了解为Kafka主题spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2# 定义消费者生产组,能够了解为Kafka生产组spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers# 映射办法名spring.cloud.function.definition=warehouse2;warehouse3
Spring-Cloud-Stream 3.1版本之后,@EnableBinding、@Output等StreamApi注解都标记为废除,并提供了一种更简洁的函数式编程模型。
该版本后,用户不须要应用注解,只有在配置文件中指定须要绑定的办法,Spring-Cloud-Stream会为用户将这些办法与底层音讯组件绑定,用户能够间接调用这些办法发送音讯,或者接管到音讯时Spring-Cloud-Stream会调用这些办法生产音讯。
通过以下格局定义输出、输入函数的相干属性:
输入(发送音讯):<functionName> + -out- + <index>
输出(生产音讯):<functionName> + -in- + <index>
对于典型的单个输出/输入函数,index始终为0,因而它仅与具备多个输出和输入参数的函数相干。
Spring-Cloud-Stream反对具备多个输出(函数参数)/输入(函数返回值)的函数。
spring.cloud.function.definition配置指定须要绑定的办法名,不增加该配置,Spring-Cloud-Stream会主动尝试绑定返回类型为Supplier/Function/Consumer的办法,然而应用该配置能够防止Spring-Cloud-Stream绑定混同。
3.发送音讯
用户能够编写一个返回类型为Supplier的办法,并定时发送音讯
@PollableBean public Supplier<Flux<Warehouse>> warehouse2() { Warehouse warehouse = new Warehouse(); warehouse.setId(333L); warehouse.setName("天下第一仓"); warehouse.setLabel("一级仓"); logger.info("Supplier Add : {}", warehouse); return () -> Flux.just(warehouse); }
定义该办法后,Spring-Cloud-Stream每秒调用一次该办法,生成Warehouse实例,并发送到Kafka。
(这里办法名warehouse3曾经配置在spring.cloud.function.definition中。)
通常场景下,利用并不需要定时发送音讯,而是由业务场景触发发送音讯操作, 如Rest接口,
这时能够应用StreamBridge接口
@Autowired private StreamBridge streamBridge; public boolean add2(Warehouse warehouse) { return streamBridge.send("warehouse2-out-0", warehouse); }
临时未发现StreamBridge如何实现响应式交互。
4.生产音讯
利用要生产音讯,只须要定义一个返回类型为Function/Consumer的办法即可。如下
@Bean public Function<Flux<Warehouse>, Mono<Void>> warehouse3() { Logger logger = LoggerFactory.getLogger("WarehouseFunction"); return flux -> flux.doOnNext(data -> { logger.info("Warehouse Data: {}", data); }).then(); }
留神:办法名与<functionName> + -out- + <index>
/<functionName> + -in- + <index>
、
spring.cloud.function.definition中的配置须要保持一致,免得出错。
SpringCloudStream文档:https://docs.spring.io/spring...
文章残缺代码:https://gitee.com/binecy/bin-...
如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!