关于springboot:Kafka成长记1从HelloWorld开始研究Kafka-Producer源码原理

30次阅读

共计 10237 个字符,预计需要花费 26 分钟才能阅读完成。

成长记不会介绍太多一些 kafka 的基础知识,如果有需要的话,之后会有专门的《小白起步营》。成长记的默认大家对 kafka 的一些概念是熟知的、默认也是会根本 Kafka 的部署的。当然为了关照一些小白,第一次波及的常识我会简略介绍和解释的,相熟的人就当回顾吧。简略的事件反复做有时也是坏事。

Kafka 成长记会间接从三个方面开始摸索,Producer、Broker、Comsumer。过程中,依据场景会应用之前 ZK 和 JDK 成长记介绍源码分析方法。话不多说,让咱们间接开始第一节的内容吧!

咱们之前钻研 ZK 次要是应用的场景法,找到一些外围入口开始剖析的。钻研 Kafka 的源码时候,咱们也能够参考之前的办法。不过这次咱们不间接从 Broker 服务端节点动手,先从 Producer 开始动手钻研。会用到一些新的剖析源码的思维和办法。

要想剖析 Kafka Producer 的源码原理,首先必定得有一个入口或者下手的中央。很多人应用 Kafka 必定都是从一个 Demo 开始的。本人部署一台 Kafka,之后发送下音讯,之后在本人生产一条音讯。

KafkaProducerHelloWorld

所以咱们就从最简略的一个 Kafka Producer 的 Demo 开始,从一个 KafkaProducerHelloWorld 例子开始 Kafka 源码原理的摸索。

HelloWorld 的代码如下:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * @author fanmao
 */
public class KafkaProducerHelloWorld {public static void main(String[] args) throws Exception {
        // 配置 Kafka 的一些参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.30.1:9092");

        // 创立一个 Producer 实例
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 封装一条音讯
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");

        // 同步形式发送音讯,会阻塞在这里,直到发送实现
        // producer.send(record).get();

        // 异步形式发送音讯,不阻塞,设置一个监听回调函数即可
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {System.out.println("音讯发送胜利");
                } else {System.out.println("音讯发送异样");
                }
            }
        });

        Thread.sleep(5 * 1000);

        // 退出 producer
        producer.close();}
    
}

下面的代码例子,尽管非常简单,然而也有本人的脉络。

1)创立 KafkaProducer

2)筹备音讯 ProducerRecord

3)发送音讯 producer.send()

简略画个图:

这里多说一点,我之前在 Zookeeper 成长记 5 提到过源码版本的抉择和看源码的形式,这里我就不反复说了。间接将抉择后的后果通知大家,我抉择的是 kafka-0.10.0.1 版本。

所以客户端应用的依赖的 GAV(Group-ArtifactId-Version) 是 org.apache.kafka-kafka-clients-0.10.0.1。POM 如下所示:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.mfm.learn</groupId>
    <artifactId>learn-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>learn-kafka</name>
    <url>http://maven.apache.org</url>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>
    </dependencies>

    <build>
    </build>
</project>

KafkaProducer 的创立

下面 KafkaProducerHelloWorld 脉络既然次要分了三步,那咱们一步一步来看,首先就是 KafkaProducer 的创立。咱们来一起看看它初始化什么货色?

这里问大家一个问题,这种构造方法的源码原理,个别剖析的后果用什么办法会比拟好?

没错,组件图或者源码脉络图剖析最容易了解了。咱们只须要有个大抵印象就行。办法有了,个别又会用什么思维呢?连蒙带猜、看看正文,猜想组件的作用,是不是?

好了让咱们来试试吧!

new KafkaProducer 的代码如下:

    /**
     * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
     * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
     * @param properties   The producer configs
     */
    public KafkaProducer(Properties properties) {this(new ProducerConfig(properties), null, null);
    }

构造函数中调用了一个重载的构造函数,咱们不焦急往下看,先看下正文。大体能够晓得,这个构造函数,入参是能够通过 Properties 设置一些参数,之后必定是讲这个参数转换成了 ProducerConfig 对象进行封装,必定有肯定的转换方法。你还记得 Zookeeper 成长记中是不是也有相似的操作,封装了一个 QuorumPeerConfig 对象。其实剖析多了很多源码,你就逐步有教训了,更好的能轻车熟路的剖析任何一个源码原理了。这才是我想要让大家学会的,而不是它如何解析,封装成配置对象的。

