乐趣区

关于kafka:聊聊-Kafka-Consumer-源码解析之-ConsumerNetworkClient

一、Consumer 的应用

Consumer 的源码解析次要来看 KafkaConsumer,KafkaConsumer 是 Consumer 接口的实现类。KafkaConsumer 提供了一套封装良好的 API,开发人员能够基于这套 API 轻松实现从 Kafka 服务端拉取音讯的性能,这样开发人员基本不必关怀与 Kafka 服务端之间网络连接的治理、心跳检测、申请超时重试等底层操作,也不用关怀订阅 Topic 的分区数量、分区正本的网络拓扑以及 Consumer Group 的 Rebalance 等 Kafka 具体细节,KafkaConsumer 中还提供了主动提交 offset 的性能,使的开发人员更加关注业务逻辑,进步了开发效率。

上面咱们来看一个 KafkaConsumer 的示例程序:

/**
 * @author: 微信公众号【老周聊架构】*/
public class KafkaConsumerTest {public static void main(String[] args) {Properties props = new Properties();

        // kafka 地址, 列表格局为 host1:port1,host2:port2,...,无需增加所有的集群地址,kafka 会依据提供的地址发现其余的地址 (倡议多提供几个,以防提供的服务器敞开) 必须设置
        props.put("bootstrap.servers", "localhost:9092");
        // key 序列化形式 必须设置
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value 序列化形式 必须设置
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("group.id", "consumer_riemann_test");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可生产多个 topic, 组成一个 list
        String topic = "riemann_kafka_test";
        consumer.subscribe(Arrays.asList(topic));

        while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {Thread.sleep(100);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }
    }
}

从示例中能够看出 KafkaConsumer 的外围办法是 poll(),它负责从 Kafka 服务端拉取音讯。外围办法的具体细节我想放在下一篇再细讲,关乎生产侧的客户端与 Kafka 服务端的通信模型。这一篇咱们次要从宏观的角度来分析下 Consumer 生产端的源码。

二、KafkaConsumer 剖析

咱们先来看下 Consumer 接口,该接口定义了 KafkaConsumer 对外的 API,其外围办法能够分为以下六类:

  • subscribe() 办法:订阅指定的 Topic,并为消费者主动调配分区。
  • assign() 办法:用户手动订阅指定的 Topic,并且指定生产的分区,此办法 subscribe() 办法互斥。
  • poll() 办法:负责从服务端获取音讯。
  • commit*() 办法:提交消费者曾经生产实现的 offset。
  • seek*() 办法:指定消费者起始生产的地位。
  • pause()、resume() 办法:暂停、持续 Consumer,暂停后 poll() 办法会返回空。

咱们先来看下 KafkaConsumer 的重要属性以及 UML 结构图。

  • clientId:Consumer 的惟一标识。
  • groupId:消费者组的惟一标识。
  • coordinator:管制着 Consumer 与服务端 GroupCoordinator 之间的通信逻辑,读者能够了解为 Consumer 与服务端 GroupCoordinator 通信的门面。
  • keyDeserializer、valueDeserializer:key 和 value 的反序列化器。
  • fetcher:负责从服务端获取音讯。
  • interceptors:ConsumerInterceptors 汇合,ConsumerInterceptors.onConsumer() 办法能够在音讯通过 poll() 办法返回给用户之前对其进行拦挡或批改;ConsumerInterceptors.onCommit() 办法也能够在服务端返回提交 offset 胜利的响应进行拦挡或批改。
  • client:ConsumerNetworkClient 负责消费者与 Kafka 服务端的网络通信。
  • subscriptions:SubscriptionState 保护了消费者的生产状态。
  • metadata:ConsumerMetadata 记录了整个 Kafka 集群的元信息。
  • currentThread、refcount:别离记录的 KafkaConsumer 的线程 id 和重入次数

三、ConsumerNetworkClient

ConsumerNetworkClient 在 NetworkClient 之上进行了封装,提供了更高级的性能和更易用的 API。

