关于java:kafka实战一Producer消息发送实战

38次阅读

共计 8240 个字符,预计需要花费 21 分钟才能阅读完成。

文章目录

      • [一、前言]
    • [二、调试常用命令行总结]
    • [三、Producer 音讯发送流程详解]
      • [3.1、总体流程]
      • [3.2、分步骤细化流程]
    • [四、异步发送音讯实战]
      • [4.1、引入依赖]
      • [4.2、简略异步发送 Demo]
      • [4.3、带回调函数的异步发送 Demo]
        • [4.3.1、失败重试机制]
        • [4.3.2、解析回调函数]
        • [4.3.3、实现带回调函数的异步发送 Demo]
    • [五、同步发送音讯实战]
      • [5.1、从源码找到端倪]
      • [5.2、同步发送 Demo]
    • [六、总结]

一、前言

前些天和大家一起深入分析了 kafka 架构办法的常识,这部分内容偏差于实践,不过也是大数据开发工程师必须要把握的知识点。

kafka 架构篇系列文章:
[深入分析 Kafka 架构(一):工作流程、存储机制、分区策略]
[深入分析 Kafka 架构(二):数据可靠性、故障解决]
[深入分析 Kafka 架构(三):消费者生产形式、三种分区调配策略、offset 保护]

因而我感觉十分有必要再辅以代码实现,来加深了解,死记硬背。因而本系列 kafka 实战篇以实战为主,目标是应用 kafka 提供的 JAVA API 来实现音讯的发送,生产,拦挡等操作,用来加深对 kafka 架构的认知,并会把相干残缺 demo 共享到 github 和咱们 csdn 上,感兴趣的能够拿来应用。

本文为 kafka 实战系列第一篇,次要进行 kafka 的音讯发送局部的流程解析及实战开发。

留神:我所应用的 kafka 版本为 2.4.1,java 版本为 1.8,本文会对一些新老版本的改变中央加以阐明。

二、调试常用命令行总结

其实在日常应用 kafka 的过程中,很少应用命令行操作,个别命令行操作只是用来调试的时候应用。不过作为回顾,上面列出一些罕用的命令行操作,并对其进行具体解释。

  1. 查看以后服务器中的所有 topic

    bin/kafka-topics.sh --zookeeper zookeeper 主机名或 ip:2181 --list` 
    
    
  2. 创立 topic

    如下命令能够创立了一个 3 分区,2 正本的 topic first:

    
     bin/kafka-topics.sh --zookeeper zookeeper 主机名或 ip:2181 
     --create --replication-factor 2 --partitions 3 --topic first
     
     选项阐明:--topic 定义 topic 名
     --replication-factor 定义正本数
     --partitions 定义分区数
  3. 删除 topic

    留神:删除 topic 的时候须要在 server.properties 中设置 delete.topic.enable=true 否则只是标记删除,并没有真正删除。

    bin/kafka-topics.sh --zookeeper zookeeper 主机名或 ip:2181 
    --delete --topic first 
    
    
    
  4. 查看某个 Topic 的详情

    bin/kafka-topics.sh --zookeeper zookeeper 主机名或 ip:2181 
    --describe --topic first 
    
    
    
  5. 发送音讯

    留神:–broker-list kafka 集群里的 broker 主机名或 ip:9092,如果要发送给多个 broker,用逗号宰割就能够了。

    bin/kafka-console-producer.sh 
    --broker-list kafka 集群里的 broker 主机名或 ip:9092 --topic first
    输出完下面的命令胜利后会有 ">" 标记,就能够输出数据了
    输出数据,须要删除的时候按住 ctrl 在点击 backspace 就能够删除了。>hello world 
    
    
    
    
  6. 生产音讯

    留神:在 kafka0.9.x 版本之前,消费者指定的是 zookeeper,然而新版本都是指定的 kafka 集群。

     bin/kafka-console-consumer.sh 
    --bootstrap-server kafka 集群里的 broker 主机名或 ip:9092 --from-beginning --topic first
    
    --from-beginning:会把主题中以往所有的数据都读取进去。` 
    
    
    
  7. 批改分区数

    留神:分区数只能增多不能缩小(因为分区数缩小后,把删掉的分区的数据调配到残余的分区这个过程过于简单,所以 kafka 没有设计分区缩小的逻辑。)

    
     bin/kafka-topics.sh --zookeeper zookeeper 主机名或 ip:2181 --alter --topic first --partitions 6 
      
    
    
    
    

三、Producer 音讯发送流程详解

3.1、总体流程

咱们谈到音讯队列就会想到:异步,解耦,消峰。kafka 天然也不例外,在新版本里,它的 Producer 发送音讯采纳的也是异步发送的形式(_之前老版本有同步发送的 api,新版本勾销了,然而咱们能够通过骚操作实现同步发送,前面会具体解释_)。在音讯发送的过程中,波及到了两个线程,别离是 main 线程(又叫主线程)和sender 线程,以及一个 线程共享变量(能够了解为缓存)RecordAccumulator

