关于kafka:Kafka-Producer-异步发送消息居然也会阻塞

Kafka 始终以来都以高吞吐量的个性而妇孺皆知,就在上周,在一个性能监控我的项目中,须要应用到 Kafka 传输海量音讯,在这过程中遇到了一个 Kafka Producer 异步发送音讯会被阻塞的问题,导致生产端发送耗时很大。 是的,你没听错,Kafka Producer 异步发送音讯也会产生阻塞景象,那到底是怎么回事呢? 在新版的 Kafka Producer 中,设计了一个音讯缓冲池,客户端发送的音讯都会被存储到缓冲池中,同时 Producer 启动后还会开启一个 Sender 线程,一直地从缓冲池获取音讯并将其发送到 Broker,如下图所示: 这么看来,Kafka 的所有发送,都能够看作是异步发送了,因而在新版的 Kafka Producer 中废除掉异步发送的办法了,仅保留了一个 send 办法,同时返回一个 Futrue 对象,须要同步期待发送后果,就应用 Futrue#get 办法阻塞获取发送后果。而我在我的项目中间接调用 send 办法,为何还会发送阻塞呢? 咱们在构建 Kafka Producer 时,会有一个自定义缓冲池大小的参数 buffer.memory,默认大小为 32M,因而缓冲池的大小是有限度的,咱们无妨想一下,缓冲池内存资源耗尽了会怎么样? Kafka 源码的正文是十分具体的,RecordAccumulator 类是 Kafka Producer 缓冲池的外围类,而 RecordAccumulator 类就有那么一段正文: The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.大略的意思是: ...

September 13, 2020 · 2 min · jiezi

关于kafka:Kafka详细学习资料

Kafka一. 消息中间件的益处1.解耦 容许你独立的扩大或批改两边的处理过程,只有确保它们恪守同样的接口束缚。如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。 2.异步 很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。 3.灵活性/削峰 在访问量剧增的状况下,利用依然须要持续发挥作用,然而这样的突发流量并不常见。如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。 4.可恢复性 零碎的一部分组件生效时,不会影响到整个零碎。音讯队列升高了过程间的耦合度,所以即便一个解决音讯的过程挂掉,退出队列中的音讯依然能够在零碎复原后被解决。 5.缓冲 有助于管制和优化数据流通过零碎的速度,解决生产音讯和生产音讯的处理速度不统一的状况。 二. 音讯队列通信的模式1.点对点模式(一对一,消费者被动拉取数据,音讯收到后音讯革除) 2.公布订阅模式(一对多,消费者生产数据之后不会革除音讯)kafka个别应用的是生产方拉取,会始终轮询,浪费资源。(能够设置一个等待时间,在没有拉取到信息的时候,会期待设置的工夫) 三. KafkaKafka是由Apache软件基金会开发的一个开源流解决平台,由Scala(一品种java语言)和Java编写。Kafka是一种高吞吐量的分布式的基于公布/订阅模式的音讯队列,次要利用于大数据实时处理畛域(spark实时剖析框架)。 1.Kafka的个性高吞吐量、低提早:kafka每秒能够解决几十万条音讯,它的提早最低只有几毫秒,每个topic能够分多个partition, consumer group 对partition进行consume操作。可扩展性:kafka集群反对热扩大持久性、可靠性:音讯被长久化到本地磁盘,并且反对数据备份避免数据失落容错性:容许集群中节点失败(若正本数量为n,则容许n-1个节点失败)高并发:反对数千个客户端同时读写2.Kafka的根本架构 1)Producer :音讯生产者,就是向 kafka broker 发消息的客户端; 2)Consumer :音讯消费者,向 kafka broker 取音讯的客户端; 3)Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责生产不同分区的数据,一个分区只能由一个组内消费者生产;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 4)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker能够包容多个 topic。 5)Topic :能够了解为一个队列,生产者和消费者面向的都是一个 topic; 6)Partition:为了实现扩展性,一个十分大的 topic 能够散布到多个 broker(即服务器)上,一个 topic 能够分为多个 partition,每个 partition 是一个有序的队列; 7)Replica:正本,为保障集群中的某个节点产生故障时,该节点上的 partition 数据不失落,且 kafka 依然可能持续工作,kafka 提供了正本机制,一个 topic 的每个分区都有若干个正本,一个 leader 和若干个 follower。 8)leader:每个分区多个正本的“主”,生产者发送数据的对象,以及消费者生产数据的对象都是 leader。 9)follower:每个分区多个正本中的“从”,实时从 leader 中同步数据,放弃和 leader 数据的同步。leader 产生故障时,某个 follower 会成为新的 follower ...

September 1, 2020 · 10 min · jiezi

关于kafka:Kafka和RocketMQ底层存储之那些你不知道的事

大家好,我是yes。 咱们都晓得 RocketMQ 和 Kafka 音讯都是存在磁盘中的,那为什么音讯存磁盘读写还能够这么快?有没有做了什么优化?都是存磁盘它们两者的实现之间有什么区别么?各自有什么优缺点? 明天咱们就来一探到底。 存储介质-磁盘一般而言消息中间件的音讯都存储在本地文件中,因为从效率来看间接放本地文件是最快的,并且稳定性最高。毕竟要是放相似数据库等第三方存储中的话,就多一个依赖少一份平安,并且还有网络的开销。 那对于将音讯存入磁盘文件来说一个流程的瓶颈就是磁盘的写入和读取。咱们晓得磁盘相对而言读写速度较慢,那通过磁盘作为存储介质如何实现高吞吐呢? 程序读写答案就是程序读写。 首先理解一下页缓存,页缓存是操作系统用来作为磁盘的一种缓存,缩小磁盘的I/O操作。 在写入磁盘的时候其实是写入页缓存中,使得对磁盘的写入变成对内存的写入。写入的页变成脏页,而后操作系统会在适合的时候将脏页写入磁盘中。 在读取的时候如果页缓存命中则间接返回,如果页缓存 miss 则产生缺页中断,从磁盘加载数据至页缓存中,而后返回数据。 并且在读的时候会预读,依据局部性原理当读取的时候会把相邻的磁盘块读入页缓存中。在写入的时候会后写,写入的也是页缓存,这样存着能够将一些小的写入操作合并成大的写入,而后再刷盘。 而且依据磁盘的结构,程序 I/O 的时候,磁头简直不必换道,或者换道的工夫很短。 依据网上的一些测试后果,程序写盘的速度比随机写内存还要快。 当然这样的写入存在数据失落的危险,例如机器忽然断电,那些还未刷盘的脏页就失落了。不过能够调用 fsync 强制刷盘,然而这样对于性能的损耗较大。 因而个别倡议通过多正本机制来保障音讯的牢靠,而不是同步刷盘。 能够看到程序 I/O 适应磁盘的结构,并且还有预读和后写。 RocketMQ 和 Kafka 都是程序写入和近似程序读取。它们都采纳文件追加的形式来写入音讯,只能在日志文件尾部写入新的音讯,老的音讯无奈更改。 mmap-文件内存映射从下面可知拜访磁盘文件会将数据加载到页缓存中,然而页缓存属于内核空间,用户空间拜访不了,因而数据还须要拷贝到用户空间缓冲区。 能够看到数据须要从页缓存再通过一次拷贝程序能力拜访的到,因而还能够通过mmap来做一波优化,利用内存映射文件来防止拷贝。 简略的说文件映射就是将程序虚构页面间接映射到页缓存上,这样就无需有内核态再往用户态的拷贝,而且也防止了反复数据的产生。并且也不用再通过调用read或write办法对文件进行读写,能够通过映射地址加偏移量的形式间接操作。 sendfile-零拷贝既然音讯是存在磁盘中的,那消费者来拉音讯的时候就得从磁盘拿。咱们先来看看个别发送文件的流程是如何的。 简略说下DMA是什么,全称 Direct Memory Access ,它能够独立地间接读写零碎内存,不须要 CPU 染指,像显卡、网卡之类都会用DMA。 能够看到数据其实是冗余的,那咱们来看看mmap之后的发送文件流程是怎么的。 能够看到上下文切换的次数没有变动,然而数据少拷贝一份,这和咱们上文提到的mmap能达到的成果是一样的。 然而数据还是冗余了一份,这不是能够间接把数据从页缓存拷贝到网卡不就好了嘛?sendfile就有这个效用。咱们先来看看Linux2.1版本中的sendfile。 因为就一个零碎调用就满足了发送的需要,相比 read + write 或者 mmap + write 上下文切换必定是少了的,然而如同数据还是有冗余啊。是的,因而 Linux2.4 版本的 sendfile + 带 「扩散-收集(Scatter-gather)」的DMA。实现了真正的无冗余。 ...

August 26, 2020 · 2 min · jiezi

关于kafka:kafka集群安装部署

3、 kafka集群装置部署3.1、具体部署过程1、下载安装包(http://kafka.apache.org) kafka_2.11-1.1.0.tgz2、布局装置目录 /wangyq/install3、上传安装包到node01服务器,并解压 # 通过FTP工具上传安装包到node01服务器的/wangyq/soft门路下,而后进行解压cd /wangyq/soft/tar -zxf kafka_2.11-1.1.0.tgz -C /wangyq/install/4、批改配置文件 在node01上批改kafak对应的配置文件 server.properties 进入到kafka装置目录下有一个config目录,批改配置文件 cd /wangyq/install/kafka_2.11-1.1.0/config vim server.properties#指定kafka对应的broker id ,惟一broker.id=0#指定数据寄存的目录log.dirs=/wangyq/install/kafka_2.11-1.1.0/logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否能够删除topic ,默认是false 示意不能够删除delete.topic.enable=true#指定broker主机名host.name=node01```5、node01执行以下命令散发kafka装置目录到其余节点 # 由node01节点同步其余正本节点中cd /wangyq/install/scp -r kafka_2.11-1.1.0/ node02:$PWDscp -r kafka_2.11-1.1.0/ node03:$PWD6、批改node02和node03上的配置 node02执行以下命令进行批改配置 cd /wangyq/install/kafka_2.11-1.1.0/config/vi server.properties#指定kafka对应的broker id ,惟一broker.id=1#指定数据寄存的目录log.dirs=/wangyq/install/kafka_2.11-1.1.0/logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否能够删除topic ,默认是false 示意不能够删除delete.topic.enable=true#指定broker主机名host.name=node02node03执行以下命令进行批改配置 cd /wangyq/install/kafka_2.11-1.1.0/config/vi server.properties#指定kafka对应的broker id ,惟一broker.id=2#指定数据寄存的目录log.dirs=/wangyq/install/kafka_2.11-1.1.0/logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否能够删除topic ,默认是false 示意不能够删除delete.topic.enable=true#指定broker主机名host.name=node033.1、 kafka集群启动和进行3.1.1、 启动先启动zk集群而后在所有节点执行脚本 cd /wangyq/install/kafka_2.11-1.1.0/nohup bin/kafka-server-start.sh config/server.properties 2>&1 & 一键启动kafka start_kafka.sh #!/bin/shfor host in node01 node02 node03do ssh $host "source /etc/profile;nohup /wangyq/install/kafka_2.11-1.1.0/bin/kafka-server-start.sh /wangyq/install/kafka_2.11-1.1.0/config/server.properties >/dev/null 2>&1 &" echo "$host kafka is running"done3.2.1、 进行所有节点执行敞开kafka脚本 ...

August 26, 2020 · 1 min · jiezi

关于kafka:kafka全面理解

kafka全面了解什么是音讯队列,它的益处是什么?解藕将音讯写入音讯队列,须要音讯的零碎本人从音讯队列中订阅,从而上游零碎不须要做任何批改 例如有上游零碎a,它有3个上游零碎b,c,d,为了使b,c,d能拿到a的数据,a须要在代码中去调用这3个零碎。如果有一天,b不再应用a的数据了或b的接口产生了变动,a还须要批改代码。而应用音讯队列,就只管往队列里发送数据,须要的上游本人去队列里取数据即可。异步不必同步期待上游将数据处理完,将音讯发到音讯队列中即可返回,不妨碍主流程。 例如上游零碎a是主业务,b,c,d是非次要业务,没有必要同步期待3个上游都返回主业务才持续。应用音讯队列能够实现异步,进步吞吐量。削峰上游数据有突发流量,上游可能扛不住,kafka在两头能够起到一个缓冲的作用,把音讯暂存在kafka中,上游服务就能够依照本人的节奏缓缓解决。 kafka概念brokerbroker是kafka实例。 replication每一个partition有多正本,当主节点产生故障时,会抉择一个正本作为主节点。kafka是主写主读的。 topictopic是音讯的分类,一个topic能够供任意多个生产组生产。 partitiontopic的分区,每个topic的数据能够被分成多个partition,能够不在一个机器上,由此来实现kafka的伸缩性。各个partition的数据是不反复的,雷同partition的数据是依照发送程序有序的。任何partition只有一个leader,只有leader是对外提供服务的。leader接管到数据后,follower会不停给他发送申请尝试去拉取最新的数据,拉取到本人本地后,写入磁盘中。每个partition都有多个正本,雷同partition的各个正本散布在不同的broker上。 consumer group/consumer一个consumer group是一个topic的订阅者,一个topic能够被多个consumer group订阅,各个consumer group是互相独立的。一个consumer group外部能够有多个consumer,多个consumer不会生产雷同的partition的音讯。最多无效的consumer数与partition数雷同,如果consumer数多于partition数,那么多进去的consumer不会生产到任何音讯。 rebalance生产组内某个消费者挂掉后,其余消费者主动重新分配订阅topic的partition的过程。rebalance是消费者端实现高可用的重要伎俩。 kafka的个性高吞吐、低提早:kakfa 最大的特点就是收发音讯十分快,kafka 每秒能够解决几十万条音讯,它的最低提早只有几毫秒。高伸缩性: 每个主题(topic) 蕴含多个分区(partition),主题中的分区能够散布在不同的主机(broker)中。持久性、可靠性: Kafka 可能容许数据的长久化存储,音讯被长久化到磁盘,并反对数据备份避免数据失落,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 咱们晓得它的数据可能长久存储。容错性: 容许集群中的节点失败,某个节点宕机,Kafka 集群可能失常工作高并发: 反对数千个客户端同时读写kafka为何快页缓存+程序写入kafka 写数据的时候,十分要害的一点,它是以磁盘程序写的形式来写的。仅仅将数据追加到文件的开端,不是在文件的随机地位来批改数据。写入磁盘文件的时候,能够间接写入这个 OS Cache 里,也就是仅仅写入内存中,接下来由操作系统本人决定什么时候把 OS Cache 里的数据真的刷入磁盘文件中。 零拷贝如果Kafka从磁盘中读取数据发送给上游的消费者,大略过程是: 先看看要读的数据在不在os cache中,如果不在的话就从磁盘文件里读取数据后放入os cache从操作系统的os cache 里拷贝数据到应用程序过程的缓存里从应用程序过程的缓存里拷贝数据到操作系统层面的Socket缓存里从Soket缓存里提取数据后发送到网卡,最初发送进来给上游消费者整个过程有两次没必要的拷贝 从操作系统的cache里拷贝到利用过程的缓存里从应用程序缓存里拷贝回操作系统的Socket缓存里。为了进行这两次拷贝,两头还产生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。 所以这种形式来读取数据是比拟耗费性能的 零拷贝 让操作系统的cache中的数据发送到网卡网卡传出给上游的消费者两头跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过来,不会拷贝数据到Socket缓存 另外:从磁盘读数据的时候,会先看看os cache内存中是否有,如果有的话,其实读数据都是间接读内存的。 如果kafka集群通过良好的调优,大家会发现大量的数据都是间接写入os cache中,而后读数据的时候也是从os cache中读。 相当于是Kafka齐全基于内存提供数据的写和读了,所以这个整体性能会极其的高 消息压缩批量发送参考文章https://juejin.im/post/684490...https://juejin.im/post/684490...https://zhuanlan.zhihu.com/p/...

August 25, 2020 · 1 min · jiezi

关于kafka:Linux-Page-Cache调优在Kafka中的应用

本文首发于 vivo互联网技术 微信公众号  链接: https://mp.weixin.qq.com/s/MaeXn-kmgLUah78brglFkg 作者:Yang Yijun本文次要形容Linux Page Cache优化的背景、Page Cache的基本概念、列举之前针对Kafka的 IO 性能瓶颈采取的一些解决方案、如何进行Page Cache相干参数调整以及性能优化前后成果比照。 一、优化背景当业务快速增长,每天须要解决万亿记录级数据量时。在读写数据方面,Kafka 集群的压力将变得微小,而磁盘 IO 成为了 Kafka 集群最大的性能瓶颈。 当呈现入流量突增或者出流量突增状况,磁盘 IO 继续处于被打满状态,导致无奈解决新的读写申请,甚至造成局部broker节点雪崩而影响集群的稳固。 如下图所示,磁盘 IO 被继续打满: 这重大的影响了集群的稳固,从而影响业务的稳固运行。对此,咱们做出了一些针对性的优化计划: 对Linux操作系统的Page Cache参数进行优化;【本文次要解说内容】对kafka集群用户的出入流量进行限度,防止出入流量突增给磁盘IO带来的压力;【本文对此计划不做解说】按业务对集群进行资源组隔离(集群broker的物理隔离),防止不同业务间因为共享磁盘IO相互影响;【本文对此计划不做解说】对Kafka集群broker节点服务参数进行优化;【本文对此计划不做解说】革新Kafka正本迁徙源码,实现增量并发正本迁徙,缩小正本迁徙给集群broker节点磁盘IO带来的压力;【本文对此计划不做解说】开发一套Kafka集群主动负载平衡服务,定期对集群进行负载平衡;【本文对此计划不做解说】采纳IO性能更好的SSD固态硬盘替换一般的机械硬盘;进行磁盘RAID让broker外部多块磁盘间IO负载更加平衡【本文对此计划不做解说】革新Kafka源码,对Kafka集群单个broker及单个topic进行出入流量限度,实现流量对最细粒度管制;当单个broker流量突增时能够对其进行下限限度,防止节点被异样流量打挂;【本文对此计划不做解说】革新Kafka源码,修复正本迁徙工作启动后不可手动终止的缺点,实现当因迁徙导致负载过高却无奈进行的问题;【本文对此计划不做解说】机房网络带宽的竞争也将间接的影响到follower同步leader的数据,最终将导致follower同步拉取历史数据而减少IO负载,因而须要对网络带宽进行优先级打标,当有竞争时进步Kafka集群的优先级,防止kafka集群的broker和其余大量耗费网络带宽的业务共用机房交换机。【本文对此计划不做解说】以上只是列举了几点次要的优化计划,还有一些其余的内容这里不再赘述。本文咱们次要来解说一下 Linux操作系统的Page Cache参数调优。 二、基本概念1、什么是Page Cache?Page Cache是针对文件系统的缓存,通过将磁盘中的文件数据缓存到内存中,从而缩小磁盘I/O操作进步性能。 对磁盘的数据进行缓存从而进步性能次要是基于两个因素: 磁盘拜访的速度比内存慢好几个数量级(毫秒和纳秒的差距);被拜访过的数据,有很大概率会被再次拜访。文件读写流程如下所示: 2、读Cache当内核发动一个读申请时(例如过程发动read()申请),首先会查看申请的数据是否缓存到了Page Cache中。 如果有,那么间接从内存中读取,不须要拜访磁盘,这被称为cache命中(cache hit); 如果cache中没有申请的数据,即cache未命中(cache miss),就必须从磁盘中读取数据。而后内核将读取的数据缓存到cache中,这样后续的读申请就能够命中cache了。 page能够只缓存一个文件局部的内容,不须要把整个文件都缓存进来。 3、写Cache当内核发动一个写申请时(例如过程发动write()申请),同样是间接往cache中写入,后备存储中的内容不会间接更新(当服务器呈现断电关机时,存在数据失落危险)。 内核会将被写入的page标记为dirty,并将其退出dirty list中。内核会周期性地将dirty list中的page写回到磁盘上,从而使磁盘上的数据和内存中缓存的数据统一。 当满足以下两个条件之一将触发脏数据刷新到磁盘操作: 数据存在的工夫超过了dirty_expire_centisecs(默认300厘秒,即30秒)工夫;脏数据所占内存 > dirty_background_ratio,也就是说当脏数据所占用的内存占总内存的比例超过dirty_background_ratio(默认10,即零碎内存的10%)的时候会触发pdflush刷新脏数据。4、Page Cache缓存查看工具咱们如何查看缓存命中率呢?在这里咱们能够借助一个缓存命中率查看工具 cachestat。 (1)下载安装mkdir /opt/bigdata/app/cachestatcd /opt/bigdata/app/cachestatgit clone --depth 1 https://github.com/brendangregg/perf-tools(2)启动执行 (3)输入内容阐明 5、如何回收Page Cache执行脚本:echo 1 > /proc/sys/vm/drop_caches 这里可能须要期待一会,因为有应用程序正在写数据。 ...

August 24, 2020 · 1 min · jiezi

关于kafka:kafka-系列-41消费者基本介绍

1、消费者食用DEMOProperties prop = new Properties();prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton("test"));while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); String value = record.value(); System.err.println(record.toString()); }}2、消费者基本概念kafka 消费者是以 组为根本单位 进行生产的。生产的模型如下 1 个 topic 容许被多个 生产组 生产。再次强调,kafka 生产是以组为单位。 prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");以上这行代码设置了生产组。 2.1、partition 调配topic 为逻辑上的概念,partition 才是物理上的概念。那么看完这个以上的生产模型图。你可能会很纳闷。当一个组下有多个消费者时,每个消费者是如何生产的? 先阐明:partition 的调配为平均分配 假如一:topic1 上面有 3 个分区。别离如下:p1 - p3。那么 groupA 下的三个消费者生产的对应 partition 为如下 ...

