文章目录

    • [一、前言]
    • [二、消费者生产形式回顾]
    • [三、消费者生产外围点剖析]
    • [四、手动提交offset实战]
      • [4.1、引入maven依赖]
      • [4.2、实现一个简略的手动提交offset消费者demo]
      • [4.3、答疑解惑]
      • [4.4、运行调试]
    • [五、主动提交offset实战]
    • [六、两种提交offset形式的比照]
    • [七、总结]

一、前言

上一篇文章剖析了kafka生产者发送音讯的流程并进行了具体的解释和实战。其中蕴含了新版本的kafka对于同步发送音讯和异步发送音讯的api实现,以及kafka源码里的回调函数和架构外部的失败重试机制等都给出了底层的具体解释及java实战demo。

留神:我所应用的kafka版本为2.4.1,java版本为1.8,本文会对一些新老版本的改变中央加以阐明。

二、消费者生产形式回顾

首先咱们回顾一下kafka生产音讯的形式,kafka是应用pull(拉)模式从broker中读取数据的,而后就有两个疑难须要解答一下了。
疑难一:那为什么不采纳push(推,填鸭式教学)的模式给消费者数据呢?

首先回忆下咱们上学学习不就是各种填鸭式教学吗?不论你三七二十一,就是依照教学进度给你灌输知识,能不能承受是你的事,并美其名曰:优胜略汰!

其实这种push形式在kafka架构里显然是不合理的,比方一个broker有多个消费者,它们的生产速率不同,一昧的push只会给消费者带来拒绝服务以及网络拥塞等危险。而kafka显然不可能去放弃速率低的消费者,因而kafka采纳了pull的模式,能够依据消费者的生产能力以适当的速率生产broker里的音讯。

疑难二:让消费者去被动pull数据就美中不足了吗?

同样联想上学的场景,如果把学习主动权全副交给学生,那有些学生想学的货色老师那里没有怎么办?那他不就陷入了一辈子就在那一直求索,然而别的也啥都学的这个死循环的状态了。

其实让消费者去被动pull数据天然也是有毛病的。采纳pull模式后,如果kafka没有数据,消费者可能会陷入循环中,始终返回空数据,这样超级频繁的返回空数据太耗费资源了。为了解决这个问题,Kafka消费者在生产数据时会传入一个时长参数timeout,如果以后没有数据可供生产,消费者会期待一段时间之后再返回,这段时长即为timeout。

留神:老版本还能够依据条数来判断,消费者期待肯定的条数后返回,不过新版本给勾销了。

三、消费者生产外围点剖析

咱们晓得,数据在kafka中是能够长久化的,因而consumer生产数据的可靠性是不必放心的,也就是说不必放心数据的失落问题,数据的生产是可控的。

疑难:那它是怎么管制的呢?

咱们假如consumer在生产过程中呈现了断电宕机等故障,而后它复原后,须要从故障前的地位持续生产,所以consumer须要实时记录本人生产到了哪个offset,以便故障复原后持续生产。

所以消费者生产数据的外围点在于offset的保护,而在撸代码时,offset的保护又分为手动提交和主动提交,它们是怎么玩的呢?又有哪些优缺点?都利用于哪些场景呢?当初就让咱们一起来见真章!

四、手动提交offset实战

4.1、引入maven依赖

大家能够依据本人的kafka版本在mvn上找到适宜本人的依赖,因为只是做简略的音讯生产,所以只须要和上一篇文章的音讯发送一样,引入kafka-clients依赖即可。我的kafka版本为2.4.1,所以我须要引入的依赖为:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.4.1</version></dependency>

4.2、实现一个简略的手动提交offset消费者demo

又到了令人兴奋掉头发的撸代码工夫了,老规矩,先介绍一下待会会用到的类:

  • KafkaConsumer:和前文的KafkaProducer一唱一和,用来创立一个消费者对象进行数据生产;
  • ConsumerConfig:获取各种配置参数,如果不去配置,就是用默认的;
  • ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象才能够进行生产。

