Kafka消息过长详解

57次阅读

共计 4164 个字符,预计需要花费 11 分钟才能阅读完成。

Kafka 发送消息大小问题

⚠️ 本文实验的 Kafka 版本为 2.11 版本.

消息概述

kafka 中的消息指的就是一条 ProducerRecord, 里面除了携带发送的数据之外, 还包含:

  • topic 发往的 Topic
  • partition 发往的分区
  • headers 头信息
  • key 数据
  • value 数据
  • timestamp-long 时间戳

Producer 生产消息过长

在生产者发送消息的时候, 并不是上面所有的信息都算在发送的消息大小. 详情见下面代码.

上面的代码会将 value 序列化成字节数组, 参与序列化的有 topic,headers,key. 用来验证 value 是否超出长度的是 ensureValidRecordSize(serializedSize); 方法.

ensureValidRecordSize 从两个方面验证, 一个是 maxRequestSize(max.request.size), 另一个是 totalMemorySize(buffer.memory), 只有当 value 的长度同时小于时, 消息才可以正常发送.

private void ensureValidRecordSize(int size) {if (size > this.maxRequestSize)
        throw new RecordTooLargeException("The message is" + size +
                "bytes when serialized which is larger than the maximum request size you have configured with the" +
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                "configuration.");
    if (size > this.totalMemorySize)
        throw new RecordTooLargeException("The message is" + size +
                "bytes when serialized which is larger than the total memory buffer you have configured with the" +
                ProducerConfig.BUFFER_MEMORY_CONFIG +
                "configuration.");
}

单条消息过长或产生如下错误.

这里有个注意的点, 如果只是单纯的发送消息, 没有用 Callback 进行监控或者用 Future 进行获得结果, 在消息过长的情况下, 不会主动发出提示,

使用 Future 接收结果

Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"));
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);

Future 类中 get()方法, @throws ExecutionException 如果计算抛出异常, 该方法将会抛出该异常.

/**
 * Waits if necessary for the computation to complete, and then
 * retrieves its result.
 *
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 */
V get() throws InterruptedException, ExecutionException;

使用 Callback 进行监控

先看 Kafka 专门为回调写的接口.

// 英文注释省略, 总的来说: 用于异步回调, 当消息发送 server 已经被确认之后, 就会调用该方法
// 该方法中的肯定有一个参数不为 null, 如果没有异常产生, 则 metadata 有数据, 如果有异常则相反
public void onCompletion(RecordMetadata metadata, Exception exception);
kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();
        }
    }
});

日志 Level=DEBUG

将日志的消息级别设置为 DEBUG, 也会给标准输出输出该警告信息.

Future 和 Callback 总结

通过上面两种比较, 不难发现 Future 是 Java 并发标准库中, 并不是专门为 kafka 而设计, 需要显示捕获异常, 而 Callback 接口是 kafka 提供标准回调措施, 所以应尽可能采用后者.

服务端接收消息限制

在生产者有一个限制消息的参数, 而在服务端也有限制消息的参数, 该参数就是
message.max.bytes, 默认为 1000012B (大约 1MB), 服务端可以接收不到 1MB 的数据.(在新客户端 producer, 消息总是经过分批 group into batch 的数据, 详情见 RecordBatch 接口).

/**
 * A record batch is a container for records. In old versions of the record format (versions 0 and 1),
 * a batch consisted always of a single record if no compression was enabled, but could contain
 * many records otherwise. Newer versions (magic versions 2 and above) will generally contain many records
 * regardless of compression.
 * 在旧版本不开启消息压缩的情况下, 一个 batch 只包含一条数据
 * 在新版本中总是会包含多条消息, 不会去考虑消息是否压缩
 */
public interface RecordBatch extends Iterable<Record>{...}

设置 Broker 端接收消息大小

修改 broker 端的可以接收的消息大小, 需要在 broker 端 server.properties 文件中添加message.max.bytes=100000. 数值可以修改成自己想要的, 单位是 byte.

生产端消息大于 broker 会发生什么

如果生产者设置的消息发送大小为 1MB, 而 broker 端设置的消息大小为 512KB 会发生什么?
答案就是 broker 会拒绝该消息, 生产者会返回一个RecordTooLargeException. 该消息是不会被消费者消费. 提示的信息为: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

消费者消息的限制

消费者也会进行消息限制, 这里介绍有关三个限制消费的参数

  • fetch.max.bytes 服务端消息合集 (多条) 能返回的大小
  • fetch.min.bytes 服务端最小返回消息的大小
  • fetch.max.wait.ms 最多等待时间

如果 fetch.max.wait.ms 设置的时间到达, 即使可以返回的消息总大小没有满足 fetch.min.bytes 设置的值, 也会进行返回.

fetch.max.bytes 设置过小

如果 fetch.max.bytes 设置过小会发生什么? 会是不满足条件的数据一条都不返回吗? 我们可以根据文档来查看一下.

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress.

英文的大意就是: fetch.max.bytes 表示服务端能返回消息的总大小. 消息是通过分批次返回给消费者. 如果在分区中的第一个消息批次大于这个值, 那么该消息批次依然会返回给消费者, 保证流程运行.

可以得出结论: 消费端的参数只会影响消息读取的大小.

实践 fetch.max.bytes 设置过小

properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024);
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1);
...
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
    System.out.println(records.count());
}

启动消费者, 添加上面三个参数. 指定消息批次最小最大返回的大小以及允许抓取最长的等待时间. 最后将返回的消息总数输出到标准输出.

实验结果: 因为每次发送的消息都要大于 1024B, 所以消费者每个批次只能返回一条数据. 最终会输出 1 …

正文完
 0