August 20, 2020 · 3 min · jiezi

关于kafka:Kafka260发布性能大幅提升

近日Kafka2.6版本公布,间隔2.5.0公布只过来了不到四个月的工夫。 Kafka 2.6.0蕴含许多重要的新性能。以下是一些重要更改的摘要: 默认状况下,已为Java 11或更高版本启用TLSv1.3性能显着进步,尤其是当代理具备大量分区时扩大Kafka Streams的应用程序更便捷Kafka Streams反对更改时收回新的metrics可提供更好的经营洞察力配置为进行连贯时,Kafka Connect能够主动创立Topic改良了Kafka Connect中接收器连接器的错误报告选项Kafka Connect中的新过滤器和有条件地利用SMT“ client.dns.lookup”配置的默认值当初为“ use_all_dns_ips”。将Zookeeper降级到3.5.8新性能增加KStream#repartition操作使SSL上下文/引擎配置可扩大默认状况下启用TLSv1.3,并禁用某些较旧的协定有条件地利用SMT向流指标增加工作级流动过程比率重构主循环以一次解决一个工作的多个记录改善加强了TransformerSupplier / ProcessorSupplier清理工作治理将“ onAssignment”流与“ partitionsAssigned”工作创立合并公开磁盘读写指标容许消费者明确触发从新均衡将gradle更新为6.0+反对Java 14将默认版本切换到Scala 2.13-改良“ matchingAcls”的性能控制台生产者反对client.id的设置降级指南:如果要从2.1.x之前的版本升级,请参阅以下正文,以理解用于存储使用者偏移量的架构的更改。将inter.broker.protocol.version更改为最新版本后,将无奈降级到2.1之前的版本。 对于滚动降级: 在所有代理上更新server.properties并增加以下属性。CURRENT_KAFKA_VERSION指的是您要降级的版本。CURRENT_MESSAGE_FORMAT_VERSION是指以后应用的音讯格局版本。如果以前笼罩了音讯格局版本,则应保留其以后值。或者,如果要从0.11.0.x之前的版本升级,则应将CURRENT_MESSAGE_FORMAT_VERSION设置为与CURRENT_KAFKA_VERSION相匹配。 inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如2.5,2.4等)log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION如果要从0.11.0.x或更高版本升级,并且尚未笼罩音讯格局,则只须要笼罩代理间协定版本。 inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如2.5,2.4等)一次降级一个代理:敞开代理,更新代码,而后重新启动。实现此操作后,代理将运行最新版本,并且您能够验证集群的行为和性能是否合乎预期。如果有任何问题,此时依然能够降级。验证集群的行为和性能后,请通过编辑协定版本inter.broker.protocol.version并将其设置为来更改协定版本 2.6。逐个重新启动代理,以使新协定版本失效。代理开始应用最新的协定版本后,将无奈再将群集降级到较旧的版本。如果您已依照上述阐明笼罩了音讯格局版本,则须要再次滚动重启以将其降级到最新版本。一旦所有(或大多数)使用者都降级到0.11.0或更高版本,则在每个代理上将log.message.format.version更改为2.6,而后逐个重新启动它们。请留神,不再保护的较旧的Scala客户端不反对0.11中引入的音讯格局,为防止转换老本必须应用较新的Java客户端。2.6.0留神点Kafka Streams增加了一种新的解决模式(须要Broker 2.5或更高版本),该模式应用齐全一次的保障进步了应用程序的可伸缩性。 缺省状况下,Java 11或更高版本已启用TLSv1.3。如果客户端和服务器均反对TLSv1.3,则将协商该协定,否则将回退至TLSv1.2。 缺省状况下,Java 11或更高版本已启用TLSv1.3。如果客户端和服务器均反对TLSv1.3,则将协商该协定,否则将回退至TLSv1.2。 NotLeaderForPartitionException已弃用,并已替换为NotLeaderOrFollowerException。如果代理不是正本,则获取申请和仅用于领导者或跟随者的其余申请将返回NOT_LEADER_OR_FOLLOWER(6)而不是REPLICA_NOT_AVAILABLE(9),以确保重新分配期间的此临时谬误由所有客户端作为可重试的异样进行解决。 更多Flink,Kafka,Spark等相干技术博文,科技资讯,欢送关注实时流式计算 公众号后盾回复 “电子书” 下载300页Flink实战电子书

August 11, 2020 · 1 min · jiezi

关于kafka:消息队列的消费幂等性如何保证

什么是幂等?任意屡次执行所产生的影响均与一次执行的影响雷同就能够称为幂等 什么是音讯幂等?当呈现消费者对某条音讯反复生产的状况时,反复生产的后果与生产一次的后果是雷同的,并且屡次生产并未对业务零碎产生任何负面影响 为什么咱们要保障幂等性,不保障幂等性,会不会有问题?这个问题其实没法精确答复。答复这个问题的本源得从业务场景上进行剖析。比方失常业务状况下,咱们是不容许同个订单反复领取,这种业务场景咱们就须要确保幂等性。再比方日志记录,这种业务场景,咱们可能就不须要做幂等判断。 因而是否要保障幂等性,得基于业务进行考量 音讯队列的生产幂等性如何保障?没法保障。后面说了要保障幂等性,得基于业务场景进行考量。音讯队列他自身就不是给你用来做业务幂等性用的。如果你要实现业务幂等性,靠音讯队列是没法帮你实现的,你本人得依据本身业务场景,来实现幂等。 罕用的业务幂等性保障办法1、利用数据库的惟一束缚实现幂等比方将订单表中的订单编号设置为惟一索引,创立订单时,依据订单编号就能够保障幂等 2、去重表这个计划实质也是依据数据库的唯一性束缚来实现。其实现大体思路是:首先在去重表上建惟一索引,其次操作时把业务表和去重表放在同个本地事务中,如果呈现重现反复生产,数据库会抛惟一束缚异样,操作就会回滚 3、利用redis的原子性每次操作都间接set到redis外面,而后将redis数据定时同步到数据库中 4、多版本(乐观锁)管制此计划多用于更新的场景下。其实现的大体思路是:给业务数据减少一个版本号属性,每次更新数据前,比拟以后数据的版本号是否和音讯中的版本统一,如果不统一则回绝更新数据,更新数据的同时将版本号+1 5、状态机机制此计划多用于更新且业务场景存在多种状态流转的场景 6、token机制生产者发送每条数据的时候,减少一个全局惟一的id,这个id通常是业务的惟一标识,比方订单编号。在生产端生产时,则验证该id是否被生产过,如果还没生产过,则进行业务解决。解决完结后,在把该id存入redis,同时设置状态为已生产。如果曾经生产过了,则不进行解决。 演示例子应用springboot2加kafka来演示一下应用token机制如何实现生产端幂等 1、application.ymlspring: redis: host: localhost port: 6379 # 连贯超时工夫(毫秒) timeout: 10000 jedis: pool: # 连接池中的最大闲暇连贯 max-idle: 8 # 连接池中的最小闲暇连贯 min-idle: 10 # 连接池最大连接数(应用负值示意没有限度) max-active: 100 # 连接池最大阻塞等待时间(应用负值示意没有限度) max-wait: -1 password: kafka: # 以逗号分隔的地址列表,用于建设与Kafka集群的初始连贯(kafka 默认的端口号为9092) bootstrap-servers: localhost:9092 producer: # 产生谬误后,音讯重发的次数。 retries: 0 #当有多个音讯须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次能够应用的内存大小,依照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化形式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化形式 value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer # acks=0 : 生产者在胜利写入音讯之前不会期待任何来自服务器的响应。 # acks=1 : 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。 # acks=all :只有当所有参加复制的节点全副收到音讯时,生产者才会收到一个来自服务器的胜利响应。 acks: 1 consumer: # 主动提交的工夫距离 在spring boot 2.X 版本中这里采纳的是值的类型为Duration 须要合乎特定的格局,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下该作何解决: # latest(默认值)在偏移量有效的状况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量有效的状况下,消费者将从起始地位读取分区的记录 auto-offset-reset: earliest # 是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量 enable-auto-commit: false # 键的反序列化形式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化形式 value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 1 #listner负责ack,每调用一次,就立刻commit ack-mode: manual_immediate2、实现kafka的自定义序列和反序列注:kakfa默认的序列化和反序列形式是StringSerializer和StringDeserializer。咱们要革新成反对对象的序列化和反序列化 ...

August 9, 2020 · 2 min · jiezi

关于kafka:消息队列的消费幂等性如何保证

什么是幂等?任意屡次执行所产生的影响均与一次执行的影响雷同就能够称为幂等 什么是音讯幂等?当呈现消费者对某条音讯反复生产的状况时,反复生产的后果与生产一次的后果是雷同的,并且屡次生产并未对业务零碎产生任何负面影响 为什么咱们要保障幂等性,不保障幂等性,会不会有问题?这个问题其实没法精确答复。答复这个问题的本源得从业务场景上进行剖析。比方失常业务状况下,咱们是不容许同个订单反复领取,这种业务场景咱们就须要确保幂等性。再比方日志记录,这种业务场景,咱们可能就不须要做幂等判断。 因而是否要保障幂等性,得基于业务进行考量 音讯队列的生产幂等性如何保障?没法保障。后面说了要保障幂等性,得基于业务场景进行考量。音讯队列他自身就不是给你用来做业务幂等性用的。如果你要实现业务幂等性,靠音讯队列是没法帮你实现的,你本人得依据本身业务场景,来实现幂等。 罕用的业务幂等性保障办法1、利用数据库的惟一束缚实现幂等比方将订单表中的订单编号设置为惟一索引,创立订单时,依据订单编号就能够保障幂等 2、去重表这个计划实质也是依据数据库的唯一性束缚来实现。其实现大体思路是:首先在去重表上建惟一索引,其次操作时把业务表和去重表放在同个本地事务中,如果呈现重现反复生产,数据库会抛惟一束缚异样,操作就会回滚 3、利用redis的原子性每次操作都间接set到redis外面,而后将redis数据定时同步到数据库中 4、多版本(乐观锁)管制此计划多用于更新的场景下。其实现的大体思路是:给业务数据减少一个版本号属性,每次更新数据前,比拟以后数据的版本号是否和音讯中的版本统一,如果不统一则回绝更新数据,更新数据的同时将版本号+1 5、状态机机制此计划多用于更新且业务场景存在多种状态流转的场景 6、token机制生产者发送每条数据的时候,减少一个全局惟一的id,这个id通常是业务的惟一标识,比方订单编号。在生产端生产时,则验证该id是否被生产过,如果还没生产过,则进行业务解决。解决完结后,在把该id存入redis,同时设置状态为已生产。如果曾经生产过了,则不进行解决。 演示例子应用springboot2加kafka来演示一下应用token机制如何实现生产端幂等 1、application.ymlspring: redis: host: localhost port: 6379 # 连贯超时工夫(毫秒) timeout: 10000 jedis: pool: # 连接池中的最大闲暇连贯 max-idle: 8 # 连接池中的最小闲暇连贯 min-idle: 10 # 连接池最大连接数(应用负值示意没有限度) max-active: 100 # 连接池最大阻塞等待时间(应用负值示意没有限度) max-wait: -1 password: kafka: # 以逗号分隔的地址列表,用于建设与Kafka集群的初始连贯(kafka 默认的端口号为9092) bootstrap-servers: localhost:9092 producer: # 产生谬误后,音讯重发的次数。 retries: 0 #当有多个音讯须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次能够应用的内存大小,依照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化形式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化形式 value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer # acks=0 : 生产者在胜利写入音讯之前不会期待任何来自服务器的响应。 # acks=1 : 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。 # acks=all :只有当所有参加复制的节点全副收到音讯时,生产者才会收到一个来自服务器的胜利响应。 acks: 1 consumer: # 主动提交的工夫距离 在spring boot 2.X 版本中这里采纳的是值的类型为Duration 须要合乎特定的格局,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下该作何解决: # latest(默认值)在偏移量有效的状况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量有效的状况下,消费者将从起始地位读取分区的记录 auto-offset-reset: earliest # 是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量 enable-auto-commit: false # 键的反序列化形式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化形式 value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 1 #listner负责ack,每调用一次,就立刻commit ack-mode: manual_immediate2、实现kafka的自定义序列和反序列注:kakfa默认的序列化和反序列形式是StringSerializer和StringDeserializer。咱们要革新成反对对象的序列化和反序列化 ...

August 9, 2020 · 2 min · jiezi

关于kafka:消息队列的消费幂等性如何保证

什么是幂等?任意屡次执行所产生的影响均与一次执行的影响雷同就能够称为幂等 什么是音讯幂等?当呈现消费者对某条音讯反复生产的状况时,反复生产的后果与生产一次的后果是雷同的,并且屡次生产并未对业务零碎产生任何负面影响 为什么咱们要保障幂等性,不保障幂等性,会不会有问题?这个问题其实没法精确答复。答复这个问题的本源得从业务场景上进行剖析。比方失常业务状况下,咱们是不容许同个订单反复领取,这种业务场景咱们就须要确保幂等性。再比方日志记录,这种业务场景,咱们可能就不须要做幂等判断。 因而是否要保障幂等性,得基于业务进行考量 音讯队列的生产幂等性如何保障?没法保障。后面说了要保障幂等性,得基于业务场景进行考量。音讯队列他自身就不是给你用来做业务幂等性用的。如果你要实现业务幂等性,靠音讯队列是没法帮你实现的,你本人得依据本身业务场景,来实现幂等。 罕用的业务幂等性保障办法1、利用数据库的惟一束缚实现幂等比方将订单表中的订单编号设置为惟一索引,创立订单时,依据订单编号就能够保障幂等 2、去重表这个计划实质也是依据数据库的唯一性束缚来实现。其实现大体思路是:首先在去重表上建惟一索引,其次操作时把业务表和去重表放在同个本地事务中,如果呈现重现反复生产,数据库会抛惟一束缚异样,操作就会回滚 3、利用redis的原子性每次操作都间接set到redis外面,而后将redis数据定时同步到数据库中 4、多版本(乐观锁)管制此计划多用于更新的场景下。其实现的大体思路是:给业务数据减少一个版本号属性,每次更新数据前,比拟以后数据的版本号是否和音讯中的版本统一,如果不统一则回绝更新数据,更新数据的同时将版本号+1 5、状态机机制此计划多用于更新且业务场景存在多种状态流转的场景 6、token机制生产者发送每条数据的时候,减少一个全局惟一的id,这个id通常是业务的惟一标识,比方订单编号。在生产端生产时,则验证该id是否被生产过,如果还没生产过,则进行业务解决。解决完结后,在把该id存入redis,同时设置状态为已生产。如果曾经生产过了,则不进行解决。 演示例子应用springboot2加kafka来演示一下应用token机制如何实现生产端幂等 1、application.ymlspring: redis: host: localhost port: 6379 # 连贯超时工夫(毫秒) timeout: 10000 jedis: pool: # 连接池中的最大闲暇连贯 max-idle: 8 # 连接池中的最小闲暇连贯 min-idle: 10 # 连接池最大连接数(应用负值示意没有限度) max-active: 100 # 连接池最大阻塞等待时间(应用负值示意没有限度) max-wait: -1 password: kafka: # 以逗号分隔的地址列表,用于建设与Kafka集群的初始连贯(kafka 默认的端口号为9092) bootstrap-servers: localhost:9092 producer: # 产生谬误后,音讯重发的次数。 retries: 0 #当有多个音讯须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次能够应用的内存大小,依照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化形式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化形式 value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer # acks=0 : 生产者在胜利写入音讯之前不会期待任何来自服务器的响应。 # acks=1 : 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。 # acks=all :只有当所有参加复制的节点全副收到音讯时,生产者才会收到一个来自服务器的胜利响应。 acks: 1 consumer: # 主动提交的工夫距离 在spring boot 2.X 版本中这里采纳的是值的类型为Duration 须要合乎特定的格局,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下该作何解决: # latest(默认值)在偏移量有效的状况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量有效的状况下,消费者将从起始地位读取分区的记录 auto-offset-reset: earliest # 是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量 enable-auto-commit: false # 键的反序列化形式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化形式 value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 1 #listner负责ack,每调用一次,就立刻commit ack-mode: manual_immediate2、实现kafka的自定义序列和反序列注:kakfa默认的序列化和反序列形式是StringSerializer和StringDeserializer。咱们要革新成反对对象的序列化和反序列化 ...

August 9, 2020 · 2 min · jiezi

