talk is easy,show me the code,先来看一段创立producer的代码

public class KafkaProducerDemo {  public static void main(String[] args) {    KafkaProducer<String,String> producer = createProducer();    //指定topic,key,value    ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");    //异步发送    producer.send(record);    producer.close();    System.out.println("发送实现");  }  public static KafkaProducer<String,String> createProducer() {    Properties props = new Properties();    //bootstrap.servers 必须设置    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.131:9092");    // key.serializer   必须设置    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    // value.serializer  必须设置    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    //client.id    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-0");    //retries    props.put(ProducerConfig.RETRIES_CONFIG, 3);    //acks    props.put(ProducerConfig.ACKS_CONFIG, "all");    //max.in.flight.requests.per.connection    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);        //linger.ms    props.put(ProducerConfig.LINGER_MS_CONFIG, 100);    //batch.size    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10240);    //buffer.memory    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240);    return new KafkaProducer<>(props);  }}

生产者的API应用还是比较简单,创立一个ProducerRecord对象(这个对象蕴含指标主题和要发送的内容,当然还能够指定键以及分区),而后调用send办法就把音讯发送进来了。

在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样能力在网络上进行传输。

在深刻源码之前,我先给出一张源码剖析图给大家(其实应该在结尾的时候给进去),这样看着图再看源码跟容易些

简要阐明:

  1. new KafkaProducer()后创立一个后盾线程KafkaThread(理论运行线程是Sender,KafkaThread是对Sender的封装)扫描RecordAccumulator中是否有音讯
  2. 调用KafkaProducer.send()发送音讯,理论是将音讯保留到RecordAccumulator中,实际上就是保留到一个Map中(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>),这条音讯会被记录到同一个记录批次(雷同主题雷同分区算同一个批次)外面,这个批次的所有音讯会被发送到雷同的主题和分区上
  3. 后盾的独立线程扫描到RecordAccumulator中有音讯后,会将音讯发送到kafka集群中(不是一有音讯就发送,而是要看音讯是否ready)
  4. 如果发送胜利(音讯胜利写入kafka),就返回一个RecordMetaData对象,它包换了主题和分区信息,以及记录在分区里的偏移量。
  5. 如果写入失败,就会返回一个谬误,生产者在收到谬误之后会尝试从新发送音讯(如果容许的话,此时会将音讯在保留到RecordAccumulator中),几次之后如果还是失败就返回谬误音讯

源码剖析

后盾线程的创立

KafkaClient client = new NetworkClient(...);this.sender = new Sender(.,client,...);String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();

下面的代码就是结构KafkaProducer时外围逻辑,它会结构一个KafkaClient负责和broker通信,同时结构一个Sender并启动一个异步线程,这个线程会被命名为:kafka-producer-network-thread|${clientId},如果你在创立producer的时候指定client.id的值为myclient,那么线程名称就是kafka-producer-network-thread|myclient

发送音讯(缓存音讯)

KafkaProducer<String,String> producer = createProducer();//指定topic,key,valueProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");//异步发送,能够设置回调函数producer.send(record);//同步发送//producer.send(record).get();

发送音讯有同步发送以及异步发送两种形式,咱们个别不应用同步发送,毕竟太过于耗时,应用异步发送的时候能够指定回调函数,当音讯发送实现的时候(胜利或者失败)会通过回调告诉生产者。

发送音讯实际上是将音讯缓存起来,外围代码如下:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp,   serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);

RecordAccumulator的外围数据结构是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,会将雷同主题雷同Partition的数据放到一个Deque(双向队列)中,这也是咱们之前提到的同一个记录批次外面的音讯会发送到同一个主题和分区的意思。append()办法的外围源码如下:

//从batchs(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>)中//依据主题分区获取对应的队列,如果没有则new ArrayDeque<>返回Deque<ProducerBatch> dq = getOrCreateDeque(tp);//计算同一个记录批次占用空间大小,batchSize依据batch.size参数决定int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(    maxUsableMagic, compression, key, value, headers));//为同一个topic,partition调配buffer,如果同一个记录批次的内存不足,//那么会阻塞maxTimeToBlock(max.block.ms参数)这么长时间ByteBuffer buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {  //创立MemoryRecordBuilder,通过buffer初始化appendStream(DataOutputStream)属性  MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);  ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());  //将key,value写入到MemoryRecordsBuilder中的appendStream(DataOutputStream)中  batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());  //将须要发送的音讯放入到队列中  dq.addLast(batch);}

发送音讯到Kafka

下面曾经将音讯存储RecordAccumulator中去了,当初看看怎么发送音讯。下面咱们提到了创立KafkaProducer的时候会启动一个异步线程去从RecordAccumulator中获得音讯而后发送到Kafka,发送音讯的外围代码是Sender.java,它实现了Runnable接口并在后盾始终运行解决发送申请并将音讯发送到适合的节点,直到KafkaProducer被敞开

/*** The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.*/public class Sender implements Runnable {  public void run() {    // 始终运行直到kafkaProducer.close()办法被调用    while (running) {       run(time.milliseconds());    }        //从日志上看是开始解决KafkaProducer被敞开后的逻辑    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");    //当非强制敞开的时候,可能还依然有申请并且accumulator中还依然存在数据,此时咱们须要将申请解决实现    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {       run(time.milliseconds());    }    if (forceClose) {        //如果是强制敞开,且还有未发送结束的音讯,则勾销发送并抛出一个异样new KafkaException("Producer is closed forcefully.")        this.accumulator.abortIncompleteBatches();    }    ...  }}

KafkaProducer的敞开办法有2个,close()以及close(long timeout,TimeUnit timUnit),其中timeout参数的意思是期待生产者实现任何待处理申请的最长工夫,第一种形式的timeout为Long.MAX_VALUE毫秒,如果采纳第二种形式敞开,当timeout=0的时候则示意强制敞开,间接敞开Sender(设置running=false)。

run(long)办法中咱们先跳过对transactionManager的解决,查看发送音讯的次要流程如下:

//将记录批次转移到每个节点的生产申请列表中long pollTimeout = sendProducerData(now);//轮询进行音讯发送client.poll(pollTimeout, now);

首先查看sendProducerData()办法,它的外围逻辑在sendProduceRequest()办法(处于Sender.java)中

for (ProducerBatch batch : batches) {    TopicPartition tp = batch.topicPartition;    //将ProducerBatch中MemoryRecordsBuilder转换为MemoryRecords(发送的数据就在这外面)    MemoryRecords records = batch.records();    produceRecordsByPartition.put(tp, records);}ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,        produceRecordsByPartition, transactionalId);//音讯发送实现时的回调RequestCompletionHandler callback = new RequestCompletionHandler() {    public void onComplete(ClientResponse response) {        //解决响应音讯        handleProduceResponse(response, recordsByPartition, time.milliseconds());    }};//依据参数结构ClientRequest,此时须要发送的音讯在requestBuilder中ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,        requestTimeoutMs, callback);//将clientRequest转换成Send对象(Send.java,蕴含了须要发送数据的buffer),//给KafkaChannel设置该对象,记住这里还没有发送数据client.send(clientRequest, now);

下面的client.send()办法最终会定位到NetworkClient.doSend()办法,所有的申请(无论是producer发送音讯的申请还是获取metadata的申请)都是通过该办法设置对应的Send对象。所反对的申请在ApiKeys.java中都有定义,这外面能够看到每个申请的request以及response对应的数据结构。

下面只是设置了发送音讯所须要筹备的内容,当初进入到发送音讯的主流程,发送音讯的外围代码在Selector.java的pollSelectionKeys()办法中,代码如下:

/* if channel is ready write to any sockets that have space in their buffer and for which we have data */if (channel.ready() && key.isWritable()) {  //底层理论调用的是java8 GatheringByteChannel的write办法  channel.write();}

就这样,咱们的音讯就发送到了broker中了,发送流程剖析结束,这个是完满的状况,然而总会有发送失败的时候(音讯过大或者没有可用的leader),那么发送失败后重发又是在哪里实现的呢?还记得下面的回调函数吗?没错,就是在回调函数这里设置的,先来看下回调函数源码

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {  RequestHeader requestHeader = response.requestHeader();  if (response.wasDisconnected()) {    //如果是网络断开则结构Errors.NETWORK_EXCEPTION的响应    for (ProducerBatch batch : batches.values())        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);  } else if (response.versionMismatch() != null) {   //如果是版本不匹配,则结构Errors.UNSUPPORTED_VERSION的响应    for (ProducerBatch batch : batches.values())        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);  } else {        if (response.hasResponse()) {        //如果存在response就返回失常的response           ...        }    } else {        //如果acks=0,那么则结构Errors.NONE的响应,因为这种状况只须要发送不须要响应后果        for (ProducerBatch batch : batches.values()) {            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);        }    }  }}

而在completeBatch办法中咱们次要关注失败的逻辑解决,外围源码如下:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,                           long now, long throttleUntilTimeMs) {  Errors error = response.error;  //如果发送的音讯太大,须要从新进行宰割发送  if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&        (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {    this.accumulator.splitAndReenqueue(batch);    this.accumulator.deallocate(batch);    this.sensors.recordBatchSplit();  } else if (error != Errors.NONE) {    //产生了谬误,如果此时能够retry(retry次数未达到限度以及产生异样是RetriableException)    if (canRetry(batch, response)) {        if (transactionManager == null) {            //把须要重试的音讯放入队列中,期待重试,理论就是调用deque.addFirst(batch)            reenqueueBatch(batch, now);        }     } }

Producer发送音讯的流程曾经剖析结束,当初回过头去看流程图会更加清晰。

更多对于Kafka协定的波及能够参考这个链接

分区算法

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {    //如果key为null,则应用Round Robin算法    int nextValue = nextValue(topic);    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);    if (availablePartitions.size() > 0) {        int part = Utils.toPositive(nextValue) % availablePartitions.size();        return availablePartitions.get(part).partition();    } else {        // no partitions are available, give a non-available partition        return Utils.toPositive(nextValue) % numPartitions;    }} else {    // 依据key进行散列    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}

Kafka中对于分区的算法有两种状况

  1. 如果键值为null,并且应用了默认的分区器,那么记录键随机地发送到主题内各个可用的分区上。分区器应用轮询(Round Robin)算法键音讯平衡地散布到各个分区上。
  2. 如果键不为空,并且应用了默认的分区器,那么Kafka会对键进行散列(应用Kafka本人的散列算法,即便降级Java版本,散列值也不会发生变化),而后依据散列值把音讯映射到特定的分区上。同一个键总是被映射到同一个分区上(如果分区数量产生了变动则不能保障),映射的时候会应用主题所有的分区,而不仅仅是可用分区,所以如果写入数据分区是不可用的,那么就会产生谬误,当然这种状况很少产生。

如果你想要实现自定义分区,那么只须要实现Partitioner接口即可。

生产者的配置参数

剖析了KafkaProducer的源码之后,咱们会发现很多参数是贯通在整个音讯发送流程,上面列出了一些KafkaProducer中用到的配置参数。

  1. acks
    acks参数指定了必须要有多少个分区正本收到该音讯,producer才会认为音讯写入是胜利的。有以下三个选项

    • acks=0,生产者不须要期待服务器的响应,也就是说如果其中呈现了问题,导致服务器没有收到音讯,生产者就无从得悉,音讯也就失落了,过后因为不须要期待响应,所以能够以网络可能反对的最大速度发送音讯,从而达到很高的吞吐量。
    • acks=1, 只须要集群的leader收到音讯,生产者就会收到一个来自服务器的胜利响应。如果音讯无奈达到leader,生产者会收到一个谬误响应,此时producer会重发消息。不过如果一个没有收到音讯的节点称为leader,音讯还是会失落。
    • acks=all,当所有参加复制的节点全副收到音讯的时候,生产者才会收到一个来自服务器的胜利响应,最平安不过提早比拟高。
  2. buffer.memory

    设置生产者内存缓冲区的大小,如果应用程序发送音讯的速度超过生产者发送到服务器的速度,那么就会导致生产者空间有余,此时send()办法要么被阻塞,要么抛出异样。取决于如何设置max.block.ms,示意在抛出异样之前能够阻塞一段时间。

  3. retries

    发送音讯到服务器收到的谬误可能是能够长期的谬误(比方找不到leader),这种状况下依据该参数决定生产者重发消息的次数。留神:此时要依据重试次数以及是否是RetriableException来决定是否重试。

  4. batch.size

    当有多个音讯须要被发送到同一个分区的时候,生产者会把他们放到同一个批次外面(Deque),该参数指定了一个批次能够应用的内存大小,依照字节数计算,当批次被填满,批次里的所有音讯会被发送进来。不过生产者并不一定会等到批次被填满才发送,半满甚至只蕴含一个音讯的批次也有可能被发送。

  5. linger.ms

    指定了生产者在发送批次之前期待更多音讯退出批次的工夫。KafkaProducer会在批次填满或linger.ms达到下限时把批次发送进来。把linger.ms设置成比0大的数,让生产者在发送批次之前期待一会儿,使更多的音讯退出到这个批次,尽管这样会减少提早,过后也会晋升吞吐量。

  6. max.block.ms

    指定了在调用send()办法或者partitionsFor()办法获取元数据时生产者的阻塞工夫。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些办法就会阻塞。在阻塞工夫达到max.block.ms时,就会抛出new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");

  7. client.id

    任意字符串,用来标识消息来源,咱们的后盾线程就会依据它来起名儿,线程名称是kafka-producer-network-thread|{client.id}

  8. max.in.flight.requests.per.connection

    该参数指定了生产者在收到服务器响应之前能够发送多少个音讯。它的值越高,就会占用越多的内存,不过也会晋升吞吐量。把它设为1能够保障音讯是依照发送的程序写入服务器的,即使产生了重试。

  9. timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

    request.timeout.ms指定了生产者在发送数据时期待服务器返回响应的工夫,metadata.fetch.timeout.ms指定了生产者在获取元数据(比方指标分区的leader)时期待服务器返回响应的工夫。如果期待响应超时,那么生产者要么重试发送数据,要么返回一个谬误。timeout.ms指定了broker期待同步正本返回音讯确认的工夫,与asks的配置相匹配——如果在指定工夫内没有收到同步正本的确认,那么broker就会返回一个谬误。

  10. max.request.size

    该参数用于管制生产者发送的申请大小。broker对可接管的音讯最大值也有本人的限度(message.max.bytes),所以两边的配置最好能够匹配,防止生产者发送的音讯被broker回绝。

  11. receive.buffer.bytes和send.buffer.bytes

    这两个参数别离制订了TCP socket接管和发送数据包的缓冲区大小(和broker通信还是通过socket)。如果他们被设置为-1,就应用操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心,那么能够适当增大这些值,因为跨数据中心的网络个别都有比拟高的提早和比拟低的带宽。

关注我不迷路

你的关注是对我最大的激励