文章目录

      • [一、前言]
    • [二、调试常用命令行总结]
    • [三、Producer音讯发送流程详解]
      • [3.1、总体流程]
      • [3.2、分步骤细化流程]
    • [四、异步发送音讯实战]
      • [4.1、引入依赖]
      • [4.2、简略异步发送Demo]
      • [4.3、带回调函数的异步发送Demo]
        • [4.3.1、失败重试机制]
        • [4.3.2、解析回调函数]
        • [4.3.3、实现带回调函数的异步发送Demo]
    • [五、同步发送音讯实战]
      • [5.1、从源码找到端倪]
      • [5.2、同步发送Demo]
    • [六、总结]

一、前言

前些天和大家一起深入分析了kafka架构办法的常识,这部分内容偏差于实践,不过也是大数据开发工程师必须要把握的知识点。

kafka架构篇系列文章:
[深入分析Kafka架构(一):工作流程、存储机制、分区策略]
[深入分析Kafka架构(二):数据可靠性、故障解决]
[深入分析Kafka架构(三):消费者生产形式、三种分区调配策略、offset保护]

因而我感觉十分有必要再辅以代码实现,来加深了解,死记硬背。因而本系列kafka实战篇以实战为主,目标是应用kafka提供的JAVA API来实现音讯的发送,生产,拦挡等操作,用来加深对kafka架构的认知,并会把相干残缺demo共享到github和咱们csdn上,感兴趣的能够拿来应用。

本文为kafka实战系列第一篇,次要进行kafka的音讯发送局部的流程解析及实战开发。

留神:我所应用的kafka版本为2.4.1,java版本为1.8,本文会对一些新老版本的改变中央加以阐明。

二、调试常用命令行总结

其实在日常应用kafka的过程中,很少应用命令行操作,个别命令行操作只是用来调试的时候应用。不过作为回顾,上面列出一些罕用的命令行操作,并对其进行具体解释。

  1. 查看以后服务器中的所有topic

    bin/kafka-topics.sh --zookeeper zookeeper主机名或ip:2181 --list` 
  2. 创立topic

    如下命令能够创立了一个3分区,2正本的topic first:

     bin/kafka-topics.sh --zookeeper zookeeper主机名或ip:2181  --create --replication-factor 2 --partitions 3 --topic first  选项阐明:  --topic 定义topic名 --replication-factor 定义正本数 --partitions 定义分区数
  3. 删除topic

    留神:删除topic的时候须要在server.properties中设置delete.topic.enable=true否则只是标记删除,并没有真正删除。

    bin/kafka-topics.sh --zookeeper zookeeper主机名或ip:2181 --delete --topic first 
  4. 查看某个Topic的详情

    bin/kafka-topics.sh --zookeeper zookeeper主机名或ip:2181 --describe --topic first 
  5. 发送音讯

    留神:–broker-list kafka集群里的broker主机名或ip:9092,如果要发送给多个broker,用逗号宰割就能够了。

    bin/kafka-console-producer.sh --broker-list kafka集群里的broker主机名或ip:9092 --topic first输出完下面的命令胜利后会有">"标记,就能够输出数据了输出数据,须要删除的时候按住ctrl 在点击backspace就能够删除了。>hello world 
  6. 生产音讯

    留神:在kafka0.9.x版本之前,消费者指定的是zookeeper,然而新版本都是指定的kafka集群。

     bin/kafka-console-consumer.sh --bootstrap-server kafka集群里的broker主机名或ip:9092 --from-beginning --topic first--from-beginning:会把主题中以往所有的数据都读取进去。` 
  7. 批改分区数

    留神:分区数只能增多不能缩小(因为分区数缩小后,把删掉的分区的数据调配到残余的分区这个过程过于简单,所以kafka没有设计分区缩小的逻辑。)

     bin/kafka-topics.sh --zookeeper zookeeper主机名或ip:2181 --alter --topic first --partitions 6   

三、Producer音讯发送流程详解

3.1、总体流程

咱们谈到音讯队列就会想到:异步,解耦,消峰。kafka天然也不例外,在新版本里,它的Producer发送音讯采纳的也是异步发送的形式(_之前老版本有同步发送的api,新版本勾销了,然而咱们能够通过骚操作实现同步发送,前面会具体解释_)。在音讯发送的过程中,波及到了两个线程,别离是main线程(又叫主线程)和sender线程,以及一个线程共享变量(能够了解为缓存)RecordAccumulator