关于kafka:你真的了解Flink-Kafka-source吗

Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的解决语义。为此,Flink 并不齐全依赖于跟踪 Kafka 生产组的偏移量,而是在外部跟踪和查看偏移量。 引言当咱们在应用Spark Streaming、Flink等计算框架进行数据实时处理时,应用Kafka作为一款公布与订阅的音讯零碎成为了标配。Spark Streaming与Flink都提供了绝对应的Kafka Consumer,应用起来十分的不便,只须要设置一下Kafka的参数,而后增加kafka的source就高枕无忧了。如果你真的感觉事件就是如此的so easy,感觉妈妈再也不必放心你的学习了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source为探讨对象,首先从根本的应用动手,而后深刻源码逐个分析,一并为你拨开Flink Kafka connector的神秘面纱。值得注意的是,本文假设读者具备了Kafka的相干常识,对于Kafka的相干细节问题,不在本文的探讨范畴之内。 Flink Kafka Consumer介绍Flink Kafka Connector有很多个版本,能够依据你的kafka和Flink的版本抉择相应的包(maven artifact id)和类名。本文所波及的Flink版本为1.10,Kafka的版本为2.3.4。Flink所提供的Maven依赖于类名如下表所示: Demo示例增加Maven依赖<!--本文应用的是通用型的connector--><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version></dependency>简略代码案例public class KafkaConnector { public static void main(String[] args) throws Exception { StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint,工夫距离为毫秒 senv.enableCheckpointing(5000L); // 抉择状态后端 senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint")); //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint")); Properties props = new Properties(); // kafka broker地址 props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092"); // 仅kafka0.8版本须要配置 props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181"); // 消费者组 props.put("group.id", "test"); // 主动偏移量提交 props.put("enable.auto.commit", true); // 偏移量提交的工夫距离,毫秒 props.put("auto.commit.interval.ms", 5000); // kafka 音讯的key序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // kafka 音讯的value序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定kafka的消费者从哪里开始生产数据 // 共有三种形式, // #earliest // 当各分区下有已提交的offset时,从提交的offset开始生产; // 无提交的offset时,从头开始生产 // #latest // 当各分区下有已提交的offset时,从提交的offset开始生产; // 无提交的offset时,生产新产生的该分区下的数据 // #none // topic各分区都存在已提交的offset时, // 从offset后开始生产; // 只有有一个分区不存在已提交的offset,则抛出异样 props.put("auto.offset.reset", "latest"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( "qfbap_ods.code_city", new SimpleStringSchema(), props); //设置checkpoint后在提交offset,即oncheckpoint模式 // 该值默认为true, consumer.setCommitOffsetsOnCheckpoints(true); // 最早的数据开始生产 // 该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。 //consumer.setStartFromEarliest(); // 消费者组最近一次提交的偏移量,默认。 // 如果找不到分区的偏移量,那么将会应用配置中的 auto.offset.reset 设置 //consumer.setStartFromGroupOffsets(); // 最新的数据开始生产 // 该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。 //consumer.setStartFromLatest(); // 指定具体的偏移量工夫戳,毫秒 // 对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。 // 如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。 // 在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。 //consumer.setStartFromTimestamp(1585047859000L); // 为每个分区指定偏移量 /*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L); consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/ /** * * 请留神:当 Job 从故障中主动复原或应用 savepoint 手动复原时, * 这些起始地位配置办法不会影响生产的起始地位。 * 在复原时,每个 Kafka 分区的起始地位由存储在 savepoint 或 checkpoint 中的 offset 确定 * */ DataStreamSource<String> source = senv.addSource(consumer); // TODO source.print(); senv.execute("test kafka connector"); }}参数配置解读在Demo示例中,给出了具体的配置信息,上面将对下面的参数配置进行逐个剖析。 ...

August 8, 2020 · 9 min · jiezi

关于kafka:Kafka的Controller-Broker是什么

控制器组件(Controller),是 Apache Kafka 的外围组件。它的次要作用是在 Apache ZooKeeper 的帮忙下治理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,然而,在运行过程中,只能有一个 Broker 成为控制器,行使其治理和协调的职责。接下来,咱们将探讨Controller原理和外部运行机制。通过本文你能够理解到: 什么是Controller BrokerController Broker是怎么被选举的Controller Broker次要作用是什么Kafka是如何解决脑裂的在分布式系统中,通常须要有一个协调者,该协调者会在分布式系统产生异样时施展非凡的作用。在Kafka中该协调者称之为控制器(Controller),其实该控制器并没有什么非凡之处,它自身也是一个一般的Broker,只不过须要负责一些额定的工作(追踪集群中的其余Broker,并在适合的时候解决新退出的和失败的Broker节点、Rebalance分区、调配新的leader分区等)。值得注意的是:Kafka集群中始终只有一个Controller Broker。 Controller Broker是如何被选出来的上一大节解释了什么是Controller Broker,并且每台 Broker 都有充当控制器的可能性。那么,控制器是如何被选出来的呢?当集群启动后,Kafka 怎么确认控制器位于哪台 Broker 呢? 实际上,Broker 在启动时,会尝试去 ZooKeeper 中创立 /controller 节点。Kafka 以后选举控制器的规定是:第一个胜利创立 /controller 节点的 Broker 会被指定为控制器。 Controller Broker的具体作用是什么Controller Broker的主要职责有很多,次要是一些治理行为,次要包含以下几个方面: 创立、删除主题,减少分区并调配leader分区集群Broker治理(新增 Broker、Broker 被动敞开、Broker 故障)preferred leader选举分区重调配解决集群中下线的Broker当某个Broker节点因为故障来到Kafka群集时,则存在于该Broker的leader分区将不可用(因为客户端仅对leader分区进行读写操作)。为了最大水平地缩小停机工夫,须要疾速找到代替的leader分区。 Controller Broker能够对失败的Broker做出响应,Controller Broker能够从Zookeeper监听(zookeeper watch)中获取告诉信息,ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 告诉性能。一旦 znode 节点被创立、删除,子节点数量发生变化,抑或是 znode 所存的数据自身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的形式显式告诉客户端。 每个 Broker 启动后,会在zookeeper的 /Brokers/ids 下创立一个长期 znode。当 Broker 宕机或被动敞开后,该 Broker 与 ZooKeeper 的会话完结,这个 znode 会被主动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能晓得有 Broker 敞开或宕机了,从而进行后续的协调操作。 ...

August 8, 2020 · 1 min · jiezi

关于kafka:Kafka生产者ack机制剖析

Kafka有两个很重要的配置参数,acks与min.insync.replicas .其中acks是producer的配置参数,min.insync.replicas是Broker端的配置参数,这两个参数对于生产者不失落数据起到了很大的作用.接下来,本文会以图示的形式解说这两个参数的含意和应用形式。通过本文,你能够理解到: Kafka的分区正本什么是同步正本(In-sync replicas)什么是acks确认机制什么是最小同步正本ack=all与最小同步正本是如何发挥作用的分区正本Kafka的topic是能够分区的,并且能够为分区配置多个正本,改配置能够通过replication.factor参数实现. Kafka中的分区正本包含两种类型:领导者正本(Leader Replica)和追随者正本(Follower Replica),每个分区在创立时都要选举一个正本作为领导者正本,其余的正本主动变为追随者正本. 在 Kafka 中,追随者正本是不对外提供服务的,也就是说,任何一个追随者正本都不能响应消费者和生产者的读写申请. 所有的申请都必须由领导者副原本解决. 换句话说,所有的读写申请都必须发往领导者正本所在的 Broker,由该 Broker 负责解决. 追随者正本不解决客户端申请,它惟一的工作就是从领导者正本异步拉取音讯,并写入到本人的提交日志中,从而实现与领导者正本的同步. Kafka默认的正本因子是3,即每个分区只有1个leader正本和2个follower正本.具体如下图所示: 下面提到生产者客户端仅写入Leader broker,跟随者异步复制数据。因为Kafka是一个分布式系统,必然会存在与 Leader 不能实时同步的危险,所以须要一种办法来判断这些追随者是否跟上了领导者的步调, 即追随者是否同步了最新的数据.换句话说,Kafka 要明确地通知咱们,追随者正本到底在什么条件下才算与 Leader 同步?这就是上面所要说的ISR同步正本机制. 同步正本(In-sync replicas)In-sync replica(ISR)称之为同步正本,ISR中的正本都是与Leader进行同步的正本,所以不在该列表的follower会被认为与Leader是不同步的. 那么,ISR中存在是什么正本呢?首先能够明确的是:Leader正本总是存在于ISR中. 而follower正本是否在ISR中,取决于该follower正本是否与Leader正本放弃了“同步”. 尖叫提醒:对于"follower正本是否与Leader正本放弃了同步"的了解如下:(1)下面所说的同步不是指齐全的同步,即并不是说一旦follower正本同步滞后与Leader正本,就会被踢出ISR列表. (2)Kafka的broker端有一个参数replica.lag.time.max.ms, 该参数示意follower正本滞后与Leader正本的最长工夫距离,默认是10秒. 这就意味着,只有follower正本落后于leader正本的工夫距离不超过10秒,就能够认为该follower正本与leader正本是同步的,所以哪怕以后follower正本落后于Leader正本几条音讯,只有在10秒之内赶上Leader正本,就不会被踢出出局. (3)如果follower正本被踢出ISR列表,等到该正本追上了Leader正本的进度,该正本会被再次退出到ISR列表中,所以ISR是一个动静列表,并不是动态不变的。 如上图所示:Broker3上的partition1正本超过了规定工夫,未与Leader正本同步,所以被踢出ISR列表,此时的ISR为[1,3]. acks确认机制acks参数指定了必须要有多少个分区正本收到音讯,生产者才认为该音讯是写入胜利的,这个参数对于音讯是否失落起着重要作用,该参数的配置具体如下: acks=0,示意生产者在胜利写入音讯之前不会期待任何来自服务器的响应. 换句话说,一旦呈现了问题导致服务器没有收到音讯,那么生产者就无从得悉,音讯也就失落了. 改配置因为不须要等到服务器的响应,所以能够以网络反对的最大速度发送音讯,从而达到很高的吞吐量。 acks=1,示意只有集群的leader分区正本接管到了音讯,就会向生产者发送一个胜利响应的ack,此时生产者接管到ack之后就能够认为该音讯是写入胜利的. 一旦音讯无奈写入leader分区正本(比方网络起因、leader节点解体),生产者会收到一个谬误响应,当生产者接管到该谬误响应之后,为了防止数据失落,会从新发送数据.这种形式的吞吐量取决于应用的是异步发送还是同步发送. 尖叫提醒:如果生产者收到了谬误响应,即使是从新发消息,还是会有可能呈现丢数据的景象. 比方,如果一个没有收到音讯的节点成为了新的Leader,音讯就会失落. acks =all,示意只有所有参加复制的节点(ISR列表的正本)全副收到音讯时,生产者才会接管到来自服务器的响应. 这种模式是最高级别的,也是最平安的,能够确保不止一个Broker接管到了音讯. 该模式的提早会很高. 最小同步正本下面提到,当acks=all时,须要所有的正本都同步了才会发送胜利响应到生产者. 其实这外面存在一个问题:如果Leader正本是惟一的同步正本时会产生什么呢?此时相当于acks=1.所以是不平安的. Kafka的Broker端提供了一个参数min.insync.replicas,该参数管制的是音讯至多被写入到多少个正本才算是"真正写入",该值默认值为1,生产环境设定为一个大于1的值能够晋升音讯的持久性. 因为如果同步正本的数量低于该配置值,则生产者会收到谬误响应,从而确保音讯不失落. Case 1如下图,当min.insync.replicas=2且acks=all时,如果此时ISR列表只有[1,2],3被踢出ISR列表,只须要保障两个正本同步了,生产者就会收到胜利响应. Case 2如下图,当min.insync.replicas=2,如果此时ISR列表只有[1],2和3被踢出ISR列表,那么当acks=all时,则不能胜利写入数;当acks=0或者acks=1能够胜利写入数据. Case 3这种状况是很容易引起误会的,如果acks=all且min.insync.replicas=2,此时ISR列表为[1,2,3],那么还是会等到所有的同步正本都同步了音讯,才会向生产者发送胜利响应的ack.因为min.insync.replicas=2只是一个最低限度,即同步正本少于该配置值,则会抛异样,而acks=all,是须要保障所有的ISR列表的正本都同步了才能够发送胜利响应. 如下图所示: 总结acks=0,生产者在胜利写入音讯之前不会期待任何来自服务器的响应. acks=1,只有集群的leader分区正本接管到了音讯,就会向生产者发送一个胜利响应的ack. ...

August 8, 2020 · 1 min · jiezi

关于kafka:Kafka权威指南

Kafka权威指南 下载地址: https://pan.baidu.com/s/1auSKYE5MlVI2yLWpJ3f1fA 扫码上面二维码关注公众号回复100022 获取分享码 本书目录构造如下: 序 xiii 前言 xv 第 1 章 初识Kafka 1 1.1 公布与订阅音讯零碎 1 1.1.1 如何开始 2 1.1.2 独立的队列零碎 3 1.2 Kafka退场 4 1.2.1 音讯和批次 4 1.2.2 模式 4 1.2.3 主题和分区 5 1.2.4 生产者和消费者 5 1.2.5 broker和集群 6 1.2.6 多集群 7 1.3 为什么抉择Kafka 8 1.3.1 多个生产者 8 1.3.2 多个消费者 8 1.3.3 基于磁盘的数据存储 9 1.3.4 伸缩性 9 1.3.5 高性能 9 1.4 数据生态系统 9 1.5 起源故事 11 1.5.1 LinkedIn的问题 11 1.5.2 Kafka的诞生 12 1.5.3 走向开源 12 1.5.4 命名 13 1.6 开始Kafka之旅 13 第 2 章 装置Kafka 14 2.1 要事后行 14 2.1.1 抉择操作系统 14 2.1.2 装置Java 14 2.1.3 装置Zookeeper 15 2.2 装置Kafka Broker 17 2.3 broker配置 18 2.3.1 惯例配置 18 2.3.2 主题的默认配置 19 2.4 硬件的抉择 23 2.4.1 磁盘吞吐量 23 2.4.2 磁盘容量 23 2.4.3 内存 23 2.4.4 网络 24 2.4.5 CPU 24 2.5 云端的Kafka 24 2.6 Kafka集群 24 2.6.1 须要多少个broker 25 2.6.2 broker 配置 25 2.6.3 操作系统调优 26 2.7 生产环境的注意事项 28 2.7.1 垃圾回收器选项 28 2.7.2 数据中心布局 29 2.7.3 共享Zookeeper 29 2.8 总结 30 第 3 章 Kafka生产者——向Kafka写入数据 31 3.1 生产者概览 32 3.2 创立Kafka生产者 33 3.3 发送音讯到Kafka 34 3.3.1 同步发送音讯 35 3.3.2 异步发送音讯 35 3.4 生产者的配置 36 3.5 序列化器 39 3.5.1 自定义序列化器 39 3.5.2 应用Avro序列化 41 3.5.3 在Kafka里应用Avro 42 3.6 分区 45 3.7 旧版的生产者API 46 3.8 总结 47 第 4 章 Kafka消费者——从Kafka读取数据 48 4.1 KafkaConsumer概念 48 4.1.1 消费者和消费者群组 48 4.1.2 消费者群组和分区再平衡 51 4.2 创立Kafka消费者 52 4.3 订阅主题 53 4.4 轮询 53 4.5 消费者的配置 55 4.6 提交和偏移量 57 4.6.1 主动提交 58 4.6.2 提交以后偏移量 59 4.6.3 异步提交 59 4.6.4 同步和异步组合提交 61 4.6.5 提交特定的偏移量 61 4.7 再平衡监听器 62 4.8 从特定偏移量处开始解决记录 64 4.9 如何退出 66 4.10 反序列化器 67 4.11 独立消费者——为什么以及怎么应用没有群组的消费者 71 4.12 旧版的消费者API 71 4.13 总结 72 第 5 章 深刻Kafka 73 5.1 集群成员关系 73 5.2 控制器 74 5.3 复制 74 5.4 解决申请 76 5.4.1 生产申请 78 5.4.2 获取申请 78 5.4.3 其余申请 80 5.5 物理存储 81 5.5.1 分区调配 81 5.5.2 文件治理 82 5.5.3 文件格式 83 5.5.4 索引 84 5.5.5 清理 84 5.5.6 清理的工作原理 84 5.5.7 被删除的事件 86 5.5.8 何时会清理主题 86 5.9 总结 86 第 6 章 牢靠的数据传递 87 6.1 可靠性保障 87 6.2 复制 88 6.3 broker配置 89 6.3.1 复制系数 89 6.3.2 不齐全的领袖选举 90 6.3.3 起码同步正本 91 6.4 在牢靠的零碎里应用生产者 92 6.4.1 发送确认 92 6.4.2 配置生产者的重试参数 93 6.4.3 额定的错误处理 94 6.5 在牢靠的零碎里应用消费者 94 6.5.1 消费者的可靠性配置 95 6.5.2 显式提交偏移量 95 6.6 验证系统可靠性 97 6.6.1 配置验证 98 6.6.2 利用程序验证 98 6.6.3 在生产环境监控可靠性 99 6.7 总结 100 第 7 章 构建数据管道 101 7.1 构建数据管道时须要思考的问题 102 7.1.1 及时性 102 7.1.2 可靠性 102 7.1.3 高吞吐量和动静吞吐量 103 7.1.4 数据格式 103 7.1.5 转换 104 7.1.6 安全性 104 7.1.7 故障解决能力 104 7.1.8 耦合性和灵活性 105 7.2 如何在Connect API和客户端API之间作出抉择 105 7.3 Kafka Connect 106 7.3.1 运行Connect 106 7.3.2 连接器示例——文件数据源和文件数据池 107 7.3.3 连接器示例——从MySQL到ElasticSearch 109 7.3.4 深刻了解Connect 114 7.4 Connect之外的抉择 116 7.4.1 用于其余数据存储的摄入框架 116 7.4.2 基于图形界面的ETL工具 117 7.4.3 流式解决框架 117 7.5 总结 117 第 8 章 跨集群数据镜像 118 8.1 跨集群镜像的应用场景 118 8.2 多集群架构 119 8.2.1 跨数据中心通信的一些现实情况 119 8.2.2 Hub和Spoke架构 120 8.2.3 双活架构 121 8.2.4 主备架构 123 8.2.5 延展集群 127 8.3 Kafka的MirrorMaker 128 8.3.1 如何配置 129 8.3.2 在生产环境部署MirrorMaker 130 8.3.3 MirrorMaker调优 132 8.4 其余跨集群镜像计划 134 8.4.1 优步的uReplicator 134 8.4.2 Confluent的Replicator 135 8.5 总结 135 第 9 章 治理Kafka 136 9.1 主题操作 136 9.1.1 创立主题 137 9.1.2 减少分区 138 9.1.3 删除主题 138 9.1.4 列出集群里的所有主题 139 9.1.5 列出主题详细信息 139 9.2 消费者群组 140 9.2.1 列出并形容群组 140 9.2.2 删除群组 142 9.2.3 偏移量治理 142 9.3 动静配置变更 143 9.3.1 笼罩主题的默认配置 143 9.3.2 笼罩客户端的默认配置 145 9.3.3 列出被笼罩的配置 145 9.3.4 移除被笼罩的配置 146 9.4 分区治理 146 9.4.1 首选的领袖选举 146 9.4.2 批改分区正本 147 9.4.3 批改复制系数 150 9.4.4 转储日志片段 151 9.4.5 正本验证 152 9.5 生产和生产 153 9.5.1 控制台消费者 153 9.5.2 控制台生产者 155 9.6 客户端ACL 157 9.7 不平安的操作 157 9.7.1 挪动集群控制器 157 9.7.2 勾销分区重调配 157 9.7.3 移除待删除的主题 158 9.7.4 手动删除主题 158 9.8 总结 159 第 10 章 监控Kafka 160 10.1 度量指标根底 160 10.1.1 度量指标在哪里 160 10.1.2 外部或内部度量 161 10.1.3 应用程序衰弱检测 161 10.1.4 度量指标的覆盖面 161 10.2 broker的度量指标 162 10.2.1 非同步分区 162 10.2.2 broker度量指标 166 10.2.3 主题和分区的度量指标 173 10.2.4 Java虚拟机监控 174 10.2.5 操作系统监控 175 10.2.6 日志 176 10.3 客户端监控 177 10.3.1 生产者度量指标 177 10.3.2 消费者度量指标 179 10.3.3 配额 181 10.4 延时监控 182 10.5 端到端监控 183 10.6 总结 183 第 11 章 流式解决 184 11.1 什么是流式解决 185 11.2 流式解决的一些概念 186 11.2.1 工夫 187 11.2.2 状态 188 11.2.3 流和表的二元性 188 11.2.4 工夫窗口 189 11.3 流式解决的设计模式 190 11.3.1 单个事件处理 191 11.3.2 应用本地状态 191 11.3.3 多阶段解决和重分区 193 11.3.4 应用内部查找——流和表的连贯 193 11.3.5 流与流的连贯 195 11.3.6 乱序的事件 195 11.3.7 重新处理 196 11.4 Streams示例 197 11.4.1 字数统计 197 11.4.2 股票市场统计 199 11.4.3 填充点击事件流 201 11.5 Kafka Streams的架构概览 202 11.5.1 构建拓扑 202 11.5.2 对拓扑进行伸缩 203 11.5.3 从故障中存活下来 205 11.6 流式解决应用场景 205 11.7 如何抉择流式解决框架 206 11.8 总结 208 附录A 在其余操作系统上装置Kafka 209 作者介绍 214 封面介绍 214 ...

August 8, 2020 · 4 min · jiezi

关于kafka:Kafka权威指南

Kafka权威指南 下载地址: https://pan.baidu.com/s/1auSKYE5MlVI2yLWpJ3f1fA 扫码上面二维码关注公众号回复100022 获取分享码 本书目录构造如下: 序 xiii 前言 xv 第 1 章 初识Kafka 1 1.1 公布与订阅音讯零碎 1 1.1.1 如何开始 2 1.1.2 独立的队列零碎 3 1.2 Kafka退场 4 1.2.1 音讯和批次 4 1.2.2 模式 4 1.2.3 主题和分区 5 1.2.4 生产者和消费者 5 1.2.5 broker和集群 6 1.2.6 多集群 7 1.3 为什么抉择Kafka 8 1.3.1 多个生产者 8 1.3.2 多个消费者 8 1.3.3 基于磁盘的数据存储 9 1.3.4 伸缩性 9 1.3.5 高性能 9 1.4 数据生态系统 9 1.5 起源故事 11 1.5.1 LinkedIn的问题 11 1.5.2 Kafka的诞生 12 1.5.3 走向开源 12 1.5.4 命名 13 1.6 开始Kafka之旅 13 第 2 章 装置Kafka 14 2.1 要事后行 14 2.1.1 抉择操作系统 14 2.1.2 装置Java 14 2.1.3 装置Zookeeper 15 2.2 装置Kafka Broker 17 2.3 broker配置 18 2.3.1 惯例配置 18 2.3.2 主题的默认配置 19 2.4 硬件的抉择 23 2.4.1 磁盘吞吐量 23 2.4.2 磁盘容量 23 2.4.3 内存 23 2.4.4 网络 24 2.4.5 CPU 24 2.5 云端的Kafka 24 2.6 Kafka集群 24 2.6.1 须要多少个broker 25 2.6.2 broker 配置 25 2.6.3 操作系统调优 26 2.7 生产环境的注意事项 28 2.7.1 垃圾回收器选项 28 2.7.2 数据中心布局 29 2.7.3 共享Zookeeper 29 2.8 总结 30 第 3 章 Kafka生产者——向Kafka写入数据 31 3.1 生产者概览 32 3.2 创立Kafka生产者 33 3.3 发送音讯到Kafka 34 3.3.1 同步发送音讯 35 3.3.2 异步发送音讯 35 3.4 生产者的配置 36 3.5 序列化器 39 3.5.1 自定义序列化器 39 3.5.2 应用Avro序列化 41 3.5.3 在Kafka里应用Avro 42 3.6 分区 45 3.7 旧版的生产者API 46 3.8 总结 47 第 4 章 Kafka消费者——从Kafka读取数据 48 4.1 KafkaConsumer概念 48 4.1.1 消费者和消费者群组 48 4.1.2 消费者群组和分区再平衡 51 4.2 创立Kafka消费者 52 4.3 订阅主题 53 4.4 轮询 53 4.5 消费者的配置 55 4.6 提交和偏移量 57 4.6.1 主动提交 58 4.6.2 提交以后偏移量 59 4.6.3 异步提交 59 4.6.4 同步和异步组合提交 61 4.6.5 提交特定的偏移量 61 4.7 再平衡监听器 62 4.8 从特定偏移量处开始解决记录 64 4.9 如何退出 66 4.10 反序列化器 67 4.11 独立消费者——为什么以及怎么应用没有群组的消费者 71 4.12 旧版的消费者API 71 4.13 总结 72 第 5 章 深刻Kafka 73 5.1 集群成员关系 73 5.2 控制器 74 5.3 复制 74 5.4 解决申请 76 5.4.1 生产申请 78 5.4.2 获取申请 78 5.4.3 其余申请 80 5.5 物理存储 81 5.5.1 分区调配 81 5.5.2 文件治理 82 5.5.3 文件格式 83 5.5.4 索引 84 5.5.5 清理 84 5.5.6 清理的工作原理 84 5.5.7 被删除的事件 86 5.5.8 何时会清理主题 86 5.9 总结 86 第 6 章 牢靠的数据传递 87 6.1 可靠性保障 87 6.2 复制 88 6.3 broker配置 89 6.3.1 复制系数 89 6.3.2 不齐全的领袖选举 90 6.3.3 起码同步正本 91 6.4 在牢靠的零碎里应用生产者 92 6.4.1 发送确认 92 6.4.2 配置生产者的重试参数 93 6.4.3 额定的错误处理 94 6.5 在牢靠的零碎里应用消费者 94 6.5.1 消费者的可靠性配置 95 6.5.2 显式提交偏移量 95 6.6 验证系统可靠性 97 6.6.1 配置验证 98 6.6.2 利用程序验证 98 6.6.3 在生产环境监控可靠性 99 6.7 总结 100 第 7 章 构建数据管道 101 7.1 构建数据管道时须要思考的问题 102 7.1.1 及时性 102 7.1.2 可靠性 102 7.1.3 高吞吐量和动静吞吐量 103 7.1.4 数据格式 103 7.1.5 转换 104 7.1.6 安全性 104 7.1.7 故障解决能力 104 7.1.8 耦合性和灵活性 105 7.2 如何在Connect API和客户端API之间作出抉择 105 7.3 Kafka Connect 106 7.3.1 运行Connect 106 7.3.2 连接器示例——文件数据源和文件数据池 107 7.3.3 连接器示例——从MySQL到ElasticSearch 109 7.3.4 深刻了解Connect 114 7.4 Connect之外的抉择 116 7.4.1 用于其余数据存储的摄入框架 116 7.4.2 基于图形界面的ETL工具 117 7.4.3 流式解决框架 117 7.5 总结 117 第 8 章 跨集群数据镜像 118 8.1 跨集群镜像的应用场景 118 8.2 多集群架构 119 8.2.1 跨数据中心通信的一些现实情况 119 8.2.2 Hub和Spoke架构 120 8.2.3 双活架构 121 8.2.4 主备架构 123 8.2.5 延展集群 127 8.3 Kafka的MirrorMaker 128 8.3.1 如何配置 129 8.3.2 在生产环境部署MirrorMaker 130 8.3.3 MirrorMaker调优 132 8.4 其余跨集群镜像计划 134 8.4.1 优步的uReplicator 134 8.4.2 Confluent的Replicator 135 8.5 总结 135 第 9 章 治理Kafka 136 9.1 主题操作 136 9.1.1 创立主题 137 9.1.2 减少分区 138 9.1.3 删除主题 138 9.1.4 列出集群里的所有主题 139 9.1.5 列出主题详细信息 139 9.2 消费者群组 140 9.2.1 列出并形容群组 140 9.2.2 删除群组 142 9.2.3 偏移量治理 142 9.3 动静配置变更 143 9.3.1 笼罩主题的默认配置 143 9.3.2 笼罩客户端的默认配置 145 9.3.3 列出被笼罩的配置 145 9.3.4 移除被笼罩的配置 146 9.4 分区治理 146 9.4.1 首选的领袖选举 146 9.4.2 批改分区正本 147 9.4.3 批改复制系数 150 9.4.4 转储日志片段 151 9.4.5 正本验证 152 9.5 生产和生产 153 9.5.1 控制台消费者 153 9.5.2 控制台生产者 155 9.6 客户端ACL 157 9.7 不平安的操作 157 9.7.1 挪动集群控制器 157 9.7.2 勾销分区重调配 157 9.7.3 移除待删除的主题 158 9.7.4 手动删除主题 158 9.8 总结 159 第 10 章 监控Kafka 160 10.1 度量指标根底 160 10.1.1 度量指标在哪里 160 10.1.2 外部或内部度量 161 10.1.3 应用程序衰弱检测 161 10.1.4 度量指标的覆盖面 161 10.2 broker的度量指标 162 10.2.1 非同步分区 162 10.2.2 broker度量指标 166 10.2.3 主题和分区的度量指标 173 10.2.4 Java虚拟机监控 174 10.2.5 操作系统监控 175 10.2.6 日志 176 10.3 客户端监控 177 10.3.1 生产者度量指标 177 10.3.2 消费者度量指标 179 10.3.3 配额 181 10.4 延时监控 182 10.5 端到端监控 183 10.6 总结 183 第 11 章 流式解决 184 11.1 什么是流式解决 185 11.2 流式解决的一些概念 186 11.2.1 工夫 187 11.2.2 状态 188 11.2.3 流和表的二元性 188 11.2.4 工夫窗口 189 11.3 流式解决的设计模式 190 11.3.1 单个事件处理 191 11.3.2 应用本地状态 191 11.3.3 多阶段解决和重分区 193 11.3.4 应用内部查找——流和表的连贯 193 11.3.5 流与流的连贯 195 11.3.6 乱序的事件 195 11.3.7 重新处理 196 11.4 Streams示例 197 11.4.1 字数统计 197 11.4.2 股票市场统计 199 11.4.3 填充点击事件流 201 11.5 Kafka Streams的架构概览 202 11.5.1 构建拓扑 202 11.5.2 对拓扑进行伸缩 203 11.5.3 从故障中存活下来 205 11.6 流式解决应用场景 205 11.7 如何抉择流式解决框架 206 11.8 总结 208 附录A 在其余操作系统上装置Kafka 209 作者介绍 214 封面介绍 214 ...

August 8, 2020 · 4 min · jiezi

关于kafka:Kafka-Python的生产者和消费者

Kafka Python的生产者和消费者在本教程中,咱们将应用Python构建Kafka Producer和Consumer。除此之外,咱们还将学习如何在Kafka中设置配置以及如何应用组和偏移量概念。 建设对于本教程,咱们应该在计算机上安装python。另外,咱们须要拜访在咱们的设施或某些服务器上运行的Apache Kafka。您能够查看如何在Windows上装置Apache Kafka。除此之外,咱们须要python的_kafka_ 库来运行咱们的代码。要解决此问题,请在零碎上运行以下命令 pip install kafka 卡夫卡生产者=== 让咱们开始创立本人的Kafka Producer。咱们必须从kafka库导入KafkaProducer。咱们还须要将Kafka服务器的代理列表提供给Producer,以便它能够连贯到Kafka服务器。咱们还须要提供要向其公布音讯的主题名称。这是创立生产者所需的最小配置。 from kafka import KafkaProducerbootstrap_servers = ['localhost:9092']topicName = 'myTopic'producer = KafkaProducer(bootstrap_servers = bootstrap_servers)producer = KafkaProducer()咱们能够应用以下代码开始向该主题发送音讯。 ack = producer.send(topicName, b'Hello World!!!!!!!!')metadata = ack.get()print(metadata.topic)print(metadata.partition)下面的代码将音讯发送到Kafka服务器中名为“ myTopic”的主题。然而,如果该主题尚未呈现在Kafka服务器中怎么办?在这种状况下,Kafka会应用该名称创立一个新主题并向其公布音讯。不便吗?然而您应该记住要查看主题名称中是否存在拼写错误。 如果要为Producer设置更多属性或更改其序列化格局,则能够应用以下代码行。 producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii'))卡夫卡消费者实现创立Producer的工作后,当初让咱们开始应用python构建Consumer,看看这是否同样容易。导入KafkaConsumer后,咱们须要设置提供疏导服务器ID和主题名称,以与Kafka服务器建设连贯。 from kafka import KafkaConsumerimport sysbootstrap_servers = ['localhost:9092']topicName = 'myTopic'consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest')如咱们所见,咱们须要设置哪个组消费者属于。另外,咱们须要指定偏移量,此使用者应该从该偏移量读取主题中的音讯。在上述情况下,咱们最早指定了auto_offset_reset,这意味着此使用者将从主题的结尾开始读取音讯。 之后,咱们能够开始浏览主题中的音讯。与每条音讯一起,咱们还取得了一些其余信息,例如音讯所属的分区,在该分区中的偏移量和键。 ...

August 4, 2020 · 1 min · jiezi

关于kafka:Kafka的原理介绍及实践

文|孙超 网易智慧企业资深后端开发工程师 官网定义 依据官网的介绍,kafka是一个提供对立的、高吞吐、低提早的,用来解决实时数据的流式平台,它具备以下三个性: 流式记录的公布和订阅:相似于音讯零碎。存储:在一个分布式、容错的集群中平安长久化地存储流式数据。解决:编写流解决应用程序,对实时事件进行响应。kafka个别用在两大类利用中: 建设实时流数据管道,在零碎或利用之间实时地传输数据。构建对数据流进行转换和解决的实时流应用程序。在邮箱服务中,咱们次要将kafka作为音讯零碎,用于零碎内部消息的传输。为什么要采纳kafka呢?让咱们先从kafka的设计原理说起。 概念与存储机制 kafka中是以Topic机制来对音讯进行分类的,同一类音讯属于同一个Topic,你能够将每个Topic看成是一个音讯队列。生产者将音讯发送到相应的Topic,而消费者通过从Topic拉取音讯来生产,没错,在kafka中是要求消费者被动拉取音讯生产的,它并不会被动推送音讯,这是它的一个特点,为什么会这样设计呢?咱们前面再说,先来看一下Topic的构造: Partition分区,每个topic能够有多个分区,这是kafka为了进步并发量而设计的一种机制:一个topic下的多个分区能够并发接管音讯,同样的也能供消费者并发拉取音讯,即分区之间互不烦扰,这样的话,有多少个分区就能够有多大的并发量。所以,如果要更精确的打比方,一个分区就是一个音讯队列,只不过这些音讯队列同属于一种音讯分类。 在kafka服务器,分区是以目录模式存在的,每个分区目录中,kafka会按配置大小或配置周期将分区拆分成多个段文件(LogSegment), 每个段由三局部组成: 磁盘文件:*.log位移索引文件:*.index工夫索引文件:*.timeindex其中*.log用于存储音讯自身的数据内容,*.index存储音讯在文件中的地位(包含音讯的逻辑offset和物理存储offset),*.timeindex存储音讯创立工夫和对应逻辑地址的映射关系。 段文件结构图如下 : 将分区拆分成多个段是为了管制存储的文件大小,如果整个分区只保留为一个文件,那随着分区里音讯的增多,文件也将越来越大,最初不可管制。而如果每个音讯都保留为一个文件,那文件数量又将变得微小,同样容易失去管制。所以kafka采纳段这种形式,管制了每个文件的大小,也不便管制所有文件的数量。同时,这些文件因为大小适中,能够很不便地通过操作系统mmap机制映射到内存中,进步写入和读取效率。这个设计的另一个益处是:当零碎要革除过期数据时,能够间接将过期的段文件删除,十分简洁。 然而这里也会有一个问题:如果每个音讯都要在index文件中保留地位信息,那么index文件也很容易变得很大,这样又会削弱上文所说的益处。所以在kafka中,index设计为稠密索引来升高index的文件大小,这样,index文件存储的理论内容为:该段音讯在音讯队列中的绝对offset和在log文件中的物理偏移量映射的稠密记录。 那么多少条音讯会在index中保留一条记录呢?这个能够通过系统配置来进行设置。索引记录固定为8个字节大小,别离为4个字节的绝对offset(音讯在partition中全局offset减去该segment的起始offset),4个字节的音讯具体存储文件的物理偏移量。 index文件结构图如下: Kafka不会在消费者拉取完音讯后马上就清理音讯,而是会保留段文件一段时间,直到其过期再标记为可清理,由后台程序定期进行清理。这种机制使得消费者能够反复生产音讯,满足更灵便的需要。 查问机制 下面说过,kafka尽管作为音讯零碎,然而生产音讯并不是通过推送而是通过拉取来生产的,client须要通过offset和size参数被动去查问音讯。 kafka收到客户端申请后,对音讯的寻址会通过上面几个步骤: 查找具体的Log Segment,kafka将段信息缓存在跳跃表中,所以这个步骤将从跳跃表中获取段信息。依据offset在index文件中进行定位,找到匹配范畴的偏移量position,此时失去的是一个近似起始文件偏移量。从Log文件的position地位处开始往后寻找,直到找到offset处的音讯。kafka读取示意图: RabbitMQ vs kafka 介绍了kafka的实现原理,咱们再来比照一下同样作为音讯队列服务的RabbitMQ。MQ的利用也很宽泛,性能多而全,那么和MQ相比,kafka有哪些劣势呢?为什么咱们会应用kafka而摈弃了RabbitMQ呢? RabbitMQ流程图: RabbitMQ消费者只能从队列头部按序进行生产,音讯一旦被生产,就会被打上删除标记,紧接着生产下一条音讯,没方法进行回溯操作,这样的话一个消费者生产完音讯,另一个消费者就别想再生产了。而Kafka提供动静指定生产位点,可能灵便地进行回溯生产操作,只有该音讯还在生命周期内能够反复拉取,并且不同消费者能够互不烦扰的生产同一个音讯队列,这就比RabbitMQ灵便多了。 kafka生产位点示意图: RabbitMQ如果要满足多个消费者生产同一个音讯队列,也能够借助exchange路由能力,然而这样会将音讯复制到多个队列,每个消费者须要绑定一个本人的队列进行生产。如果有几百个消费者,那么队列复制几百倍,引起mq的音讯水位猛涨,容易失控。而kafka就没这个问题,不论多少个消费者都只须要一个队列就能满足,每个消费者都能够残缺地不互相烦扰地生产队列中的所有音讯。 当然,RabbitMQ也有其长处,它提供的exchange,binding, queue等形象实体,提供弱小的路由关系(rounte key and bindkey)和音讯过滤能力。作为传统音讯零碎提供了细粒度的音讯控制能力。而Kafka次要是面向高流量,大吞吐的批处理零碎,在路由形象方面化繁为简,重点关注零碎的高吞吐,所以应用上更为简洁。 kafka还有传统解决方案无奈满足的高伸缩能力等劣势,这里就不一一介绍了。 Kafka在邮件系统data bus中的使用 正因为kafka有着以上介绍的能力和劣势,咱们的邮箱服务中采纳了它作为音讯零碎,其中一个利用就是邮件系统的data bus。 data bus介绍 邮件系统用户收发信流程随同着大量的业务逻辑和子系统调用,如果将这些流程都强附丽在骨干枝上,将会对系统造成较大的压力,整个业务流程也将变得复杂而迟缓。所以通过数据总线将主次流程进行解耦,加重收发信主流程的复杂度,使其能够以更快的速度实现,放慢零碎响应工夫。主流程产生事件源,通过kafka的传输,触发多个主要流程,主要流程能够并发在零碎后盾实现,并且能够轻易的扩大多种多样的主要流程。 下图以简化后的信流程为例: Kafka在data bus中的使用 邮件系统在实现收发信流程后,会生成当次流程相干的零碎事件,比方新邮件事件。data bus将这些事件写入到kafka集群的相应topic中,上游的一系列子系统对topic进行生产。 每个不同的流程会对应不必的topic,以辨别不同类别的事件,比方新进邮件,邮件已读,邮件删除等。每个topic能够依据各自的音讯吞吐量和并发需要划分成多个partition,比方新进邮件量大能够划分成256个分区,邮件删除量小则能够划分32个分区。每个事件按什么机制来调配到相应的分区呢?一般来说能够按邮筒来划分,同一个邮筒的事件进入同一个partition,这样就保障了同一邮筒产生的事件的程序。不同事件的时效性可能有不同,所以其须要保留的工夫也能够不同,能够依据业务的需要来设置topic的保留时长。因为事件全副写入到kafka中,后台任务能够任意生产,所以能够灵便地减少不同的业务流程。如下图所示,利用生产能力能借助Kafka集群实现弹性扩容 总    结 kafka在邮件系统中的利用给咱们带来的益处: ...

July 24, 2020 · 1 min · jiezi

关于kafka:Kafka的原理介绍及实践

文|孙超 网易智慧企业资深后端开发工程师 官网定义 依据官网的介绍,kafka是一个提供对立的、高吞吐、低提早的,用来解决实时数据的流式平台,它具备以下三个性: 流式记录的公布和订阅:相似于音讯零碎。存储:在一个分布式、容错的集群中平安长久化地存储流式数据。解决:编写流解决应用程序,对实时事件进行响应。kafka个别用在两大类利用中: 建设实时流数据管道,在零碎或利用之间实时地传输数据。构建对数据流进行转换和解决的实时流应用程序。在邮箱服务中,咱们次要将kafka作为音讯零碎,用于零碎内部消息的传输。为什么要采纳kafka呢?让咱们先从kafka的设计原理说起。 概念与存储机制 kafka中是以Topic机制来对音讯进行分类的,同一类音讯属于同一个Topic,你能够将每个Topic看成是一个音讯队列。生产者将音讯发送到相应的Topic,而消费者通过从Topic拉取音讯来生产,没错,在kafka中是要求消费者被动拉取音讯生产的,它并不会被动推送音讯,这是它的一个特点,为什么会这样设计呢?咱们前面再说,先来看一下Topic的构造: Partition分区,每个topic能够有多个分区,这是kafka为了进步并发量而设计的一种机制:一个topic下的多个分区能够并发接管音讯,同样的也能供消费者并发拉取音讯,即分区之间互不烦扰,这样的话,有多少个分区就能够有多大的并发量。所以,如果要更精确的打比方,一个分区就是一个音讯队列,只不过这些音讯队列同属于一种音讯分类。 在kafka服务器,分区是以目录模式存在的,每个分区目录中,kafka会按配置大小或配置周期将分区拆分成多个段文件(LogSegment), 每个段由三局部组成: 磁盘文件:*.log位移索引文件:*.index工夫索引文件:*.timeindex其中*.log用于存储音讯自身的数据内容,*.index存储音讯在文件中的地位(包含音讯的逻辑offset和物理存储offset),*.timeindex存储音讯创立工夫和对应逻辑地址的映射关系。 段文件结构图如下 : 将分区拆分成多个段是为了管制存储的文件大小,如果整个分区只保留为一个文件,那随着分区里音讯的增多,文件也将越来越大,最初不可管制。而如果每个音讯都保留为一个文件,那文件数量又将变得微小,同样容易失去管制。所以kafka采纳段这种形式,管制了每个文件的大小,也不便管制所有文件的数量。同时,这些文件因为大小适中,能够很不便地通过操作系统mmap机制映射到内存中,进步写入和读取效率。这个设计的另一个益处是:当零碎要革除过期数据时,能够间接将过期的段文件删除,十分简洁。 然而这里也会有一个问题:如果每个音讯都要在index文件中保留地位信息,那么index文件也很容易变得很大,这样又会削弱上文所说的益处。所以在kafka中,index设计为稠密索引来升高index的文件大小,这样,index文件存储的理论内容为:该段音讯在音讯队列中的绝对offset和在log文件中的物理偏移量映射的稠密记录。 那么多少条音讯会在index中保留一条记录呢?这个能够通过系统配置来进行设置。索引记录固定为8个字节大小,别离为4个字节的绝对offset(音讯在partition中全局offset减去该segment的起始offset),4个字节的音讯具体存储文件的物理偏移量。 index文件结构图如下: Kafka不会在消费者拉取完音讯后马上就清理音讯,而是会保留段文件一段时间,直到其过期再标记为可清理,由后台程序定期进行清理。这种机制使得消费者能够反复生产音讯,满足更灵便的需要。 查问机制 下面说过,kafka尽管作为音讯零碎,然而生产音讯并不是通过推送而是通过拉取来生产的,client须要通过offset和size参数被动去查问音讯。 kafka收到客户端申请后,对音讯的寻址会通过上面几个步骤: 查找具体的Log Segment,kafka将段信息缓存在跳跃表中,所以这个步骤将从跳跃表中获取段信息。依据offset在index文件中进行定位,找到匹配范畴的偏移量position,此时失去的是一个近似起始文件偏移量。从Log文件的position地位处开始往后寻找,直到找到offset处的音讯。kafka读取示意图: RabbitMQ vs kafka 介绍了kafka的实现原理,咱们再来比照一下同样作为音讯队列服务的RabbitMQ。MQ的利用也很宽泛,性能多而全,那么和MQ相比,kafka有哪些劣势呢?为什么咱们会应用kafka而摈弃了RabbitMQ呢? RabbitMQ流程图: RabbitMQ消费者只能从队列头部按序进行生产,音讯一旦被生产,就会被打上删除标记,紧接着生产下一条音讯,没方法进行回溯操作,这样的话一个消费者生产完音讯,另一个消费者就别想再生产了。而Kafka提供动静指定生产位点,可能灵便地进行回溯生产操作,只有该音讯还在生命周期内能够反复拉取,并且不同消费者能够互不烦扰的生产同一个音讯队列,这就比RabbitMQ灵便多了。 kafka生产位点示意图: RabbitMQ如果要满足多个消费者生产同一个音讯队列,也能够借助exchange路由能力,然而这样会将音讯复制到多个队列,每个消费者须要绑定一个本人的队列进行生产。如果有几百个消费者,那么队列复制几百倍,引起mq的音讯水位猛涨,容易失控。而kafka就没这个问题,不论多少个消费者都只须要一个队列就能满足,每个消费者都能够残缺地不互相烦扰地生产队列中的所有音讯。 当然,RabbitMQ也有其长处,它提供的exchange,binding, queue等形象实体,提供弱小的路由关系(rounte key and bindkey)和音讯过滤能力。作为传统音讯零碎提供了细粒度的音讯控制能力。而Kafka次要是面向高流量,大吞吐的批处理零碎,在路由形象方面化繁为简,重点关注零碎的高吞吐,所以应用上更为简洁。 kafka还有传统解决方案无奈满足的高伸缩能力等劣势,这里就不一一介绍了。 Kafka在邮件系统data bus中的使用 正因为kafka有着以上介绍的能力和劣势,咱们的邮箱服务中采纳了它作为音讯零碎,其中一个利用就是邮件系统的data bus。 data bus介绍 邮件系统用户收发信流程随同着大量的业务逻辑和子系统调用,如果将这些流程都强附丽在骨干枝上,将会对系统造成较大的压力,整个业务流程也将变得复杂而迟缓。所以通过数据总线将主次流程进行解耦,加重收发信主流程的复杂度,使其能够以更快的速度实现,放慢零碎响应工夫。主流程产生事件源,通过kafka的传输,触发多个主要流程,主要流程能够并发在零碎后盾实现,并且能够轻易的扩大多种多样的主要流程。 下图以简化后的信流程为例: Kafka在data bus中的使用 邮件系统在实现收发信流程后,会生成当次流程相干的零碎事件,比方新邮件事件。data bus将这些事件写入到kafka集群的相应topic中,上游的一系列子系统对topic进行生产。 每个不同的流程会对应不必的topic,以辨别不同类别的事件,比方新进邮件,邮件已读,邮件删除等。每个topic能够依据各自的音讯吞吐量和并发需要划分成多个partition,比方新进邮件量大能够划分成256个分区,邮件删除量小则能够划分32个分区。每个事件按什么机制来调配到相应的分区呢?一般来说能够按邮筒来划分,同一个邮筒的事件进入同一个partition,这样就保障了同一邮筒产生的事件的程序。不同事件的时效性可能有不同,所以其须要保留的工夫也能够不同,能够依据业务的需要来设置topic的保留时长。因为事件全副写入到kafka中,后台任务能够任意生产,所以能够灵便地减少不同的业务流程。如下图所示,利用生产能力能借助Kafka集群实现弹性扩容 总    结 kafka在邮件系统中的利用给咱们带来的益处: ...

July 24, 2020 · 1 min · jiezi

关于kafka:快速搭建kafka集群3台

默认三台机器曾经装置java,设置好ssh免密连贯。1.下载kafkahttp://kafka.apache.org/downl...2.发送到集群解压到指定目录/usr/local下 tar -zxvf kafka_2.11-0.10.0.1.tar.gz -C /usr/local3.进入kafka_2.11-0.10.0.1目录,创立文件夹zk_kfk_data(自取),并在该目录下创立myid文件,内容在三个集群中不同,别离是1,2,3 cd kafka_2.11-0.10.0.1mkdir zk_kfk_datavi myid4.创立目录 mkdir logsmkdir kafka-logs-15.批改/config/zookeeper.properties文件 cd configvi zookeeper.properties6.批改server.properties vi server.properties7.把kafka整个文件夹分发给两个子节点 scp -r /usr/local/kafka_2.11-0.10.1.1 hadoop@centos2:/usr/localscp -r /usr/local/kafka_2.11-0.10.1.1 hadoop@centos3:/usr/local8.批改centos2和centos3的myid ssh centos2cd /usr/local/kafka_2.11-0.10.1.1/zk_kfk_datavi myid ssh centos3cd /usr/local/kafka_2.11-0.10.1.1/zk_kfk_datavi myid9.批改centos2和centos3的server.properties ssh centos2cd /usr/local/kafka_2.11-0.10.1.1/configvi server.properties ssh centos3cd /usr/local/kafka_2.11-0.10.1.1/configvi server.properties装置实现!运行测试:(默认在/usr/local/kafka_2.11-0.10.1.1目录下执行)10.三台集群别离启动zk: ./bin/zookeeper-server-start.sh config/zookeeper.properties & 11.启动kafka集群 nohup ./bin/kafka-server-start.sh config/server.properties &>> kafka.log &12.创立topic: ./bin/kafka-topics.sh --create --zookeeper centos1:2181,centos2:2181,centos3:2181  --replication-factor 1 --partitions 1 --topic test13.查看topic: ./bin/kafka-topics.sh --list --zookeeper localhost:218114.发送数据: ./bin/kafka-console-producer.sh --broker-list centos1:9092,centos2:9092,centos3:9092 --topic test15.生产: ...

July 22, 2020 · 1 min · jiezi

关于kafka:查看kafka基本信息命令

将miner-profit topic正本由一个减少到3个replication.json { "version": 1, "partitions": [ { "topic": "miner-profit", "partition": 0, "replicas": [ 1, 2, 3 ] }, { "topic": "miner-profit", "partition": 1, "replicas": [ 1, 2, 3 ] }, { "topic": "miner-profit", "partition": 2, "replicas": [ 1, 2, 3 ] } ]}执行命令 ../kafka1/bin/kafka-reassign-partitions.sh --zookeeper dev1.xiayu.com:2181,dev2.xiayu.com:2182,dev3.xiayu.com:2183 --reassignment-json-file replication.json --execute查看所有topic./kafka1/bin/kafka-topics.sh --list --zookeeper dev1.xiayu.com:2181,dev2.xiayu.com:2182,dev3.xiayu.com:2183创立topic./kafka1/bin/kafka-topics.sh --zookeeper dev1.xiayu.com:2181,dev2.xiayu.com:2182,dev3.xiayu.com:2183 --create --replication-factor 2 --partitions 2 --topic user-login删除topic./kafka1/bin/kafka-topics.sh --zookeeper dev1.xiayu.com:2181,dev2.xiayu.com:2182,dev3.xiayu.com:2183 --delete --topic user-regisger减少topic分区数目./kafka1/bin/kafka-topics.sh --alter --zookeeper dev1.xiayu.com:2181,dev2.xiayu.com:2182,dev3.xiayu.com:2183 --topic miner-profit --partitions 3 查看topic分区信息./kafka1/bin/kafka-topics.sh --describe --zookeeper dev1.xiayu.com:2181,dev2.xiayu.com:2182,dev3.xiayu.com:2183 --topic miner-profit查看一个topic中的所有音讯./kafka1/bin/kafka-console-consumer.sh --bootstrap-server dev1.xiayu.com:9092,dev2.xiayu.com:9093,dev3.xiayu.com:9094 --topic miner-profit --from-beginning查看所有消费者组./kafka1/bin/kafka-consumer-groups.sh --list --bootstrap-server dev1.xiayu.com:9092,dev2.xiayu.com:9093,dev3.xiayu.com:9094查看消费者组生产状况./kafka1/bin/kafka-consumer-groups.sh --bootstrap-server dev1.xiayu.com:9092,dev2.xiayu.com:9093,dev3.xiayu.com:9094 --group xiayu-consumer-group --describecmak 启动./cmak-3.0.0.5/bin/cmak -Dconfig.file=./cmak-3.0.0.5/conf/application.conf -Dhttp.port=8333

July 21, 2020 · 1 min · jiezi

关于kafka:kafka-系列-2搭建与实践

前言入手实际往往比看看更重要???? 单机版 Docker 搭建version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_PORT: 9092 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"注意事项: 如果想要 java 客户端可能失常连贯上 kafka, 须要配置宿主机的 hostsudo vim /etc/hosts172.20.10.6 kafka如何应用 kafka 自带的 kafka-console-producer 测试发送音讯?kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test集群版 + kafka managerkafka 集群 docker-composeversion: '2'services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092" KAFKA_LISTENERS: "PLAINTEXT://kafka1:9092" KAFKA_PORT: 9092 kafka2: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9093:9093" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9093" KAFKA_LISTENERS: "PLAINTEXT://kafka2:9093" KAFKA_PORT: 9093 kafka3: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9094:9094" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9094" KAFKA_LISTENERS: "PLAINTEXT://kafka3:9094" KAFKA_PORT: 9094注意事项: ...

July 19, 2020 · 2 min · jiezi

关于kafka:Kafka扩容之分区扩容

分区扩容举例,主题“user_order”目前是1个分区,这里将该主题分区减少到6个,察看批改后果kafka-topics.sh --partitions 6 --alter --zookeeper dn1:2181,dn2:2181,dn3:2181 --topic user_order这里只是批改分区数,然而数据还没有迁徙过来 应用kafka提供的工具kafka-reassign-partitions.sh来迁徙数据。迁徙数据须要分三步做: 生成迁徙打算先手动生成一个topic.json,内容如下。这里topic能够是一个列 {"topics": [{ "topic": "user_order"}],"version": 1}执行如下语句 --topics-to-move-json-file ./bin/kafka-reassign-partitions.sh --zookeeper dn1:2181,dn2:2181,dn3:2181 --topics-to-move-json-file topic.json --broker-list "0,1,2,3,4" --generate这句命令的意思是,将topic.json里的topic迁徙到broker-list列表里列的broker上,会失去一个迁徙执行打算 Current partition replica assignment { "version": 1, "partitions": [....]}Proposed partition reassignment configuration { "version": 1, "partitions": [.....]}新建一个文件reassignment.json,保留上边这些信息。其中Current partition replica assignment指以后的分区状况,Proposed partition reassignment configuration是打算的分区状况 数据迁徙执行如下命令 ./bin/kafka-reassign-partitions.sh --zookeeper dn1:2181,dn2:2181,dn3:2181 --reassignment-json-file reassignment.json --execute验证 ./bin/kafka-reassign-partitions.sh --zookeeper dn1:2181,dn2:2181,dn3:2181 --reassignment-json-file reassignment.json --verify

July 18, 2020 · 1 min · jiezi

基础科普一文带你了争Kafka基本原理

起源:阿凡卢,www.cnblogs.com/luxiaoxun/p/5492646.html简介 Apache Kafka是分布式公布-订阅音讯零碎。它最后由LinkedIn公司开发,之后成为Apache我的项目的一部分。Kafka是一种疾速、可扩大的、设计外在就是分布式的,分区的和可复制的提交日志服务。 Kafka架构 它的架构包含以下组件: 话题(Topic):是特定类型的音讯流。音讯是字节的无效负载(Payload),话题是音讯的分类名或种子(Feed)名。生产者(Producer):是可能公布音讯到话题的任何对象。服务代理(Broker):已公布的音讯保留在一组服务器中,它们被称为代理(Broker)或Kafka集群。消费者(Consumer):能够订阅一个或多个话题,并从Broker拉数据,从而生产这些已公布的音讯。 Kafka存储策略 1)kafka以topic来进行音讯治理,每个topic蕴含多个partition,每个partition对应一个逻辑log,有多个segment组成。 2)每个segment中存储多条音讯(见下图),音讯id由其逻辑地位决定,即从音讯id可间接定位到音讯的存储地位,防止id到地位的额定映射。 3)每个part在内存中对应一个index,记录每个segment中的第一条音讯偏移。 4)发布者发到某个topic的音讯会被平均的散布到多个partition上(或依据用户指定的路由规定进行散布),broker收到公布音讯往对应partition的最初一个segment上增加该音讯,当某个segment上的音讯条数达到配置值或音讯公布工夫超过阈值时,segment上的音讯会被flush到磁盘,只有flush到磁盘上的音讯订阅者能力订阅到,segment达到肯定的大小后将不会再往该segment写数据,broker会创立新的segment。 Kafka删除策略 1)N天前的删除。 2)保留最近的MGB数据。 Kafka broker 与其它音讯零碎不同,Kafka broker是无状态的。这意味着消费者必须保护已生产的状态信息。这些信息由消费者本人保护,broker齐全不论(有offset managerbroker治理)。 从代理删除音讯变得很辣手,因为代理并不知道消费者是否曾经应用了该音讯。Kafka创新性地解决了这个问题,它将一个简略的基于工夫的SLA利用于保留策略。当音讯在代理中超过肯定工夫后,将会被主动删除。这种翻新设计有很大的益处,消费者能够成心倒回到老的偏移量再次生产数据。这违反了队列的常见约定,但被证实是许多消费者的基本特征。以下摘抄自kafka官网文档: Kafka Design 指标 1) 高吞吐量来反对高容量的事件流解决 2) 反对从离线零碎加载数据 3) 低提早的音讯零碎 长久化 1) 依赖文件系统,长久化到本地 2) 数据长久化到log 效率 1) 解决”small IO problem“: 应用”message set“组合音讯。 server应用”chunks of messages“写到log。 consumer一次获取大的音讯块。 2)解决”byte copying“: 在producer、broker和consumer之间应用对立的binary message format。 应用零碎的pagecache。 应用sendfile传输log,防止拷贝。 端到端的批量压缩(End-to-end Batch Compression) Kafka反对GZIP和Snappy压缩协定。 The Producer 负载平衡 1)producer能够自定义发送到哪个partition的路由规定。默认路由规定:hash(key)%numPartitions,如果key为null则随机抉择一个partition。 2)自定义路由:如果key是一个user id,能够把同一个user的音讯发送到同一个partition,这时consumer就能够从同一个partition读取同一个user的音讯。 异步批量发送 批量发送:配置不多于固定音讯数目一起发送并且等待时间小于一个固定提早的数据。 ...