总体来说,sender 线程是 main 线程的守护线程,在工作时,main 线程负责创立音讯对象并将音讯放在缓存 RecordAccumulator,sender 线程从缓存 RecordAccumulator 中拉取音讯而后发送到 kafka broker。

对于 RecordAccumulator,咱们还须要晓得:

  1. 在音讯追加到 RecordAccumulator 时会对音讯进行分类,发往同一分区的音讯会被装在同一个 Deque 中,Deque 寄存的是 ProducerBatch 示意一组音讯。换句话说就是 RecordAccumulator 会依照分区进行队列保护;
  2. 队列中寄存的是发往该分区的音讯组,追加音讯时候从队列的尾部追加;
  3. RecordAccumulator 的大小默认 32M,能够通过 buffer.memory 配置指定;
  4. 如果内存空间用完了,追加音讯将产生阻塞直到有空间可用为止,默认最大阻塞 60s, 能够通过数 max.block.ms 配置。

整体流程图能够看成上面这样:

其实晓得了下面这些,就能够依据 api 写一些简略的 kafka 发送音讯的代码了。因为在 api 里,main 线程和 sender 线程都被封装的很好,很多事件是咱们不须要去关怀的。

不过如果你和我一样,好奇 main 线程和 sender 线程具体都做了什么?那就接着看上面这部分细化的的流程总结,不感兴趣的话就能够间接跳到下一大节(异步发送 Demo)开始撸代码了。

3.2、分步骤细化流程

首先 main 线程的流程:

  1. 封装音讯对象为 ProducerRecord 并调用 send 办法;
  2. 进入 produce r 拦截器(拦截器能够自定义);
  3. 更新 kafka 集群数据;
  4. 进行 序列化,将音讯对象序列化成 byte 数组;
  5. 应用 分区器 计算分区;
  6. 将音讯追加到线程共享变量 RecordAccumulator。

sender 线程在 KafkaProduer 实例化完结开启,前面就是 sender 线程干的活了:

  1. sender 线程将音讯从 RecordAccumulator 中取出解决音讯格局;
  2. 构建发送的申请对象 Request;
  3. 将申请交给 Selector, 并将申请寄存在申请队列;
  4. 收到响应就移除申请队列的申请,调用每个音讯上的回调函数。

四、异步发送音讯实战

其实只有把握了 Producer 音讯发送的总体流程,就能够依据 api 写根本 demo 了。上面咱们层层递进来实现异步发送 demo。

4.1、引入依赖

这里以 maven 依赖为例,大家能够依据本人的 kafka 版本在 mvn 上找到适宜本人的依赖,因为只是做简略的音讯发送,所以只须要引入 kafka-clients 依赖即可。我的 kafka 版本为 2.4.1,所以我须要引入的依赖为:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>


倡议:这里倡议大家引入依赖后,花点工夫钻研一下 KafkaProducer 源码,至多看看外面构造函数和办法的正文也好,对 kafka 的学习很有帮忙!

4.2、简略异步发送 Demo

这里咱们须要用到如下三个类:

  • KafkaProducer:须要创立一个生产者对象,用来发送数据;
  • ProducerConfig:获取所需的一系列配置参数;
  • ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象才能够发送。

上面就开始撸代码, 一共分为 4 步:

  1. KafkaProducer有 5 个构造方法来初始化,这里咱们采纳第 3 种传递 properties 的形式来初始化,所以第一步须要创立 properties。

    properties 是 k,v 构造的,品种十分多,不必刻意去记它,平时记住几个罕用的,而后额定的用到的时候在官网或者 ProducerConfig 源码外面去查就能够了。
    producerconfigs 官网查问地址:http://kafka.apache.org/documentation/#producerconfigs。

    这里咱们应用到的参数设置如下:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");//kafka 集群,broker-list
    props.put(ProducerConfig.ACKS_CONFIG, "all");//all 相当于 -1
    props.put(ProducerConfig.RETRIES_CONFIG, 1);// 重试次数
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 批次大小
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 等待时间
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//RecordAccumulator 缓冲区大小
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
    
    

    在应用的时候,把下面 kafka- 1 改为你本人的 kafka 主机名或者 ip 就能够了。

  2. 有了 properties 之后就能够创立一个生产者对象了,把后面的 props 传入 KafkaProducer 就能够了;

    Producer<String, String> producer = new KafkaProducer<>(props);
    
  3. 应用后面创立的 producer 对象的 send 办法来发送数据即可,这里作为 demo,咱们用 for 循环发送 100 条 0 -99 的数据;

    for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)));
            } 
    
    
    
    
  4. 发送完记得敞开生产者。

    producer.close(); 
    
    
    
    

