数据失落是一件十分重大的事件事,针对数据失落的问题咱们须要有明确的思路来确定问题所在,针对这段时间的总结,我集体面对kafka数据失落问题的解决思路如下:

  1. 是否真正的存在数据失落问题
    比方有很多时候可能是其余共事操作了测试环境,所以首先确保数据没有第三方烦扰。
  2. 理清你的业务流程,数据流向
    数据到底是在什么中央失落的数据,在Kafka之前的环节或者Kafka之后的流程失落?比方Kafka的数据是由flume提供的,兴许是flume失落了数据,Kafka天然就没有这一部分数据。
  3. 如何发现有数据失落,又是如何验证的?
    从业务角度思考,例如:教育行业,每年高考后数据量微小,然而却反常的比高考前还少,或者源端数据量和目标端数据量不符。
  4. 定位数据是否在Kafka之前就曾经失落还是生产端失落数据的
    Kafka反对数据的从新回放性能(换个生产group),清空目标端所有数据,从新生产。
    如果是在生产端失落数据,那么屡次生产后果齐全截然不同的几率很低。
    如果是在写入端失落数据,那么每次后果应该齐全一样(在写入端没有问题的前提下)。
  5. Kafka环节失落数据
    常见的Kafka环节失落数据的起因有:
    如果auto.commit.enable=true,当consumer fetch了一些数据但还没有齐全解决掉的时候,刚好到commit interval登程了提交offset操作,接着consumer crash掉了。这时曾经fetch的数据还没有解决实现但曾经被commit掉,因而没有机会再次被解决,数据失落。
    网络负载很高或者磁盘很忙写入失败的状况下,没有主动重试重发消息。没有做限速解决,超出了网络带宽限速。kafka肯定要配置上音讯重试的机制,并且重试的工夫距离肯定要长一些,默认1秒钟并不合乎生产环境(网络中断工夫有可能超过1秒)。
    如果磁盘坏了,会失落曾经落盘的数据。

    单批数据的长度超过限度会失落数据,报kafka.common.MessageSizeTooLargeException异样解决:
    Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
    Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
    Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
    Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)6. partition leader在未实现正本数follows的备份时就宕机的状况,即便选举出了新的leader然而曾经push的数据因为未备份就失落了。
    Kafka是多正本的,当你配置了同步复制之后。多个正本的数据都在PageCache外面,呈现多个正本同时挂掉的概率比1个正本挂掉的概率就很小了。(官网举荐是通过副原本保证数据的完整性的)
  6. Kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个音讯都被存储在磁盘了,如果呈现断电或者机器故障等,PageCache上的数据就失落了。
    能够通过log.flush.interval.messages和log.flush.interval.ms来配置flush距离,interval大丢的数据多些,小会影响性能但在0.8版本,能够通过replica机制保证数据不丢,代价就是须要更多资源,尤其是磁盘资源,Kafka以后反对GZip和Snappy压缩,来缓解这个问题是否应用replica取决于在可靠性和资源代价之间的balance。
    同时Kafka也提供了相干的配置参数,来让你在性能与可靠性之间衡量(个别默认):
    当达到上面的音讯数量时,会将数据flush到日志文件中。默认10000
    log.flush.interval.messages=10000当达到上面的工夫(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
    log.flush.interval.ms=1000查看是否须要将日志flush的工夫距离
    log.flush.scheduler.interval.ms = 3000Kafka的优化倡议
    producer端
    • 设计上保证数据的牢靠安全性,根据分区数做好数据备份,设立正本数等push数据的形式:同步异步推送数据:衡量安全性和速度性的要求,抉择相应的同步推送还是异步推送形式,当发现数据有问题时,能够改为同步来查找问题。
    • flush是Kafka的外部机制,Kafka优先在内存中实现数据的替换,而后将数据长久化到磁盘Kafka首先会把数据缓存(缓存到内存中)起来再批量flush。能够通过log.flush.interval.messages和log.flush.interval.ms来配置flush距离。
    • 能够通过replica机制保证数据不丢代价就是须要更多资源,尤其是磁盘资源,Kafka以后反对GZip和Snappy压缩,来缓解这个问题。是否应用replica(正本)取决于在可靠性和资源代价之间的balance(均衡)。
    • broker到Consumer kafka的consumer提供两种接口
    high-level版本曾经封装了对partition和offset的治理,默认是会定期主动commit offset,这样可能会丢数据的。
    low-level版本本人治理spout线程和partition之间的对应关系和每个partition上的已生产的offset(定期写到zk)。
    并且只有当这个offset被ack后,即胜利解决后,才会被更新到zk,所以根本是能够保证数据不丢的即便spout线程crash(解体),重启后还是能够从zk中读到对应的offset。
    • 异步要思考到partition leader在未实现正本数follows的备份时就宕机的状况,即便选举出了新的leader然而曾经push的数据因为未备份就失落了
    不能让内存的缓冲池太满,如果满了内存溢出,也就是说数据写入过快,Kafka的缓冲池数据落盘速度太慢,这时必定会造成数据失落。
    尽量保障生产者端数据始终处于线程阻塞状态,这样一边写内存一边落盘。
    异步写入的话还能够设置相似flume回滚类型的batch数,即依照累计的音讯数量,累计的工夫距离,累计的数据大小设置batch大小。
    • 设置适合的形式,增大batch大小来减小网络IO和磁盘IO的申请,这是对于Kafka效率的思考不过异步写入失落数据的状况还是难以管制,还是得稳固整体集群架构的运行,特地是zookeeper,当然正对异步数据失落的状况尽量保障broker端的稳固运作吧。

Kafka不像hadoop更致力于解决大量级数据,Kafka的音讯队列更擅长于解决小数据。针对具体业务而言,若是源源不断的push大量的数据(eg:网络爬虫),能够思考消息压缩。然而这也肯定水平上对CPU造成了压力,还是得联合业务数据进行测试抉择broker端
topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。分区是Kafka进行并行读写的单位,是晋升kafka速度的要害。

  1. broker能接管音讯的最大字节数的设置肯定要比生产端能生产的最大字节数要小,否则broker就会因为生产端无奈应用这个音讯而挂起
  2. broker可赋值的音讯的最大字节数设置肯定要比能承受的最大字节数大,否则broker就会因为数据量的问题无奈复制正本,导致数据失落

comsumer端
敞开自动更新offset,等到数据被解决后再手动跟新offset。
在生产前做验证前拿取的数据是否是接着上回生产的数据,不正确则return后行解决排错。
一般来说zookeeper只有稳固的状况下记录的offset是没有问题,除非是多个consumer group同时生产一个分区的数据,其中一个先提交了,另一个就失落了。

问题
Kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个音讯都被存储在磁盘了,如果呈现断电或者机器故障等,PageCache上的数据就失落了。这个是总结出的到目前为止没有产生失落数据的状况。
//producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量解决,批量解决音讯越多,压缩性能越好

 props.put("compression.type", "gzip"); //减少提早 props.put("linger.ms", "50"); //这意味着leader须要期待所有备份都胜利写入日志,这种策略会保障只有有一个备份存活就不会失落数据。这是最强的保障。, props.put("acks", "all"); //有限重试,直到你意识到呈现了问题,设置大于0的值将使客户端从新发送任何数据,一旦这些数据发送失败。留神,这些重试与客户端接管到发送谬误时的重试没有什么不同。容许重试将潜在的扭转数据的程序,如果这两个音讯记录都是发送到同一个partition,则第一个音讯失败第二个发送胜利,则第二条音讯会比第一条音讯呈现要早。 props.put("retries ", MAX_VALUE); props.put("reconnect.backoff.ms ", 20000); props.put("retry.backoff.ms", 20000); //敞开unclean leader选举,即不容许非ISR中的正本被选举为leader,以防止数据失落 props.put("unclean.leader.election.enable", false); //敞开主动提交offset props.put("enable.auto.commit", false); 限度客户端在单个连贯上可能发送的未响应申请的个数。设置此值是1示意kafka broker在响应申请之前client不能再向同一个broker发送申请。留神:设置此参数是为了防止音讯乱序 props.put("max.in.flight.requests.per.connection", 1);Kafka反复生产起因 

强行kill线程,导致生产后的数据,offset没有提交,partition就断开连接。比方,通常会遇到生产的数据,解决很耗时,导致超过了Kafka的session timeout工夫(0.10.x版本默认是30秒),那么就会re-blance重均衡,此时有肯定几率offset没提交,会导致重均衡后反复生产。
如果在close之前调用了consumer.unsubscribe()则有可能局部offset没提交,下次重启会反复生产。
Kafka数据反复Kafka设计的时候是设计了(at-least-once)至多一次的逻辑,这样就决定了数据可能是反复的,Kafka采纳基于工夫的SLA(服务水平保障),音讯保留肯定工夫(通常为7天)后会被删除。
Kafka的数据反复个别状况下应该在消费者端,这时log.cleanup.policy = delete应用定期删除机制。