那上面就开始写了,这个简略的demo一共分为四步:

  1. 第一步:对Properties参数进行配置。

    消费者的配置参数十分多,不倡议都去记忆,能够记住几个罕用的,对于不罕用的咱们能够在用到的时候去ConsumerConfig源码或者kafka官网去查,还是比拟不便的。kafka官网对于consumerconfigs网址:

    http://kafka.apache.org/documentation/#consumerconfigs

    在这个简略的demo里,咱们须要用到上面这几个参数:

    Properties参数

    对应变量

    性能

    bootstrap.servers

    BOOTSTRAP_SERVERS_CONFIG

    指定kafka集群

    key.deserializer

    KEY_DESERIALIZER_CLASS_CONFIG

    序列化key

    value.deserializer

    VALUE_DESERIALIZER_CLASS_CONFIG

    序列化value

    enable.auto.commit

    ENABLE_AUTO_COMMIT_CONFIG

    主动提交offset,默认为true

    group.id

    GROUP_ID_CONFIG

    消费者组ID,只有group.id雷同,就属于同一个消费者组

    晓得了这些变量的含意,代码就进去了:

    Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.GROUP_ID_CONFIG, "csdn");

    解释:这里咱们不容许主动提交,所以把enable.auto.commit设置为false,而后给咱们的消费者定一个消费者组,叫做“csdn”。

  2. 第二步:创立1个消费者。

    和生产者相似的操作,这里应用传参的形式来创立消费者:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  3. 第三步:订阅topic主题。

    当初消费者有了,你要通知它去生产哪个或哪些topic才行,能够传入多个topic:

    consumer.subscribe(Arrays.asList("testKafka"));
  4. 第四步:循环生产并提交offset。

    到这里,消费者有了,它也晓得要去生产哪个topic了,那接下来写个循环,让它去生产就好了。这部分还有几个知识点,先看代码,而后我再解释:

    while (true) {    ConsumerRecords<String, String> records = consumer.poll(100);    for (ConsumerRecord<String, String> record : records) {        System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());    }    consumer.commitAsync();  //consumer.commitSync();}

4.3、答疑解惑

看第四步的代码,外面蕴含了一些知识点,不理解的话可能会有一些疑难,上面对应解释如下:

疑难一:consumer.poll(100),这里的poll办法是干啥的?传入的100又是啥?有啥用呢?

poll是consumer对象的一个办法,后面第二节的第二个疑难,咱们解释了pull并不是美中不足的,如果broker里这个topic没有数据,你就让这个消费者始终去拉空数据吗?必定是不合理的,所以这里的100代表了timeout,单位是ms,如果以后没有数据可供生产,消费者会期待100ms之后再返回数据。

疑难二:poll办法拿到的数据是一条还是一批?

这里要留神:它是一批一批去拿,不是一条哦。

疑难三:record都能拿到哪些信息呢?

能够拿到诸如offset,partition,topic,key等信息,具体的信息及类型能够看下图:

疑难四:我看你最初提交offset应用了commitAsync,或者commitSync,它们是啥?这两个形式有什么区别吗?

commitSync(同步提交)和commitAsync(异步提交)是手动提交offset的两种形式。

两者的相同点是:都会将本次poll的一批数据外面最高的偏移量提交;

