关于springboot:Kafka成长记5Producer-消息的初步序列化和分区路由源码原理

5次阅读

共计 9467 个字符,预计需要花费 24 分钟才能阅读完成。

Kafka 成长记的前 4 节咱们通过 KafkaProducerHelloWorld 剖析了 Producer 配置解析、组件组成、元数据拉取原理。

但 KafkaProducerHelloWorld 发送音讯的代码并没有剖析完,咱们剖析了如到了如下图所示的地位:

接下来,咱们持续往下剖析,这一节咱们次要剖析下发送音讯的初步序列化和分区路由源码原理。

自定义音讯的初步序列化的形式

在 producer.send() 执行 doSend() 的时候,waitOnMetadata 拉取元数据胜利之后脉络是什么呢?

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
        byte[] serializedKey;
        try {serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class "+ record.key().getClass().getName() +" to class "+ producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer");
        }
        byte[] serializedValue;
        try {serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class "+ record.value().getClass().getName() +" to class "+ producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer");
        }
        int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        ensureValidRecordSize(serializedSize);
        tp = new TopicPartition(record.topic(), partition);
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
        if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();}
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (Exception e) {throw e;}
    // 省略其余各种异样捕捉
}

次要脉络就是:

1)waitOnMetadata 期待元数据拉取

2)keySerializer.serialize 和 valueSerializer.serialize,很显著就是将 Record 序列化成 byte 字节数组

3)通过 partition 进行路由分区,依照肯定路由策略抉择 Topic 下的某个分区

4)accumulator.append 将音讯放入缓冲器中

5)唤醒 Sender 线程的 selector.select() 的阻塞,开始解决内存缓冲器中的数据。

整个脉络如下图:

第二步执行的脉络是应用自定义序列化器,将音讯转换为 byte[] 数组。咱们就来先看下这块的逻辑。

首先第一个问题就是,自定义的音讯序列化器哪里来的?其实是在配置参数中设置的。还记得 KafkaProducerHelloWorld 代码么?

  // KafkaProducerHelloWorld.java
  public static void main(String[] args) throws Exception {Properties props = new Properties();
        props.put("bootstrap.servers", "mengfanmao.org:9092");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");
        producer.send(record).get();
        Thread.sleep(5 * 1000);
        producer.close();}

在之前的 KafkaProducerHelloWorld.java 中,咱们起初并没有设置序列化参数。后果发消息失败,提醒了如下堆栈:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:421)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:55)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:336)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
    at org.mfm.learn.kafka.KafkaProducerHelloWorld.main(KafkaProducerHelloWorld.java:20)

下面堆栈的信息有没有很相熟?提醒的那些类不正是咱们之前钻研配置解析相干的源码类么?ProducerConfig、AbstractConfig、ConfigDef 切实是太相熟了。

关上源码 ConfigDef,你会发现 ConfigDef 在解析配置文件时,没有序列化配置会使得 new KafkaProducer() 这一步间接抛出异样,音讯发送失败。

到这里你是不是能够稍微体验进去,浏览源码的益处之一了?

接着你补充配置下序列化参数如下:

  // KafkaProducerHelloWorld.java
  public static void main(String[] args) throws Exception {Properties props = new Properties();
        props.put("bootstrap.servers", "mengfanmao.org:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");
        producer.send(record).get();
        Thread.sleep(5 * 1000);
        producer.close();}

音讯发送胜利!咱们补充设置的序列化器是客户端 jar 包中默认提供的 StringSerializer。既然有了音讯序列化器,咱们就来看看它是如何序列化的 key 和 value 的。

咱们将之前第二步外围简化,其实就是如下代码:

//KafkaProudcer.java#doSend
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");
keySerializer.serialize(record.topic(), record.key());
valueSerializer.serialize(record.topic(), record.value());

//StringSerializer.java
public byte[] serialize(String topic, String data) {
    try {if (data == null)
            return null;
        else
            return data.getBytes(encoding);
    } catch (UnsupportedEncodingException e) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding" + encoding);
    }
}

能够看到 StringSerializer 的序列化的形式非常简单,就是调用 String 原始的 getBytes() 办法而已。(PS: 第一个参数居然没有应用 …)

序列化真的只是到这里为止了么?必定不是,这个 bytes[] 数组的数据必定最终须要通过网络发送进来的,这里只是算是初步的一次序列化而已。音讯之后最终的序列化,包含具体的格局,咱们之后钻研 Kafka 应用原生 Java NIO 解决粘包和拆包问题时在深入研究。

起码,这里咱们能够失去如下的图了:

音讯基于 Topic 分区路由源码原理

发送音讯时,拉取到元数据、初步序列化音讯为 byte[] 数组。之后就是通过元数据信息进行路由,抉择一个 Topic 对应的 Partition 发送音讯了。在路由抉择发送音讯的分区时,用到了 Metadata 中的 Cluster 元数据,这里带大家回顾下它的构造。

Cluster 类的元数据内存构造回顾

List<Node>:Kafka Broker 节点, 次要是 Broker 的 ip、端口。