July 13, 2020 · 1 min · jiezi

kafka-系列-1基本概念

前言思考的过程往往比间接失去论断更加重要 kafka 利用场景利用监控网站用户行为追踪流数据持久性日志基本概念在说基本概念前,先看一下 kafka 的零碎架构 Broker 一般而言,一台机器就是一个 broker,当然 1 台机器上能够部署多个 brokerProducer 音讯的生产者Consumer 音讯的消费者Consumer Group 消费者组,组内能够有多个消费者,共享同一个 groupid。生产组内的消费者,个别状况下为同一个消费者部署多个实例。Topic topic 在 kafka 中是一个逻辑上的概念,用于将 partition 分类。1 个 topic 有多个 partition。生产者将音讯发送到指定的 topic 中,消费者从指定的 topic 进行生产。partition 一个可追加的日志存储文件。kafka 的分区能够散布在不同的 broker 上多正本机制 kafka 为 partition 引入了多正本机制,能够通过减少正本数量来晋升容灾能力。同一分区中的不同正本保留雷同的音讯。正本之间个别是 一主多从。如下图,每个分区有 3 个正本 AR 集群中的正本,统称 AR(Assigned Replication),AR = ISR + OSRISR 与 leader 正本放弃肯定同步的正本,称为 ISR(in-sync-replication)。音讯需先发送到 leader 正本,follower 能力从 leader 正本中拉取音讯OSR 与 leader 正本同步滞后过多的正本,称为 OSR(out-sync-replication)。leader 正本 leader 正本负责保护 follower 正本的状态,当 ISR 正本中滞后 leader 正本过多,会被移除到 OSR 正本中。当 OSR 正本跟上了 leader 正本,会被挪动到 ISR 正本。只有在 ISR 汇合的正本,才有机会选举 leaderHW、LEO HW(Hight Watermark)俗称高水位,标识一个特定的偏移量,消费者只能拉取该偏移量之前的音讯 LEO(Low End Offset)日志最初偏移量。标识日志文件中待写入音讯的偏移量关系图图 2: HW、LEO 的关系图 3 - 图4: leader 正本、follower、HW、LEO 关系 音讯写入之后,LEO 变成5,follower,会从 leader 中拉取音讯,进行同步 当 ISR 汇合的正本都写入 3 后,HW 就会变成 4,示意 0-3 的音讯为可生产 当 ISR 汇合都写入 3 、4 之后,HW、LEO 值都变成 5总结 kafka 音讯的写入,是可靠性和可用性的衡量。当 ISR 正本均写入音讯时,不必期待 OSR 正本也写入音讯。这里防止了 OSR 滞后太多,从而导致不可用性,且音讯被写入到多个正本,也保障了音讯的可靠性。 当 leader 写入完音讯立刻挂掉后,ISR 正本因未能同步到音讯,从而导致音讯失落。与 RocketMQ 区别partition 在 RocketMQ 中为 队列。生产模型也简直统一,基于生产组进行生产RocketMQ 有自带的注册核心,无需 zookeeper。kafka partition 有多正本机制,RocketMQ 队列没有多正本机制kafka 多正本机制有丢音讯问题,RocketMQ 则没有从设计上来看,RocketMQ 与 kafka 的解决的利用场景不一样。RocketMQ 重视音讯的可靠性,而 kafka 在这一方面比拟弱,kafka 更重视零碎吞吐量。因而 kafka 不适宜要求音讯不能丢的场景。

