共计 5840 个字符,预计需要花费 15 分钟才能阅读完成。
kafka 真实环境部署规划
1. 操作系统选型
因为 kafka 服务端代码是 Scala 语言开发的,因此属于 JVM 系的大数据框架,目前部署最多的 3 类操作系统主要由 Linux,OS X 和 Windows, 但是部署在 Linux 数量最多,为什么呢?因为 I / O 模型的使用和数据网络传输效率两点。
- 第一:Kafka 新版本的 Clients 在设计底层网络库时采用了 Java 的 Select 模型,而在 Linux 实现机制是 epoll, 感兴趣的读者可以查询一下 epoll 和 select 的区别,明确一点就是:kafka 跑在 Linux 上效率更高,因为 epoll 取消了轮询机制,换成了回调机制,当底层连接 socket 数较多时,可以避免 CPU 的时间浪费。
- 第二:网络传输效率上。kafka 需要通过网络和磁盘进行数据传输,而大部分操作系统都是通过 Java 的 FileChannel.transferTo 方法实现,而 Linux 操作系统则会调用 sendFile 系统调用,也即零拷贝(Zero Copy 技术),避免了数据在内核地址空间和用户程序空间进行重复拷贝。
2. 磁盘类型规划
- 机械磁盘(HDD)一般机械磁盘寻道时间是毫秒级的,若有大量随机 I /O,则将会出现指数级的延迟,但是 kafka 是顺序读写的,因此对于机械磁盘的性能也是不弱的,所以,基于成本问题可以考虑。
- 固态硬盘(SSD)读写速度可观,没有成本问题可以考虑。
- JBOD (Just Bunch Of Disks) 经济实惠的方案,对数据安全级别不是非常非常高的情况下可以采用,建议用户在 Broker 服务器上设置多个日志路径,每个路径挂载在不同磁盘上,可以极大提升并发的日志写入速度。
- RAID 磁盘阵列 常见的 RAID 是 RAID10,或者称为(RAID 1+0)这种磁盘阵列结合了磁盘镜像和磁盘带化技术来保护数据,因为使用了磁盘镜像技术,使用率只有 50%,注意,LinkedIn 公司采用的就是 RAID 作为存储来提供服务的。那么弊端在什么地方呢?如果 Kafka 副本数量设置为 3,那么实际上数据将存在 6 倍的冗余数据,利用率实在太低。因此,LinkedIn 正在计划更改方案为 JBOD.
3. 磁盘容量规划
我们公司物联网平台每天大约能够产生一亿条消息,假设副本 replica 设置为 2(其实我们设置为 3),数据留存时间为 1 周,平均每条上报事件消息为 1K 左右,那么每天产生的消息总量为:1 亿 乘 2 乘 1K 除以 1000 除以 1000 =200G 磁盘。预留 10% 的磁盘空间,为 210G。一周大约为 1.5T。采用压缩,平均压缩比为 0.5,整体磁盘容量为 0.75T。关联因素主要有:
- 新增消息数
- 副本数
- 是否启用压缩
- 消息大小
- 消息保留时间
4. 内存容量规划
kafka 对于内存的使用,并不过多依赖 JVM 内存, 而是更多的依赖操作系统的页缓存,consumer 若命中页缓存,则不用消耗物理 I / O 操作。一般情况下,java 堆内存的使用属于朝生夕灭的,很快会被 GC, 一般情况下,不会超过 6G,对于 16G 内存的机器,文件系统 page cache 可以达到 10-14GB。
- 怎么设计 page cache,可以设置为单个日志段文件大小,若日志段为 10G, 那么页缓存应该至少设计为 10G 以上。
- 堆内存最好不要超过 6G。
5. CPU 选择规划
kafka 不属于计算密集型系统,因此 CPU 核数够多就可以,而不必追求时钟频率,因此核数选择最好大于 8。
6. 网络带宽决定 Broker 数量
带宽主要有 1Gb/s 和 10 Gb/s。我们可以称为千兆位网络和万兆位网络。举例如下:我们的物联网系统一天每小时都要处理 1Tb 的数据,我们选择 1Gb/ b 带宽,那么需要选择多少机器呢?
- 假设网络带宽 kafka 专用,且分配给 kafka 服务器 70% 带宽,那么单台 Borker 带宽就是 710Mb/s,但是万一出现突发流量问题,很容易把网卡打满,因此在降低 1 /3, 也即 240Mb/s。因为 1 小时处理 1TTB 数据,每秒需要处理 292MB,1MB=8Mb,也就是 2336Mb 数据,那么一小时处理 1TB 数据至少需要 2336/240=10 台 Broker 数据。冗余设计,最终可以定为 20 台机器。
作者:凯新的技术社区
链接:https://juejin.im/post/5bd464…
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
1. kafka 生产者吞吐量测试指标
kafka-producer-perf-test : 是 kafka 提供的测试 Producer 性能脚本,通过脚本,可以计算出 Producer 在一段时间内的平均延时和吞吐量。
1.1 kafka-producer-perf-test
在 kafka 安装目录下面执行如下命令, 生产环境中尽量让脚本运行较长的时间,才会有意义:
bin/kafka-producer-perf-test.sh –topic test –num-records 500000 –record-size 200 –througthput -1 –producer-props bootstrap.servers=bd-master:9092,bd-slave1=9092,bd-slave3=9092 acks=1
1.2 测试结果分析如下:
500000 records sent ,41963 records/sec (8.00 MB/sec),2362.85 ms/avg latency ,3513.00 ms max latency ,2792ms 50h ,3144ms 95th ,3364 ms 99h,3503ms 99.9th
看到上面的结果肯定蒙了,看我细细讲来:kafka 的平均吞吐量是 8.00 MB/sec,即占用 64Mb/ s 左右的带宽,平均每一秒发送 41963 条消息。平均延时为 2362.85 ms,最大延时为 3513.00 ms,95% 的消息发送需要 3144ms,99% 的消息发送需要 3364ms,99.9% 的消息发送需要 3503ms。
2. kafka 消费者吞吐量指标说明:
2.1 kafka-consumer-perfs
我们总共测试 500 万条数据量 bin/kafka-consumer-perfs-test.sh –broker-list bd-master:9092,bd-slave1=9092,bd-slave3=9092 –message-size 200 –messages 500000 –topic test
2.2 得到如下结果:
2018-10-28 9:39:02 95.4188 92.2313 500271 484289 看到上面的结果肯定蒙了,看我细细讲来:该环境下,1s 内总共消费了 95.4188MB 消息,吞吐量为 92.2313MB/s, 也即 736Mb/s。
作者:凯新的技术社区
链接:https://juejin.im/post/5bd50b…
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
1. Producer 核心工作流程
- Producer 首先使用用户主线程将待发送的消息封装进一个 ProducerRecord 类实例中。
- 进行序列化后,发送给 Partioner,由 Partioner 确定目标分区后,发送到 Producer 程序中的一块内存缓冲区中。
- Producer 的另一个工作线程(即 Sender 线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次的内,统一发送给对应的 broker 中。
2. producer 主要参数设置
2.1 producer 参数 acks 设置(无数据丢失)
在消息被认为是“已提交”之前,producer 需要 leader 确认的 produce 请求的应答数。该参数用于控制消息的持久性,目前提供了 3 个取值:
acks = 0: 表示 produce 请求立即返回,不需要等待 leader 的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
acks = -1: 表示分区 leader 必须等待消息被成功写入到所有的 ISR 副本 (同步副本) 中才认为 produce 请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
acks = 1: 表示 leader 副本必须应答此 produce 请求并写入消息到本地日志,之后 produce 请求被认为成功。如果此时 leader 副本应答请求之后挂掉了,消息会丢失。这是个这种的方案,提供了不错的持久性保证和吞吐。
商业环境推荐:
如果要较高的持久性要求以及无数据丢失的需求,设置 acks = -1。其他情况下设置 acks = 1
2.2 producer 参数 buffer.memory 设置(吞吐量)
该参数用于指定 Producer 端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432 合计为 32M。kafka 采用的是异步发送的消息架构,prducer 启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由一个专属线程负责从缓冲区读取消息进行真正的发送。
商业环境推荐:
- 消息持续发送过程中,当缓冲区被填满后,producer 立即进入阻塞状态直到空闲内存被释放出来,这段时间不能超过 max.blocks.ms 设置的值,一旦超过,producer 则会抛出 TimeoutException 异常,因为 Producer 是线程安全的,若一直报 TimeoutException,需要考虑调高 buffer.memory 了。
- 用户在使用多个线程共享 kafka producer 时,很容易把 buffer.memory 打满。
2.3 producer 参数 compression.type 设置(lZ4)
producer 压缩器,目前支持 none(不压缩),gzip,snappy 和 lz4。
商业环境推荐:
基于公司物联网平台,试验过目前 lz4 的效果最好。当然 2016 年 8 月,FaceBook 开源了 Ztandard。官网测试:Ztandard 压缩率为 2。8,snappy 为 2.091,LZ4 为 2.101。
2.4 producer 参数 retries 设置(注意消息乱序,EOS)
producer 重试的次数设置。重试时 producer 会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了 retries > 0,那么这些情况下 producer 会尝试重试。
商业环境推荐:
- producer 还有个参数:max.in.flight.requests.per.connection。如果设置该参数大约 1,那么设置 retries 就有可能造成发送消息的乱序。
- 版本为 0.11.1.0 的 kafka 已经支持 ” 精确到一次的语义”,因此消息的重试不会造成消息的重复发送。
2.5 producer 参数 batch.size 设置(吞吐量和延时性能)
producer 都是按照 batch 进行发送的,因此 batch 大小的选择对于 producer 性能至关重要。producer 会把发往同一分区的多条消息封装进一个 batch 中,当 batch 满了后,producer 才会把消息发送出去。但是也不一定等到满了,这和另外一个参数 linger.ms 有关。默认值为 16K,合计为 16384.
商业环境推荐:
- batch 越小,producer 的吞吐量越低,越大,吞吐量越大。
2.6 producer 参数 linger.ms 设置(吞吐量和延时性能)
producer 是按照 batch 进行发送的,但是还要看 linger.ms 的值,默认是 0,表示不做停留。这种情况下,可能有的 batch 中没有包含足够多的 produce 请求就被发送出去了,造成了大量的小 batch,给网络 IO 带来的极大的压力。
商业环境推荐:
- 为了减少了网络 IO,提升了整体的 TPS。假设设置 linger.ms=5,表示 producer 请求可能会延时 5ms 才会被发送。
2.7 producer 参数 max.in.flight.requests.per.connection 设置(吞吐量和延时性能)
producer 的 IO 线程在单个 Socket 连接上能够发送未应答 produce 请求的最大数量。增加此值应该可以增加 IO 线程的吞吐量,从而整体上提升 producer 的性能。不过就像之前说的如果开启了重试机制,那么设置该参数大于 1 的话有可能造成消息的乱序。
商业环境推荐:
- 默认值 5 是一个比较好的起始点, 如果发现 producer 的瓶颈在 IO 线程,同时各个 broker 端负载不高,那么可以尝试适当增加该值.
- 过大增加该参数会造成 producer 的整体内存负担,同时还可能造成不必要的锁竞争反而会降低 TPS
作者:凯新的技术社区
链接:https://juejin.im/post/5bd51b…
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
调整 partiton
调整 partition 可以直接执行如下命令:
./kafka-topics.sh --alter --topic topicName --zookeeper $ZK_HOST_NODE --partitions partitionNum
注意替换 topicName、$ZK_HOST_NODE 和 partitionNum 三个参数。
调整 replica factor
调整 replica-factor 需要先创建一个 json 描述文件 replica.json,大致如下:
{
"version": 1,
"partitions": [
{
"topic": "topicName",
"partition": 0,
"replicas": [,]
},
{
"topic": "topicName",
"partition": 1,
"replicas": [,]
},
{
"topic": "topicName",
"partition": 2,
"replicas": [,]
}
]
}
在描述文件中说明分区和副本所在 broker 的 Id 的映射。
而后在 replica.json 所在的位置执行如下命令:
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper $ZK_HOST_NODE --reassignment-json-file replica.json --execute
另外,kafka-manager 是个好东西,可以直接在界面上完成 partiton 数目的调整。可惜不能调整 replica-factor。
转载于:https://www.cnblogs.com/bigbe…