咱们接着剖析,那么接下里就是两条路了,看下 重载的构造方法或者是 ProducerConfig 是如何解析的。如下:

Kafka Producer 生产者的配置如何解析的?

这一节,咱们就先来看看 ProducerConfig 是如何解析配置文件的。new ProducerConfig()的代码如下:

/*
 * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND
 * CHANGE WILL BREAK USER CODE.
 * 留神:请勿更改任何配置字符串或它们的 JAVA 变量名,因为它们是公共 API 的一部分,更改将毁坏用户代码。*/
private static final ConfigDef CONFIG;

ProducerConfig(Map<?, ?> props) {super(CONFIG, props);
}

这个构造函数的脉络,调用了一个 super,居然有一个父类。看起来比 Zookeeper 的配置解析封装的多一些,不是简略的一个 QuorumPeerConfig。

而且有一个 动态变量 ConfigDef CONFIG。你必定想晓得它是个什么货色。

咱们能够看下 ConfigDef 这个类的源码脉络,看看能不能看进去什么:

看着就是有一堆 define 办法、validate 办法。要害几个变量,比方一个 Map configKeys 啥的。如同感觉是放 key-value 配置的

比方 key=bootstrap.servers , value192.168.30.:9092 的。

切实猜不到,咱们能够再看看 ConfigDef 这个类的正文。

/**
/**
 * This class is used for specifying the set of expected configurations. For each configuration, you can specify
 * the name, the type, the default value, the documentation, the group information, the order in the group,
 * the width of the configuration value and the name suitable for display in the UI.
 * 此类用于指定冀望的配置集。对于每种配置,您能够指定名称,类型,默认值,文档,组信息,组中的程序,配置值的宽度和适宜在 UI 中显示的名称。*
 * You can provide special validation logic used for single configuration validation by overriding {@link Validator}.
 * 您能够通过笼罩 {@link Validator} 来提供用于单个配置验证的非凡验证逻辑。*
 * Moreover, you can specify the dependents of a configuration. The valid values and visibility of a configuration
 * may change according to the values of other configurations. You can override {@link Recommender} to get valid
 * values and set visibility of a configuration given the current configuration values.
 * 此外,您能够指定配置的隶属。配置的有效值和可见性可能会依据其余配置的值而扭转。您能够笼罩 {@link Recommender} 来取得有效值,* 并在给定以后配置值的状况下设置配置的可见性。* 省略其余...
 
 * This class can be used standalone or in combination with {@link AbstractConfig} which provides some additional
 * functionality for accessing configs.
 * 此类能够独自应用,也能够与 {@link AbstractConfig} 联合应用,从而提供一些附加性能拜访配置的性能。*/

通过下面的话,你应该就不难看出它的性能了。简略的说就是封装了 key-value 的配置,能够设置和校验 key-value,能够独自应用用于拜访配置

晓得了这个动态变量的作用后,你点击到 ProducerConfig 的 super,进入父类的构造函数:

public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
    /* check that all the keys are really strings */
    for (Object key : originals.keySet())
        if (!(key instanceof String))
            throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
    this.originals = (Map<String, ?>) originals;
    this.values = definition.parse(this.originals);
    this.used = Collections.synchronizedSet(new HashSet<String>());
    if (doLog)
        logAll();}

下面的代码外围脉络就一句话definition.parse(this.originals); 也就是执行了 ConfigDef 的 parrse 办法。

到这里,你想都不必想,这个办法就是转换 Properties 为 ProducerConfig 配置的办法了。如下图所示:

那么接下来简略看下 parse 办法吧,代码如下:

private final Map<String, ConfigKey> configKeys = new HashMap<>();

public Map<String, Object> parse(Map<?, ?> props) {
        // Check all configurations are defined
        List<String> undefinedConfigKeys = undefinedDependentConfigs();
        if (!undefinedConfigKeys.isEmpty()) {String joined = Utils.join(undefinedConfigKeys, ",");
            throw new ConfigException("Some configurations in are referred in the dependents, but not defined:" + joined);
        }
        // parse all known keys
        Map<String, Object> values = new HashMap<>();
        for (ConfigKey key : configKeys.values()) {
            Object value;
            // props map contains setting - assign ConfigKey value
            if (props.containsKey(key.name)) {value = parseType(key.name, props.get(key.name), key.type);
                // props map doesn't contain setting, the key is required because no default value specified - its an error
            } else if (key.defaultValue == NO_DEFAULT_VALUE) {throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
            } else {
                // otherwise assign setting its default value
                value = key.defaultValue;
            }
            if (key.validator != null) {key.validator.ensureValid(key.name, value);
            }
            values.put(key.name, value);
        }
        return values;
 }

