关于kafka:Kafka丢失数据问题优化及重复消费原因分析

5次阅读

共计 5104 个字符,预计需要花费 13 分钟才能阅读完成。

数据失落是一件十分重大的事件事,针对数据失落的问题咱们须要有明确的思路来确定问题所在,针对这段时间的总结,我集体面对 kafka 数据失落问题的解决思路如下:

  1. 是否真正的存在数据失落问题
    比方有很多时候可能是其余共事操作了测试环境,所以首先确保数据没有第三方烦扰。
  2. 理清你的业务流程,数据流向
    数据到底是在什么中央失落的数据,在 Kafka 之前的环节或者 Kafka 之后的流程失落?比方 Kafka 的数据是由 flume 提供的,兴许是 flume 失落了数据,Kafka 天然就没有这一部分数据。
  3. 如何发现有数据失落,又是如何验证的?
    从业务角度思考,例如:教育行业,每年高考后数据量微小,然而却反常的比高考前还少,或者源端数据量和目标端数据量不符。
  4. 定位数据是否在 Kafka 之前就曾经失落还是生产端失落数据的
    Kafka 反对数据的从新回放性能(换个生产 group),清空目标端所有数据,从新生产。
    如果是在生产端失落数据,那么屡次生产后果齐全截然不同的几率很低。
    如果是在写入端失落数据,那么每次后果应该齐全一样(在写入端没有问题的前提下)。
  5. Kafka 环节失落数据
    常见的 Kafka 环节失落数据的起因有:
    如果 auto.commit.enable=true,当 consumer fetch 了一些数据但还没有齐全解决掉的时候,刚好到 commit interval 登程了提交 offset 操作,接着 consumer crash 掉了。这时曾经 fetch 的数据还没有解决实现但曾经被 commit 掉,因而没有机会再次被解决,数据失落。
    网络负载很高或者磁盘很忙写入失败的状况下,没有主动重试重发消息。没有做限速解决,超出了网络带宽限速。kafka 肯定要配置上音讯重试的机制,并且重试的工夫距离肯定要长一些,默认 1 秒钟并不合乎生产环境(网络中断工夫有可能超过 1 秒)。
    如果磁盘坏了,会失落曾经落盘的数据。

    单批数据的长度超过限度会失落数据,报 kafka.common.MessageSizeTooLargeException 异样解决:
    Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
    Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
    Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
    Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker’smessage.max.bytes.)6. partition leader 在未实现正本数 follows 的备份时就宕机的状况,即便选举出了新的 leader 然而曾经 push 的数据因为未备份就失落了。
    Kafka 是多正本的,当你配置了同步复制之后。多个正本的数据都在 PageCache 外面,呈现多个正本同时挂掉的概率比 1 个正本挂掉的概率就很小了。(官网举荐是通过副原本保证数据的完整性的)
  6. Kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个音讯都被存储在磁盘了,如果呈现断电或者机器故障等,PageCache 上的数据就失落了。
    能够通过 log.flush.interval.messages 和 log.flush.interval.ms 来配置 flush 距离,interval 大丢的数据多些,小会影响性能但在 0.8 版本,能够通过 replica 机制保证数据不丢,代价就是须要更多资源,尤其是磁盘资源,Kafka 以后反对 GZip 和 Snappy 压缩,来缓解这个问题是否应用 replica 取决于在可靠性和资源代价之间的 balance。
    同时 Kafka 也提供了相干的配置参数,来让你在性能与可靠性之间衡量(个别默认):
    当达到上面的音讯数量时,会将数据 flush 到日志文件中。默认 10000
    log.flush.interval.messages=10000 当达到上面的工夫 (ms) 时,执行一次强制的 flush 操作。interval.ms 和 interval.messages 无论哪个达到,都会 flush。默认 3000ms
    log.flush.interval.ms=1000 查看是否须要将日志 flush 的工夫距离
    log.flush.scheduler.interval.ms = 3000Kafka 的优化倡议
    producer 端
    • 设计上保证数据的牢靠安全性,根据分区数做好数据备份,设立正本数等 push 数据的形式:同步异步推送数据:衡量安全性和速度性的要求,抉择相应的同步推送还是异步推送形式,当发现数据有问题时,能够改为同步来查找问题。
    • flush 是 Kafka 的外部机制,Kafka 优先在内存中实现数据的替换, 而后将数据长久化到磁盘 Kafka 首先会把数据缓存(缓存到内存中) 起来再批量 flush。能够通过 log.flush.interval.messages 和 log.flush.interval.ms 来配置 flush 距离。
    • 能够通过 replica 机制保证数据不丢代价就是须要更多资源,尤其是磁盘资源,Kafka 以后反对 GZip 和 Snappy 压缩,来缓解这个问题。是否应用 replica(正本)取决于在可靠性和资源代价之间的 balance(均衡)。
    • broker 到 Consumer kafka 的 consumer 提供两种接口
    high-level 版本曾经封装了对 partition 和 offset 的治理,默认是会定期主动 commit offset,这样可能会丢数据的。
    low-level 版本本人治理 spout 线程和 partition 之间的对应关系和每个 partition 上的已生产的 offset(定期写到 zk)。
    并且只有当这个 offset 被 ack 后,即胜利解决后,才会被更新到 zk,所以根本是能够保证数据不丢的即便 spout 线程 crash(解体),重启后还是能够从 zk 中读到对应的 offset。
    • 异步要思考到 partition leader 在未实现正本数 follows 的备份时就宕机的状况,即便选举出了新的 leader 然而曾经 push 的数据因为未备份就失落了
    不能让内存的缓冲池太满,如果满了内存溢出,也就是说数据写入过快,Kafka 的缓冲池数据落盘速度太慢,这时必定会造成数据失落。
    尽量保障生产者端数据始终处于线程阻塞状态,这样一边写内存一边落盘。
    异步写入的话还能够设置相似 flume 回滚类型的 batch 数,即依照累计的音讯数量,累计的工夫距离,累计的数据大小设置 batch 大小。
    • 设置适合的形式,增大 batch 大小来减小网络 IO 和磁盘 IO 的申请,这是对于 Kafka 效率的思考不过异步写入失落数据的状况还是难以管制,还是得稳固整体集群架构的运行,特地是 zookeeper,当然正对异步数据失落的状况尽量保障 broker 端的稳固运作吧。