July 12, 2020 · 1 min · jiezi

kafka真实环境部署规划转载

kafka真实环境部署规划1. 操作系统选型因为kafka服务端代码是Scala语言开发的,因此属于JVM系的大数据框架,目前部署最多的3类操作系统主要由Linux ,OS X 和Windows,但是部署在Linux数量最多,为什么呢?因为I/O模型的使用和数据网络传输效率两点。 第一:Kafka新版本的Clients在设计底层网络库时采用了Java的Select模型,而在Linux实现机制是epoll,感兴趣的读者可以查询一下epoll和select的区别,明确一点就是:kafka跑在Linux上效率更高,因为epoll取消了轮询机制,换成了回调机制,当底层连接socket数较多时,可以避免CPU的时间浪费。第二:网络传输效率上。kafka需要通过网络和磁盘进行数据传输,而大部分操作系统都是通过Java的FileChannel.transferTo方法实现,而Linux操作系统则会调用sendFile系统调用,也即零拷贝(Zero Copy 技术),避免了数据在内核地址空间和用户程序空间进行重复拷贝。2. 磁盘类型规划机械磁盘(HDD) 一般机械磁盘寻道时间是毫秒级的,若有大量随机I/O,则将会出现指数级的延迟,但是kafka是顺序读写的,因此对于机械磁盘的性能也是不弱的,所以,基于成本问题可以考虑。固态硬盘(SSD) 读写速度可观,没有成本问题可以考虑。JBOD (Just Bunch Of Disks ) 经济实惠的方案,对数据安全级别不是非常非常高的情况下可以采用,建议用户在Broker服务器上设置多个日志路径,每个路径挂载在不同磁盘上,可以极大提升并发的日志写入速度。RAID 磁盘阵列 常见的RAID是RAID10,或者称为(RAID 1+0) 这种磁盘阵列结合了磁盘镜像和磁盘带化技术来保护数据,因为使用了磁盘镜像技术,使用率只有50%,注意,LinkedIn公司采用的就是RAID作为存储来提供服务的。那么弊端在什么地方呢?如果Kafka副本数量设置为3,那么实际上数据将存在6倍的冗余数据,利用率实在太低。因此,LinkedIn正在计划更改方案为JBOD.3. 磁盘容量规划我们公司物联网平台每天大约能够产生一亿条消息,假设副本replica设置为2 (其实我们设置为3),数据留存时间为1周,平均每条上报事件消息为1K左右,那么每天产生的消息总量为:1亿 乘 2 乘 1K 除以 1000 除以 1000 =200G磁盘。预留10%的磁盘空间,为210G。一周大约为1.5T。采用压缩,平均压缩比为0.5,整体磁盘容量为0.75T。 关联因素主要有: 新增消息数副本数是否启用压缩消息大小消息保留时间4. 内存容量规划kafka对于内存的使用,并不过多依赖JVM 内存,而是更多的依赖操作系统的页缓存,consumer若命中页缓存,则不用消耗物理I/O操作。一般情况下,java堆内存的使用属于朝生夕灭的,很快会被GC,一般情况下,不会超过6G,对于16G内存的机器,文件系统page cache 可以达到10-14GB。 怎么设计page cache,可以设置为单个日志段文件大小,若日志段为10G,那么页缓存应该至少设计为10G以上。堆内存最好不要超过6G。5. CPU选择规划kafka不属于计算密集型系统,因此CPU核数够多就可以,而不必追求时钟频率,因此核数选择最好大于8。 6. 网络带宽决定Broker数量带宽主要有1Gb/s 和10 Gb/s 。我们可以称为千兆位网络和万兆位网络。举例如下: 我们的物联网系统一天每小时都要处理1Tb的数据,我们选择1Gb/b带宽,那么需要选择多少机器呢? 假设网络带宽kafka专用,且分配给kafka服务器70%带宽,那么单台Borker带宽就是710Mb/s,但是万一出现突发流量问题,很容易把网卡打满,因此在降低1/3,也即240Mb/s。因为1小时处理1TTB数据,每秒需要处理292MB,1MB=8Mb,也就是2336Mb数据,那么一小时处理1TB数据至少需要2336/240=10台Broker数据。冗余设计,最终可以定为20台机器。作者:凯新的技术社区 链接:https://juejin.im/post/5bd464... 来源:掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 1. kafka生产者吞吐量测试指标kafka-producer-perf-test :是kafka提供的测试Producer性能脚本,通过脚本,可以计算出Producer在一段时间内的平均延时和吞吐量。 1.1 kafka-producer-perf-test在kafka安装目录下面执行如下命令,生产环境中尽量让脚本运行较长的时间,才会有意义: bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 200 --througthput -1 --producer-props bootstrap.servers=bd-master:9092,bd-slave1=9092,bd-slave3=9092 acks=1 ...

