关于kafka:我花了一周读了Kafka-Producer的源码

45次阅读

共计 11200 个字符,预计需要花费 28 分钟才能阅读完成。

talk is easy,show me the code, 先来看一段创立 producer 的代码

public class KafkaProducerDemo {public static void main(String[] args) {KafkaProducer<String,String> producer = createProducer();

    // 指定 topic,key,value
    ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");

    // 异步发送
    producer.send(record);
    producer.close();

    System.out.println("发送实现");

  }

  public static KafkaProducer<String,String> createProducer() {Properties props = new Properties();

    //bootstrap.servers 必须设置
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.131:9092");

    // key.serializer   必须设置
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // value.serializer  必须设置
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    //client.id
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-0");

    //retries
    props.put(ProducerConfig.RETRIES_CONFIG, 3);

    //acks
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    //max.in.flight.requests.per.connection
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    
    //linger.ms
    props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

    //batch.size
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10240);

    //buffer.memory
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240);

    return new KafkaProducer<>(props);
  }
}

生产者的 API 应用还是比较简单, 创立一个 ProducerRecord 对象 ( 这个对象蕴含指标主题和要发送的内容, 当然还能够指定键以及分区), 而后调用 send 办法就把音讯发送进来了。

在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样能力在网络上进行传输。

在深刻源码之前,我先给出一张源码剖析图给大家(其实应该在结尾的时候给进去), 这样看着图再看源码跟容易些

简要阐明:

  1. new KafkaProducer()后创立一个后盾线程 KafkaThread(理论运行线程是 Sender,KafkaThread 是对 Sender 的封装)扫描 RecordAccumulator 中是否有音讯
  2. 调用 KafkaProducer.send() 发送音讯,理论是将音讯保留到 RecordAccumulator 中, 实际上就是保留到一个 Map 中 (ConcurrentMap<TopicPartition, Deque<ProducerBatch>>), 这条音讯会被记录到同一个记录批次(雷同主题雷同分区算同一个批次) 外面, 这个批次的所有音讯会被发送到雷同的主题和分区上
  3. 后盾的独立线程扫描到 RecordAccumulator 中有音讯后,会将音讯发送到 kafka 集群中(不是一有音讯就发送,而是要看音讯是否 ready)
  4. 如果发送胜利 (音讯胜利写入 kafka), 就返回一个RecordMetaData 对象,它包换了主题和分区信息,以及记录在分区里的偏移量。
  5. 如果写入失败,就会返回一个谬误,生产者在收到谬误之后会尝试从新发送音讯 ( 如果容许的话, 此时会将音讯在保留到 RecordAccumulator 中), 几次之后如果还是失败就返回谬误音讯

源码剖析

后盾线程的创立

KafkaClient client = new NetworkClient(...);
this.sender = new Sender(.,client,...);
String ioThreadName = "kafka-producer-network-thread" + "|" + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

下面的代码就是结构 KafkaProducer 时外围逻辑, 它会结构一个 KafkaClient 负责和 broker 通信, 同时结构一个 Sender 并启动一个异步线程,这个线程会被命名为:kafka-producer-network-thread|${clientId}, 如果你在创立 producer 的时候指定 client.id 的值为 myclient, 那么线程名称就是kafka-producer-network-thread|myclient

发送音讯(缓存音讯)

KafkaProducer<String,String> producer = createProducer();

// 指定 topic,key,value
ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");

// 异步发送, 能够设置回调函数
producer.send(record);
// 同步发送
//producer.send(record).get();

发送音讯有同步发送以及异步发送两种形式,咱们个别不应用同步发送,毕竟太过于耗时,应用异步发送的时候能够指定回调函数,当音讯发送实现的时候 (胜利或者失败) 会通过回调告诉生产者。

发送音讯实际上是将音讯缓存起来,外围代码如下:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, 
  serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);

