kafka生产者的消息发送机制

5次阅读

共计 14235 个字符,预计需要花费 36 分钟才能阅读完成。

开篇一张图,读者更幸福,不多说上架构图。

这个架构图我们在前面一篇文章《kafka 生产者的蓄水池机制》里面介绍过,上一篇我们是介绍了这个图里面的消息收集过程(我们成为“蓄水池”机制),这里我们就介绍它的另外一部分,消息的发送机制。

1.1、Sender 运行过程

所有的消息发送,都是从 Sender 线程开始,它是一个守护线程,所以我们首先就需要来看一下 Sender 的 run 方法,最外层的 run 方式是一个主循环不断调用具体逻辑运行方法 run,我们看下它的具体逻辑处理 run 方法:

 void run(long now) {
        // 生产者事务管理相关处理,本章节不做具体分析,后面专门章节再做分析,大家先了解一下
        if (transactionManager != null) {
            try {if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();

                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for "+"some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }
        // 实际的数据发送请求,并处理服务端响应
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

接下来我们从两个层面来看,一个是消息发送,一个是消息返回响应处理。

1.2、消息的发送

先看下 sendProducerData 的具体逻辑:

private long sendProducerData(long now) {
        // 获取集群信息
        Cluster cluster = metadata.fetch();

        // 获取那些可以发送消息的分区列表信息
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // 如果这些分区没有对应的 leader,就需要强制对 metadata 信息进行更新
        if (!result.unknownLeaderTopics.isEmpty()) {
            // 没有 leader 的场景例如 leader 选举,或者 topic 已失效,这些都需要将 topic 重新加入,发送到服务端请求更新,因为现在还需要往这些 topic 发送消息
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();}

        // 遍历所有获取到的网络节点,基于网络连接状态来检测这些节点是否可用,如果不可用则剔除
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {Node node = iter.next();
            // 节点连接状态检查,如果允许连接,则重新创建连接
            if (!this.client.ready(node, now)) {
                // 未准备好的节点删除
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // 获取所有待发送的批量消息以及其对应的 leader 节点集合
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
                
        // 如果需要保证消息的强顺序性,则缓存对应 topic 分区对象,防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息
        if (guaranteeMessageOrder) {
            // 将每个 batch 的分区对象信息加入到 mute 集合,采取 Set 实现,重复的 topicpartition 信息不会被加入
            for (List<ProducerBatch> batchList : batches.values()) {for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        // 获取本地过期的消息,返回 TimeoutException,并释放空间
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
        // 过期的 batch 消息处理
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
        // 更新度量信息
        sensors.updateProduceRequestMetrics(batches);

        // 设置 pollTimeout,如果存在待发送的消息,则设置 pollTimeout 等于 0,这样可以立即发送请求,从而能够缩短剩余消息的缓存时间,避免堆积
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
        // 调用 NetWorkClient 将消息发送到服务端
        sendProduceRequests(batches, now);

        return pollTimeout;
}

归纳起来 sendProducerData 的核心流程如下:
1. 通过 accumulator.ready 方法获取可发送的分区列表信息
2. 调用 client.ready 对获取到的所有网络节点进行连通性检测
3. 通过.accumulator.drain 获取所有待发送的批量消息以及其对应的 leader 节点集合
4. 在需要保障分区消息的强顺序性的场景下调用 accumulator.mutePartition 将分区信息添加到 mute 集合
5. 调用 sendProduceRequests 发送生产消息请求

下面逐个流程讲解:
通过 accumulator.ready 方法获取可发送的分区列表信息:

 public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        // 可接受消息的节点集合
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        // 记录未找到 leader 副本的 Topic 信息集合
        Set<String> unknownLeaderTopics = new HashSet<>();
        
        // 是否有线程在等待 BufferPool 分配空间
        boolean exhausted = this.free.queued() > 0;
        // 遍历待发送的 batch 里面的每个分区信息,对其 leader 执行判定
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {TopicPartition part = entry.getKey();
            Deque<ProducerBatch> deque = entry.getValue();
            
            // 获取当前 topic 分区 leader 副本所在的节点
            Node leader = cluster.leaderFor(part);
            synchronized (deque) {if (leader == null && !deque.isEmpty()) {
                    // 该分区下的 leader 未知,但是存在往该分区发送的消息,需要记录下,在后面的流程当发现有未知 leader 需要强制向服务端发送 metadata 的信息更新请求
                    unknownLeaderTopics.add(part.topic());
                } 
                // 所有可发送的节点需要不能在 mute 集合里面,保障消息有序性,当 mute 里面还有消息未发送完成不能继续追加发送
                else if (!readyNodes.contains(leader) && !muted.contains(part)) {ProducerBatch batch = deque.peekFirst();
                    if (batch != null) {long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        // 是否处于重试操作判断
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        // 标记当前 leader 是否可发送
                        boolean sendable = full // 1. 队列中有多个 RecordBatch,或第一个 RecordBatch 已满
                        || expired // 2. 当前等待重试的时间过长
                        || exhausted // 3. 有其他线程在等待 BufferPoll 分配空间,即本地消息缓存已满
                        || closed // 4. producer 已经关闭
                        || flushInProgress();// 5. 有线程正在等待 flush 操作完成
                        if (sendable && !backingOff) {
                        // 满足可发送状态,并且没有处于重试操作的状态下,将当前 leader 加入可发送节点
                            readyNodes.add(leader);
                        } else {long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            // 更新下次执行 ready 判定的时间间隔
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        // 返回检查结果
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

调用 client.ready 对获取到的所有网络节点进行连通性检测:

 public boolean ready(Node node, long now) {if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node" + node);
        //connectionStates 已就绪,直接返回可连接
        if (isReady(node, now))
            return true;
        
        // 连接状态显示可连接
        if (connectionStates.canConnect(node.idString(), now))
            // 则调用 selector 初始化该连接
            initiateConnect(node, now);

        return false;
}

通过.accumulator.drain 获取所有待发送的批量消息以及其对应的 leader 节点集合:

 public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
                                                   Set<Node> nodes,
                                                   int maxSize,
                                                   long now) {if (nodes.isEmpty())
            return Collections.emptyMap();
            
        // 返回的 nodeid 对应的发送消息 batch 信息
        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        // 遍历每一个可连通的 node
        for (Node node : nodes) {
            int size = 0;
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            List<ProducerBatch> ready = new ArrayList<>();
            /* drainIndex 用于记录上次发送停止的位置,本次继续从当前位置开始发送,* 如果每次都是从 0 位置开始,可能会导致排在后面的分区饿死,这是一个简单的负载均衡策略
            */
            int start = drainIndex = drainIndex % parts.size();
            do {PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                // 如果是需要保障消息的强顺序性,则不能将消息添加进目标分区,否则会导致消息乱序
                if (!muted.contains(tp)) {
                    // 获取当前分区对应的 RecordBatch 集合
                    Deque<ProducerBatch> deque = getDeque(tp);
                    if (deque != null) {synchronized (deque) {ProducerBatch first = deque.peekFirst();
                            if (first != null) {
                                // 当前第一个 batch 是否处于重试状态或者已重试过
                                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                                // 没有重试过,或者重试已超时
                                if (!backoff) {if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                                         // 单次消息数据量已达到上限,结束循环,一般对应一个请求的大小,防止请求消息过大
                                        break;
                                    } 
                                    // 处理处于重试状态下的消息
                                    else {
                                        // 省略在重试状态下的事务处理流程
                                        // 遍历每个节点,节点的起始位置也以一个轮训方式来遍历,并且每个队列里面的 batch 也都是只取第一个,每个队列轮训着取,所有这些操作都是为了对消息发送的均衡处理,保障消息公平发送
                                        ProducerBatch batch = deque.pollFirst();
                                        //close 代表着消息 batch 通道被关闭,只能读取,无法写入
                                        batch.close();
                                        size += batch.records().sizeInBytes();
                                        ready.add(batch);
                                        batch.drained(now);
                                    }
                                }
                            }
                        }
                    }
                }
                // 更新本次 drainIndex
                this.drainIndex = (this.drainIndex + 1) % parts.size();} while (start != drainIndex);
            batches.put(node.id(), ready);
        }
        return batches;
}

调用 accumulator.mutePartition 将分区信息添加到 mute 集合,这个过程比较简单就是将遍历待发送的 batch 消息,如果设置了保障消息时序强一致性,那就将这个分区信息保存在 mute 集合之中,每次发送消息之前都会去检查这个队列是否包含已有的分区,如果有则本次不做发送,每发送完成之后都会调用 mute 集合去除所在的分区信息,以便可以放入下一个消息进行发送。

调用 sendProduceRequests 发送生产消息请求:

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {if (batches.isEmpty())
            return;

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // 遍历所有 batch 消息,找到最小的版本号信息
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();}

        // 遍历 RecordBatch 集合,整理成 produceRecordsByPartition 和 recordsByPartition
        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // 进行消息的向下兼容转换操作,例如分区消息的迁移,从一个高版本迁移到低版本,就需要额外重新构造 MemoryRecords
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {transactionalId = transactionManager.transactionalId();
        }
        
        // 创建 ProduceRequest 请求构造器,produceRecordsByPartition 用于构造请求器
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        // 创建回调对象,用于处理响应,recordsByPartition 用于响应回调处理
        RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);
        // 创建 ClientRequest 请求对象,如果 acks 不等于 0 则表示会等待服务端的响应
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
        // 调用 NetWorkClient 发送消息
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

接下来就需要了解一下 NetWorkClient 的发送流程,它的发送最终都是调用 doSend 函数完成:

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        // 获取目标节点 id
        String nodeId = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        // 省略日志信息打印
        Send send = request.toSend(nodeId, header);
        // 新建 InFlightRequest,并将请求添加进去
        InFlightRequest inFlightRequest = new InFlightRequest(
                header,
                clientRequest.createdTimeMs(),
                clientRequest.destination(),
                clientRequest.callback(),
                clientRequest.expectResponse(),
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        // 网络消息发送
        selector.send(inFlightRequest.send);
}

至此,我们消息发送讲解完成,接下来讲解一下消息的响应拉取过程。

1.3、消息的响应拉取

消息的响应拉取是从 NetworkClient 的 poll 方法开始的,它的逻辑解析如下:

 public List<ClientResponse> poll(long timeout, long now) {ensureActive();

        if (!abortedSends.isEmpty()) {
            // 当连接断开,或者版本不支持,需要优先处理这些响应
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }
        //metada 信息的响应处理
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            // 该 poll 过程处理所有的网络连接、断开连接,初始化新的发送以及处理过程总的发送和接收请求,接收的信息最终会放到 completedReceives 中
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {log.error("Unexpected error during I/O", e);
        }

        // 处理所有的完成操作及响应
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
}

响应操作的核心处理几个函数就是 handle* 的几个函数,我们分别介绍一下:
handleCompletedSends 该方法遍历所有发送完成的对象,对于那些不希望接收响应的请求,创建本地响应队列并添加进去:

private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // 遍历所有的发送完成的 send 对象
        for (Send send : this.selector.completedSends()) {
            // 找出最近一次在 inFlightRequests 的发送请求信息
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
            // 对于发送成功,但是不期望服务端响应的请求,创建本地响应队列并将其添加进去
            if (!request.expectResponse) {
                //inFlightRequests 在发送的时候添加,接收完成后去除
                this.inFlightRequests.completeLastSent(send.destination());
                // 添加到本地响应队列中
                responses.add(request.completed(null, now));
            }
        }
    }

handleCompletedReceives 该方法获取服务端响应,并依据响应分类处理,分别是 metadata、apiversion

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        // 从 completedReceives 中遍历所有的接收信息,completedReceives 中的信息是在上一层的 selector.poll 中添加进去的
        for (NetworkReceive receive : this.selector.completedReceives()) {
            // 获取返回响应的节点 ID
            String source = receive.source();
            // 从 inFlightRequests 集合中获取缓存的 request 对象
            InFlightRequest req = inFlightRequests.completeNext(source);
            // 解析响应信息
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
           // 省略日志
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            if (req.isInternalRequest && body instanceof MetadataResponse)
                // 处理 metadata 的更新响应信息
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                 // 如果是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
                // 添加到本地响应队列
                responses.add(req.completed(body, now));
        }
    }

handleDisconnections 该方法会最终调用 Selector#disconnected 方法获取断开连接的节点 ID 集合,并更新相应节点的连接状态为 DISCONNECTED,同时会清空本地缓存的与该节点相关的数据,最终创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中。如果这一步确实发现了已断开的连接,则标记需要更新本地缓存的节点元数据信息。

handleConnections 该方法会调用 Selector#connected 方法获取连接正常的节点 ID 集合,如果当前节点是第一次建立连接,则需要获取节点支持的 API 版本信息,方法会将当前节点的连接状态设置为 CHECKING_API_VERSIONS,并将节点 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch 集合中,对于其它节点,则更新相应连接状态为 READY。

handleInitiateApiVersionRequests 该方法用于处理 NetworkClient#handleConnections 方法中标记的需要获取支持的 API 版本信息的节点,即记录到 NetworkClient#nodesNeedingApiVersionsFetch 集合中的节点。方法会遍历处理集合中的节点,并在判断目标节点允许接收请求的情况下,构建 ApiVersionsRequest 请求以获取目标节点支持的 API 版本信息,该请求会被包装成 ClientRequest 对象,并在下次 Selector#poll 操作时一并送出。

handleTimedOutRequests 该方法会遍历缓存在 inFlightRequests 中已经超时的相关请求对应的节点集合,针对此类节点将其视作断开连接进行处理。方法会创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中,并标记需要更新本地缓存的集群元数据信息。

最后一个是 completeResponses,它的流程很简单,触发生产者的回调函数,通知服务端的响应信息:

  private void completeResponses(List<ClientResponse> responses) {for (ClientResponse response : responses) {
            try {
                // 遍历之前所有阶段的 handle* 处理过程中添加的 response,并回调其 callback 方法,这样生产者就收到服务端响应信息了
                response.onComplete();} catch (Exception e) {log.error("Uncaught error in request completion:", e);
            }
        }
    }

正文完
 0