July 3, 2020 · 2 min · jiezi

监测kafka-lag值的shell脚本

自己手写了一个监测kafka lag值的shell脚本。之前是用python写的,感觉比较麻烦,这里写了一个shell版的,大家可以直接拿来使用。cd /usr/share/kafka/kafka_2.11-2.4.1/ || exit 1lag=$(./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my_group 2>/dev/null|grep -v GROUP|awk 'NR>1{num+=$6}END{print num}')echo "$lag"if [ "$lag" -gt 10 ];then echo "lag值过大" #或者mail或者send_ding_msg,自行设置fi下面是执行情况。 最后可以将这个脚本添加到crontab定时任务,我目前是每10分钟执行一次,还没有遇到消息堆积的情况。

June 28, 2020 · 1 min · jiezi

Kafka控制器选举原理

1. Kafka控制器介绍 在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本(一个分区会有多个副本,其中只有leader副本对外提供读写服务)出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当为某个Topic增加分区数量时,由控制器负责分区的重新分配。 分区集合介绍:分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas)。 leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。当OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader。 2. Kafka控制器选举原理 Kafka中的控制器选举工作依赖于Zookeeper,成功竞选成为控制器的broker会在Zookeeper中创建/controller临时(Ephemeral)节点,此临时节点的内容参考如下: {"version":1,"brokerid":0,"timestamp":"1593330804078"} 其中version与Kafka版本相关,对同一个Kafka版本来说为固定值。brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳(精确到毫秒)。 在任意时刻,集群中有且只有一个控制器。每个broker启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到的brokerid的值不为-1,表示已经有其他broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller节点,或者这个节点的数据异常,那么就会尝试去创建/controller节点。当前broker去创建节点的时候,也有可能有其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。 Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(Persistent)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch值用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器纪元”。 controller_epoch的初始值为1,即集群中的第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了(也就是说接收到这种请求的broker已经不再是控制器了)。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。 具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下: 监听分区的变化。监听主题的变化。监听broker相关的变化。从Zookeeper中读取获取当前所有与主题、分区及broker有关的信息并进行相应的管理。启动并管理分区状态机和副本状态机。更新集群的元数据信息。 当/controller节点的数据发生变化时,每个broker都会更新自身内存中保存的activeControllerId。如果broker在数据变更前是控制器,在数据变更后自身的brokerid值与新的activeControllerId值不一致,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。有可能控制器由于异常而下线,造成/controller这个临时节点被自动删除;也有可能是其他原因将此节点删除了。 当/controller节点被删除时,每个broker都会进行选举,如果broker在节点被删除前是控制器,那么在选举前还需要有一个“退位”的动作。如果有特殊需要,则可以手动删除/controller节点来触发新一轮的选举。当然关闭控制器所对应的broker,以及手动向/controller节点写入新的brokerid的所对应的数据,同样可以触发新一轮的选举。 3. 总结 Kafka控制器选择的流程并不复杂,但是考虑的各种边界条件还是比较周到的,程序的健壮性比较好。有时候我们需要根据业务场景去设计或者调整某种分布式集群,Kafka控制器的选举也可以是一个很好的借鉴,尤其是/controller_epoch的设计,考虑到了控制器可能会因为某种原因过时,保证了控制器的唯一性。 4. 参考文献《深入理解Kafka:核心设计与实践原理》

June 28, 2020 · 1 min · jiezi

快速搭建-Kafka-集群

版本JDK 14ZookeeperKafka安装 Zookeeper 和 KafkaKafka 依赖 Zookeeper,所以我们需要在安装 Kafka 之前先拥有 Zookeeper。准备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你自己的环境中的主机地址即可。 version: "3"services: zookeeper: image: zookeeper build: context: ./ container_name: zookeeper ports: - 2181:2181 volumes: - ./data/zookeeper/data:/data - ./data/zookeeper/datalog:/datalog - ./data/zookeeper/logs:/logs restart: always kafka_node_0: depends_on: - zookeeper build: context: ./ container_name: kafka-node-0 image: wurstmeister/kafka environment: KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 2 ports: - 9092:9092 volumes: - ./data/kafka/node_0:/kafka restart: unless-stopped kafka_node_1: depends_on: - kafka_node_0 build: context: ./ container_name: kafka-node-1 image: wurstmeister/kafka environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093 KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 2 ports: - 9093:9093 volumes: - ./data/kafka/node_1:/kafka restart: unless-stopped kafka_node_2: depends_on: - kafka_node_1 build: context: ./ container_name: kafka-node-2 image: wurstmeister/kafka environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094 KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 2 ports: - 9094:9094 volumes: - ./data/kafka/node_2:/kafka restart: unless-stopped输入 docker-compose up -d 运行脚本文件进行集群构建。等待一会儿,得到如下结果即为成功。 ...

June 21, 2020 · 2 min · jiezi

不要被kafka的异步模式欺骗了

啥是异步模式kafka的生产者可以选择使用异步方式发送数据,所谓异步方式,就是我们调用 send() 方法,并指定一个回调函数, 服务器在返回响应时调用该函数。 kafka在客户端里暴露了两个send方法,我们可以自己选择同步或者异步模式。我们来看一个kafka的生产者发送示例,有个直观的感受。这个示例是一个同步的模式。 ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “测试”);//Topic Key Valuetry{Future future = producer.send(record);future.get();//获取执行结果} catch(Exception e) {e.printStackTrace();}我们从源码层面来继续看下。 首先kafka定义了一个接口, 然后KafkaProducer实现了这两个方法,我们看下异步方法的实现逻辑。 可以看到最终是调用doSend方法,调用的时候传入一个回调。这个回调就是监听方法的执行结果的。 异步模式也会阻塞的很多人会认为,既然是异步模式,不管结果是成功还是失败,肯定方法调用会马上返回的。那我只能告诉你,不好意思,不一定是这样。我自己就曾经踩过这个坑。 我们当时有个业务流程需要在执行完成后发送kakfa消息给某个业务方,为了尽量减少影响我这个主流程的执行时间,采用了异步方式发送kafka消息。在使用中,因为配错了kafka的TOPIC信息,发现流程阻塞发送消息这里长达6秒(kafka默认的发送超时时间)。 究竟为啥异步方式还会阻塞呢?我们继续看源码。 不管是同步模式还是异步模式,最终都会调用到doSend方法,注意看上图中的waitOnMetadata方法,我上面说的阻塞的情况就是阻塞在这个方法里。那我们继续看这个方法。 通过代码中的注释我们大概能了解这个方法的功能,不过我这里还是要解释下。(防止有人看不懂英文,哈哈) waitOnMetadata获取当前的集群元数据信息,如果缓存有,并且分区没有超过指定分区范围则缓存返回,否则触发更新,等待新的metadata。这个等待的操作在下面这行代码: metadata.awaitUpdate(version, remainingWaitMs);然后就继续跟喽, 这个方法很好理解,就是一直在等一个条件,这个条件达到了就返回,否则一直等待超时退出。而这个条件就是当前的版本号要大于上个版本号。 那么谁来更新版本号呢?就是我们前面提到的sender线程。当我们的topic配置错误的时候导致metadata一直无法更新,然后一直等到超时。 破案了! 总结kafka的异步模式可以让我们在业务场景中发送消息时即刻返回,不必等待发送的结果。但是当metadata取不到时,发送的过程还是需要等待一直超时的。 程序员是一个尤其需要不断学习的工种,平时养成阅读源码的习惯,不光能避免踩一些坑,还能在遇到问题是快递定位到问题的根源。

June 13, 2020 · 1 min · jiezi

数据源管理-Kafka集群环境搭建消息存储机制详解

本文源码:GitHub·点这里 || GitEE·点这里 一、Kafka集群环境1、环境版本版本:kafka2.11,zookeeper3.4注意:这里zookeeper3.4也是基于集群模式部署。 2、解压重命名tar -zxvf kafka_2.11-0.11.0.0.tgzmv kafka_2.11-0.11.0.0 kafka2.11创建日志目录 [root@en-master kafka2.11]# mkdir logs注意:以上操作需要同步到集群下其他服务上。 3、添加环境变量vim /etc/profileexport KAFKA_HOME=/opt/kafka2.11export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile4、修改核心配置[root@en-master /opt/kafka2.11/config]# vim server.properties-- 核心修改如下# 唯一编号broker.id=0# 开启topic删除delete.topic.enable=true# 日志地址log.dirs=/opt/kafka2.11/logs# zk集群zookeeper.connect=zk01:2181,zk02:2181,zk03:2181注意:broker.id安装集群服务个数编排即可,集群下不能重复。 5、启动kafka集群# 启动命令[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties# 停止命令[root@node02 kafka2.11]# bin/kafka-server-stop.sh# 进程查看[root@node02 kafka2.11]# jps注意:这里默认启动了zookeeper集群服务,并且集群下的kafka分别启动。 6、基础管理命令创建topic bin/kafka-topics.sh --zookeeper zk01:2181 \--create --replication-factor 3 --partitions 1 --topic one-topic参数说明: replication-factor 定义副本个数partitions 定义分区个数topic:定义topic名称查看topic列表 bin/kafka-topics.sh --zookeeper zk01:2181 --list修改topic分区 bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5查看topic bin/kafka-topics.sh --zookeeper zk01:2181 \--describe --topic one-topic发送消息 ...

June 11, 2020 · 2 min · jiezi

译为什么-Kafka-这么快