咱们先来看下 ConsumerNetworkClient 的重要属性以及 UML 结构图。


  • client:NetworkClient 对象。
  • unsent:缓冲队列。UnsentRequests 对象,该对象外部保护了一个 unsent 属性,该属性是 ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>>,key 是 Node 节点,value 是 ConcurrentLinkedQueue<ClientRequest>
  • metadata:用于治理 Kafka 集群元数据。
  • retryBackoffMs:在尝试重试对给定主题分区的失败申请之前期待的工夫量,这防止了在某些故障状况下在严密循环中反复发送申请。对应 retry.backoff.ms 配置,默认 100 ms。
  • maxPollTimeoutMs:应用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期工夫。心跳用于确保消费者的会话放弃沉闷,并在新消费者退出或来到组时促成从新均衡。该值必须设置为低于 session.timeout.ms,但通常不应设置为高于该值的 1/3。它能够调整得更低,以管制失常从新均衡的预期工夫。对应 heartbeat.interval.ms 配置,默认 3000 ms。构造函数中,maxPollTimeoutMs 取的是 maxPollTimeoutMs 与 MAX_POLL_TIMEOUT_MS 的最小值,MAX_POLL_TIMEOUT_MS 默认为 5000 ms。
  • requestTimeoutMs:配置管制客户端期待申请响应的最长工夫。如果在超时之前没有收到响应,客户端将在必要时从新发送申请,或者如果重试用尽,则申请失败。对应 request.timeout.ms 配置,默认 305000 ms。
  • wakeupDisabled:由调用 KafkaConsumer 对象的消费者线程之外的其它线程设置,示意要中断 KafkaConsumer 线程。
  • lock:咱们不须要高吞吐量,所以应用偏心锁来尽量避免饥饿。
  • pendingCompletion:当申请实现时,它们在调用之前被转移到这个队列。目标是防止在持有此对象的监视器时调用它们,这可能会为死锁打开门。
  • pendingDisconnects:断开与协调器连贯节点的队列。
  • wakeup:这个标记容许客户端被平安唤醒而无需期待下面的锁。为了同时启用它,防止须要获取下面的锁是原子的。

ConsumerNetworkClient 的外围办法是 poll() 办法,poll() 办法有很多重载办法,最终会调用 poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) 办法,这三个参数含意是:timer 示意定时器限度此办法能够阻塞多长时间;pollCondition 示意可空阻塞条件;disableWakeup 示意如果 true 禁用触发唤醒。

咱们来简略回顾下 ConsumerNetworkClient 的性能:

3.1 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#trySend

循环解决 unsent 中缓存的申请,对每个 Node 节点,循环遍历其 ClientRequest 链表,每次循环都调用 NetworkClient.ready() 办法检测消费者与此节点之间的连贯,以及发送申请的条件。若符合条件,则调用 NetworkClient.send() 办法将申请放入 InFlightRequest 中期待响应,也放入 KafkaChannel 中的 send 字段期待发送,并将音讯从列表中删除。代码如下:

long trySend(long now) {
    long pollDelayMs = maxPollTimeoutMs;

    // send any requests that can be sent now
    // 遍历 unsent 汇合
    for (Node node : unsent.nodes()) {Iterator<ClientRequest> iterator = unsent.requestIterator(node);
        if (iterator.hasNext())
            pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));

        while (iterator.hasNext()) {ClientRequest request = iterator.next();
            // 调用 NetworkClient.ready()查看是否能够发送申请
            if (client.ready(node, now)) {
                // 调用 NetworkClient.send()办法,期待发送申请。client.send(request, now);
                // 从 unsent 汇合中删除此申请
                iterator.remove();} else {
                // try next node when current node is not ready
                break;
            }
        }
    }
    return pollDelayMs;
}

3.2 计算超时工夫

如果没有申请在进行中,则阻塞工夫不要超过重试退却工夫。