这段代码间接看上去有点懵,没关系,还是间接看的外围脉络。

外围脉络是一个 for 循环,次要遍历了 Map<String, ConfigKey> configKey 这个 map,外围逻辑如下:

1)首先通过 parseType 确认 value 的类型, 之后依据 ConfigKey 定义的配置名称,也就是 key

2)最初将筹备好的 key-value 配置,放入 Map<String, Object> values 中返回给了 AbstractConfig

这里咱们就晓得了最终咱们配置的 Producer 参数,就会放入到 AbstractConfig 的一个 Map<String,Object> 中, 而且 Object 阐明配置的 value 是辨别整数、字符串之类的。比方

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.30.:9092");

就会如下图所示:

其实就是解析的 Properties 的整个过程了。你会发现其实也没有多简单,就是略微比 Zookeeper 封装的简单点。

不过如果你仔细的话,这里就有一个问题了,下面 parase 办法的 for 循环,循环的 Map<String, ConfigKey> configKey 是什么时候初始化的呢?

咱们能够倒回去看看。

private static final ConfigDef CONFIG;

ProducerConfig(Map<?, ?> props) {super(CONFIG, props);
}

还记得调用父类办法前,这个 ConfigDef 是子类传递给父类的。这个变量又是一个动态的,要想初始化,必定是有一段动态初始化代码在 ProducerConfig 中的。你能够找到如下的代码:

    /** <code>retries</code> */
    public static final String RETRIES_CONFIG = "retries";

    private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
                                              + "Note that this retry is no different than if the client resent the record upon receiving the error."
                                              + "Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
                                              + "ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
                                              + "succeeds, then the records in the second batch may appear first.";
static {CONFIG = new ConfigDef()
            .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
            .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
            .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
            .define(ACKS_CONFIG,
                    Type.STRING,
                    "1",
                    in("all", "-1", "0", "1"),
                    Importance.HIGH,
                    ACKS_DOC)
            .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
            .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
            .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
            .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
            .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
            .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, 
            // 省略其余 define
            .withClientSslSupport()
            .withClientSaslSupport();}

这个静态方法的其实就是 调用了 define 办法,初始化了 Producer 各个配置名称、默认值还有文档阐明。最终封装成一个 map,value 是 ConfigKey, 初始化了 ConfigDef。

private final Map<String, ConfigKey> configKeys = new HashMap<>();
public static class ConfigKey {
    public final String name;
    public final Type type;
    public final String documentation;
    public final Object defaultValue;
    public final Validator validator;
    public final Importance importance;
    public final String group;
    public final int orderInGroup;
    public final Width width;
    public final String displayName;
    public final List<String> dependents;
    public final Recommender recommender;
}

这个过程尽管没什么,然而重点就来了,默认值 也就说 KafkaProducer 的配置,默认值都是在这里初始化的。如果你想晓得 Producer 的默认值,就可以看这里了。

这些参数之前公众号的《Kafka 入门系列》中都有具体的介绍,我这里介绍了预计你也记不住。之后咱们剖析源码的时候你在缓缓了解吧。上面我摘录了一些外围配置,供大家回顾下:

Producer 外围参数:

metadata.max.age.ms 默认每隔 5 分钟 会刷新下元数据

max.request.size 每个申请的最大大小(1mb)

buffer.memory 缓冲区的内存大小(32mb)

max.block.ms 缓冲区填满之后或元数据拉取最大阻塞工夫(60s)

request.timeout.ms 申请超时工夫(30s)

batch.size 每个 batch 的大小默认(16kb)

linger.ms 默认为 0,不提早发送。

能够配置为 10ms,10ms 内还没有凑成 1 个 batch 发送进来,必须立刻发送进来

……

小结

好了明天咱们就先剖析到这里,下一节咱们持续剖析 Producer 的创立,通过组件图和流程图的形式看看配置解析之后,执行的重载构造函数又做了那些事件呢?

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0