关于kafka:Kafka-Producer-异步发送消息居然也会阻塞

86次阅读

共计 3242 个字符,预计需要花费 9 分钟才能阅读完成。

Kafka 始终以来都以高吞吐量的个性而妇孺皆知,就在上周,在一个性能监控我的项目中,须要应用到 Kafka 传输海量音讯,在这过程中遇到了一个 Kafka Producer 异步发送音讯会被阻塞的问题,导致生产端发送耗时很大。

是的,你没听错,Kafka Producer 异步发送音讯也会产生阻塞景象,那到底是怎么回事呢?

在新版的 Kafka Producer 中,设计了一个音讯缓冲池,客户端发送的音讯都会被存储到缓冲池中,同时 Producer 启动后还会开启一个 Sender 线程,一直地从缓冲池获取音讯并将其发送到 Broker,如下图所示:

这么看来,Kafka 的所有发送,都能够看作是异步发送了,因而在新版的 Kafka Producer 中废除掉异步发送的办法了,仅保留了一个 send 办法,同时返回一个 Futrue 对象,须要同步期待发送后果,就应用 Futrue#get 办法阻塞获取发送后果。而我在我的项目中间接调用 send 办法,为何还会发送阻塞呢?

咱们在构建 Kafka Producer 时,会有一个自定义缓冲池大小的参数 buffer.memory,默认大小为 32M,因而缓冲池的大小是有限度的,咱们无妨想一下,缓冲池内存资源耗尽了会怎么样?

Kafka 源码的正文是十分具体的,RecordAccumulator 类是 Kafka Producer 缓冲池的外围类,而 RecordAccumulator 类就有那么一段正文:

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

大略的意思是:

当缓冲池的内存块用完后,音讯追加调用将会被阻塞,直到有闲暇的内存块。

因为性能监控我的项目每分钟须要发送几百万条音讯,只有 Kafka 集群负载很高或者网络稍有稳定,Sender 线程从缓冲池捞取音讯的速度赶不上客户端发送的速度,就会造成客户端发送被阻塞。

我写个例子让大家直观感受一下被阻塞的景象:

public static void main(String[] args) {Properties properties = new Properties();
  properties.put(ProducerConfig.ACKS_CONFIG, "0");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);
  properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);
  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
  KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
  String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  List<byte[]> bytesList = new ArrayList<>();
  Random random = new Random();
  for (int j = 0; j < 1024; j++) {int i1 = random.nextInt(10);
    if (i1 == 0) {i1 = 1;}
    byte[] bytes = new byte[1024 * i1];
    for (int i = 0; i < bytes.length - 1; i++) {bytes[i] = (byte) str.charAt(random.nextInt(62));
    }
    bytesList.add(bytes);
  }

  while (true) {long start = System.currentTimeMillis();
    producer.send(new ProducerRecord<>("test_topic", bytesList.get(random.nextInt(1023))));
    long end = System.currentTimeMillis() - start;
    if (end > 100) {System.out.println("发送耗时:" + end);
    }
    // Thread.sleep(10);
  }
}

以上例子构建了一个 Kafka Producer 对象,同时应用死循环不断地发送音讯,这时如果把 Thread.sleep(10); 正文掉,则会呈现发送耗时很长的景象:

应用 JProfiler 能够查看到分配内存的中央呈现了阻塞:

跟踪到源码:

发现在 org.apache.kafka.clients.producer.internals.BufferPool#allocate 办法中,如果判断缓冲池没有闲暇的内存了,则会阻塞内存调配,直到有闲暇内存为止。

如果不正文 Thread.sleep(10); 这段代码则不会产生阻塞景象,打断点到阻塞的中央,也不会被 Debug 到,从景象可能得悉,Thread.sleep(10); 使得发送音讯的频率变低了,此时 Sender 线程发送的速度超过了客户端的发送速度,缓冲池始终处于未满状态,因而不会产生阻塞景象。

除了以上缓冲池内存满了会产生阻塞之外,Kafka Produer 其它状况都不会产生阻塞了吗?非也,其实还有一个中央,也会产生阻塞!

Kafka Producer 通常在第一次发送音讯之前,须要获取该主题的元数据 Metadata,Metadata 内容包含了主题相干分区 Leader 所在节点信息、正本所在节点信息、ISR 列表等,Kafka Producer 获取 Metadata 后,便会依据 Metadata 内容将音讯发送到指定的分区 Leader 上,整个获取流程大抵如下:

如上图所示,Kafka Producer 在发送音讯之前,会查看主题的 Metadata 是否须要更新,如果须要更新,则会唤醒 Sender 线程并发送 Metatadata 更新申请,此时 Kafka Producer 主线程则会阻塞期待 Metadata 的更新。

如果 Metadata 始终无奈更新,则会导致客户端始终阻塞在那里。

作者简介

作者张乘辉,善于消息中间件技能,负责公司百万 TPS 级别 Kafka 集群的保护,作者保护的公号「后端进阶」不定期分享 Kafka、RocketMQ 系列不讲概念间接真刀真枪的实战总结以及细节上的源码剖析;同时作者也是阿里开源分布式事务框架 Seata Contributor,因而也会分享对于 Seata 的相干常识;当然公号也会分享 WEB 相干常识比方 Spring 全家桶等。内容不肯定八面玲珑,但肯定让你感触到作者对于技术的谋求是认真的!

公众号:后端进阶

技术博客:https://objcoding.com/

GitHub:https://github.com/objcoding/

正文完
 0