博客原文https://taohuawu.club/why-kaf... 为什么 Kafka 如此地快探究是哪些精妙的设计决策使得 Kafka 成为了现如今的性能强者。软件体系结构在过去的几年间发生了巨大的变化。单体应用程序或甚至几个粗粒度的服务共享一个公共数据存储的理念,在全世界的软件从业者的头脑中早已不复存在了。自主微服务、事件驱动架构和职责分离 (CQRS) 模式是构建以业务为中心的现代应用程序的主要工具。除此之外,设备连接物联网、移动和可穿戴设备的普及,正在对系统在接近实时的情况下必须处理的事件数量造成越来越大的压力。 我们首先要接受一个共识:术语『快』是一个多义的、复杂的甚至是模糊不清的词。延迟、吞吐量和抖动,这些指标会影响人们对这个术语的理解。它还具有内在的上下文关系:行业和应用领域本身就设置了关于性能的规范和期望。某个东西是否『快』很大程度上取决于一个人的参照系。 Apache Kafka 以延迟和抖动为代价对吞吐量进行了优化,同时保留了其他必须的功能特性,比如持久化、严格的日志记录顺序和至少交付一次的语义。当有人说 "Kafka 很快",并且假定他们至少是有资格说这话的,那么我们可以认为他们指的是 Kafka 在短时间内安全地积累和分发大量日志记录的能力。 从历史上看,Kafka 诞生于 LinkedIn 的业务需求:高效地移动大量的消息,每小时的数据量达数 TB 。因为时间的可变性,单个消息的传播延迟被认为是次要的。毕竟,LinkedIn 不是从事高频交易的金融机构,也不是需要在确定的时限内完成指定操作的工业控制系统。Kafka 可用于实现近实时(或称为软实时)的系统。 注意:对于不熟悉这个术语的人,这里必须说明一下,实时并不等同于快速,它仅仅意味着 "可预测"。具体点说,实时意味着完成一个指定操作所需的硬性时间上限,或称为截止时间。如果系统作为一个整体不能每次都满足这个时限(内完成操作),它就不能被归类为实时。能够在具有小概率超时容错性的时限范围内完成操作的系统被称为近实时系统。就吞吐量而言,实时系统通常比近实时或非实时的系统要慢。Kafka 的高性能主要得益于两个要素,这两个要素需要分开来讨论。第一个与客户端 (Client) 和 代理 (Broker) 实现上的底层效率有关。第二个则来自于流数据处理的机会性并行。 Broker 性能日志结构的持久性Kafka 利用了一种分段式的、只追加 (Append-Only) 的日志,基本上把自身的读写操作限制为顺序 I/O,也就使得它在各种存储介质上能有很快的速度。一直以来,有一种广泛的误解认为磁盘很慢。实际上,存储介质 (特别是旋转式的机械硬盘) 的性能很大程度依赖于访问模式。在一个 7200 转/分钟的 SATA 机械硬盘上,随机 I/O 的性能比顺序 I/O 低了大概 3 到 4 个数量级。此外,一般来说现代的操作系统都会提供预读和延迟写技术:以大数据块的倍数预先载入数据,以及合并多个小的逻辑写操作成一个大的物理写操作。正因为如此,顺序 I/O 和随机 I/O 之间的性能差距在 flash 和其他固态非易失性存储介质中仍然很明显,尽管它远没有旋转式的存储介质那么明显。 日志记录批处理顺序 I/O 在大多数的存储介质上都非常快,几乎可以和网络 I/O 的峰值性能相媲美。在实践中,这意味着一个设计良好的日志结构的持久层将可以紧随网络流量的速度。事实上,Kafka 的瓶颈通常是网络而非磁盘。因此,除了由操作系统提供的底层批处理能力之外,Kafka 的 Clients 和 Brokers 会把多条读写的日志记录合并成一个批次,然后才通过网络发送出去。日志记录的批处理通过使用更大的包以及提高带宽效率来摊薄网络往返的开销。 ...

June 9, 2020 · 3 min · jiezi

聊一聊高并发高可用那些事-Kafka篇

目录 为什么需要消息队列1.异步 :一个下单流程,你需要扣积分,扣优惠卷,发短信等,有些耗时又不需要立即处理的事,可以丢到队列里异步处理。 2.削峰 :按平常的流量,服务器刚好可以正常负载。偶尔推出一个优惠活动时,请求量极速上升。由于服务器 Redis,MySQL 承受能力不一样,如果请求全部接收,服务器负载不了会导致宕机。加机器嘛,需要去调整配置,活动结束后用不到了,即麻烦又浪费。这时可以将请求放到队列里,按照服务器的能力去消费。 3.解耦 :一个订单流程,需要扣积分,优惠券,发短信等调用多个接口,出现问题时不好排查。像发短信有很多地方需要用到, 如果哪天修改了短信接口参数,用到的地方都得修改。这时可以将要发送的内容放到队列里,起一个服务去消费, 统一发送短信。 高吞吐、高可用 MQ 对比分析看了几个招聘网站,提到较多的消息队列有:RabbitMQ、RocketMQ、Kafka 以及 Redis 的消息队列和发布订阅模式。 Redis 队列是用 List 数据结构模拟的,指定一端 Push,另一端 Pop,一条消息只能被一个程序所消费。如果要一对多消费的,可以用 Redis 的发布订阅模式。Redis 发布订阅是实时消费的,服务端不会保存生产的消息,也不会记录客户端消费到哪一条。在消费的时候如果客户端宕机了,消息就会丢失。这时就需要用到高级的消息队列,如 RocketMQ、Kafka 等。 ZeroMQ 只有点对点模式和 Redis 发布订阅模式差不多,如果不是对性能要求极高,我会用其它队列代替,毕竟关解决开发环境所需的依赖库就够折腾的。 RabbitMQ 多语言支持比较完善,特性的支持也比较齐全,但是吞吐量相对小些,而且基于 Erlang 语言开发,不利于二次开发和维护。 RocketMQ 和 Kafka 性能差不多,基于 Topic 的订阅模式。RocketMQ 支持分布式事务,但在集群下主从不能自动切换,导致了一些小问题。RocketMQ 使用的集群是 Master-Slave ,在 Master 没有宕机时,Slave 作为灾备,空闲着机器。而 Kafka 采用的是 Leader-Slave 无状态集群,每台服务器既是 Master 也是 Slave。 Kafka 相关概念在高可用环境中,Kafka 需要部署多台,避免 Kafka 宕机后,服务无法访问。Kafka集群中每一台 Kafka 机器就是一个 Broker。Kafka 主题名称和 Leader 的选举等操作需要依赖 ZooKeeper。 ...

June 7, 2020 · 2 min · jiezi

为什么使用消息队列消息队列有什么优点和缺点

面试题为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?面试官心理分析其实面试官主要是想看看: 第一,你知不知道你们系统里为什么要用消息队列这个东西?不少候选人,说自己项目里用了 Redis、MQ,但是其实他并不知道自己为什么要用这个东西。其实说白了,就是为了用而用,或者是别人设计的架构,他从头到尾都没思考过。没有对自己的架构问过为什么的人,一定是平时没有思考的人,面试官对这类候选人印象通常很不好。因为面试官担心你进了团队之后只会木头木脑的干呆活儿,不会自己思考。 第二,你既然用了消息队列这个东西,你知不知道用了有什么好处&坏处?你要是没考虑过这个,那你盲目弄个 MQ 进系统里,后面出了问题你是不是就自己溜了给公司留坑?你要是没考虑过引入一个技术可能存在的弊端和风险,面试官把这类候选人招进来了,基本可能就是挖坑型选手。就怕你干 1 年挖一堆坑,自己跳槽了,给公司留下无穷后患。 第三,既然你用了 MQ,可能是某一种 MQ,那么你当时做没做过调研?你别傻乎乎的自己拍脑袋看个人喜好就瞎用了一个 MQ,比如 Kafka,甚至都从没调研过业界流行的 MQ 到底有哪几种。每一个 MQ 的优点和缺点是什么。每一个 MQ 没有绝对的好坏,但是就是看用在哪个场景可以扬长避短,利用其优势,规避其劣势。如果是一个不考虑技术选型的候选人招进了团队,leader 交给他一个任务,去设计个什么系统,他在里面用一些技术,可能都没考虑过选型,最后选的技术可能并不一定合适,一样是留坑。 面试题剖析为什么使用消息队列其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么? 面试官问你这个问题,期望的一个回答是说,你们公司有个什么业务场景,这个业务场景有个什么技术挑战,如果不用 MQ 可能会很麻烦,但是你现在用了 MQ 之后带给了你很多的好处。 先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异步、削峰。 解耦看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃...... 在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊! 如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。 总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。 面试技巧:你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 作解耦。 ...

June 4, 2020 · 2 min · jiezi

Kafka-Streams未来可期

核心知识预热TIPS1.资料来源说明书以及内部构造2.学习技术就是不断解惑的过程,就kafka stream自问:是个什么技术,能干什么,怎么使用..Kafka Streams是一个数据输入和数据输出都保存在kafka集群的程序和微服务构建的客户端类库,那么就不需要专门去搭建计算集群,方便快捷;Kafka Streams提供两种方法来定义流处理拓扑。Kafka Streams DSL提供了最通用的可直接使用的数据转换操作(比如map);低阶的处理器API则允许开发者定义和连接到自定义的处理器或者和state store进行交互。也就是说前者是高阶API,封装好了的,通用场景使用且能快速开发;后者是低阶API,更接近底层,开发难度大但是能更好地适配程序和业务。Kafka Streams同样支持状态统计、窗口函数、eventTime和exactly-once语义等实时场景;前置概念conceptdescstream processing application多个处理器形成的拓扑结构,包含有一定处理逻辑的应用程序processor topology流处理器拓扑,是processor+...+processor的形式,source和sink是特殊的processorSource Processor源头处理器,即上游没有其他的流处理器,从kafka的topic中消费数据产生数据流输送到下游Sink Processor结果处理器,即下游没有其他的流处理器,将上游的数据输送到指定的kafka topicTime联想flink的时间语义,例如某某time1手机端购买某商品,产生了日志数据,然后time2这个日志数据被实时采集到Kafka持久化到topic,然后进入流式处理框架,在time3正式被计算,那么time123分别称为:event time,ingestion time,processing timestates保存和查询数据状态的功能,可以定义流处理应用外的程序进行只读访问processing guarantees消费是否丢失和是否重复的级别,比如exactly-once,at-least-once,at-most-once拓扑kafka stream的拓扑其实就是一个个processor连接起来的流程图,其中source和sink是比较特殊的processor,分别没有上游和下游处理器。拓扑创建方式是在创建下游processor的时候指定上游的processor名称进行连接 // DSL转换算子生成新KStream是调用void addGraphNode(final StreamsGraphNode parent,final StreamsGraphNode child) {}// 直接通过builder添加processorpublic synchronized Topology addProcessor(final String name,final ProcessorSupplier supplier,final String... parentNames) {} 使用使用上核心都是四个步骤: 创建流处理应用配置参数;构造流处理拓扑结构;创建流处理客户端实例;开始执行流处理程序;使用DSL编写单词统计测试代码 /* 1.props */ Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");//可作为consumer的group id props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//kafka的地址,多个逗号分隔 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());// 序列化和反序列化,在读取和写出流的时候、在读取和写出state的时候都会用到 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); /* 2.topology */ final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input");//source processor,传入参数可定义key,value的序列化方式,以及时间提取器等 source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))//KString<String,String> .groupBy((key, value) -> value)// KGroupedStream<String,String> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//KTable<String,String> .toStream()//KStream<String,Long> .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//sink processor,指定输出key,value的数据类型 final Topology topology = builder.build(); /* 3.KafkaStreams实例 */ final KafkaStreams streams = new KafkaStreams(topology, props); // CountDownLatch用await()阻塞当前线程,countDown()记录完成线程的数量 // 当getCount()=0的时候继续执行await后续的代码 final CountDownLatch latch = new CountDownLatch(1); System.out.println(topology.describe());// 打印流处理拓扑 // 钩子函数 Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { // 4.执行 streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0);测试数据# 生产者打印生产数据langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input>hello hello hello hello>kafka kafka kafka kafka# 消费者打印消费数据langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerhello 4kafka 4打印拓扑这里可以看到有点类似于宽依赖的时候,拓扑会划分,中间会生成streams-wordcount-counts-store-repartition主题保存中间结果。 ...

June 3, 2020 · 3 min · jiezi

Kafka内核高水位与Leader-Epoch

前言你可能听说过高水位,但不一定听说过Leader Epoch。前者是Kafka中非常重要的概念。而后者是0.11版本中新推出的。主要是为了弥补前者水位机制的一些缺陷。 1.高水位1.1 什么是高水位Kafka的水位不是时间戳,更与时间无关。它是和位置信息绑定的,具体来说,它是用消息位移来表征的。1.2 高水位的作用定义消息可见性,用来标识分区下的哪些消息是可以被消费者消费的帮助kafka完成副本同步1.3 已提交消息和未提交消息 在分区高水位以下的消息就被认为是已提交消息,反之就是未提交消息消费者只能消费已提交消息,即位移值小于8的消息。这里不存在kafka的事务,因为事务机制会影响消息者所能看到的消息的范围,他不只是简单依赖高水位来判断,是依赖于一个名为LSO的位移值来判断事务性消费者的可见性位移值等于高水位的消息也属于为未提交消息。即高水位的消息也是不能被消费者消费的LEO表示副本写入下一条消息的位移值。同一个副本对象,起高水位值不会超过LEO1.4 高水位更新机制Kafka中所有副本对象都保存一组高水位值和LEO值,但Leader副本中还保留着其他Follower副本的LEO值。 Kafka副本机制在运行过程中,会更新Broker1上Follower副本的高水位和LEO值,同时也会更新Broker0上Leader副本的高水位和LEO以及Follow副本的LEO,但不会更新其HW。Leader副本处理生产者请求得逻辑如下: 1.写入消息到本次磁盘2.更新分区高水位 获取Leader副本所在Broker端保存得所有远程副本LEO值(LEO-1,LEO-2,......,LEO-n)获取Leader副本高水位值:currentHW更新currentHW=max{currentHW,min(LEO-1,LEO-2,.....,LEO-n)}处理Follower副本拉取消息的逻辑如下:1.读取磁盘(或页缓存)中的数据。2.使用Follower副本发送请求中位移值更新远程副本LEO的值。3.更新分区高水位值(具体步骤与处理生产者请求的步骤相同) Follower副本从Leader拉取消息的处理逻辑如下: 1.写入消息到本地磁盘2.更新LEO值3.更新高水位 获取Leader发送的高水位值:currentHW获取步骤2中更新的LEO值:currentLEO更新高水位为min(currnetHW,currentLEO)1.5 副本同步机制解析当生产者发送一条消息时,Leader和Follower副本对应的高水位是怎么被更新的呢? 首先是初始状态,remoteLEO指的是远程副本的LEO值。在初始状态时,所有值都是0. 当生产者给主题分区发送一条消息后,状态变更为: 此时,Leader副本成功讲消息写入了本地磁盘,故LEO值被更新为1 Follower再次尝试从Leader拉取消息。有消息拉去后,状态进一步变更: 这时,Follower副本也成功地更新LEO为1.此时,Leader和Follower副本的LEO都是1,但各自的高水位依然是0,还没有被更新。他们需要在下一轮的拉取中被更新 在新一轮的拉去请求中,由于位移值是0的消息已经拉取成功,因此Follower副本这次请求拉去的位移值为1的消息。Leader副本接收此请求后,更新远程副本LEO为1,然后更新Leader高水位为1,然后才会将更新过的高水位值1发送给Follower副本。Follower副本接收到以后,也将自己的高水位值更新为1.至此,一个完整的消息同步周期就结束了。 2. Leader Epoch依托高水位,Kafka既界定了消息的对外可见性,又实现了异步的副本同步机制。 但,在上文分析过程中,Follower副本的高水位更新是需要额外一轮的拉取请求才能实现的。若有多个副本的情况下,则需要多轮的拉取请求。也就是说,Leader副本高水位更新和Follower副本高水位更新在时间上是存在错配的。而这种错配往往是数据丢失,数据不一致问题现象的根源。因此kafka社区在0.11版本中引入了Leader Epoch。 2.1 Leader Epoch的组成Epoch。一个单调递增的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的Leader被认为是过期的Leader,不能再行使Leader的权力。起始位移(Start Offset)。Leader副本在该Epoch上写入的首条消息的位移。Leader Epoch<0,0>和<1,100>。第一个Epoch指的是0版本,位移0开始保存消息,一共保存100条消息。之后Leader发生了变更,版本号增加到1,新版本起始位移为100.Kafka Broker会在内存中为每个分区都缓存Leader Epoch数据,同时它还会定期的将这信息持久化一个checkpoint文件中。当Leader副本写入消息到磁盘时,Broker会尝试更新这部分缓存,如果该Leader是首次写入消息,那么Broker会向缓存中增加一个Leader Epoch条目,否则就不做更新。 2.2 Leader Epoch使用Leader Epoch是怎样防止数据丢失的呢? 单纯依赖高水位是怎么造成数据丢失的。开始时,副本A和副本B都处于正常状态,A是Leader副本,B是Follower副本。当生产者使用ack=1(默认)往Leader副本A中发送两条消息。且A全部写入成功,此时Kafka会通知生产者说这两条消息写入成功。现在假设A,B都写入了这两条消息,而且Leader副本的高水位也已经更新了,但Follower副本高水位还未更新。因为Follower端高水位的更新与Leader端有时间错配。假如现在副本B所在Broker宕机了,那么当它重启回来后,副本B就会执行日志截断操作,将LEO值调整为之前的高水位值,也就是1.所以副本B当中位移值为1的消息就丢失了。副本B中只保留了位移值0的消息。 当执行完截断操作之后,副本B开始从A中拉取消息,执行正常的消息同步。假如此时副本A所在的Broker也宕机了。那么kafka只能让副本B成为新的Leader,然后副本A重启回来之后,也需要执行日志截断操作,即调整高水位为与B相同的值,也就是1。这样操作之后,位移值为1的那条消息就永远丢失了。 Leader Epoch机制如何规避这种数据丢失现象呢? 延续上文场景,引用了Leader Epoch机制之后,Follower副本B重启回来后,需要向A发送一个特殊的请求去获取Leader的LEO值,该例子中为2。当知道Leader LEO为2时,B发现该LEO值不必自己的LEO值小,而且缓存中也没有保存任何起始位移值>2的Epoch条目,因此B无需执行日志截断操作。这是对高水位机制的一次明显改进,即不是依赖于高水位判断是否进行日志截断操作。现在,副本A宕机了,B成立新Leader。同样的,在A重启回来后,执行与B逻辑相同的判断,也不需要执行日志截断操作,所以位移值为1的那条消息就全部得以保存。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,kafka就规避掉了数据丢失的场景。

June 3, 2020 · 1 min · jiezi

一文读懂Kafka副本机制