Map nodesById,key 是 broker 的 id,value 是 Broker 的信息 Node

Map partitionsByTopic: 每个 topic 有哪些分区,key 是 topic 名称,value 是分区信息列表

Map availablePartitionsByTopic,每个 topic 有哪些以后可用的分区,key 是 topic 名称,value 是分区信息列表

Map partitionsByNode, 每个 broker 上放了哪些分区,key 是 broker 的 id,value 是分区信息列表

unautorhizedTopics: 没有被受权拜访的 Topic 的列表,如果你的客户端没有被受权拜访某个 Topic,音讯队列的权限管制用的很少,这个简直能够疏忽。

你能够断点,看下数据,如下所示:

对集群元数据,你能够发现,依据不同的需要、应用和场景,采纳不同的数据结构来进行寄存,kafka Producer 设计了不同的数据结构,其实很多时候咱们是能够学习用相似这种思路写代码的。

回顾了元数据之后,客户端必定能够依据元数据信息进行路由了。那么是如何路由的呢?代码如下:

// KakfaProducer.java
private final Partitioner partitioner;
//#doSend()
int partitionpartition = partition(record, serializedKey, serializedValue, metadata.fetch());
//#partition()
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();
    if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int lastPartition = partitions.size() - 1;
        // they have given us a partition, use it
        if (partition < 0 || partition > lastPartition) {throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
        }
        return partition;
    }
    return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
                                      cluster);
}

这段办法脉络很简略,次要就是依据 record 是否指定分区 partition 决定:

1)如果发送的音讯 record 指定了分区,应用元数据信息 Cluster 校验后,路由后的分区就是指定的分区编号。
2)如果发送的音讯 record 没有指定分区,应用一个 Partitioner 组件 partition 办法路由决定分区编号。

如下图:

上一节咱们说过 ProducerRecord 的工夫戳和分区是可选的,默认都是 null。也就是说,默认会走到 Partitioner 组件 partition 这个分支。

可是问题就来了。Partitioner 这个是什么时候初始化的?

因为 partitioner 这个是 KafkaProducer 的一个成员变量,你能够搜寻下它。你会发现:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    // 省略其余代码...
    this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
    // 省略其余代码...
}

原来是在构造函数时候初始化的,它其实就是通过配置解析失去的。并且有一个默认值 DefaultPartitioner

晓得了这个之后,咱们来看看默认的话是如何路发送的音讯呢?

//DefaultPartitioner.java
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {int nextValue = counter.getAndIncrement();
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();} else {
            // no partitions are available, give a non-available partition
            return DefaultPartitioner.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

这个办法脉络次要是:

1)从元数据 Cluster 中的 map 获取 topic 下对应的所有分区和分区数量

2)发送音讯如果没有指定 key,则从一个随机数开始,每次通过 AtomicInteger 递增 +1,对分区数量或者可用分区大小取模,取得对应的分区编号

3)发送音讯如果指定 key,会对 key 对应的字节数组执行一个算法 murmur2,失去一个 int 数字,之后对分区数量取模,取得对应分区编号

整个过程如下图所示:

通过下面的路由策略,你能够发现,kafka 发送的音讯,哪怕只是指定了 topic 都是能够的。不须要指定 key 和 partition。不过这样可能会导致音讯乱序。

至于如何保障 kafka 发送音讯的程序性,除了指定分区和 key 外,其实还须要其余的配置,比方 InFlightRequest 的 size 默认是 5,须要设置为 1,否则重试的时候也会导致音讯乱序,这些咱们前面会剖析到的。

小结

明天咱们次要摸索了音讯的初步序列化形式、音讯的路由策略。咱们简略小结下:

1)Kafka 音讯的初步序列化必须通过配置参数指定,个别应用 StringSerializer,不指定会导致发送音讯失败

2)Kafka 发送的音讯,Topic 必须指定,而 Topic 下的 key 和 partition 可选。

默认的分区路由的策略,反对三种,指定分区,指定分区 key,或者不指定分区 key

a. 同时指定或者只指定 partition,因为 parttition 路由的优先级高于 key,会依据指定的 parttition 编号间接路由音讯。

b. 如果只是指定 key, 会对 key 对应的字节数组执行一个算法 murmur2,失去一个 int 数字,之后对分区数量取模,取得对应分区编号

c. 如果都不指定,则从一个随机数开始,每次通过 AtomicInteger 递增 +1, 对分区数量或者可用分区大小取模,取得对应的分区编号

这一节的常识比拟轻松,不晓得大家把握的怎么样了。随着对 KafkaProducer 的剖析,咱们曾经,缓缓揭开了它神秘的面纱了。前面两节咱们一起来剖析下发送音讯的内存缓冲器的原理,如何分配内存区域,队列机制 +batch 机制如何将音讯批量发送进来。在之后再剖析下,Kakfa 如何解决 Java 原生 NIO 中的拆包和粘包的问题。根本 Producer 的源码原理就钻研的差不多了。

咱们下一节再见!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0