不同点是:commitSync会失败重试,始终到提交胜利(如果因为不可复原起因导致,也会提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

留神:尽管commitAsync没有失败重试机制,然而咱们理论工作中还是用它比拟多,因为commitAsync尽管可能提交失败,然而失败后下一次从新提交(变相的相当于重试),对数据并没有什么影响,而且异步提交的提早较同步提交较低。

疑难五:代码的最初为什么不和生产者一样进行close呢?

其实就像spark streaming和flink streaming一样,对于须要始终生产的角色,是没有close办法的,要么close是内嵌在别的办法之内,要么就是不须要close。比方这里的kafka的consumer,咱们是用来测试根底demo框架的,因而不须要close,让它始终读对应topic的数据就好,理论生产中,kafka和spark streaming或者flink对接,是会始终进行数据的生产的,也不须要close。

这些纳闷都解决后,就能够运行代码调试了,上面列出全副代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;import java.util.Properties;/** * @author ropleData * 博客地址:https://blog.csdn.net/qq_26803795 * 专一大数据畛域,欢送拜访 */public class consumerDemo {    public static void main(String[] args) {        //1.配置参数        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1205");        //2.创立1个消费者        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        //3.订阅主题topic        consumer.subscribe(Arrays.asList("testKafka"));        //5.调用poll输入数据并提交offset        while (true) {            ConsumerRecords<String, String> records = consumer.poll(100);            for (ConsumerRecord<String, String> record : records) {                System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());            }            consumer.commitAsync();//            consumer.commitSync();        }    }}

4.4、运行调试

首先启动咱们的生产代码,而后在kafka某个broker下启动命令行调试,创立testKafka生产者,命令如下:

bin/kafka-console-producer.sh --broker-list 192.168.8.45:9092 --topic testKafka

而后输出局部数据信息:

这时候察看生产端:

能够得出结论,咱们的简略demo曾经实现了。

尽管是一个简略的demo,然而运行调试这里依然可能玩出花来,能够很透测的了解3种分区调配策略,也能够比照出异步提交offset(commitAsync)和同步提交offset(commitSync)的区别。

对于三种分区调配策略,我的一篇文章曾经举例讲的很分明了,这里就不反复做对应的试验了,强烈建议对照着,多启动几个消费者试验一下数据的生产状况,来印证一下它们的特点,加深印象。
相干文章:深入分析Kafka(三):消费者生产形式、三种分区调配策略、offset保护

五、主动提交offset实战

其实手动提交和主动提交offset的代码差别很小,次要体现在:

  1. 配置参数,改一个,加一个。

    批改:主动提交offset(enable.auto.commit)为true;

    新增:主动提交工夫距离(auto.commit.interval.ms)单位ms,设置主动提交offset的工夫距离。

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put("auto.commit.interval.ms", "1000");
  2. 输入数据后不在写提交offset代码。

    //5.调用poll输入数据(不必主动提交offset)while (true) {    ConsumerRecords<String, String> records = consumer.poll(100);    for (ConsumerRecord<String, String> record : records) {        System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());    }}

具体就不再演示了,前面会剖析主动提交offset和手动提交offset的区别,残缺代码如下:

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;import java.util.Properties;/** * @author ropleData * 博客地址:https://blog.csdn.net/qq_26803795 * 专一大数据畛域,欢送拜访 */public class consumerAutoCommitDemo {    public static void main(String[] args) {        //1.配置参数        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");        props.put("auto.commit.interval.ms", "1000");        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1205");        //2.创立1个消费者        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        //3.订阅主题topic        consumer.subscribe(Arrays.asList("testKafka"));        //5.调用poll输入数据(不必主动提交offset)        while (true) {            ConsumerRecords<String, String> records = consumer.poll(100);            for (ConsumerRecord<String, String> record : records) {                System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());            }        }    }}

六、两种提交offset形式的比照

个别是在理论生产中罕用手动提交offset这种形式,因为手动提交offset取到的数据是可控的。

疑难:怎么管制的呢?

如果先生产数据后提交offset,这时候如果在提交offset的时候挂掉了,起初复原后,会反复生产那条offset的数据,这样会数据反复,但也就是保障了数据的起码一次性(at least once);

如果先提交offset后生产数据,这时候如果在提交offset的时候挂掉了,起初复原后,那局部offset尽管提交了,但其实是没有生产的,因而就照成了数据的失落,然而不会反复,也就保障了数据的最多一次性(at most once)。

也就是说手动提交offset的时候,是通过管制提交offset和生产数据的程序来实现的。

反观主动提交offset,因为是依据工夫来主动提交的,因而是出了问题之后齐全不可控的,因而在理论生产中不常应用。

七、总结

本文对kafka是怎么进行数据生产的,以及各个知识点进行了答疑解惑和代码的剖析,应该能够弄明确kafka的生产的形式以及相干的难点,同时也解释了一些写手动提交offset代码时大家可能有纳闷的中央,并比照了两种手动提交offset的形式。咱们还对手动提交offset还是主动提交offset别离进行了代码的编写以及两者在理论生产中的比照。

残缺的代码已上传,感兴趣的能够下载查看。

github:https://github.com/ropleData/kafkaConsumerDemo