前言副本机制就是备份机制,指的是在分布式集群机器中保存着相同的数据备份。 那么副本机制的好处的是什么呢? 提供数据冗余(主要作用)提供高伸缩性改善数据局部性总之: 副本机制是kafka确保系统高可用和高持久的重要基石。 1.副本所谓副本,本质上就是一个只能追加写消息的提交日志。这些日志被相同的分散保存在不同的Broker上。在实际生产上,每台Broker都可能保存有各个主题下不同分区的不同副本。因此单个Broker上存有成百上千个副本现象是非常正常的。 1.1 副本角色既然多个Broker中保存分区下的多个副本,那么是如何保证副本当中的数据都是一致的呢? 针对这个问题,kafka的解决方案就是领导者副本机制 领导者的副本机制工作原理 在kafka中,副本分成两类:领导者副本和追随者副本。每个分区在创建时都要选举一个副本,成为领导者副本,其余的副本自动称为追随者副本。kafka中,追随者副本是不会对外提供服务的,所有的请求都必须由领导者副本来处理。它唯一的任务就是从领导者副本异步拉去消息,并写入到自己提交日志中,从而实现与领导者副本的同步。当领导者副本挂掉了,或者说所在Broker宕机了,kafka可以通过Zookeeper提供的监控功能能够实时感知到,并开启新一轮领导者选举,从追随者副本中选一个作为新的领导者。老Leader副本重启回来后,只能作为追随者副本加入到集群中。一定注意上面第二点,追随者副本是不会对外提供服务的。这也是kafka没能提供读操作横向扩展的根本原因,而且它也不像mysql副本一样有”抗读“的作用,帮助领导者减轻压力。那么这种副本机制设计究竟有什么好处呢?1.2 副本机制的好处1.方便实现“Read-your-writes” 顾名思义,就是当你使用生产者api向kafka成功写入消息后,就马上使用消费者api去读取刚才的消息。举个例子,就是你刚发完一条微博,肯定是希望立马能够看到的。这就是Read-your-writes场景了。如果追随者副本对外提供服务的话,由于副本同步是异步的,因此有可能发生追随者副本还没有及时从领导者副本中拉取最新消息,从而使客户端看不到最新的消息。2.方便实现单调读 什么是单调读。单调读就是消费者在多次读消息时候,不会看到一条消息一会儿存在一会儿不存在。例如:如果允许追随者副本提供读服务,那么假设当前有两个追随者副本F1,F2。生产者往领导者中发送了消息后,F1,F2开始异步拉取消息。若F1拉取成功了,而F2还未拉取成功。此时消费者第一次消费F1副本获取最新消息,第二次消费的时候消费到了F2副本。就获取不到该条消息了。这就不是单调读一致性。所以都由Leader副本来处理请求的话,就能实现单调读。1.3 In-sync Replicas(ISR)上文提及到的追随者副本不对外提供服务,只是定期的异步拉取消息。既然是异步的,那么就存在着不可能与Leader实时同步的风险。所以kafka应该告诉我们,追随者副本到底在什么条件之下才算与Leader同步。 基于这个想法,kafka引入了ISR,副本集合。ISR中的副本都是与Leader同步的副本,相反,不在ISR中的追随者副本被认为是与Leader不同步的。那么进入ISR到底需要满足什么条件才能进入呢。 首先需要明确一点。ISR不只是追随者副本集合,它必然包括Leader副本。甚至在某些情况下,ISR只有Leader这一个副本。 图中有3个副本:1个领导者副本,2个追随者副本。领导者副本写入了10条消息,F1同步了6条,F3同步了3条。那么哪个追随者副本与Leader不同步呢?事实上,这两个副本都有可能与Leader副本不同步,但也可能同步。它实际上不是依靠与消息条数来进行判断的。而是根据Broker端参数replica.lag.time.max.ms参数值。这个参数的含义就是Follower副本能够落后Leader副本的最长时间间隔,当前默认值是10秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。若是同步过程的速度持续慢于Leadr副本的写入速度,那么在replica.lag.time.max.ms时间后,kafka就会自动收缩ISR集合,将改副本提出集合。值得注意的是,若改副本后面慢慢追上了Leader的进度。那么它是可以被重新放入ISR集合中的。这也表明ISR是一个动态调整的集合,而非静态不变的。 Unclean 领导者选举既然ISR可以动态调整,那么就会出现ISR为空的情况。ISR为空的情况就代表Leader副本也挂掉了。那么kafka就需要重新选举新的Leader。那么该怎么选举Leader呢? kafka把所有不在ISR的存活副本都成为非同步副本。通常来说,非同步副本落后Leader太多,因此,如果选择这些副本为新的Leader,就可能出现数据的丢失。在kafka,选举Leader这种过程被成为Unclean。由Broker端参数unclean.leader.election.enable控制是否允许Unclean领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。不过并不建议开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。

June 2, 2020 · 1 min · jiezi

kafka学习笔记

一、什么是Kafka?Apache kafka 是一个开源的分布式消息队列,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目,该项目的目的是为处理实时数据提供一个高吞吐量、低等待的平台。 二、kafka架构 1)Producer:消息生产者,向kafka broker发消息的客户端 2)Consumer:消息消费者,向kafka broker取消息的客户端 3)Topic:对消息进行归类 4)Consumer Group(GC):这是kafka 用来实现一个topic 消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic 可以有多个CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion 只会把消息发给该CG 中的一个consumer。如果需要实现广播,只要每个consumer 有一个独立的CG 就可以了。要实现单播只要所有的consumer 在同一个CG。用CG 还可以将consumer 进行自由的分组而不需要多次发送消息到不同的topic 5)Broker:一台kafka服务器就是一个Broker,一个集群由多个Broker组成。一个Broker可以容纳多个Topic 6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个partition,每个partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。kafka 只保证按一个partition 中的顺序将消息发给 consumer,不保证一个topic 的整体(多个partition 间)的顺序 7)Offset:消息偏移量,方便查找。 三、Producer向Kafka写消息的流程 1)producer 先从zookeeper 的 "/brokers/.../state"节点找到该partition的leader 2)producer 将消息发送给该leader 3)leader 将消息写入本地log 4)followers 从leader pull消息,写入本地log 后向leader发送ACK 5)leader收到所有ISR中的replication 的ACK后,增加HW(high watermark,最后commit的offset)并向producer 发送ACK 四、kafka的分区数和消费者个数的关系topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。即分区数决定了同组消费者个数的上限,所以,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。 Kafka提供的两种Consumer消费Partition的分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。具体参考:https://www.jianshu.com/p/dbbca800f607 五、kafka如何实现高可用?Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。 ...

May 28, 2020 · 2 min · jiezi

安装Zookeeper和Kafka集群

配置/etc/hosts文件vi /etc/hosts# 添加192.168.200.110 master192.168.200.111 slave1192.168.200.112 slave2安装Zookeeper集群下载Zookeeper安装包下载地址:https://www.apache.org/dyn/cl... 创建对应的ZK数据和日志目录# 创建ZK的数据目录,同时需要创建myid指定这个节点的IDmkdir -p /software/zookeeper/zkdata/vi /software/zookeeper/zkdata/myid# 创建ZK的日志目录mkdir /software/zookeeper/zklogs/修改zoo.cfg文件先复制一份: cp zoo_sample.cfg zoo.cfg开始修改: cat zoo.cfg# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial # synchronization phase can takeinitLimit=10# The number of ticks that can pass between # sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just # example sakes.#dataDir=/tmp/zookeeper# the port at which the clients will connectclientPort=2181# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the # administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to "0" to disable auto purge feature#autopurge.purgeInterval=1# ZK的数据目录,myid也在里面dataDir=/software/zookeeper/zkdata# ZK的日志目录dataLogDir=/software/zookeeper/zklogs# 集群模式的各个节点,如果是自身的话,需要配置为0.0.0.0,而不是master/IP地址,不然可能会出现zk之间无法连接通信的情况。(如果您提供了公共IP,则侦听器将无法连接到端口,您必须为当前节点指定0.0.0.0)server.1=0.0.0.0:2888:3888server.2=slave1:2888:3888server.3=slave2:2888:3888启动/停止ZK/查看状态/重启zkServer.sh startzkServer.sh stopzkServer.sh statuszkServer.sh restartZK的相关命令操作# 连接ZK[root@master conf]# zkCli.sh -server 127.0.0.1:2181# 显示根目录下文件[zk: 127.0.0.1:2181(CONNECTED) 0] ls /[admin, brokers, cluster, config, consumers, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]# 显示根目录下文件,并能看到更新次数等数据[zk: 127.0.0.1:2181(CONNECTED) 1] ls2 /'ls2' has been deprecated. Please use 'ls [-s] path' instead.[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]cZxid = 0x0ctime = Thu Jan 01 08:00:00 CST 1970mZxid = 0x0mtime = Thu Jan 01 08:00:00 CST 1970pZxid = 0x400000003cversion = 10dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 0numChildren = 10# 创建文件,并设置初始化内容[zk: 127.0.0.1:2181(CONNECTED) 3] create /lzhpo-test1 "hello,lzhpo-test1~"Created /lzhpo-test1# 查看文件内容[zk: 127.0.0.1:2181(CONNECTED) 6] get /lzhpo-test1hello,lzhpo-test1~# 修改文件内容[zk: 127.0.0.1:2181(CONNECTED) 7] set /lzhpo-test1 "test1"[zk: 127.0.0.1:2181(CONNECTED) 8] get /lzhpo-test1test1# 删除文件[zk: 127.0.0.1:2181(CONNECTED) 9] delete /lzhpo-test1[zk: 127.0.0.1:2181(CONNECTED) 10] ls /[admin, brokers, cluster, config, consumers, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]剩下的,请看官方文档:https://zookeeper.apache.org/... ...

May 27, 2020 · 7 min · jiezi

消息队列二三事

最近在看kafka的代码,就免不了想看看消息队列的一些要点:服务质量(QOS)、性能、扩展性等等,下面一一探索这些概念,并谈谈在特定的消息队列如kafka或者mosquito中是如何具体实现这些概念的。服务质量服务语义服务质量一般可以分为三个级别,下面说明它们不同语义。At most once至多一次,消息可能丢失,但绝不会重复传输。生产者:完全依赖底层TCP/IP的传输可靠性,不做特殊处理,所谓“发送即忘”。kafka中设置acks=0。消费者:先保存消费进度,再处理消息。kafka中设置消费者自动提交偏移量并设置较短的提交时间间隔。At least once至少一次,消息绝不会丢,但是可能会重复。生产者:要做消息防丢失的保证。kafka中设置acks=1 或 all并设置retries>0。消费者:先处理消息,再保存消费进度。kafka中设置消费者自动提交偏移量并设置很长的提交时间间隔,或者直接关闭自动提交偏移量,处理消息后手动调用同步模式的偏移量提交。Exactly once精确一次,每条消息肯定会被传输一次且仅一次。这个级别光靠消息队列本身并不好保证,有可能要依赖外部组件。生产者:要做消息防丢失的保证。kafka中设置acks=1 或 all并设置retries>0。mosquito中通过四步握手与DUP、MessageID等标识来实现单次语义。消费者:要做消息防重复的保证,有多种方案,如:在保存消费进度和处理消息这两个操作中引入两阶段提交协议;让消息幂等;让消费处理与进度保存处于一个事务中来保证原子性。kafka中关闭自动提交偏移量,并设置自定义的再平衡监听器,监听到分区发生变化时从外部组件读取或者存储偏移量,保证自己或者其他消费者在更换分区时能读到最新的偏移量从而避免重复。总之就是结合ConsumerRebalanceListener、seek和一个外部系统(如支持事务的数据库)共同来实现单次语义。此外,kafka还提供了GUID以便用户自行实现去重。kafka 0.11版本通过3个大的改动支持EOS:1.幂等的producer;2. 支持事务;3. 支持EOS的流式处理(保证读-处理-写全链路的EOS)。这三个级别可靠性依次增加,但是延迟和带宽占用也会增加,所以实际情况中,要依据业务类型做出权衡。可靠性上面的三个语义不仅需要生产者和消费者的配合实现,还要broker本身的可靠性来进行保证。可靠性就是只要broker向producer发出确认,就一定要保证这个消息可以被consumer获取。kafka 中一个topic有多个partition,每个partition又有多个replica,所有replica中有一个leader,ISR是一定要同步leader后才能返回提交成功的replica集,OSR内的replica尽力的去同步leader,可能数据版本会落后。在kafka工作的过程中,如果某个replica同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。可以配置min.insync.replicas指定ISR中的replica最小数量,默认该值为1。LEO是分区的最新数据的offset,当数据写入leader后,LEO就立即执行该最新数据,相当于最新数据标识位。HW是当写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到,相当于所有副本同步数据标识位。每个partition的所有replica需要进行leader选举(依赖ZooKeeper)。在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。当ISR中所有replica都宕机该partition就不可用了,可以设置unclean.leader.election.enable=true,该选项使得kafka选择任何一个活的replica成为leader然后继续工作,此replica可能不在ISR中,就可能导致数据丢失。所以实际使用中需要进行可用性与可靠性的权衡。kafka建议数据可靠存储不依赖于数据强制刷盘(会影响整体性能),而是依赖于replica。顺序消费顺序消费是指消费者处理消息的顺序与生产者投放消息的顺序一致。主要可能破坏顺序的场景是生产者投放两条消息AB,然后A失败重投递导致消费者拿到的消息是BA。kafka中能保证分区内部消息的有序性,其做法是设置max.in.flight.requests.per.connection=1,也就是说生产者在未得到broker对消息A的确认情况下是不会发送消息B的,这样就能保证broker存储的消息有序,自然消费者请求到的消息也是有序的。但是我们明显能感觉到这会降低吞吐量,因为消息不能并行投递了,而且会阻塞等待,也没法发挥 batch 的威力。如果想要整个topic有序,那就只能一个topic一个partition了,一个consumer group也就只有一个consumer了。这样就违背了kafka高吞吐的初衷。重复消费重复消费是指一个消息被消费者重复消费了。 这个问题也是上面第三个语义需要解决的。一般的消息系统如kafka或者类似的rocketmq都不能也不提倡在系统内部解决,而是配合第三方组件,让用户自己去解决。究其原因还是解决问题的成本与解决问题后获得的价值不匹配,所以干脆不解决,就像操作系统对待死锁一样,采取“鸵鸟政策”。但是kafka 0.11还是处理了这个问题,见发行说明,维护者是想让用户无可挑剔嘛 [笑cry]。性能衡量一个消息系统的性能有许多方面,最常见的就是下面几个指标。连接数是指系统在同一时刻能支持多少个生产者或者消费者的连接总数。连接数和broker采用的网络IO模型直接相关,常见模型有:单线程、连接每线程、Reactor、Proactor等。单线程一时刻只能处理一个连接,连接每线程受制于server的线程数量,Reactor是目前主流的高性能网络IO模型,Proactor由于操作系统对真异步的支持不太行所以尚未流行。kafka的broker采用了类似于Netty的Reactor模型:1(1个Acceptor线程)+N(N个Processor线程)+M(M个Work线程)。其中Acceptor负责监听新的连接请求,同时注册OPACCEPT事件,将新的连接按照RoundRobin的方式交给某个Processor线程处理。每个Processor都有一个NIO selector,向 Acceptor分配的 SocketChannel 注册 OPREAD、OPWRITE事件,对socket进行读写。N由num.networker.threads决定。Worker负责具体的业务逻辑如:从requestQueue中读取请求、数据存储到磁盘、把响应放进responseQueue中等等。M的大小由num.io.threads决定。Reactor模型一般基于IO多路复用(如select,epoll),是非阻塞的,所以少量的线程能处理大量的连接。如果大量的连接都是idle的,那么Reactor使用epoll的效率是杠杠的,如果大量的连接都是活跃的,此时如果没有Proactor的支持就最好把epoll换成select或者poll。具体做法是-Djava.nio.channels.spi.SelectorProvider把sun.nio.ch包下面的EPollSelectorProvider换成PollSelectorProvider。QPS是指系统每秒能处理的请求数量。QPS通常可以体现吞吐量(该术语很广,可以用TPS/QPS、PV、UV、业务数/小时等单位体现)的大小。kafka中由于可以采用 batch 的方式(还可以压缩),所以每秒钟可以处理的请求很多(因为减少了解析量、网络往复次数、磁盘IO次数等)。另一方面,kafka每一个topic都有多个partition,所以同一个topic下可以并行(注意不是并发哟)服务多个生产者和消费者,这也提高了吞吐量。平均响应时间平均响应时间是指每个请求获得响应需要的等待时间。kafka中处理请求的瓶颈(也就是最影响响应时间的因素)最有可能出现在哪些地方呢?网络? 有可能,但是这个因素总体而言不是kafka能控制的,kafka可以对消息进行编码压缩并批量提交,减少带宽占用;磁盘? 很有可能,所以kafka从分利用OS的pagecache,并且对磁盘采用顺序写,这样能大大提升磁盘的写入速度。同时kafka还使用了零拷贝技术,把普通的拷贝过程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,内核buffer到用户buffer的拷贝过程省略了,加快了处理速度。此外还有文件分段技术,每个partition都分为多个segment,避免了大文件操作的同时提高了并行度。CPU? 不大可能,因为消息队列的使用并不涉及大量的计算,常见消耗有线程切换、编解码、压缩解压、内存拷贝等,这些在大数据处理中一般不是瓶颈。并发数是指系统同时能处理的请求数量数。一般而言,QPS = 并发数/平均响应时间 或者说 并发数 = QPS*平均响应时间。 这个参数一般只能估计或者计算,没法直接测。顾名思义,机器性能越好当然并发数越高咯。此外注意用上多线程技术并且提高代码的并行度、优化IO模型、减少减少内存分配和释放等手段都是可以提高并发数的。扩展性消息系统的可扩展性是指要为系统组件添加的新的成员的时候比较容易。kafka中扩展性的基石就是topic采用的partition机制。第一,Kafka允许Partition在cluster中的Broker之间移动,以此来解决数据倾斜问题。第二,支持自定义的Partition算法,比如你可以将同一个Key的所有消息都路由到同一个Partition上去(来获得顺序)。第三,partition的所有replica通过ZooKeeper来进行集群管理,可以动态增减副本。第四,partition也支持动态增减。对于producer,不存在扩展问题,只要broker还够你连接就行。对于consumer,一个consumer group中的consumer可以增减,但是最好不要超过一个topic的partition数量,因为多余的consumer并不能提升处理速度,一个partition在同一时刻只能被一个consumer group中的一个consumer消费代码上的可扩展性就属于设计模式的领域了,这里不谈。参考《kafka技术内幕》Kafka的存储机制以及可靠性Kafka 0.11.0.0 是如何实现 Exactly-once 语义的查看原文,来自mageekchiu。总结不到位的地方请不吝赐教。

August 30, 2018 · 1 min · jiezi

Kafka源码系列之源码分析zookeeper在kafka的作用

以kafka0.8.2.2源码为例给大家进行讲解的。纯属个人爱好,希望大家对不足之处批评指正。__一,zookeeper在分布式集群的作用____1,数据发布与订阅(配置中心)__发布与订阅模型,即所谓的配置中心,顾名思义就是讲发布者将数据发布到zk节点上,共订阅者动态获取数据,实现配置的集中式管理和动态更新。例如,全局的配置信息,服务服务框架的地址列表就非常适合使用。__2,负载均衡__即软件负载均衡。最典型的是消息中间件的生产、消费者负载均衡。 ...

May 30, 2018 · 1 min · jiezi

kafka [B cannot be cast to java.lang.String

自己写了一个Kafka发送消息的demo,但是发送消息的时候,却报了kafka [B cannot be cast to java.lang.String的错误,后来找到了解决办法原来是因为在定义config文件的时候,针对 serializer.class部分,错误的当成了StringEncoder,其实修改成默认的encoder就行了 props.put("serializer.class", "kafka.serializer.DefaultEncoder");

October 24, 2017 · 1 min · jiezi