RecordAccumulator的外围数据结构是 ConcurrentMap<TopicPartition, Deque<ProducerBatch>>, 会将雷同主题雷同 Partition 的数据放到一个 Deque(双向队列) 中, 这也是咱们之前提到的同一个记录批次外面的音讯会发送到同一个主题和分区的意思。append()办法的外围源码如下:

// 从 batchs(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>)中
// 依据主题分区获取对应的队列,如果没有则 new ArrayDeque<> 返回
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

// 计算同一个记录批次占用空间大小,batchSize 依据 batch.size 参数决定
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

// 为同一个 topic,partition 调配 buffer,如果同一个记录批次的内存不足,// 那么会阻塞 maxTimeToBlock(max.block.ms 参数)这么长时间
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {// 创立 MemoryRecordBuilder, 通过 buffer 初始化 appendStream(DataOutputStream)属性
  MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());

  // 将 key,value 写入到 MemoryRecordsBuilder 中的 appendStream(DataOutputStream)中
  batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());

  // 将须要发送的音讯放入到队列中
  dq.addLast(batch);
}

发送音讯到 Kafka

下面曾经将音讯存储 RecordAccumulator 中去了, 当初看看怎么发送音讯。下面咱们提到了创立 KafkaProducer 的时候会启动一个异步线程去从 RecordAccumulator 中获得音讯而后发送到 Kafka, 发送音讯的外围代码是Sender.java, 它实现了 Runnable 接口并在后盾始终运行解决发送申请并将音讯发送到适合的节点,直到 KafkaProducer 被敞开

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {public void run() {// 始终运行直到 kafkaProducer.close()办法被调用
    while (running) {run(time.milliseconds());
    }
    
    // 从日志上看是开始解决 KafkaProducer 被敞开后的逻辑
    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

    // 当非强制敞开的时候,可能还依然有申请并且 accumulator 中还依然存在数据,此时咱们须要将申请解决实现
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {run(time.milliseconds());
    }
    if (forceClose) {// 如果是强制敞开, 且还有未发送结束的音讯,则勾销发送并抛出一个异样 new KafkaException("Producer is closed forcefully.")
        this.accumulator.abortIncompleteBatches();}
    ...
  }
}

KafkaProducer 的敞开办法有 2 个,close()以及close(long timeout,TimeUnit timUnit), 其中 timeout 参数的意思是期待生产者实现任何待处理申请的最长工夫,第一种形式的 timeout 为 Long.MAX_VALUE 毫秒, 如果采纳第二种形式敞开,当 timeout= 0 的时候则示意强制敞开, 间接敞开 Sender(设置 running=false)。

run(long)办法中咱们先跳过对 transactionManager 的解决,查看发送音讯的次要流程如下:

// 将记录批次转移到每个节点的生产申请列表中
long pollTimeout = sendProducerData(now);

// 轮询进行音讯发送
client.poll(pollTimeout, now);

首先查看 sendProducerData()办法,它的外围逻辑在 sendProduceRequest() 办法 (处于 Sender.java) 中

for (ProducerBatch batch : batches) {
    TopicPartition tp = batch.topicPartition;

    // 将 ProducerBatch 中 MemoryRecordsBuilder 转换为 MemoryRecords(发送的数据就在这外面)
    MemoryRecords records = batch.records();
    produceRecordsByPartition.put(tp, records);
}

ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
        produceRecordsByPartition, transactionalId);

// 音讯发送实现时的回调
RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {
        // 解决响应音讯
        handleProduceResponse(response, recordsByPartition, time.milliseconds());
    }
};

// 依据参数结构 ClientRequest, 此时须要发送的音讯在 requestBuilder 中
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
        requestTimeoutMs, callback);

// 将 clientRequest 转换成 Send 对象(Send.java, 蕴含了须要发送数据的 buffer),// 给 KafkaChannel 设置该对象,记住这里还没有发送数据
client.send(clientRequest, now);

