共计 8165 个字符,预计需要花费 21 分钟才能阅读完成。
引言
Kafka 是一款很棒的消息系统,今天我们就来深入了解一下它的实现细节,首先关注 Producer 这一方。
要使用 kafka 首先要实例化一个 KafkaProducer,需要有 brokerIP、序列化器等必要 Properties 以及 acks(0、1、n)、compression、retries、batch.size 等非必要 Properties,通过这个简单的接口可以控制 Producer 大部分行为,实例化后就可以调用 send 方法发送消息了。
核心实现是这个方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
return doSend(interceptedRecord, callback);//②
}
通过不同的模式可以实现发送即忘(忽略返回结果)、同步发送(获取返回的 future 对象,回调函数置为 null)、异步发送(设置回调函数)三种消息模式。
我们来看看消息类 ProducerRecord 有哪些属性:
private final String topic;// 主题
private final Integer partition;// 分区
private final Headers headers;// 头
private final K key;// 键
private final V value;// 值
private final Long timestamp;// 时间戳
它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无 key 等。
①中 ProducerInterceptors(有 0 ~ 无穷多个,形成一个拦截链)对 ProducerRecord 进行拦截处理(比如打上时间戳,进行审计与统计等操作)
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// 不抛出异常,继续执行下一个拦截器
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
如果用户有定义就进行处理并返回处理后的 ProducerRecord,否则直接返回本身。
然后②中 doSend 真正发送消息,并且是异步的(源码太长只保留关键):
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 序列化 key 和 value
byte[] serializedKey;
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) { }
byte[] serializedValue;
try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) { }
// 计算分区获得主题与分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 回调与事务处理省略。Header[] headers = record.headers().toArray();
// 消息追加到 RecordAccumulator 中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 该批次满了或者创建了新的批次就要唤醒 IO 线程发送该批次了,也就是 sender 的 wakeup 方法
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();}
return result.future;
} catch (Exception e) {
// 拦截异常并抛出
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
下面是计算分区的方法:
private int partition(ProducerRecord<K, V> record,
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();
// 消息有分区就直接使用,否则就使用分区器计算
return partition != null ?
partition :
partitioner.partition(record.topic(), record.key(), serializedKey,
record.value(), serializedValue, cluster);
}
默认的分区器 DefaultPartitioner 实现方式是如果 partition 存在就直接使用,否则根据 key 计算 partition,如果 key 也不存在就使用 round robin 算法分配 partition。
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {//key 为空
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);// 可用的分区
if (availablePartitions.size() > 0) {// 有分区,取模就行
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();} else {// 无分区,return Utils.toPositive(nextValue) % numPartitions;
}
} else {// key 不为空,计算 key 的 hash 并取模获得分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {counter = currentCounter;}
}
return counter.getAndIncrement();// 返回并加一,在取模的配合下就是 round robin}
}
以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。
Sender(是一个 Runnable,被包含在一个 IO 线程 ioThread 中,该线程不断从 RecordAccumulator 队列中的读取消息并通过 Selector 将数据发送给 Broker)的 wakeup 方法,实际上是 KafkaClient 接口的 wakeup 方法,由 NetworkClient 类实现,采用了 NIO,也就是 java.nio.channels.Selector.wakeup() 方法实现。
Sender 的 run 中主要逻辑是不停执行准备消息和等待消息:
long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④
③完成消息设置并保存到信道中,然后监听感兴趣的 key,由 KafkaChannel 实现。
public void setSend(Send send) {if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is" + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// transportLayer 的一种实现中的相关方法
public void addInterestOps(int ops) {key.interestOps(key.interestOps() | ops);
}
④主要是 Selector 的 poll,其 select 被 wakeup 唤醒:
public void poll(long timeout) throws IOException {
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);//wakeup 使其停止阻塞
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
if (dataInBuffers) {keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();} else {madeReadProgressLastPoll = true; //no work is also "progress"}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
}
其中 pollSelectionKeys 方法会调用如下方法完成消息发送:
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();}
Send 是一次数据发包,一般由 ByteBufferSend 或者 MultiRecordsSend 实现,其 writeTo 调用 transportLayer 的 write 方法,一般由 PlaintextTransportLayer 或者 SslTransportLayer 实现,区分是否使用 ssl:
public long writeTo(GatheringByteChannel channel) throws IOException {long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
public int write(ByteBuffer src) throws IOException {return socketChannel.write(src);
}
到此就把 Producer 的业务相关逻辑处理和非业务相关的网络 2 方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。
比如顺序保证就是 max.in.flight.requests.per.connection,InFlightRequests 的 doSend 会进行判断(由 NetworkClient 的 canSendRequest 调用),只要该参数设为 1 即可保证当前包未确认就不能发送下一个包从而实现有序性
public boolean canSendMore(String node) {Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
再比如可靠性,通过设置 acks,Sender 中 sendProduceRequest 的 clientRequest 加入了回调函数:
RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());// 调用 completeBatch
}
};
/**
* 完成或者重试投递,这里如果 acks 不对就会重试
*
* @param batch The record batch
* @param response The produce response
* @param correlationId The correlation id for the request
* @param now The current POSIX timestamp in milliseconds
*/
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now, long throttleUntilTimeMs) { }
public class ProduceResponse extends AbstractResponse {
/**
* Possible error code:
* INVALID_REQUIRED_ACKS (21)
*/
}
kafka 源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。