关于java:kafka-系列-32生产者客户端原理分析

3次阅读

共计 8898 个字符,预计需要花费 23 分钟才能阅读完成。

生产者发送音讯的整体流程

音讯追加器 RecordAccumulator

后面几个组件,在 3.1 的文章中,曾经说分明。当初来看 RecordAccumulator 组件

RecordAccumulator 次要用于缓存音讯,以便 Sender 线程可能批量发送音讯。RecordAccumulator 会将音讯放入缓存 BufferPool(实际上就是 ByteBuffer)中。BufferPool 默认最大为 33554432B,即 32MB, 可通过 buffer.memory 进行配置。
当生产者生产音讯的速度大于 sender 线程的发送速度,那么 send 办法就会阻塞。默认阻塞 60000ms,可通过 max.block.ms 配置。

RecordAccumulator 类的几个重要属性

public final class RecordAccumulator {
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    // 缓存空间,默认 32MB,可通过下面说的 buffer.memory 参数进行配置
    private final BufferPool free;
}

TopicPartition 为分区的形象。定义如下所示

public final class TopicPartition implements Serializable {
    private int hash = 0;
    private final int partition;
    private final String topic;
}

主线程发送的音讯,都会被放入 batcher 中, batches 将发往不同 TopicPartition 的音讯,寄存到各自的 ArrayDeque<ProducerBatch> 中。
主线程 append 时,往队尾插入,sender 线程取出时,则往队头取出。

ProducerBatch 批量音讯

ProducerBatch 为批量音讯的形象。
在编写客户端发送音讯时,客户端面向的类则是 ProducerRecordkafka 客户端,在发送音讯时,会将 ProducerRecord 放入 ProducerBatch,使音讯更加紧凑。
如果为每个音讯都单独创立内存空间,那么内存空间的开拓和开释,则将会比拟耗时。因而 ProducerBatch 外部有一个 ByteBufferOutputStream bufferStream(实则为 ByteBuffer), 应用 ByteBuffer 反复利用内存空间。

bufferStream 值的大小为

public final class RecordAccumulator {
    
    // 该值大小,可通过 buffer.memory 配置
    private final BufferPool free;
    
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       
    }
}

其中,batchSize 默认 16384B,即 16kb,可通过 batch.size 配置。第 2 个入参的值则为音讯的大小。

须要留神的是,bufferStream 的内存空间是从 free 内存空间中划出的。

下面有说到,ProducerBatch 会应用 ByteBuffer 追加音讯。然而,如果你看代码,你会发现 ProducerBatch 在做音讯的追加时,会将音讯放入 DataOutputStream appendStream。如同跟咱们说的 不一样! 然而实际上,就是利用 ByteBuffer,这里还须要看 appendStream 是如何初始化的!

注:MemoryRecordsBuilder 为 ProducerBatch 中的一个属性

public class MemoryRecordsBuilder {
    private final ByteBufferOutputStream bufferStream;
    private DataOutputStream appendStream;
    
    private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                     Header[] headers) throws IOException {ensureOpenForRecordAppend();
        int offsetDelta = (int) (offset - baseOffset);
        long timestampDelta = timestamp - firstTimestamp;
        // 往 appendStream 中追加音讯
        int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
        recordWritten(offset, timestamp, sizeInBytes);
    }
}

MemoryRecordsBuilder 初始化

public class MemoryRecordsBuilder {
    private final ByteBufferOutputStream bufferStream;
    private DataOutputStream appendStream;
    
    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                byte magic,
                                CompressionType compressionType,
                                TimestampType timestampType,
                                long baseOffset,
                                long logAppendTime,
                                long producerId,
                                short producerEpoch,
                                int baseSequence,
                                boolean isTransactional,
                                boolean isControlBatch,
                                int partitionLeaderEpoch,
                                int writeLimit) {
           
        // .. 省略局部代码
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;
        
        // 应用 bufferStream 包装
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }
}

能够看到实际上应用的还是 ByteBufferOutputStream bufferStream

Sender 线程

Sender 线程在发送音讯时,会从 RecordAccumulator 中取出音讯,并将放在 RecordAccumulator 中的 Deque<ProducerBatch> 转换成 Map<nodeId, List<ProducerBatch>>,这里的 nodeIdkafka 节点的 id。再发送给 kafka 之前,又会将音讯封装成 Map<nodeId, ClientRequest>

申请在从 Sender 发往 kafka 时,还会被存入 InFlightRequests