总体来说,sender线程是main线程的守护线程,在工作时,main线程负责创立音讯对象并将音讯放在缓存RecordAccumulator,sender线程从缓存RecordAccumulator中拉取音讯而后发送到kafka broker。

对于RecordAccumulator,咱们还须要晓得:

  1. 在音讯追加到RecordAccumulator时会对音讯进行分类,发往同一分区的音讯会被装在同一个Deque中,Deque寄存的是ProducerBatch示意一组音讯。换句话说就是RecordAccumulator会依照分区进行队列保护;
  2. 队列中寄存的是发往该分区的音讯组,追加音讯时候从队列的尾部追加;
  3. RecordAccumulator的大小默认32M,能够通过buffer.memory配置指定;
  4. 如果内存空间用完了,追加音讯将产生阻塞直到有空间可用为止,默认最大阻塞60s,能够通过数max.block.ms配置。

整体流程图能够看成上面这样:

其实晓得了下面这些,就能够依据api写一些简略的kafka发送音讯的代码了。因为在api里,main线程和sender线程都被封装的很好,很多事件是咱们不须要去关怀的。

不过如果你和我一样,好奇main线程和sender线程具体都做了什么?那就接着看上面这部分细化的的流程总结,不感兴趣的话就能够间接跳到下一大节(异步发送Demo)开始撸代码了。

3.2、分步骤细化流程

首先main线程的流程:

  1. 封装音讯对象为ProducerRecord并调用send办法;
  2. 进入producer拦截器(拦截器能够自定义);
  3. 更新kafka集群数据;
  4. 进行序列化,将音讯对象序列化成byte数组;
  5. 应用分区器计算分区;
  6. 将音讯追加到线程共享变量RecordAccumulator。

sender线程在KafkaProduer实例化完结开启,前面就是sender线程干的活了:

  1. sender线程将音讯从RecordAccumulator中取出解决音讯格局;
  2. 构建发送的申请对象Request;
  3. 将申请交给Selector,并将申请寄存在申请队列;
  4. 收到响应就移除申请队列的申请,调用每个音讯上的回调函数。

四、异步发送音讯实战

其实只有把握了Producer音讯发送的总体流程,就能够依据api写根本demo了。上面咱们层层递进来实现异步发送demo。

4.1、引入依赖

这里以maven依赖为例,大家能够依据本人的kafka版本在mvn上找到适宜本人的依赖,因为只是做简略的音讯发送,所以只须要引入kafka-clients依赖即可。我的kafka版本为2.4.1,所以我须要引入的依赖为:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.4.1</version></dependency>

倡议:这里倡议大家引入依赖后,花点工夫钻研一下KafkaProducer源码,至多看看外面构造函数和办法的正文也好,对kafka的学习很有帮忙!

4.2、简略异步发送Demo

这里咱们须要用到如下三个类:

  • KafkaProducer:须要创立一个生产者对象,用来发送数据;
  • ProducerConfig:获取所需的一系列配置参数;
  • ProducerRecord:每条数据都要封装成一个ProducerRecord对象才能够发送。

上面就开始撸代码,一共分为4步:

  1. KafkaProducer有5个构造方法来初始化,这里咱们采纳第3种传递properties的形式来初始化,所以第一步须要创立properties。

    properties是k,v构造的,品种十分多,不必刻意去记它,平时记住几个罕用的,而后额定的用到的时候在官网或者ProducerConfig源码外面去查就能够了。
    producerconfigs官网查问地址: http://kafka.apache.org/documentation/#producerconfigs。

    这里咱们应用到的参数设置如下:

    Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");//kafka集群,broker-listprops.put(ProducerConfig.ACKS_CONFIG, "all");//all相当于-1props.put(ProducerConfig.RETRIES_CONFIG, 1);//重试次数props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批次大小props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//等待时间props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//RecordAccumulator缓冲区大小props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    在应用的时候,把下面kafka-1改为你本人的kafka主机名或者ip就能够了。

  2. 有了properties之后就能够创立一个生产者对象了,把后面的props传入KafkaProducer就能够了;

    Producer<String, String> producer = new KafkaProducer<>(props);
  3. 应用后面创立的producer对象的send办法来发送数据即可,这里作为demo,咱们用for循环发送100条0-99的数据;

    for (int i = 0; i < 100; i++) {           producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)));        } 
  4. 发送完记得敞开生产者。

    producer.close(); 

到这里一个简略的异步producer Demo就写完了,残缺代码如下:

import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException;public class AsyncProducerDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 0.配置一系列参数        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");//kafka集群,broker-list        props.put(ProducerConfig.ACKS_CONFIG, "all");        props.put(ProducerConfig.RETRIES_CONFIG, 1);//重试次数        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批次大小        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//等待时间        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//RecordAccumulator缓冲区大小        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        // 1.创立一个生产者对象        Producer<String, String> producer = new KafkaProducer<>(props);        // 2.调用send办法        for (int i = 0; i < 100; i++) {            producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)));        }        // 3.敞开生产者        producer.close();    }}

而后在kafka集群某个broker下,输出消费者命令行来验证代码:

bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic testKafka1 --from-beginning

执行代码,而后会发现命令行胜利输入数据,证实demo实现:

反思:如果发送失败了怎么办?生产者对象调用send办法会返回什么?反对回调函数吗?

首先答复第一个问题,如果发送失败了,kafka外部有本人的一套机制,这是咱们代码不可控的,它会主动进行从新发送,咱们只能管制失败后的重试次数。
到底这套失败从新发送的机制是怎么的,以及后两个问题,咱们下一节带回调函数的异步发送Demo来进行具体介绍。

4.3、带回调函数的异步发送Demo

4.3.1、失败重试机制


其实kafka的producer发送音讯的流程如上图所示,它外部保护了一套失败重试机制,能够看到具体的音讯失败重试是kafka外部主动实现的,咱们只能管制失败重试的次数

回调函数会在producer收到ack时异步调用,该办法有两个参数,别离是RecordMetadata和Exception,如果Exception为null,阐明音讯发送胜利,如果Exception不为null,阐明音讯发送失败。

4.3.2、解析回调函数

通过查看KafkaProducer源码,能够发现有两个办法:

其中有一个反对回调函数Callback,这里的回调函数会在producer收到ack时异步调用,该办法有两个参数,别离是RecordMetadata和Exception,如果Exception为null,阐明音讯发送胜利,如果Exception不为null,阐明音讯发送失败。

留神:除了发现callback,还能够发现send办法返回的是Future对象,这里先有个印象,在前面讲同步demo的时候会具体解释。

4.3.3、实现带回调函数的异步发送Demo

咱们只须要批改send办法并重写onCompletion就能够了,别的都不必更改,代码如下:

for (int i = 0; i < 100; i++) {      producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)), new Callback() {          //回调函数在Producer收到ack时异步调用          @Override          public void onCompletion(RecordMetadata metadata, Exception exception) {              if (exception == null) {                  System.out.println("音讯发送胜利->" + metadata.offset());              } else {                  exception.printStackTrace();              }          }      });  }

在java1.8的状况下,还能够应用lambda表达式来优化这段代码:

for (int i = 0; i < 100; i++) {     //回调函数在Producer收到ack时异步调用     producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)), (metadata, exception) -> {         if (exception == null) {             System.out.println("音讯发送胜利->" + metadata.offset());         } else {             exception.printStackTrace();         }     }); }

执行代码,会发现合乎代码预期后果,带回调函数的异步发送Demo实现:

五、同步发送音讯实战

5.1、从源码找到端倪

通过后面查看KafkaProducer源码,咱们能够发现两个send办法都是返回Future对象,而Future有一个get办法,并且这个get办法是阻塞的。尽管send是异步的,然而只有咱们每次send后都调用它的返回对象的get办法,那就能够实现同步发送音讯的目标了,不得不说,kafka的源码设计十分棒,能学到很多货色。

5.2、同步发送Demo

咱们只须要批改异步发送demo的调用send办法这里就能够实现同步发送了,具体来说,就是调用send办法返回的get()办法就能够了。不过为了调试不便,咱们拿到get()返回的原数据RecordMetadata对象,而后输入这条数据的offset来验证后果。

其实能够通过get()返回的RecordMetadata对象拿到很多原数据的信息:

批改的代码如下,为了便于调试,咱们输入每条信息的offset:

for (int i = 0; i < 100; i++) {    RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i))).get();    System.out.println("offset = "+metadata.offset());}

运行代码,查看后果,合乎咱们的预期,同步发送音讯demo实现:

六、总结

本文对kafka生产者发送音讯的流程进行了具体的解释和实战,其中蕴含了新版本的kafka对于同步发送音讯和异步发送音讯的api实现,以及kafka源码里的回调函数和架构外部的失败重试机制等都给出了底层的具体解释及实战demo。
残缺的代码已上传,感兴趣的能够下载查看。

github:https://github.com/ropleData/kafkaProducerDemo