下面的 client.send()办法最终会定位到 NetworkClient.doSend()办法,所有的申请 (无论是 producer 发送音讯的申请还是获取 metadata 的申请) 都是通过该办法设置对应的 Send 对象。所反对的申请在 ApiKeys.java 中都有定义,这外面能够看到每个申请的 request 以及 response 对应的数据结构。

下面只是设置了发送音讯所须要筹备的内容,当初进入到发送音讯的主流程,发送音讯的外围代码在 Selector.java 的 pollSelectionKeys()办法中,代码如下:

/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
  // 底层理论调用的是 java8 GatheringByteChannel 的 write 办法
  channel.write();}

就这样,咱们的音讯就发送到了 broker 中了, 发送流程剖析结束,这个是完满的状况,然而总会有发送失败的时候(音讯过大或者没有可用的 leader),那么发送失败后重发又是在哪里实现的呢? 还记得下面的回调函数吗?没错,就是在回调函数这里设置的,先来看下回调函数源码

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {RequestHeader requestHeader = response.requestHeader();

  if (response.wasDisconnected()) {
    // 如果是网络断开则结构 Errors.NETWORK_EXCEPTION 的响应
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);

  } else if (response.versionMismatch() != null) {

   // 如果是版本不匹配,则结构 Errors.UNSUPPORTED_VERSION 的响应
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);

  } else {if (response.hasResponse()) {
        // 如果存在 response 就返回失常的 response
           ...
        }
    } else {

        // 如果 acks=0,那么则结构 Errors.NONE 的响应,因为这种状况只须要发送不须要响应后果
        for (ProducerBatch batch : batches.values()) {completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
        }
    }
  }
}