public class NetworkClient implements KafkaClient {
    /* the set of requests currently being sent or awaiting a response */
    private final InFlightRequests inFlightRequests;
    
    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (log.isDebugEnabled()) {int latestClientVersion = clientRequest.apiKey().latestVersion();
            if (header.apiVersion() == latestClientVersion) {log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                        clientRequest.correlationId(), destination);
            } else {log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
            }
        }
        Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        
        // 将申请放入
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);
    }
}

InFlightRequests

/**
 * The set of requests which have been sent or are being sent but haven't yet received a response
 */
final class InFlightRequests {
    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();}

InFlightRequests 的作用是存储曾经发送的,或者发送了,然而未收到响应的申请。
InFlightRequests 类中有一个属性 maxInFlightRequestsPerConnection, 标识一个节点最多能够缓存多少个申请。该默认值为 5, 可通过 max.in.flight.requests.per.connection 进行配置, 须要留神的是 InFlightRequests 对象是在创立 KafkaProducer 时就会被创立。

requests 参数的 keynodeIdvalue 则为缓存的申请。

sender 线程 在发送音讯时,会先判断 InFlightRequests 对应的申请缓存中是否超过了 maxInFlightRequestsPerConnection 的大小

代码入口:Sender.sendProducerData

public class Sender implements Runnable {private long sendProducerData(long now) {
        // ... 省略局部代码
        while (iter.hasNext()) {Node node = iter.next();
            
            // todo 这里为代码入口
            if (!this.client.ready(node, now)) {iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
        // ... 省略局部代码
    }
}

public class NetworkClient implements KafkaClient {private boolean canSendRequest(String node, long now) {return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
            inFlightRequests.canSendMore(node);
    }
}

final class InFlightRequests {public boolean canSendMore(String node) {Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
}

InFlightRequests 的设计中,能够看到,咱们能够很轻松的就晓得,哪个 kafka 节点的负载是最低。因为只须要判断 requests 中对应 node 汇合的大小即可。

重要参数

  1. acks

用于指定分区中须要有多少个正本收到音讯,生产者才会认为音讯是被写入的
acks = 1。默认为 1, 只有 leader 正本写入,则被认为曾经写入。如果音讯曾经被写入 leader 正本,且曾经返回给生产者 ok,然而在 follower 拉取 leader 音讯之前,leader 正本忽然挂掉,那么此时音讯也会失落
acks = 0。发送音讯后,不须要期待服务端的响应,此配置,吞吐量最高。
acks = -1 或者 all。须要期待所有 ISR 中的所有正本都胜利写入音讯之后,才会收到服务端的胜利响应。
须要留神的一点是 acks 入参是 String,而不是 int

  1. max.request.size

客户端容许发送的音讯最大长度,默认为 1MB.

  1. retriesretry.backoff.ms

retries 配置生产者的重试次数,默认为 0. retry.backoff.ms 配置两次重试的间隔时间

  1. compression.type

指定音讯的压缩形式,默认为 none。可选配置gzip,snappy,lz4

  1. connection.max.idle.ms

指定在多久之后敞开闲置的连贯,默认 540000(ms), 即 9 分钟

  1. linger.ms

指定发送 ProducerBatch 之前期待更多的音讯(ProducerRecord) 退出 ProducerBatch 的工夫,默认为 0。生产者在 ProducerBatch 填充斥时,或者等待时间超过 linger.ms 发送音讯进来。

  1. receive.buffer.bytes

设置 Socket 接管音讯缓存区的大小,默认 32678B, 32KB。如果设置为 -1, 则示意应用 操作系统的默认值。如果 Procuerkafka 处于不同的机房,能够调大此参数。

  1. send.buffer.bytes

设置 Socket 发送音讯缓冲区大小。默认 131072B, 即128KB。如果设置为 -1,则应用操作系统的默认值

  1. request.timeout.ms

Producer 期待响应的最长工夫,默认 30000ms。须要留神的是,该参数须要比 replica.lag.time.max.ms 值更大。能够缩小因客户端重试,而造成的音讯反复

  1. buffer.memory

配置音讯追加器,内存大小。默认最大为 33554432B,即 32MB

  1. batch.size

ProducerBatch ByteBuffer。默认 16384B,即 16kb

  1. max.block.ms

生产者生成音讯过快时,客户端最多阻塞多少工夫。

总结

