关于kafka:如何开发一个完善的-Kafka-生产者客户端

3次阅读

共计 3127 个字符,预计需要花费 8 分钟才能阅读完成。

Kafka 起初是 由 LinkedIn 公司采纳 Scala 语言开发的一个多分区、多正本且基于 ZooKeeper 协调的分布式音讯零碎,现已被募捐给 Apache 基金会。目前 Kafka 曾经定位为一个分布式流式解决平台,它以高吞吐、可长久化、可程度扩大、反对流数据处理等多种个性而被宽泛应用。目前越来越多的开源分布式解决零碎如 Cloudera、Storm、Spark、Flink 等都反对与 Kafka 集成。

Kafka 之所以受到越来越多的青眼,与它所“表演”的三大角色是分不开的:

  • 音讯零碎:Kafka 和传统的音讯零碎(也称作消息中间件)都具备零碎解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等性能。与此同时,Kafka 还提供了大多数音讯零碎难以实现的音讯程序性保障及回溯生产的性能。
  • 存储系统:Kafka 把音讯长久化到磁盘,相比于其余基于内存存储的零碎而言,无效地升高了数据失落的危险。也正是得益于 Kafka 的音讯长久化性能和多正本机制,咱们能够把 Kafka 作为长期的数据存储系统来应用,只须要把对应的数据保留策略设置为“永恒”或启用主题的日志压缩性能即可。
  • 流式解决平台:Kafka 不仅为每个风行的流式解决框架提供了牢靠的数据起源,还提供了一个残缺的流式解决类库,比方窗口、连贯、变换和聚合等各类操作。

1|0 基本概念

一个典型的 Kafka 体系架构包含若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的治理、控制器的选举等操作的。Producer 将音讯发送到 Broker,Broker 负责将收到的音讯存储到磁盘中,而 Consumer 负责从 Broker 订阅并生产音讯。

整个 Kafka 体系结构中引入了以下 3 个术语:

  • Producer:生产者,也就是发送音讯的一方。生产者负责创立音讯,而后将其投递到 Kafka 中。
  • Consumer:消费者,也就是接管音讯的一方。消费者连贯到 Kafka 上并接管音讯,进而进行相应的业务逻辑解决。
  • Broker:服务代理节点。对于 Kafka 而言,Broker 能够简略地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数状况下也能够将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。一般而言,咱们更习惯应用首字母小写的 broker 来示意服务代理节点。

在 Kafka 中还有两个特地重要的概念—主题(Topic)与分区(Partition)。Kafka 中的音讯以主题为单位进行归类,生产者负责将音讯发送到特定的主题(发送到 Kafka 集群中的每一条音讯都要指定一个主题),而消费者负责订阅主题并进行生产。

2|0 客户端开发

一个失常的生产逻辑须要具备以下几个步骤:

  1. 配置生产者客户端参数及创立相应的生产者实例。
  2. 构建待发送的音讯。
  3. 发送音讯。
  4. 敞开生产者实例。

其中构建的音讯对象 ProducerRecord,它并不是单纯意义上的音讯,它蕴含了多个属性,本来须要发送的与业务相干的音讯体只是其中的一个 value 属性,比方“Hello, Kafka!”只是 ProducerRecord 对象中的一个属性。ProducerRecord 类的定义如下(只截取成员变量)

其中 topic 和 partition 字段别离代表音讯要发往的主题和分区号。headers 字段是音讯的头部,Kafka 0.11.x 版本才引入这个属性,它大多用来设定一些与利用相干的信息,如无须要也能够不必设置。key 是用来指定音讯的键,它不仅是音讯的附加信息,还能够用来计算分区号进而能够让音讯发往特定的分区。后面提及音讯以主题为单位进行归类,而这个 key 能够让音讯再进行二次归类,同一个 key 的音讯会被划分到同一个分区中。

3|0 必要参数设置

在创立真正的生产者实例前须要配置相应的参数,比方须要连贯的 Kafka 集群地址。参考在下面客户端代码中的 initConfig() 办法,在 Kafka 生产者客户端 KafkaProducer 中有 3 个参数是必填的。

  • bootstrap.servers:该参数用来指定生产者客户端连贯 Kafka 集群所需的 broker 地址清单,具体的内容格局为 host1:port1,host2:port2,能够设置一个或多个地址,两头以逗号隔开,此参数的默认值为“”。留神这里并非须要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其余 broker 的信息。不过倡议至多要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者依然能够连贯到 Kafka 集群上。
  • key.serializer 和 value.serializer:broker 端接管的音讯必须以字节数组(byte[])的模式存在。代码清单 3 - 1 中生产者应用的 KafkaProducer<String, String> 和 ProducerRecord<String, String> 中的泛型 <String, String> 对应的就是音讯中 key 和 value 的类型,生产者客户端应用这种形式能够让代码具备良好的可读性,不过在发往 broker 之前须要将音讯中对应的 key 和 value 做相应的序列化操作来转换成字节数组。key.serializer 和 value.serializer 这两个参数别离用来指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。

在下面的客户端开发中代码中 initConfig() 办法里还设置了一个参数 client.id,这个参数用来设定 KafkaProducer 对应的客户端 id,默认值为“”。如果客户端不设置,则 KafkaProducer 会主动生成一个非空字符串,内容模式如“producer-1”、“producer-2”,即字符串“producer-”与数字的拼接。

KafkaProducer 中的参数泛滥,远非示例 initConfig() 办法中的那样只有 4 个,开发人员能够依据业务利用的理论需要来批改这些参数的默认值,以达到灵便调配的目标。个别状况下,一般开发人员无奈记住所有的参数名称,只能有个大抵的印象。

在理论应用过程中,如“key.serializer”、“max.request.size”、“interceptor.classes”之类的字符串常常因为人为因素而书写谬误。为此,咱们能够间接应用客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来做肯定水平上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称,以代码清单 3 - 1 中的 initConfig() 办法为例,引入 ProducerConfig 后的批改后果如下:

留神到下面的代码中 key.serializer 和 value.serializer 参数对应类的全限定名比拟长,也比拟容易写错,这里通过 Java 中的技巧来做进一步的改良,相干代码如下:

如此代码便简洁了许多,同时进一步升高了人为出错的可能性。在配置完参数之后,咱们就能够应用它来创立一个生产者实例,示例如下:

KafkaProducer 是线程平安的,能够在多个线程中共享单个 KafkaProducer 实例,也能够将 KafkaProducer 实例进行池化来供其余线程调用。

KafkaProducer 中有多个构造方法,比方在创立 KafkaProducer 实例时并没有设定 key.serializer 和 value.serializer 这两个配置参数,那么就须要在构造方法中增加对应的序列化器,示例如下:

正文完
 0