共计 1470 个字符,预计需要花费 4 分钟才能阅读完成。
kafka 也有生产者和消费者的概念,生产者把音讯发给 kafka,消费者从 kafka 拿音讯进行生产,这个跟之前的 activemq 和 rabbitmq 相似。
咱们从生产者发消息开始,探讨 kafka 的整个流程。这里的版本是0.10.1.0
。
音讯对象
音讯对象 ProducerRecord 蕴含了 topic、partition、key、value、timestamp,其中 topic 和 value 在构建 ProducerRecord 时是必须的。
topic 代表了一个汇合的名称,当音讯发往 kafka 的时候,kafka 须要把这个音讯寄存起来的,此时就须要一个汇合的名称,把同一个汇合的音讯,寄存在一起,这个就类比于 mysql 的表,同一个表的数据,都是寄存在一个表中。
value 是音讯的主体,比方咱们往名称为 topic1 的 topic 发送一个 hello 的音讯时,这个 value 就等于 hello。
partition 代表着分区号,kafka 是分布式系统的,也就是说,一个 topic 的数据,会寄存在多个服务器中,所以就有很多分区,设置 partition 能够指定音讯发完哪个分区。
key 作为音讯的键,能够依据 key 对这个音讯进行分区。
timestamp 指音讯的工夫戳,既能够示意音讯的工夫戳,也能够示意音讯追加到日志文件的工夫。
咱们假设一个音讯体为 hello 的音讯发送给 topic1。
KafkaProducer
音讯构建好后,咱们就须要把音讯收回去,不然构建干嘛,对吧。
负责发消息的是 KafkaProducer,每个 KafkaProducer 都有一个 clientId,如果没有设置,就会默认为 producer-
+ 数字,这个数字是AtomicInteger
类型的,所以是线程平安的,没有设置的话,clientId 就是producer-1
、producer-2
。因为 KafkaProducer 也是线程平安的,所以客户端构建一个 KafkaProducer 用来发送音讯,也是能够的。
发送音讯的时候,咱们是须要晓得服务器(即 broker)地址的,所以实例化 KafkaProducer 的时候,bootstrap.servers
也是须要赋值的,如果有多个的话,用逗号隔开,格局为host1:port1,host2:port2
,这里咱们假如服务器地址为hadoop1:9092
。
拦截器
拦截器并不是必须的,比方对音讯做一些定制化的操作,对 topic 和 value 批改等或者定制回调函数,不过失常状况,个别是不必的。
序列化
在网络传输中,broker 是以以字节数组(byte[])的模式进行接管的,所以在发送的时候,须要事后把音讯的 key 和 value 进行序列化。
在初始化 KafkaProducer 的时候,设置 key.serializer
和value.serialize
来制订 key 和 value 的序列化器。
分区器
Kafka 是一个分布式系统,每个 topic 都对应着多个 broker,所以咱们在发送音讯之前,须要对音讯调配他所在的分区。
音讯累加器
Kafka 并不是一条一条的音讯进行发送,而且保留在音讯累加器这个缓存中,而后再批次发送,这样能够缩小网络传输的资源耗费,进而晋升性能。
Sender 线程
音讯保留在内存后,Sender 线程就会把符合条件的音讯依照批次进行发送。除了发送音讯,元数据的加载也是通过 Sender 线程来解决的。
Sneder 线程发送音讯以及接管音讯,都是基于 java NIO 的 Selector。通过 Selector 把音讯收回去,并通过 Selector 接管音讯。
前面咱们缓缓对本章的内容进行开展。