Kafka 不像 hadoop 更致力于解决大量级数据,Kafka 的音讯队列更擅长于解决小数据。针对具体业务而言,若是源源不断的 push 大量的数据(eg:网络爬虫),能够思考消息压缩。然而这也肯定水平上对 CPU 造成了压力, 还是得联合业务数据进行测试抉择 broker 端
topic 设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的 broker 中,分区数要大于 broker 数。分区是 Kafka 进行并行读写的单位,是晋升 kafka 速度的要害。

  1. broker 能接管音讯的最大字节数的设置肯定要比生产端能生产的最大字节数要小,否则 broker 就会因为生产端无奈应用这个音讯而挂起
  2. broker 可赋值的音讯的最大字节数设置肯定要比能承受的最大字节数大,否则 broker 就会因为数据量的问题无奈复制正本,导致数据失落

comsumer 端
敞开自动更新 offset,等到数据被解决后再手动跟新 offset。
在生产前做验证前拿取的数据是否是接着上回生产的数据,不正确则 return 后行解决排错。
一般来说 zookeeper 只有稳固的状况下记录的 offset 是没有问题,除非是多个 consumer group 同时生产一个分区的数据,其中一个先提交了,另一个就失落了。

问题
Kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个音讯都被存储在磁盘了,如果呈现断电或者机器故障等,PageCache 上的数据就失落了。这个是总结出的到目前为止没有产生失落数据的状况。
//producer 用于压缩数据的压缩类型。默认是无压缩。正确的选项值是 none、gzip、snappy。压缩最好用于批量解决,批量解决音讯越多,压缩性能越好

 props.put("compression.type", "gzip");
 // 减少提早
 props.put("linger.ms", "50");
 // 这意味着 leader 须要期待所有备份都胜利写入日志,这种策略会保障只有有一个备份存活就不会失落数据。这是最强的保障。,props.put("acks", "all");
 // 有限重试,直到你意识到呈现了问题,设置大于 0 的值将使客户端从新发送任何数据,一旦这些数据发送失败。留神,这些重试与客户端接管到发送谬误时的重试没有什么不同。容许重试将潜在的扭转数据的程序,如果这两个音讯记录都是发送到同一个 partition,则第一个音讯失败第二个发送胜利,则第二条音讯会比第一条音讯呈现要早。props.put("retries", MAX_VALUE);
 props.put("reconnect.backoff.ms", 20000);
 props.put("retry.backoff.ms", 20000);


 // 敞开 unclean leader 选举,即不容许非 ISR 中的正本被选举为 leader,以防止数据失落
 props.put("unclean.leader.election.enable", false);
 // 敞开主动提交 offset
 props.put("enable.auto.commit", false);
 限度客户端在单个连贯上可能发送的未响应申请的个数。设置此值是 1 示意 kafka broker 在响应申请之前 client 不能再向同一个 broker 发送申请。留神:设置此参数是为了防止音讯乱序
 props.put("max.in.flight.requests.per.connection", 1);Kafka 反复生产起因 

强行 kill 线程,导致生产后的数据,offset 没有提交,partition 就断开连接。比方,通常会遇到生产的数据,解决很耗时,导致超过了 Kafka 的 session timeout 工夫(0.10.x 版本默认是 30 秒),那么就会 re-blance 重均衡,此时有肯定几率 offset 没提交,会导致重均衡后反复生产。
如果在 close 之前调用了 consumer.unsubscribe()则有可能局部 offset 没提交,下次重启会反复生产。
Kafka 数据反复 Kafka 设计的时候是设计了 (at-least-once) 至多一次的逻辑,这样就决定了数据可能是反复的,Kafka 采纳基于工夫的 SLA(服务水平保障),音讯保留肯定工夫(通常为 7 天)后会被删除。
Kafka 的数据反复个别状况下应该在消费者端,这时 log.cleanup.policy = delete 应用定期删除机制。

正文完
 0