关于java:kafka实战二消息消费实战

35次阅读

共计 8477 个字符,预计需要花费 22 分钟才能阅读完成。

文章目录

    • [一、前言]
    • [二、消费者生产形式回顾]
    • [三、消费者生产外围点剖析]
    • [四、手动提交 offset 实战]
      • [4.1、引入 maven 依赖]
      • [4.2、实现一个简略的手动提交 offset 消费者 demo]
      • [4.3、答疑解惑]
      • [4.4、运行调试]
    • [五、主动提交 offset 实战]
    • [六、两种提交 offset 形式的比照]
    • [七、总结]

一、前言

上一篇文章剖析了 kafka 生产者发送音讯的流程并进行了具体的解释和实战。其中蕴含了 新版本的 kafka 对于同步发送音讯和异步发送音讯的 api 实现 ,以及kafka 源码里的回调函数和架构外部的失败重试机制 等都给出了底层的具体解释及 java 实战 demo。

留神:我所应用的 kafka 版本为 2.4.1,java 版本为 1.8,本文会对一些新老版本的改变中央加以阐明。

二、消费者生产形式回顾

首先咱们回顾一下 kafka 生产音讯的形式,kafka 是应用 pull(拉)模式从 broker 中读取数据的,而后就有两个疑难须要解答一下了。
疑难一:那为什么不采纳 push(推,填鸭式教学)的模式给消费者数据呢?

首先回忆下咱们上学学习不就是各种填鸭式教学吗?不论你三七二十一,就是依照教学进度给你灌输知识,能不能承受是你的事,并美其名曰:优胜略汰!

其实这种 push 形式在 kafka 架构里显然是不合理的,比方一个 broker 有多个消费者,它们的生产速率不同,一昧的 push 只会给消费者带来拒绝服务以及网络拥塞等危险。而 kafka 显然不可能去放弃速率低的消费者,因而 kafka 采纳了 pull 的模式,能够依据消费者的生产能力以适当的速率生产 broker 里的音讯。

疑难二:让消费者去被动 pull 数据就美中不足了吗?

同样联想上学的场景,如果把学习主动权全副交给学生,那有些学生想学的货色老师那里没有怎么办?那他不就陷入了一辈子就在那一直求索,然而别的也啥都学的这个死循环的状态了。

其实让消费者去被动 pull 数据天然也是有毛病的。采纳 pull 模式后,如果 kafka 没有数据,消费者可能会陷入循环中,始终返回空数据,这样超级频繁的返回空数据太耗费资源了。为了解决这个问题,Kafka 消费者在生产数据时会传入一个时长参数 timeout,如果以后没有数据可供生产,消费者会期待一段时间之后再返回,这段时长即为 timeout。

留神:老版本还能够依据条数来判断,消费者期待肯定的条数后返回,不过新版本给勾销了。

三、消费者生产外围点剖析

咱们晓得,数据在 kafka 中是能够长久化的,因而 consumer 生产数据的可靠性是不必放心的,也就是说不必放心数据的失落问题,数据的生产是可控的。

疑难:那它是怎么管制的呢?

咱们假如 consumer 在生产过程中呈现了断电宕机等故障,而后它复原后,须要从故障前的地位持续生产,所以 consumer 须要实时记录本人生产到了哪个offset,以便故障复原后持续生产。

所以消费者生产数据的外围点在于 offset 的保护,而在撸代码时,offset 的保护又分为手动提交和主动提交,它们是怎么玩的呢?又有哪些优缺点?都利用于哪些场景呢?当初就让咱们一起来见真章!

四、手动提交 offset 实战

4.1、引入 maven 依赖

大家能够依据本人的 kafka 版本在 mvn 上找到适宜本人的依赖,因为只是做简略的音讯生产,所以只须要和上一篇文章的音讯发送一样,引入 kafka-clients 依赖即可。我的 kafka 版本为 2.4.1,所以我须要引入的依赖为:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>

4.2、实现一个简略的手动提交 offset 消费者 demo

又到了令人兴奋 掉头发 的撸代码工夫了,老规矩,先介绍一下待会会用到的类:

  • KafkaConsumer:和前文的 KafkaProducer 一唱一和,用来创立一个消费者对象进行数据生产;
  • ConsumerConfig:获取各种配置参数,如果不去配置,就是用默认的;
  • ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象才能够进行生产。