3.3 org.apache.kafka.clients.NetworkClient#poll

  • 判断是否须要更新 metadata 元数据
  • 调用 Selector.poll() 进行 socket 相干的 IO 操作
  • 解决实现后的操作(解决一系列 handle*() 办法解决申请响应、连贯断开、超时等状况,并调用每个申请的回调函数)

3.4 调用 checkDisconnects() 办法检测连贯状态

调用 checkDisconnects() 办法检测连贯状态。检测消费者与每个 Node 之间的连贯状态,当检测到连贯断开的 Node 时,会将其在 unsent 汇合中对应的全副 ClientRequest 对象革除掉,之后调用这些 ClientRequest 的回调函数。

private void checkDisconnects(long now) {
    // any disconnects affecting requests that have already been transmitted will be handled
    // by NetworkClient, so we just need to check whether connections for any of the unsent
    // requests have been disconnected; if they have, then we complete the corresponding future
    // and set the disconnect flag in the ClientResponse
    for (Node node : unsent.nodes()) {
        // 检测消费者与每个 Node 之间的连贯状态
        if (client.connectionFailed(node)) {
            // Remove entry before invoking request callback to avoid callbacks handling
            // coordinator failures traversing the unsent list again.
            // 在调用申请回调之前删除条目以防止回调解决再次遍历未发送列表的协调器故障。Collection<ClientRequest> requests = unsent.remove(node);
            for (ClientRequest request : requests) {RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
                AuthenticationException authenticationException = client.authenticationException(node);
                // 调用 ClientRequest 的回调函数
                handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
                        request.callback(), request.destination(), request.createdTimeMs(), now, true,
                        null, authenticationException, null));
            }
        }
    }
}

3.5 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#maybeTriggerWakeup

查看 wakeupDisabled 和 wakeup,查看是否有其它线程中断。如果有中断请求,则抛出 WakeupException 异样,中断以后 ConsumerNetworkClient.poll() 办法。

public void maybeTriggerWakeup() {
    // 通过 wakeupDisabled 检测是否在执行不可中断的办法,通过 wakeup 检测是否有中断请求。if (!wakeupDisabled.get() && wakeup.get()) {log.debug("Raising WakeupException in response to user wakeup");
        // 重置中断标记
        wakeup.set(false);
        throw new WakeupException();}
}

3.6 再次调用 trySend() 办法

再次调用 trySend() 办法。在步骤 2.1.3 中调用了 NetworkClient.poll() 办法,在其中可能曾经将 KafkaChannel.send 字段上的申请发送进来了,也可能曾经新建了与某些 Node 的网络连接,所以这里再次尝试调用 trySend() 办法。

3.7 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#failExpiredRequests

解决 unsent 中超时申请。它会循环遍历整个 unsent 汇合,检测每个 ClientRequest 是否超时,将过期申请退出到 expiredRequests 汇合,并将其从 unsent 汇合中删除。调用超时 ClientRequest 的回调函数 onFailure()。

private void failExpiredRequests(long now) {
    // clear all expired unsent requests and fail their corresponding futures
    // 革除所有过期的未发送申请并使其相应的 futures 失败
    Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now);
    for (ClientRequest request : expiredRequests) {RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
        // 调用回调函数
        handler.onFailure(new TimeoutException("Failed to send request after" + request.requestTimeoutMs() + "ms."));
    }
}

private Collection<ClientRequest> removeExpiredRequests(long now) {List<ClientRequest> expiredRequests = new ArrayList<>();
    for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) {Iterator<ClientRequest> requestIterator = requests.iterator();
        while (requestIterator.hasNext()) {ClientRequest request = requestIterator.next();
            // 查看是否超时
            long elapsedMs = Math.max(0, now - request.createdTimeMs());
            if (elapsedMs > request.requestTimeoutMs()) {
                // 将过期申请退出到 expiredRequests 汇合
                expiredRequests.add(request);
                requestIterator.remove();} else
                break;
        }
    }
    return expiredRequests;
}

四、RequestFutureCompletionHandler

说 RequestFutureCompletionHandler 之前,咱们先来看下 ConsumerNetworkClient.send() 办法。外面的逻辑会将待发送的申请封装成 ClientRequest,而后保留到 unsent 汇合中期待发送,代码如下:

public RequestFuture<ClientResponse> send(Node node,
                                          AbstractRequest.Builder<?> requestBuilder,
                                          int requestTimeoutMs) {long now = time.milliseconds();
    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
    ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
            requestTimeoutMs, completionHandler);
    // 创立 clientRequest 对象,并保留到 unsent 汇合中。unsent.put(node, clientRequest);

    // wakeup the client in case it is blocking in poll so that we can send the queued request
    // 唤醒客户端以防它在轮询中阻塞,以便咱们能够发送排队的申请。client.wakeup();
    return completionHandler.future;
}

咱们重点来关注一下 ConsumerNetworkClient 中应用的回调对象——RequestFutureCompletionHandler。其继承关系如下:


从 RequestFutureCompletionHandler 继承关系图咱们能够晓得,它不仅实现了 RequestCompletionHandler 接口,还组合了 RequestFuture 类,RequestFuture 是一个泛型类,其外围字段与办法如下:

  • listeners:RequestFutureListener 队列,用来监听申请实现的状况。RequestFutureListener 接口有 onSuccess() 和 onFailure () 两个办法,对应于申请失常实现和出现异常两种状况。
  • isDone():示意以后申请是否曾经实现,不论失常实现还是出现异常,此字段都会被设置为 true。
  • value():记录申请失常实现时收到的响应,与 exception() 办法互斥。此字段非空示意失常实现,反之示意出现异常。
  • exception():记录导致申请异样实现的异样类,与 value() 互斥。此字段非空则示意出现异常,反之则示意失常实现。

咱们之所以要剖析源码,是因为源码中有很多设计模式能够借鉴,利用到你本人的工作中。RequestFuture 中有两处典型的设计模式的应用,咱们来看一下:

  • compose() 办法:应用了适配器模式。
  • chain() 办法:应用了责任链模式。

4.1 RequestFuture.compose()

/**
 * 适配器
 * Adapt from a request future of one type to another.
 *
 * @param <F> Type to adapt from
 * @param <T> Type to adapt to
 */
public abstract class RequestFutureAdapter<F, T> {public abstract void onSuccess(F value, RequestFuture<T> future);

    public void onFailure(RuntimeException e, RequestFuture<T> future) {future.raise(e);
    }
}

/**
 * RequestFuture<T> 适配成 RequestFuture<S>
 * Convert from a request future of one type to another type
 * @param adapter The adapter which does the conversion
 * @param <S> The type of the future adapted to
 * @return The new future
 */
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
    // 适配之后的后果
    final RequestFuture<S> adapted = new RequestFuture<>();
    // 在以后 RequestFuture 上增加监听器
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {adapter.onSuccess(value, adapted);
        }

        @Override
        public void onFailure(RuntimeException e) {adapter.onFailure(e, adapted);
        }
    });
    return adapted;
}

应用 compose() 办法进行适配后,回调时的调用过程,也能够认为是申请实现的事件流传流程。当调用 RequestFuture<T> 对象的 complete() 或 raise() 办法时,会调用 RequestFutureListener<T> 的 onSuccess() 或 onFailure() 办法,而后调用 RequestFutureAdapter<T, S> 的对应办法,最终调用 RequestFuture<S> 对象的对应办法。

4.2 RequestFuture.chain()

chain() 办法与 compose() 办法相似,也是通过 RequestFutureListener 在多个 RequestFuture 之间传递事件。代码如下:

public void chain(final RequestFuture<T> future) {
    // 增加监听器
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            // 通过监听器将 value 传递给下一个 RequestFuture 对象
            future.complete(value);
        }

        @Override
        public void onFailure(RuntimeException e) {
            // 通过监听器将异样传递给下一个 RequestFuture 对象
            future.raise(e);
        }
    });
}

好了,ConsumerNetworkClient 的源码剖析告一段落了,心愿文章对你有帮忙,咱们下期再见。

退出移动版