而在 completeBatch 办法中咱们次要关注失败的逻辑解决,外围源码如下:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now, long throttleUntilTimeMs) {
  Errors error = response.error;

  // 如果发送的音讯太大,须要从新进行宰割发送
  if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
        (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {this.accumulator.splitAndReenqueue(batch);
    this.accumulator.deallocate(batch);
    this.sensors.recordBatchSplit();} else if (error != Errors.NONE) {// 产生了谬误,如果此时能够 retry(retry 次数未达到限度以及产生异样是 RetriableException)
    if (canRetry(batch, response)) {if (transactionManager == null) {// 把须要重试的音讯放入队列中,期待重试,理论就是调用 deque.addFirst(batch)
            reenqueueBatch(batch, now);
        } 
    } 
}

Producer 发送音讯的流程曾经剖析结束,当初回过头去看流程图会更加清晰。

更多对于 Kafka 协定的波及能够参考这个链接

分区算法

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
    // 如果 key 为 null, 则应用 Round Robin 算法
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();} else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
} else {
    // 依据 key 进行散列
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

Kafka 中对于分区的算法有两种状况

  1. 如果键值为 null, 并且应用了默认的分区器,那么记录键随机地发送到主题内各个可用的分区上。分区器应用轮询 (Round Robin) 算法键音讯平衡地散布到各个分区上。
  2. 如果键不为空,并且应用了默认的分区器,那么 Kafka 会对键进行散列(应用 Kafka 本人的散列算法,即便降级 Java 版本,散列值也不会发生变化),而后依据散列值把音讯映射到特定的分区上。同一个键总是被映射到同一个分区上(如果分区数量产生了变动则不能保障),映射的时候会应用主题所有的分区,而不仅仅是可用分区,所以如果写入数据分区是不可用的,那么就会产生谬误,当然这种状况很少产生。

如果你想要实现自定义分区,那么只须要实现 Partitioner 接口即可。

生产者的配置参数

剖析了 KafkaProducer 的源码之后,咱们会发现很多参数是贯通在整个音讯发送流程,上面列出了一些 KafkaProducer 中用到的配置参数。

  1. acks
    acks 参数指定了必须要有多少个分区正本收到该音讯,producer 才会认为音讯写入是胜利的。有以下三个选项

    • acks=0, 生产者不须要期待服务器的响应,也就是说如果其中呈现了问题,导致服务器没有收到音讯,生产者就无从得悉,音讯也就失落了,过后因为不须要期待响应,所以能够以网络可能反对的最大速度发送音讯,从而达到很高的吞吐量。
    • acks=1, 只须要集群的 leader 收到音讯,生产者就会收到一个来自服务器的胜利响应。如果音讯无奈达到 leader,生产者会收到一个谬误响应,此时 producer 会重发消息。不过如果一个没有收到音讯的节点称为 leader,音讯还是会失落。
    • acks=all, 当所有参加复制的节点全副收到音讯的时候,生产者才会收到一个来自服务器的胜利响应,最平安不过提早比拟高。
  2. buffer.memory

    设置生产者内存缓冲区的大小,如果应用程序发送音讯的速度超过生产者发送到服务器的速度,那么就会导致生产者空间有余,此时 send()办法要么被阻塞,要么抛出异样。取决于如何设置 max.block.ms,示意在抛出异样之前能够阻塞一段时间。

  3. retries

    发送音讯到服务器收到的谬误可能是能够长期的谬误(比方找不到 leader), 这种状况下依据该参数决定生产者重发消息的次数。留神:此时要依据重试次数以及是否是 RetriableException 来决定是否重试。

  4. batch.size

    当有多个音讯须要被发送到同一个分区的时候,生产者会把他们放到同一个批次外面(Deque), 该参数指定了一个批次能够应用的内存大小,依照字节数计算,当批次被填满,批次里的所有音讯会被发送进来。不过生产者并不一定会等到批次被填满才发送,半满甚至只蕴含一个音讯的批次也有可能被发送。

  5. linger.ms

    指定了生产者在发送批次之前期待更多音讯退出批次的工夫。KafkaProducer 会在批次填满或 linger.ms 达到下限时把批次发送进来。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前期待一会儿,使更多的音讯退出到这个批次,尽管这样会减少提早,过后也会晋升吞吐量。

  6. max.block.ms

    指定了在调用 send()办法或者 partitionsFor()办法获取元数据时生产者的阻塞工夫。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些办法就会阻塞。在阻塞工夫达到 max.block.ms 时, 就会抛出 new TimeoutException(“Failed to allocate memory within the configured max blocking time ” + maxTimeToBlockMs + ” ms.”);

  7. client.id

    任意字符串,用来标识消息来源,咱们的后盾线程就会依据它来起名儿,线程名称是 kafka-producer-network-thread|{client.id}

  8. max.in.flight.requests.per.connection

    该参数指定了生产者在收到服务器响应之前能够发送多少个音讯。它的值越高,就会占用越多的内存,不过也会晋升吞吐量。把它设为 1 能够保障音讯是依照发送的程序写入服务器的,即使产生了重试。

  9. timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

    request.timeout.ms 指定了生产者在发送数据时期待服务器返回响应的工夫,metadata.fetch.timeout.ms 指定了生产者在获取元数据 (比方指标分区的 leader) 时期待服务器返回响应的工夫。如果期待响应超时,那么生产者要么重试发送数据,要么返回一个谬误。timeout.ms 指定了 broker 期待同步正本返回音讯确认的工夫,与 asks 的配置相匹配——如果在指定工夫内没有收到同步正本的确认,那么 broker 就会返回一个谬误。

  10. max.request.size

    该参数用于管制生产者发送的申请大小。broker 对可接管的音讯最大值也有本人的限度(message.max.bytes), 所以两边的配置最好能够匹配,防止生产者发送的音讯被 broker 回绝。

  11. receive.buffer.bytes 和 send.buffer.bytes

    这两个参数别离制订了 TCP socket 接管和发送数据包的缓冲区大小(和 broker 通信还是通过 socket)。如果他们被设置为 -1,就应用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么能够适当增大这些值,因为跨数据中心的网络个别都有比拟高的提早和比拟低的带宽。

关注我不迷路

你的关注是对我最大的激励

正文完
 0