那上面就开始写了,这个简略的 demo 一共分为四步:

  1. 第一步:对 Properties 参数进行配置。

    消费者的配置参数十分多,不倡议都去记忆,能够记住几个罕用的,对于不罕用的咱们能够在用到的时候去 ConsumerConfig 源码或者 kafka 官网去查,还是比拟不便的。kafka 官网对于 consumerconfigs 网址:

    http://kafka.apache.org/documentation/#consumerconfigs

    在这个简略的 demo 里,咱们须要用到上面这几个参数:

    Properties 参数

    对应变量

    性能

    bootstrap.servers

    BOOTSTRAP_SERVERS_CONFIG

    指定 kafka 集群

    key.deserializer

    KEY_DESERIALIZER_CLASS_CONFIG

    序列化 key

    value.deserializer

    VALUE_DESERIALIZER_CLASS_CONFIG

    序列化 value

    enable.auto.commit

    ENABLE_AUTO_COMMIT_CONFIG

    主动提交 offset,默认为 true

    group.id

    GROUP_ID_CONFIG

    消费者组 ID,只有 group.id 雷同,就属于同一个消费者组

    晓得了这些变量的含意,代码就进去了:

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "csdn");
    
    

    解释:这里咱们不容许主动提交,所以把 enable.auto.commit 设置为 false,而后给咱们的消费者定一个消费者组,叫做“csdn”。

  2. 第二步:创立 1 个消费者。

    和生产者相似的操作,这里应用传参的形式来创立消费者:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    
  3. 第三步:订阅 topic 主题。

    当初消费者有了,你要通知它去生产哪个或哪些 topic 才行,能够传入多个 topic:

    consumer.subscribe(Arrays.asList("testKafka"));
    
    
  4. 第四步:循环生产并提交 offset。

    到这里,消费者有了,它也晓得要去生产哪个 topic 了,那接下来写个循环,让它去生产就好了。这部分还有几个知识点,先看代码,而后我再解释:

    while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {System.out.println("topic =" + record.topic() + "offset =" + record.offset() + "value =" + record.value());
        }
        consumer.commitAsync();
      //consumer.commitSync();}
    

4.3、答疑解惑

看第四步的代码,外面蕴含了一些知识点,不理解的话可能会有一些疑难,上面对应解释如下:

疑难一:consumer.poll(100),这里的 poll 办法是干啥的?传入的 100 又是啥?有啥用呢?

poll 是 consumer 对象的一个办法,后面第二节的第二个疑难,咱们解释了 pull 并不是美中不足的,如果 broker 里这个 topic 没有数据,你就让这个消费者始终去拉空数据吗?必定是不合理的,所以这里的 100 代表了 timeout,单位是 ms,如果以后没有数据可供生产,消费者会期待 100ms 之后再返回数据。

疑难二:poll 办法拿到的数据是一条还是一批?

这里要留神:它是一批一批去拿,不是一条哦。

疑难三:record 都能拿到哪些信息呢?

能够拿到诸如 offset,partition,topic,key 等信息,具体的信息及类型能够看下图:

疑难四:我看你最初提交 offset 应用了 commitAsync,或者 commitSync,它们是啥?这两个形式有什么区别吗?

commitSync(同步提交)和 commitAsync(异步提交)是手动提交 offset 的两种形式。

两者的相同点是:都会将本次 poll 的一批数据外面最高的偏移量提交;

