关于java:springbootroute十四整合Kafka

49次阅读

共计 3703 个字符,预计需要花费 10 分钟才能阅读完成。

在上一章中 SpringBoot 整合 RabbitMQ,曾经具体介绍了音讯队列的作用,这一种咱们间接来学习 SpringBoot 如何整合 kafka 发送音讯。

kafka 简介

kafka 是用 Scala 和 Java 语言开发的,高吞吐量的分布式消息中间件。高吞吐量使它在大数据畛域具备人造的劣势,被宽泛用来记录日志。

kafka 架构剖析

注 1:图中的红色箭头示意音讯的流动过程,蓝色示意分区备份,绿色示意 kafka 集群注册到 zookeeper。

注 2:在 kafka0.9 版本之前,消费者生产音讯的地位记录在 zookeeper 中,在 0.9 版本之后,生产音讯的地位记录在 kafka 的一个 topic 上。

kafka 名词简介

  1. Producer:音讯生产者
  2. Consumer:音讯消费者
  3. Consumer Group(CG):消费者组,一个 topic 能够有多个 CG,每个 Partition 只会把音讯发送给 GG 中的一个 Consumer
  4. Broker:一台 kafka 服务器就是一个 broker,一个 broker 有多个 topic
  5. Topic:音讯主题,音讯分类,可看作队列
  6. Partition:分区,为了实现扩大,一个大的 topic 可能散布到多个 broker 上,一个 topic 能够分为多个 partition,partition 中的每条音讯都会被调配一个有序的 id(offset),每个 partiton 中的音讯是有序的。
  7. Offset:kafka 的存储文件都是依照 offset.kafka 来命名的,不便查找,第一个 offset 为 0000000000.kafka。
  8. Leader:分区具备被备份,主分区
  9. Follower:从分区

1. 生产者分区策略

  1. 指定分区。
  2. 没有指定分区但有 key 值,将 key 的 hash 值与以后 topic 的分区个数进行取余失去分区。
  3. 如果既没有指定分区又没有指定 key,第一次调用时随机生成一个整数(当前调用每次在这个整数上自增),将这个随机数与该 topic 的分区数取余失去分区。

2. 音讯可靠性问题

采纳 ack 确认机制来保障音讯的可靠性。

kafka 在发送音讯后会同步到其余分区正本,等所有正本都接管到音讯后,kafka 才会发送 ack 进行确认。采纳这种模式的劣势就是当其中一个正本宕机后,则音讯生产者就不会收到 kafka 的 ack。

kafka 采纳 ISR 来解决这个问题。

ISR:Leader 保护的一个和 leader 放弃同步的 follower 汇合。

当 ISR 中的 folower 实现数据同步之后,leader 就会向 follower 发送 ack,如果 follower 长时间未向 leader 同步数据,则该 follower 就会被踢出 ISR,该工夫阀值的设置参数为replica.lag.time.max.ms,默认工夫为 10s,leader 产生故障后,就会从 ISR 中选举新的 leader。

注:本文所讲的 kafka 版本为 0.11,在 0.9 版本以前成为 ISR 还有一个条件,就是同步音讯的条数。

ack 参数配置

0:生产者不期待 broker 的 ack。

1:leader 分区接管到音讯向生产者发送 ack。

-1(all):ISR 中的 leader 和 follower 同步胜利后,向生产者发送 ack。

3. 音讯一致性问题

如果 leader 中有 10 条音讯,向两个 follower 同步数据,follower A 同步了 8 条,follower B 同步了 9 条。这时候 leader 宕机了,follower A 和 follower B 中的音讯是不统一的,剩下两个 follower 就会从新选举出一个 leader。

  • LEO(log end offset):每个正本的最初一个 offset
  • HW(high watermark):所有正本中最小的 offset

为了保证数据的一致性,所有的 follower 会将各自的 log 文件 高出 HW 的局部 截掉,而后再从新的 leader 中同步数据。

4. 音讯重复性问题

在 kafka0.11 版本中引入了一个新个性:幂等性。启用幂等性后,ack 默认为 -1。将生产者中的 enable.idompotence 设置为 true,即启用了幂等性。

开启幂等性的 Producer 在初始化的时候会被调配一个 PID,发往同一 Partition 的音讯会附带 Sequence Number。Broker 端会对 <PID,Partition,SeqNumber> 做缓存,当具备雷同主键的音讯提交时,Broker 只会缓存一条。然而每次重启 PID 就会发生变化,因而只能保障一次会话同一分区的音讯不反复。

5. 消费者组分区调配策略

kafka 有两种调配策略,一种是 RoundRobin,另一种是 Range

RoundRobin是依照消费者组以轮询的形式去给消费者调配分区的形式,前提条件是消费者组中的消费者须要订阅同一个 topic。

Range是 kafka 默认的调配策略,它是通过以后的 topic 依照肯定范畴来调配的,如果有 3 个分区,消费者组有两个消费者,则消费者 A 去生产 1 和 2 分区,消费者 B 去生产 3 分区。

6. 消费者 offset 保护

Kafka 0.9 版本之前,consumer 默认将 offset 保留在 zookeeper 中,0.9 版本开始,offset 保留在 kafka 的一个内置 topic 中,该 topic 为_consumer_offsets

7. 生产者事务

为了实现跨分区会话的事务,须要引入一个全局惟一的 Tracscation ID,并将 Producer 取得的 PID 与之绑定。这样当 Producer 重启后就能够通过正在进行的 Transaction ID 取得原来的 PID。

为了治理 Transcation ID,kafka 引入了一个新的组件 Transcation Coordinator。Producer 就是通过和 Transcation Coordinator 交互取得 Transction ID 对应的工作状态。

Spring Boot 整合 kafka

1. 引入 kafka 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置 kafka 服务信息

spring:
  kafka:
    # kafka 服务地址
    bootstrap-servers: 47.104.155.182:9092
    producer:
      # 生产者音讯 key 序列化形式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 生产者音讯 value 序列化形式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 消费者组
      group-id: test-consumer-group
      # 消费者音讯 value 反序列化形式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消费者音讯 value 反序列化形式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 消费者

@Component
@Slf4j
@KafkaListener(topics = {"first-topic"},groupId = "test-consumer-group")
public class Consumer {

    @KafkaHandler
    public void receive(String message){log.info("我是消费者,我接管到的音讯是:"+message);
    }
}

4. 生产者

@RestController
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("send")
    public void send(){

        String message = "你好,我是 Java 旅途";
        // 第一个参数 topic
        // 第二个参数 音讯
        kafkaTemplate.send("first-topic",message);
    }
}

此是 spring-boot-route 系列的第十四篇文章,这个系列的文章都比较简单,次要目标就是为了帮忙首次接触 Spring Boot 的同学有一个零碎的意识。本文已收录至我的 github,欢送各位小伙伴star

github:https://github.com/binzh303/s…

点关注、不迷路

如果感觉文章不错,欢送 关注 点赞 珍藏,你们的反对是我创作的能源,感激大家。

如果文章写的有问题,请不要悭吝,欢送留言指出,我会及时核查批改。

如果你还想更加深刻的理解我,能够微信搜寻「Java 旅途」进行关注。回复「1024」即可取得学习视频及精美电子书。每天 7:30 准时推送技术文章,让你的下班路不在孤单,而且每月还有送书流动,助你晋升硬实力!

正文完
 0