Kafka 发送消息大小问题

⚠️ 本文实验的 Kafka 版本为 2.11 版本.


kafka 中的消息指的就是一条 ProducerRecord, 里面除了携带发送的数据之外, 还包含:

  • topic 发往的 Topic
  • partition 发往的分区
  • headers 头信息
  • key 数据
  • value 数据
  • timestamp-long 时间戳

Producer 生产消息过长

在生产者发送消息的时候, 并不是上面所有的信息都算在发送的消息大小. 详情见下面代码.

上面的代码会将 value 序列化成字节数组, 参与序列化的有 topic,headers,key. 用来验证 value 是否超出长度的是 ensureValidRecordSize(serializedSize); 方法.

ensureValidRecordSize 从两个方面验证, 一个是 maxRequestSize(max.request.size), 另一个是 totalMemorySize(buffer.memory), 只有当 value 的长度同时小于时, 消息才可以正常发送.

这里有个注意的点, 如果只是单纯的发送消息, 没有用 Callback 进行监控或者用 Future 进行获得结果, 在消息过长的情况下, 不会主动发出提示,

使用 Future 接收结果

Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"));
RecordMetadata recordMetadata = send.get();

Future 类中 get()方法, @throws ExecutionException 如果计算抛出异常, 该方法将会抛出该异常.

使用 Callback 进行监控

先看 Kafka 专门为回调写的接口.

// 英文注释省略, 总的来说: 用于异步回调, 当消息发送 server 已经被确认之后, 就会调用该方法
// 该方法中的肯定有一个参数不为 null, 如果没有异常产生, 则 metadata 有数据, 如果有异常则相反
public void onCompletion(RecordMetadata metadata, Exception exception);
kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"), new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();

日志 Level=DEBUG

将日志的消息级别设置为 DEBUG, 也会给标准输出输出该警告信息.

Future 和 Callback 总结

通过上面两种比较, 不难发现 Future 是 Java 并发标准库中, 并不是专门为 kafka 而设计, 需要显示捕获异常, 而 Callback 接口是 kafka 提供标准回调措施, 所以应尽可能采用后者.


在生产者有一个限制消息的参数, 而在服务端也有限制消息的参数, 该参数就是
message.max.bytes, 默认为 1000012B (大约 1MB), 服务端可以接收不到 1MB 的数据.(在新客户端 producer, 消息总是经过分批 group into batch 的数据, 详情见 RecordBatch 接口).

设置 Broker 端接收消息大小

修改 broker 端的可以接收的消息大小, 需要在 broker 端 server.properties 文件中添加message.max.bytes=100000. 数值可以修改成自己想要的, 单位是 byte.

生产端消息大于 broker 会发生什么

如果生产者设置的消息发送大小为 1MB, 而 broker 端设置的消息大小为 512KB 会发生什么?
答案就是 broker 会拒绝该消息, 生产者会返回一个RecordTooLargeException. 该消息是不会被消费者消费. 提示的信息为: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.


消费者也会进行消息限制, 这里介绍有关三个限制消费的参数

  • fetch.max.bytes 服务端消息合集 (多条) 能返回的大小
  • fetch.min.bytes 服务端最小返回消息的大小
  • fetch.max.wait.ms 最多等待时间

如果 fetch.max.wait.ms 设置的时间到达, 即使可以返回的消息总大小没有满足 fetch.min.bytes 设置的值, 也会进行返回.

fetch.max.bytes 设置过小

如果 fetch.max.bytes 设置过小会发生什么? 会是不满足条件的数据一条都不返回吗? 我们可以根据文档来查看一下.

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress.

英文的大意就是: fetch.max.bytes 表示服务端能返回消息的总大小. 消息是通过分批次返回给消费者. 如果在分区中的第一个消息批次大于这个值, 那么该消息批次依然会返回给消费者, 保证流程运行.

可以得出结论: 消费端的参数只会影响消息读取的大小.

实践 fetch.max.bytes 设置过小

properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024);
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1);
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));

启动消费者, 添加上面三个参数. 指定消息批次最小最大返回的大小以及允许抓取最长的等待时间. 最后将返回的消息总数输出到标准输出.

实验结果: 因为每次发送的消息都要大于 1024B, 所以消费者每个批次只能返回一条数据. 最终会输出 1 …