  1. kafka 将生产者生产音讯,音讯发送给服务端,拆成了 2 个过程。生产音讯交由 主线程, 音讯发送给服务端的工作交由 sender 线程。
  2. 通过 RecordAccumulator 的设计,将生产音讯,与发送音讯解耦。
  3. RecordAccumulator 外部存储数据的数据结构是 ArrayDeque. 队尾追加音讯,队头取出音讯
  4. 开发人员编写的 ProducerRecord,在音讯发送之前会被转为 ProducetBatch。为的是批量发送音讯,进步网络 IO 效率
  5. 为了防止,每个节点负载过高,kafka 设计了 InFlightRequests, 将为响应的音讯放入其中
  6. 从源码角度,buffer.memory 最好是 buffer.memory 整数倍大小。因为 ProducerBatchByteBuffer 是从 RecordAccumulatorByteBuffer 中划出的

RocketMQ 区别

  1. RocketMQ 没有将生产音讯与发送音讯解耦。
  2. RocketMQ 的音讯发送,分为 同步,异步、单向。其中单向发送与 kafkaacks = 0 的配置成果一样。然而实际上,还得看 RocketMQ broker 刷盘配置
  3. kafka 发送失败,默认不重试,RocketMQ 默认重试 2 次。不过 RocketMQ 无奈配置 2 次重试的间隔时间. kafka 能够配置重试的间隔时间。
  4. RocketMQ 默认音讯最大为 4MB, kafka 默认 1MB
  5. RocketMQ 在音讯的发送上,是间接应用 Nettykafka 则是应用 NIO 本人实现通信。(虽说,Netty 也是基于 NIO
  6. 当然还有很多咯 ….., 因为设计齐全不一样!,理论解决场景也不一样

常识补充

ByteBuffer

ByteBuffer 个别用于网络传输的缓冲区。

先来看下 ByteBuffer 的类继承体系

ByteBuffer 次要的 2 个父类。DirectByteBufferHeapByteBuffer。一般而言,咱们次要的是应用 HeapByteBuffer

ByteBuffer 重要属性
  1. position

以后读取的地位

  1. mark

为某一读过的地位做标记,便于某些时候回退到该地位

  1. limit

读取的完结地位

  1. capacity

buffer 大小

ByteBuffer 根本办法
  1. put()

buffer 中写数据,并将 position 往前挪动

  1. flip()

position 设置为 0,limit 设置为以后地位

  1. rewind()

position 设置为 0, limit 不变

  1. mark()

mark 设置为以后 position 值,调用 reset(), 会将 mark 赋值给 position

  1. clear()

position 设置为 0,limit 设置为 capacity

ByteBuffer 食用 DEMO
FileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt");
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);

int read = channel.read(buffer);
while (read != -1) {System.out.println(new String(buffer.array(), Charset.defaultCharset()));
    buffer.clear();
    read = channel.read(buffer);
}

ArrayDeque

ArrayDeque,是一个双端队列。即能够从队头插入元素,也能够从队尾插入元素
对于双端队列,既能够应用 链表的形式实现,也能够应用数组的形式实现。
JDKLinkedList 应用链表实现,ArrayDeque 则应用数组的形式实现

来看 ArrayDeque 的实现。
ArrayDeque 中,有 head, tail 别离指向 头指针,和尾指针。能够把 ArrayDeque 设想成循环数组

插入
  1. 当往队尾插入元素时,tail 指针会往前走

  1. 当往队前插入元素时,head 指针会向后走

删除
  1. 从队头删除元素 4 , head 会往前走

  1. 从队尾删除元素 3,tail 会往后走

能够看到,这里通过挪动 head, tail 指针就能够删除元素了。

扩容

tailhead 都指向都一个地位时,则须要扩容

扩容会将数组的大小裁减为原来的 2 倍,而后从新将 head 指向数组 0 下标, tail 指向数组的最初一个元素地位。

下面的数组,在从新扩容后,会变成上面这个样子

public class ArrayDeque<E> extends AbstractCollection<E>
                           implements Deque<E>, Cloneable, Serializable
{private void doubleCapacity() {
            assert head == tail;
            int p = head;
            int n = elements.length;
            int r = n - p; // number of elements to the right of p
            int newCapacity = n << 1;
            if (newCapacity < 0)
                throw new IllegalStateException("Sorry, deque too big");
            Object[] a = new Object[newCapacity];
            System.arraycopy(elements, p, a, 0, r);
            System.arraycopy(elements, 0, a, r, p);
            elements = a;
            head = 0;
            tail = n;
    }
}
正文完
 0