共计 8898 个字符,预计需要花费 23 分钟才能阅读完成。
生产者发送音讯的整体流程
音讯追加器 RecordAccumulator
后面几个组件,在 3.1 的文章中,曾经说分明。当初来看 RecordAccumulator
组件
RecordAccumulator
次要用于缓存音讯,以便 Sender
线程可能批量发送音讯。RecordAccumulator
会将音讯放入缓存 BufferPool
(实际上就是 ByteBuffer
)中。BufferPool
默认最大为 33554432B
,即 32MB
, 可通过 buffer.memory
进行配置。
当生产者生产音讯的速度大于 sender
线程的发送速度,那么 send
办法就会阻塞。默认阻塞 60000ms
,可通过 max.block.ms
配置。
RecordAccumulator
类的几个重要属性
public final class RecordAccumulator {
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 缓存空间,默认 32MB,可通过下面说的 buffer.memory 参数进行配置
private final BufferPool free;
}
TopicPartition
为分区的形象。定义如下所示
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
}
主线程发送的音讯,都会被放入 batcher
中, batches
将发往不同 TopicPartition
的音讯,寄存到各自的 ArrayDeque<ProducerBatch>
中。
主线程 append
时,往队尾插入,sender
线程取出时,则往队头取出。
ProducerBatch
批量音讯
ProducerBatch
为批量音讯的形象。
在编写客户端发送音讯时,客户端面向的类则是 ProducerRecord
,kafka
客户端,在发送音讯时,会将 ProducerRecord
放入 ProducerBatch
,使音讯更加紧凑。
如果为每个音讯都单独创立内存空间,那么内存空间的开拓和开释,则将会比拟耗时。因而 ProducerBatch
外部有一个 ByteBufferOutputStream bufferStream
(实则为 ByteBuffer
), 应用 ByteBuffer
反复利用内存空间。
bufferStream
值的大小为:
public final class RecordAccumulator {
// 该值大小,可通过 buffer.memory 配置
private final BufferPool free;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
}
}
其中,batchSize
默认 16384B
,即 16kb
,可通过 batch.size
配置。第 2 个入参的值则为音讯的大小。
须要留神的是,bufferStream
的内存空间是从 free
内存空间中划出的。
下面有说到,ProducerBatch
会应用 ByteBuffer
追加音讯。然而,如果你看代码,你会发现 ProducerBatch
在做音讯的追加时,会将音讯放入 DataOutputStream appendStream
。如同跟咱们说的 不一样! 然而实际上,就是利用 ByteBuffer
,这里还须要看 appendStream
是如何初始化的!
注:MemoryRecordsBuilder 为 ProducerBatch 中的一个属性
public class MemoryRecordsBuilder {
private final ByteBufferOutputStream bufferStream;
private DataOutputStream appendStream;
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - firstTimestamp;
// 往 appendStream 中追加音讯
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
}
MemoryRecordsBuilder
初始化
public class MemoryRecordsBuilder {
private final ByteBufferOutputStream bufferStream;
private DataOutputStream appendStream;
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
// .. 省略局部代码
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
// 应用 bufferStream 包装
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
}
能够看到实际上应用的还是 ByteBufferOutputStream bufferStream
Sender
线程
Sender
线程在发送音讯时,会从 RecordAccumulator
中取出音讯,并将放在 RecordAccumulator
中的 Deque<ProducerBatch>
转换成 Map<nodeId, List<ProducerBatch>>
,这里的 nodeId
是 kafka
节点的 id
。再发送给 kafka
之前,又会将音讯封装成 Map<nodeId, ClientRequest>
。
申请在从 Sender
发往 kafka
时,还会被存入 InFlightRequests
public class NetworkClient implements KafkaClient {
/* the set of requests currently being sent or awaiting a response */
private final InFlightRequests inFlightRequests;
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
// 将申请放入
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
}
}
InFlightRequests
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
*/
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();}
InFlightRequests
的作用是存储曾经发送的,或者发送了,然而未收到响应的申请。InFlightRequests
类中有一个属性 maxInFlightRequestsPerConnection
, 标识一个节点最多能够缓存多少个申请。该默认值为 5
, 可通过 max.in.flight.requests.per.connection
进行配置, 须要留神的是 InFlightRequests
对象是在创立 KafkaProducer
时就会被创立。
requests
参数的 key
为 nodeId
,value
则为缓存的申请。
sender
线程 在发送音讯时,会先判断 InFlightRequests
对应的申请缓存中是否超过了 maxInFlightRequestsPerConnection
的大小
代码入口:Sender.sendProducerData
public class Sender implements Runnable {private long sendProducerData(long now) {
// ... 省略局部代码
while (iter.hasNext()) {Node node = iter.next();
// todo 这里为代码入口
if (!this.client.ready(node, now)) {iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// ... 省略局部代码
}
}
public class NetworkClient implements KafkaClient {private boolean canSendRequest(String node, long now) {return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
}
final class InFlightRequests {public boolean canSendMore(String node) {Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
}
从 InFlightRequests
的设计中,能够看到,咱们能够很轻松的就晓得,哪个 kafka
节点的负载是最低。因为只须要判断 requests
中对应 node
汇合的大小即可。
重要参数
acks
用于指定分区中须要有多少个正本收到音讯,生产者才会认为音讯是被写入的 acks
= 1。默认为 1, 只有 leader
正本写入,则被认为曾经写入。如果音讯曾经被写入 leader
正本,且曾经返回给生产者 ok
,然而在 follower
拉取 leader
音讯之前,leader
正本忽然挂掉,那么此时音讯也会失落acks
= 0。发送音讯后,不须要期待服务端的响应,此配置,吞吐量最高。acks
= -1 或者 all。须要期待所有 ISR
中的所有正本都胜利写入音讯之后,才会收到服务端的胜利响应。
须要留神的一点是 acks
入参是 String
,而不是 int
max.request.size
客户端容许发送的音讯最大长度,默认为 1MB
.
retries
、retry.backoff.ms
retries
配置生产者的重试次数,默认为 0
. retry.backoff.ms
配置两次重试的间隔时间
compression.type
指定音讯的压缩形式,默认为 none
。可选配置gzip
,snappy
,lz4
connection.max.idle.ms
指定在多久之后敞开闲置的连贯,默认 540000(ms)
, 即 9 分钟
linger.ms
指定发送 ProducerBatch
之前期待更多的音讯(ProducerRecord
) 退出 ProducerBatch
的工夫,默认为 0
。生产者在 ProducerBatch
填充斥时,或者等待时间超过 linger.ms
发送音讯进来。
receive.buffer.bytes
设置 Socket
接管音讯缓存区的大小,默认 32678B
, 32KB
。如果设置为 -1
, 则示意应用 操作系统的默认值。如果 Procuer
和 kafka
处于不同的机房,能够调大此参数。
send.buffer.bytes
设置 Socket
发送音讯缓冲区大小。默认 131072B
, 即128KB
。如果设置为 -1
,则应用操作系统的默认值
request.timeout.ms
Producer
期待响应的最长工夫,默认 30000ms
。须要留神的是,该参数须要比 replica.lag.time.max.ms
值更大。能够缩小因客户端重试,而造成的音讯反复
buffer.memory
配置音讯追加器,内存大小。默认最大为 33554432B
,即 32MB
batch.size
ProducerBatch
ByteBuffer
。默认 16384B
,即 16kb
max.block.ms
生产者生成音讯过快时,客户端最多阻塞多少工夫。
总结
kafka
将生产者生产音讯,音讯发送给服务端,拆成了 2 个过程。生产音讯交由 主线程, 音讯发送给服务端的工作交由sender
线程。- 通过
RecordAccumulator
的设计,将生产音讯,与发送音讯解耦。 RecordAccumulator
外部存储数据的数据结构是ArrayDeque
. 队尾追加音讯,队头取出音讯- 开发人员编写的
ProducerRecord
,在音讯发送之前会被转为ProducetBatch
。为的是批量发送音讯,进步网络 IO 效率 - 为了防止,每个节点负载过高,
kafka
设计了InFlightRequests
, 将为响应的音讯放入其中 - 从源码角度,
buffer.memory
最好是buffer.memory
整数倍大小。因为ProducerBatch
的ByteBuffer
是从RecordAccumulator
的ByteBuffer
中划出的
与 RocketMQ
区别
RocketMQ
没有将生产音讯与发送音讯解耦。RocketMQ
的音讯发送,分为 同步,异步、单向。其中单向发送与kafka
的acks
= 0 的配置成果一样。然而实际上,还得看RocketMQ broker
的 刷盘配置!kafka
发送失败,默认不重试,RocketMQ
默认重试 2 次。不过RocketMQ
无奈配置 2 次重试的间隔时间.kafka
能够配置重试的间隔时间。RocketMQ
默认音讯最大为4MB
,kafka
默认1MB
RocketMQ
在音讯的发送上,是间接应用Netty
。kafka
则是应用NIO
本人实现通信。(虽说,Netty
也是基于NIO
)- 当然还有很多咯 ….., 因为设计齐全不一样!,理论解决场景也不一样
常识补充
ByteBuffer
ByteBuffer
个别用于网络传输的缓冲区。
先来看下 ByteBuffer
的类继承体系
ByteBuffer
次要的 2 个父类。DirectByteBuffer
、HeapByteBuffer
。一般而言,咱们次要的是应用 HeapByteBuffer
。
ByteBuffer
重要属性
position
以后读取的地位
mark
为某一读过的地位做标记,便于某些时候回退到该地位
limit
读取的完结地位
capacity
buffer
大小
ByteBuffer
根本办法
put()
往 buffer
中写数据,并将 position
往前挪动
flip()
将 position
设置为 0,limit
设置为以后地位
rewind()
将 position
设置为 0, limit
不变
mark()
将 mark
设置为以后 position
值,调用 reset()
, 会将 mark
赋值给 position
clear()
将 position
设置为 0,limit
设置为 capacity
ByteBuffer
食用 DEMO
FileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt");
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = channel.read(buffer);
while (read != -1) {System.out.println(new String(buffer.array(), Charset.defaultCharset()));
buffer.clear();
read = channel.read(buffer);
}
ArrayDeque
ArrayDeque
,是一个双端队列。即能够从队头插入元素,也能够从队尾插入元素
对于双端队列,既能够应用 链表的形式实现,也能够应用数组的形式实现。JDK
中 LinkedList
应用链表实现,ArrayDeque
则应用数组的形式实现
来看 ArrayDeque
的实现。
ArrayDeque 中,有 head
, tail
别离指向 头指针,和尾指针。能够把 ArrayDeque
设想成循环数组
插入
- 当往队尾插入元素时,tail 指针会往前走
- 当往队前插入元素时,head 指针会向后走
删除
- 从队头删除元素 4 ,
head
会往前走
- 从队尾删除元素 3,
tail
会往后走
能够看到,这里通过挪动 head
, tail
指针就能够删除元素了。
扩容
当 tail
、head
都指向都一个地位时,则须要扩容
扩容会将数组的大小裁减为原来的 2 倍,而后从新将 head
指向数组 0
下标, tail
指向数组的最初一个元素地位。
下面的数组,在从新扩容后,会变成上面这个样子
public class ArrayDeque<E> extends AbstractCollection<E>
implements Deque<E>, Cloneable, Serializable
{private void doubleCapacity() {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = a;
head = 0;
tail = n;
}
}