不同点是:commitSync 会失败重试,始终到提交胜利(如果因为不可复原起因导致,也会提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

留神:尽管 commitAsync 没有失败重试机制,然而咱们理论工作中还是用它比拟多,因为 commitAsync 尽管可能提交失败,然而失败后下一次从新提交(变相的相当于重试),对数据并没有什么影响,而且异步提交的提早较同步提交较低。

疑难五:代码的最初为什么不和生产者一样进行 close 呢?

其实就像 spark streaming 和 flink streaming 一样,对于须要始终生产的角色,是没有 close 办法的,要么 close 是内嵌在别的办法之内,要么就是不须要 close。比方这里的 kafka 的 consumer,咱们是用来测试根底 demo 框架的,因而不须要 close,让它始终读对应 topic 的数据就好,理论生产中,kafka 和 spark streaming 或者 flink 对接,是会始终进行数据的生产的,也不须要 close。

这些纳闷都解决后,就能够运行代码调试了,上面列出全副代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author ropleData
 * 博客地址:https://blog.csdn.net/qq_26803795
 * 专一大数据畛域,欢送拜访
 */
public class consumerDemo {public static void main(String[] args) {

        //1. 配置参数
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1205");

        //2. 创立 1 个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //3. 订阅主题 topic
        consumer.subscribe(Arrays.asList("testKafka"));

        //5. 调用 poll 输入数据并提交 offset
        while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {System.out.println("topic =" + record.topic() + "offset =" + record.offset() + "value =" + record.value());
            }
            consumer.commitAsync();
//            consumer.commitSync();}
    }
}

4.4、运行调试

首先启动咱们的生产代码,而后在 kafka 某个 broker 下启动命令行调试,创立 testKafka 生产者,命令如下:

bin/kafka-console-producer.sh --broker-list 192.168.8.45:9092 --topic testKafka

而后输出局部数据信息:

这时候察看生产端:

能够得出结论,咱们的简略 demo 曾经实现了。

尽管是一个简略的 demo,然而运行调试这里依然可能玩出花来,能够很透测的了解 3 种分区调配策略,也能够比照出异步提交 offset(commitAsync)和同步提交 offset(commitSync)的区别。

对于三种分区调配策略,我的一篇文章曾经举例讲的很分明了,这里就不反复做对应的试验了,强烈建议对照着,多启动几个消费者试验一下数据的生产状况,来印证一下它们的特点,加深印象。
相干文章:深入分析 Kafka(三):消费者生产形式、三种分区调配策略、offset 保护

五、主动提交 offset 实战

其实手动提交和主动提交 offset 的代码差别很小,次要体现在:

  1. 配置参数,改一个,加一个。

    批改:主动提交 offset(enable.auto.commit)为 true;

    新增:主动提交工夫距离(auto.commit.interval.ms)单位 ms,设置主动提交 offset 的工夫距离。

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put("auto.commit.interval.ms", "1000");
    
    
  2. 输入数据后不在写提交 offset 代码。

    //5. 调用 poll 输入数据(不必主动提交 offset)
    while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {System.out.println("topic =" + record.topic() + "offset =" + record.offset() + "value =" + record.value());
        }
    }
    
    

具体就不再演示了,前面会剖析主动提交 offset 和手动提交 offset 的区别,残缺代码如下:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author ropleData
 * 博客地址:https://blog.csdn.net/qq_26803795
 * 专一大数据畛域,欢送拜访
 */
public class consumerAutoCommitDemo {public static void main(String[] args) {

        //1. 配置参数
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put("auto.commit.interval.ms", "1000");


        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1205");

        //2. 创立 1 个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //3. 订阅主题 topic
        consumer.subscribe(Arrays.asList("testKafka"));

        //5. 调用 poll 输入数据(不必主动提交 offset)
        while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {System.out.println("topic =" + record.topic() + "offset =" + record.offset() + "value =" + record.value());
            }
        }
    }
}

六、两种提交 offset 形式的比照

个别是在理论生产中罕用手动提交 offset 这种形式,因为手动提交 offset 取到的数据是可控的。

疑难:怎么管制的呢?

如果先生产数据后提交 offset,这时候如果在提交 offset 的时候挂掉了,起初复原后,会反复生产那条 offset 的数据,这样会数据反复,但也就是保障了数据的起码一次性(at least once);

如果先提交 offset 后生产数据,这时候如果在提交 offset 的时候挂掉了,起初复原后,那局部 offset 尽管提交了,但其实是没有生产的,因而就照成了数据的失落,然而不会反复,也就保障了数据的最多一次性(at most once)。

也就是说手动提交 offset 的时候,是通过管制提交 offset 和生产数据的程序来实现的。

反观主动提交 offset,因为是依据工夫来主动提交的,因而是出了问题之后齐全不可控的,因而在理论生产中不常应用。

七、总结

本文对 kafka 是怎么进行数据生产的,以及各个知识点进行了答疑解惑和代码的剖析,应该能够弄明确 kafka 的生产的形式以及相干的难点,同时也解释了一些写手动提交 offset 代码时大家可能有纳闷的中央,并比照了两种手动提交 offset 的形式。咱们还对手动提交 offset 还是主动提交 offset 别离进行了代码的编写以及两者在理论生产中的比照。

残缺的代码已上传,感兴趣的能够下载查看。

github:https://github.com/ropleData/kafkaConsumerDemo

正文完
 0