乐趣区

关于云计算:如何往-Kafka-发送大消息

默认状况下,Kafka topic 中每条音讯的默认限度为 1MB。这是因为在 Kafka 中,十分大的音讯被认为是低效和反模式的。然而,有时候你可能须要往 Kafka 中发送大音讯。在本文中咱们将钻研在 Kafka 中解决大音讯的两种办法。

选项 1:应用内部存储

将大音讯(例如视频文件)发送到内部存储,在 Kafka 中只保留这些文件的援用,例如文件的 URL。内部存储能够是云存储(例如 Amazon S3),也能够是网络存储(NAS)或者 HDFS 等本地大型文件存储系统。

选项 2:批改 Kafka 音讯大小限度(实用于大于 1MB 小于 10 MB 的音讯)

这里咱们须要批改 broker, consumer, producer 3 个局部的配置,以容许解决更大的音讯。

Broker 服务端

在 broker 端有两种批改最大音讯大小的形式:

    1. message.max.bytes 动态参数在 broker 级别失效,影响所有的 topic,须要批改 server.properties 文件,并重启 Kafka 集群。
    1. max.message.bytes 动静参数在 topic 级别失效,只影响指定的 topic,批改后立刻失效,无需重启 Kafka 集群。

倡议保留 broker 级别最大音讯大小的默认值(1MB),仅在 topic 级别笼罩此设置。

能够在创立 topic 的时候指定动静配置参数,例如创立一个名叫 large-message 的 topic,指定 max.message.bytes 为 10MB。

kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic large-message \
--config max.message.bytes=10485880

也能够在已创立的 topic 上批改该配置参数。

kafka-configs.sh --bootstrap-server localhost:9092 \
                --alter --entity-type topics \
                --entity-name large-message \
                --add-config max.message.bytes=10485880

查看 large-message topic 的动静配置参数。

kafka-configs.sh --bootstrap-server localhost:9092 \ 
--entity-type topics --entity-name large-message --describe

# 返回后果
Dynamic configs for topic large-message are:

# DYNAMIC_TOPIC_CONFIG 是此时失效的配置 10MB,DEFAULT_CONFIG 是默认配置 1M。max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, DEFAULT_CONFIG:message.max.bytes=1048588}

当初咱们曾经批改了 topic 的最大音讯大小,但这还不够,咱们还须要设置 replica.fetch.max.bytes=10485880(默认也是 1MB),以便大音讯能够失常复制到 broker 的正本中。 该参数是动态配置,只能在 server.properties 配置文件中批改,并且须要重启 Kafka 集群能力失效。

如果没有批改 replica.fetch.max.bytes 参数,当往 leader replica 写入大音讯时,follower replica 会因为无奈复制该音讯产生如下报错。

[2022-06-17 09:15:08,717] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from replica [2] on partition large-message-0: PartitionData(fetchOffset=410683670, logStartOffset=395947464, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/large-message-0/00000000000410481778.log.

Consumer 消费者

在 consumer 端须要批改 max.partition.fetch.bytes 参数的值,以便能够生产大音讯,须要确保该值 大于等于 broker 上配置的 message.max.bytes,否则一旦音讯大于max.partition.fetch.bytes 的值,消费者将无奈拉取到这条音讯,从而导致生产进度卡住。

在 CLI 中能够应用能够应用 --consumer-property 参数进行设置。

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic large-message \
    --from-beginning \
    --consumer-property max.partition.fetch.bytes=10485880

在 Java 代码中能够这样设置。

properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "10485880");

如果你应用 Logstash 作为消费者,能够这样设置。须要留神的是,在 Logstash 中 max_partition_fetch_bytes 参数的类型在不同的版本中是不一样的,例如在 7.7 版本中是 STRING 类型,而在 7.8 版本开始变为 NUMBER 类型。

input {
    kafka {
       bootstrap_servers => "localhost:9092"
       topics => ["large-message"]
       max_partition_fetch_bytes => "10485880"  # 设置最大生产音讯大小
    }
}

Producer 生产者

在 producer 端须要批改 max.request.size 参数的值,以便能够发送大音讯,要确保该值 小于等于 broker 上配置的 message.max.bytes

在 CLI 中能够应用能够应用 --consumer-property 参数进行设置。

kafka-console-producer.sh --bootstrap-server localhost:9092 \
    --topic large-message \
    --producer-property max.request.size=10485880

在 Java 代码中能够这样设置。

properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "10485880");

如果应用 Filebeat 作为生产者,能够这样设置。大于 max_message_bytes 的音讯将会被抛弃,不会发送给 Kafka。

output:
  kafka:
    hosts: ["localhost:9092"]
    topic: large-message
    max_message_bytes: 10485880 # 设置最大生产音讯大小

参考资料

  • [1] How to send Large Messages in Apache Kafka: https://www.conduktor.io/kafk…
  • [2] Kafka input plugin: https://www.elastic.co/guide/…`max_partition_fetch_bytes

欢送关注

退出移动版