到这里一个简略的异步 producer Demo 就写完了,残缺代码如下:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class AsyncProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 0. 配置一系列参数
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");//kafka 集群,broker-list
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);// 重试次数
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 批次大小
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 等待时间
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//RecordAccumulator 缓冲区大小
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 1. 创立一个生产者对象
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 2. 调用 send 办法
        for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)));
        }
        // 3. 敞开生产者
        producer.close();}
}

而后在 kafka 集群某个 broker 下,输出消费者命令行来验证代码:

bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic testKafka1 --from-beginning

执行代码,而后会发现命令行胜利输入数据,证实 demo 实现:

反思:如果发送失败了怎么办?生产者对象调用 send 办法会返回什么?反对回调函数吗?

首先答复第一个问题,如果发送失败了,kafka 外部有本人的一套机制,这是咱们代码不可控的,它会主动进行从新发送,咱们只能管制失败后的重试次数。
到底这套失败从新发送的机制是怎么的,以及后两个问题,咱们下一节 带回调函数的异步发送 Demo来进行具体介绍。

4.3、带回调函数的异步发送 Demo

4.3.1、失败重试机制


其实 kafka 的 producer 发送音讯的流程如上图所示,它外部保护了一套失败重试机制,能够看到具体的音讯 失败重试是 kafka 外部主动实现的,咱们只能管制失败重试的次数

回调函数会在 producer 收到 ack 时异步调用,该办法有两个参数,别离是 RecordMetadata 和 Exception,如果 Exception 为 null,阐明音讯发送胜利,如果 Exception 不为 null,阐明音讯发送失败。

4.3.2、解析回调函数

通过查看 KafkaProducer 源码,能够发现有两个办法:

其中有一个反对回调函数 Callback,这里的回调函数会在 producer 收到 ack 时异步调用,该办法有两个参数,别离是 RecordMetadata 和 Exception,如果 Exception 为 null,阐明音讯发送胜利,如果 Exception 不为 null,阐明音讯发送失败。

留神:除了发现 callback,还能够发现 send 办法返回的是 Future 对象,这里先有个印象,在前面讲同步 demo 的时候会具体解释。

4.3.3、实现带回调函数的异步发送 Demo

咱们只须要批改 send 办法并重写 onCompletion 就能够了,别的都不必更改,代码如下:

for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)), new Callback() {

          // 回调函数在 Producer 收到 ack 时异步调用
          @Override
          public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("音讯发送胜利 ->" + metadata.offset());
              } else {exception.printStackTrace();
              }
          }
      });

  }

在 java1.8 的状况下,还能够应用 lambda 表达式来优化这段代码:

for (int i = 0; i < 100; i++) {
     // 回调函数在 Producer 收到 ack 时异步调用
     producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)), (metadata, exception) -> {if (exception == null) {System.out.println("音讯发送胜利 ->" + metadata.offset());
         } else {exception.printStackTrace();
         }
     });

 }

执行代码,会发现合乎代码预期后果,带回调函数的异步发送 Demo 实现:

五、同步发送音讯实战

5.1、从源码找到端倪

通过后面查看 KafkaProducer 源码,咱们能够发现两个 send 办法都是返回 Future 对象,而 Future 有一个 get 办法,并且这个 get 办法是阻塞的。尽管 send 是异步的,然而只有咱们每次 send 后都调用它的返回对象的 get 办法,那就能够实现同步发送音讯的目标了,不得不说,kafka 的源码设计十分棒,能学到很多货色。

5.2、同步发送 Demo

咱们只须要批改异步发送 demo 的调用 send 办法这里就能够实现同步发送了,具体来说,就是调用 send 办法返回的 get()办法就能够了。不过为了调试不便,咱们拿到 get()返回的原数据 RecordMetadata 对象,而后输入这条数据的 offset 来验证后果。

其实能够通过 get()返回的 RecordMetadata 对象拿到很多原数据的信息:

批改的代码如下,为了便于调试,咱们输入每条信息的 offset:

for (int i = 0; i < 100; i++) {RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i))).get();
    System.out.println("offset ="+metadata.offset());
}

运行代码,查看后果,合乎咱们的预期,同步发送音讯 demo 实现:

六、总结

本文对 kafka 生产者发送音讯的流程进行了具体的解释和实战,其中蕴含了新版本的 kafka 对于同步发送音讯和异步发送音讯的 api 实现,以及 kafka 源码里的回调函数和架构外部的失败重试机制等都给出了底层的具体解释及实战 demo。
残缺的代码已上传,感兴趣的能够下载查看。

github:https://github.com/ropleData/kafkaProducerDemo

正文完
 0