关于程序员:通俗易懂一篇文章带你认识Kafka

52次阅读

共计 14324 个字符,预计需要花费 36 分钟才能阅读完成。

本文章转自:乐字节
文章次要解说:Kafka
获取更多 Java 相干材料能够关注公众号《乐字节》发送:999

异步通信原理

观察者模式

  • 观察者模式(Observer),又叫公布 - 订阅模式(Publish/Subscribe)
  • 定义对象间一种一对多的依赖关系,使得每当一个对象扭转状态,则所有依赖于它的对象都会失去告诉并自动更新。
  • 一个对象(指标对象)的状态产生扭转,所有的依赖对象(观察者对象)都将失去告诉。
  • 现实生活中的利用场景

    • 京东到货告诉

  • 《鸡毛信》

生产者消费者模式

  • 传统模式

    • 生产者间接将消息传递给指定的消费者
    • 耦合性特地高,当生产者或者消费者发生变化,都须要重写业务逻辑
  • 生产者消费者模式

    • 通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不间接通信,而通过阻塞队列来进行通信
  • 数据传递流程

    • 生产者消费者模式,即 N 个线程进行生产,同时 N 个线程进行生产,两种角色通过内存缓冲区进行通信,
    • 生产者负责向缓冲区外面增加数据单元
    • 消费者负责从缓冲区外面取出数据单元

      • 个别遵循先进先出的准则

缓冲区

  • 解耦

    • 假如生产者和消费者别离是两个类。如果让生产者间接调用消费者的某个办法,那么生产者对于消费者就会产生依赖
  • 反对并发

    • 生产者间接调用消费者的某个办法过程中函数调用是同步的
    • 万一消费者解决数据很慢,生产者就会白白糟践大好时光
  • 反对忙闲不均

    • 缓冲区还有另一个益处。如果制作数据的速度时快时慢,缓冲区的益处就体现进去了。
    • 当数据制作快的时候,消费者来不及解决,未解决的数据能够临时存在缓冲区中。
    • 等生产者的制作速度慢下来,消费者再缓缓解决掉。

数据单元

  • 关联到业务对象

    • 数据单元必须关联到某种业务对象
  • 完整性

    • 就是在传输过程中,要保障该数据单元的残缺
  • 独立性

    • 就是各个数据单元之间没有相互依赖
    • 某个数据单元传输失败不应该影响曾经实现传输的单元;也不应该影响尚未传输的单元。
  • 颗粒度

    • 数据单元须要关联到某种业务对象。那么数据单元和业务对象应该处于的关系(一对一?一对多)
    • 如果颗粒度过小会减少数据传输的次数
    • 如果颗粒度过大会减少单个数据传输的工夫,影响前期生产

音讯零碎原理

一个音讯零碎负责将数据从一个利用传递到另外一个利用,利用只需关注于数据,无需关注数据在两个或多个利用间是如何传递的。

点对点消息传递

  • 在点对点音讯零碎中,音讯长久化到一个队列中。此时,将有一个或多个消费者生产队列中的数据。然而一条音讯只能被生产一次。
  • 当一个消费者生产了队列中的某条数据之后,该条数据则从音讯队列中删除。
  • 该模式即便有多个消费者同时生产数据,也能保障数据处理的程序。
  • 基于推送模型的音讯零碎,由音讯代理记录生产状态。

    • 音讯代理将音讯推送 (push) 到消费者后,标记这条音讯为曾经被生产,然而这种形式无奈很好地保障生产的解决语义。

公布订阅消息传递

  • 在公布 - 订阅音讯零碎中,音讯被长久化到一个 topic 中。
  • 消费者能够订阅一个或多个 topic,消费者能够生产该 topic 中所有的数据,同一条数据能够被多个消费者生产,数据被生产后不会立马删除。
  • 在公布 - 订阅音讯零碎中,音讯的生产者称为发布者,消费者称为订阅者。
  • Kafka 采取拉取模型(Poll),由本人管制生产速度,以及生产的进度,消费者能够依照任意的偏移量进行生产。

Kafka 简介

  • 官网:http://kafka.apache.org/
  • Kafka 是由 Apache 软件基金会开发的一个开源流解决平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式公布订阅音讯零碎,它能够解决消费者在网站中的所有动作流数据。

设计指标

  • 以工夫复杂度为 O(1)的形式提供音讯长久化能力,即便对 TB 级以上数据也能保障常数工夫的拜访性能。
  • 高吞吐率。即便在十分便宜的商用机器上也能做到单机反对每秒 100K 条音讯的传输。
  • 反对 Kafka Server 间的音讯分区,及分布式生产,同时保障每个 partition 内的音讯程序传输。
  • 同时反对离线数据处理和实时数据处理。
  • 反对在线程度扩大

Kafka 的长处

  • 解耦:

在我的项目启动之初来预测未来我的项目会碰到什么需要,是极其艰难的。音讯零碎在处理过程两头插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这容许你独立的扩大或批改两边的处理过程,只有确保它们恪守同样的接口束缚。

  • 冗余

有些状况下,解决数据的过程会失败。除非数据被长久化,否则将造成失落。音讯队列把数据进行长久化直到它们曾经被齐全解决,通过这一形式躲避了数据失落危险。许多音讯队列所采纳的 ” 插入 - 获取 - 删除 ” 范式中,在把一个音讯从队列中删除之前,须要你的解决零碎明确的指出该音讯曾经被处理完毕,从而确保你的数据被平安的保留直到你应用结束。

  • 扩展性

因为音讯队列解耦了你的处理过程,所以增大音讯入队和解决的频率是很容易的,只有另外减少处理过程即可。不须要扭转代码、不须要调节参数。扩大就像调大电力按钮一样简略。

  • 灵活性 & 峰值解决能力

在访问量剧增的状况下,利用依然须要持续发挥作用,然而这样的突发流量并不常见;如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。

  • 可恢复性

零碎的一部分组件生效时,不会影响到整个零碎。音讯队列升高了过程间的耦合度,所以即便一个解决音讯的过程挂掉,退出队列中的音讯依然能够在零碎复原后被解决。

  • 程序保障

在大多应用场景下,数据处理的程序都很重要。大部分音讯队列原本就是排序的,并且能保证数据会依照特定的程序来解决。Kafka 保障一个 Partition 内的音讯的有序性。

  • 缓冲

在任何重要的零碎中,都会有须要不同的解决工夫的元素。例如,加载一张图片比利用过滤器破费更少的工夫。音讯队列通过一个缓冲层来帮忙工作最高效率的执行———写入队列的解决会尽可能的疾速。该缓冲有助于管制和优化数据流通过零碎的速度。

  • 异步通信

很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。

Kafka 零碎架构

Broker

  • Kafka 集群蕴含一个或多个服务器,服务器节点称为 broker。

Topic

  • 每条公布到 Kafka 集群的音讯都有一个类别,这个类别被称为 Topic。
  • 相似于数据库的表名或者 ES 的 Index
  • 物理上不同 Topic 的音讯离开存储
  • 逻辑上一个 Topic 的音讯尽管保留于一个或多个 broker 上但用户只需指定音讯的 Topic 即可生产或生产数据而不用关怀数据存于何处)
  • 创立流程
  • 1.controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创立,则 controller 会通过 watch 失去该 topic 的 partition/replica 调配。2.controller 从 /brokers/ids 读取以后所有可用的 broker 列表,对于 set_p 中的每一个 partition:2.1 从调配给该 partition 的所有 replica(称为 AR)中任选一个可用的 broker 作为新的 leader,并将 AR 设置为新的 ISR
        2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
    3.controller 通过 RPC 向相干的 broker 发送 LeaderAndISRRequest。
  • 删除流程
  • 1.controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 失去该 topic 的 partition/replica 调配。2. 若 delete.topic.enable=false,完结;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。

Partition

  • topic 中的数据宰割为一个或多个 partition。
  • 每个 topic 至多有一个 partition, 当生产者产生数据的时候,依据调配策略, 抉择分区, 而后将音讯追加到指定的分区的开端(队列)

    • ## Partation 数据路由规定
      1. 指定了 patition,则间接应用;2. 未指定 patition 但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
  • 每条音讯都会有一个自增的编号

    • 标识程序
    • 用于标识音讯的偏移量
  • 每个 partition 中的数据应用多个 segment 文件存储。
  • partition 中的数据是有序的,不同 partition 间的数据失落了数据的程序。
  • 如果 topic 有多个 partition,生产数据时就不能保证数据的程序。严格保障音讯的生产程序的场景下,须要将 partition 数目设为 1。

    Leader

  • 每个 partition 有多个正本,其中有且仅有一个作为 Leader,Leader 是以后负责数据的读写的 partition。
  • 1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
    2. producer 将音讯发送给该 leader
    3. leader 将音讯写入本地 log
    4. followers 从 leader pull 音讯,写入本地 log 后 leader 发送 ACK
    5. leader 收到所有 ISR 中的 replica 的 ACK 后,减少 HW(high watermark,最初 commit 的 offset)并向 producer 发送 ACK

Follower

  • Follower 追随 Leader,所有写申请都通过 Leader 路由,数据变更会播送给所有 Follower,Follower 与 Leader 保持数据同步。
  • 如果 Leader 生效,则从 Follower 中选举出一个新的 Leader。
  • 当 Follower 挂掉、卡住或者同步太慢,leader 会把这个 follower 从“in sync replicas”(ISR)列表中删除,从新创立一个 Follower。

replication

  • 数据会寄存到 topic 的 partation 中,然而有可能分区会损坏
  • 咱们须要对分区的数据进行备份(备份多少取决于你对数据的器重水平)
  • 咱们将分区的分为 Leader(1)和 Follower(N)

    • Leader 负责写入和读取数据
    • Follower 只负责备份
    • 保障了数据的一致性
  • 备份数设置为 N,示意主 + 备 =N(参考 HDFS)

    • ## Kafka 调配 Replica 的算法如下
      1. 将所有 broker(假如共 n 个 broker)和待调配的 partition 排序
      2. 将第 i 个 partition 调配到第(i mod n)个 broker 上
  • producer

  • 生产者即数据的发布者,该角色将音讯公布到 Kafka 的 topic 中。
  • broker 接管到生产者发送的音讯后,broker 将该音讯 追加 到以后用于追加数据的 segment 文件中。
  • 生产者发送的音讯,存储到一个 partition 中,生产者也能够指定数据存储的 partition。

    consumer

  • 消费者能够从 broker 中读取数据。消费者能够生产多个 topic 中的数据。
  • kafka 提供了两套 consumer API:

    • 1. The high-level Consumer API
  • high-level consumer API 提供了一个从 kafka 生产数据的高层形象,而 SimpleConsumer API 则须要开发人员更多地关注细节。

Consumer Group

  • 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
  • 将多个消费者集中到一起去解决某一个 Topic 的数据,能够更快的进步数据的生产能力
  • 整个消费者组共享一组偏移量(避免数据被反复读取),因为一个 Topic 有多个分区

offset 偏移量

  • 能够惟一的标识一条音讯
  • 偏移量决定读取数据的地位,不会有线程平安的问题,消费者通过偏移量来决定下次读取的音讯
  • 音讯被生产之后,并不被马上删除,这样多个业务就能够重复使用 kafka 的音讯
  • 咱们某一个业务也能够通过批改偏移量达到从新读取音讯的目标, 偏移量由用户管制
  • 音讯最终还是会被删除的,默认生命周期为 1 周(7*24 小时)

Zookeeper

  • kafka 通过 zookeeper 来存储集群的 meta 信息。

Kafka 环境搭建

  • 基于 Zookeeper 搭建并开启

    • 验证 ZK 的可用性
    • 【123】zkServer.sh start
  • 配置 Kafka

    • 基本操作

      • 上传解压拷贝
    • 批改配置文件

      • ## vim server.properties
        20 broker.id=0
        25 port=9092
        58 log.dirs=/var/bdp/kafka-logs
    • 批改环境变量

      • ## vim /etc/profile
        export KAFKA_HOME=/opt/lzj/kafka_2.11-0.8.2.1
        export PATH=$KAFKA_HOME/bin:$PATH
        ## 配置文件失效
    • 将文件目录拷贝到其余机器

      • [1]scp -r kafka_2.11-0.8.2.1 root@node02:`pwd`
        [1]scp -r kafka_2.11-0.8.2.1 root@node03:`pwd`
        [1]scp /etc/profile root@node02:/etc/profile
        [1]scp /etc/profile root@node03:/etc/profile
    • 批改其余机器上的配置

      • ## vim server.properties
        [2]broker.id=1
    • 启动集群

      • kafka-server-start.sh /opt/lzj/kafka_2.11-0.8.2.1/config/server.properties
    • 常见命令

      • // 创立主题
        kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic userlog
        kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 6 --topic studentlog
  
  kafka-topics.sh --zookeeper node01:2181 --delete --replication-factor 2 --partitions 6 --topic baidu
  
  // 查看所有主题
  kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
  
  // 查看主题
  kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic userlog
  
  // 创立生产者
  kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog
  
  // 创立消费者
  kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic userlog
  ```

Kafka 数据检索机制

  • topic 在物理层面以 partition 为分组,一个 topic 能够分成若干个 partition
  • partition 还能够细分为 Segment,一个 partition 物理上由多个 Segment 组成

    • segment 的参数有两个:

      • log.segment.bytes:单个 segment 可包容的最大数据量,默认为 1GB
      • log.segment.ms:Kafka 在 commit 一个未写满的 segment 前,所期待的工夫(默认为 7 天)
  • LogSegment 文件由两局部组成,别离为“.index”文件和“.log”文件,别离示意为 Segment 索引文件和数据文件。

    • partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最初一条音讯的 offset 值
    • 数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充
    • 第一个 segment
      00000000000000000000.index 
      00000000000000000000.log    
      第二个 segment,文件命名以第一个 segment 的最初一条音讯的 offset 组成
      00000000000000170410.index 
      00000000000000170410.log 
      第三个 segment,文件命名以上一个 segment 的最初一条音讯的 offset 组成
      00000000000000239430.index 
  • 音讯都具备固定的物理构造,包含:offset(8 Bytes)、音讯体的大小 (4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes) 等等字段,能够确定一条音讯的大小,即读取到哪里截止。
  • 数据的安全性

    producer delivery guarantee

  • 0. At least one 音讯绝不会丢,但可能会反复传输
    1. At most once 音讯可能会丢,但绝不会反复传输
    2. Exactly once 每条音讯必定会被传输一次且仅传输一次
  • Producers 能够抉择是否为数据的写入接管 ack,有以下几种 ack 的选项:request.required.acks

    • acks=0:

      • Producer 在 ISR 中的 Leader 已胜利收到的数据并失去确认后发送下一条 Message。
    • acks=1:

      • 这意味着 Producer 无需期待来自 Broker 的确认而持续发送下一批音讯。
    • acks=all:

      • Producer 须要期待 ISR 中的所有 Follower 都确认接管到数据后才算一次发送实现,可靠性最高。

ISR 机制

  • 关键词

    • AR : Assigned Replicas 用来标识正本的选集
    • OSR:out -sync Replicas 来到同步队列的正本
    • ISR:in -sync Replicas 退出同步队列的正本
    • ISR = Leader + 没有落后太多的正本;AR = OSR+ ISR。
  • 咱们备份数据就是避免数据失落,当主节点挂掉时,能够启用备份节点

    • producer–push–>leader
    • leader–pull–>follower
    • Follower 每距离肯定工夫去 Leader 拉取数据,来保证数据的同步
  • ISR(in-syncReplica)

    • 当主节点挂点,并不是去 Follower 抉择主,而是从 ISR 中抉择主
    • 判断规范

      • 超过 10 秒钟没有同步数据

        • replica.lag.time.max.ms=10000
      • 主副节点差 4000 条数据

        • rerplica.lag.max.messages=4000
    • 脏节点选举

      • kafka 采纳一种降级措施来解决:
      • 选举第一个复原的 node 作为 leader 提供服务,以它的数据为基准,这个措施被称为脏 leader 选举

Broker 数据存储机制

  • 无论音讯是否被生产,kafka 都会保留所有音讯。有两种策略能够删除旧数据:
  • 1. 基于工夫:log.retention.hours=168
    2. 基于大小:log.retention.bytes=1073741824

    consumer delivery guarantee

  • 如果将 consumer 设置为 autocommit,consumer 一旦读到数据立刻主动 commit。如果只探讨这一读取音讯的过程,那 Kafka 确保了 Exactly once。
  • 读完音讯先 commit 再解决音讯。

    • 如果 consumer 在 commit 后还没来得及解决音讯就 crash 了,下次从新开始工作后就无奈读到刚刚已提交而未解决的音讯
    • 这就对应于 At most once
  • 读完音讯先解决再 commit。

    • 如果在解决完音讯之后 commit 之前 consumer crash 了,下次从新开始工作时还会解决刚刚未 commit 的音讯,实际上该音讯曾经被解决过了。
    • 这就对应于 At least once。
  • 如果肯定要做到 Exactly once,就须要协调 offset 和实际操作的输入。

    • 经典的做法是引入两阶段提交。
  • Kafka 默认保障 At least once,并且容许通过设置 producer 异步提交来实现 At most once

    数据的生产

  • partiton_num=2,启动一个 consumer 过程订阅这个 topic,对应的,stream_num 设为 2,也就是说启两个线程并行处理 message。
  • 如果 auto.commit.enable=true,

    • 当 consumer fetch 了一些数据但还没有齐全解决掉的时候,
    • 刚好到 commit interval 登程了提交 offset 操作,接着 consumer crash 掉了。
    • 这时曾经 fetch 的数据还没有解决实现但曾经被 commit 掉,因而没有机会再次被解决,数据失落。
  • 如果 auto.commit.enable=false,

    • 假如 consumer 的两个 fetcher 各自拿了一条数据,并且由两个线程同时解决,
    • 这时线程 t1 解决完 partition1 的数据,手动提交 offset,这里须要着重阐明的是,当手动执行 commit 的时候,
    • 实际上是对这个 consumer 过程所占有的所有 partition 进行 commit,kafka 临时还没有提供更细粒度的 commit 形式,
    • 也就是说,即便 t2 没有解决完 partition2 的数据,offset 也被 t1 提交掉了。如果这时 consumer crash 掉,t2 正在解决的这条数据就失落了。
  • 办法 1:(将多线程问题转成单线程)

    • 手动 commit offset,并针对 partition_num 启同样数目的 consumer 过程,这样就能保障一个 consumer 过程占有一个 partition,commit offset 的时候不会影响别的 partition 的 offset。但这个办法比拟局限,因为 partition 和 consumer 过程的数目必须严格对应
  • 办法 2:(参考 HDFS 数据写入流程)

    • 手动 commit offset,另外在 consumer 端再将所有 fetch 到的数据缓存到 queue 里,当把 queue 里所有的数据处理完之后,再批量提交 offset,这样就能保障只有解决完的数据才被 commit。

    JavaAPI

    生产者

  • 创立一线程反复的向 kafka 输出数据

    • 创立生产者线程类

      public class Hello01Producer extends Thread {
      // 创立 Kafka 的生产者
      private Producer<String, String> producer;

      /**

      • 创立结构器
        */

      public Hello01Producer(String pname) {

       // 设置线程的名字
       super.setName(pname);
       // 创立配置文件列表
       Properties properties = new Properties();
       // kafka 地址,多个地址用逗号宰割
       properties.put("metadata.broker.list", "192.168.58.161:9092,192.168.58.162:9092,192.168.58.163:9092");
       // 设置写出数据的格局
       properties.put("serializer.class", StringEncoder.class.getName());
       // 写出的应答形式
       properties.put("acks", 1);
       // 批量写出
       properties.put("batch.size", 16384);
       // 创立生产者对象
       producer = new Producer<String, String>(new kafka.producer.ProducerConfig(properties));

      }

      @Override
      public void run() {

       // 初始化一个计数器
       int count = 0;
      
       System.out.println("Hello01Producer.run-- 开始发送数据");
       // 迭代發送音讯
       while (count < 100000) {String key = String.valueOf(++count);
           String value = Thread.currentThread().getName() + "--" + count;
           // 封装音讯对象
           KeyedMessage<String, String> message = new KeyedMessage<>("userlog", key, value);
           // 发送音讯到服务器
           producer.send(message);
           // 打印消息
           System.out.println("Producer.run--" + key + "--" + value);
           // 每个 1 秒发送 1 条
           try {Thread.sleep(100);
           } catch (InterruptedException e) {e.printStackTrace();
           }
      
       }
      

      }

      public static void main(String[] args) {

       Hello01Producer producer = new Hello01Producer("上海尚学堂");
       producer.start();

      }
      }

    消费者

  • 创立一线程反复的向 kafka 生产数据

    // 创立消费者对象
    private ConsumerConnector consumer;
    
    /**
     * 创立结构器
     */
    public Hello01Consumer(String cname) {super.setName(cname);
        // 读取配置文件
        Properties properties = new Properties();
        //ZK 地址
        properties.put("zookeeper.connect", "192.168.58.161:2181,192.168.58.162:2181,192.168.58.163:2181");
        // 消费者所在组的名称
        properties.put("group.id", "shsxt-bigdata");
        //ZK 超时工夫
        properties.put("zookeeper.session.timeout.ms", "400");
        // 当消费者第一次生产时,从最低的偏移量开始生产
        properties.put("auto.offset.reset", "smallest");
        // 主动提交偏移量
        properties.put("auto.commit.enable", "true");
        // 消费者主动提交偏移量的工夫距离
        properties.put("auto.commit.interval.ms", "1000");
        // 创立消费者对象
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
    
    @Override
    public void run() {
        // 形容读取哪个 topic,须要几个线程读
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("userlog", 1);
        // 消费者给句配置信息开始读取音讯流
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        // 每个线程对应于一个 KafkaStream
        List<KafkaStream<byte[], byte[]>> list = consumerMap.get("userlog");
        // 获取 kafkastream 流
        KafkaStream stream0 = list.get(0);
        ConsumerIterator<byte[], byte[]> it = stream0.iterator();
        // 开始迭代并获取数据
        while (it.hasNext()) {
            // 获取一条音讯
            MessageAndMetadata<byte[], byte[]> value = it.next();
            int partition = value.partition();
            long offset = value.offset();
            String data = new String(value.message());
            System.out.println("开始" + data + "partition:" + partition + "offset:" + offset);
        }
    }
    
    public static void main(String[] args) {Hello01Consumer consumer01 = new Hello01Consumer("李毅");
        consumer01.start();}
    

    }

    反复生产和数据的失落

  • 有可能一个消费者取出了一条数据(offset=88),然而还没有解决实现,然而消费者被敞开了

    • 如果下次还能从 88 重新处理就属于完满状况
    • 如果下次数据从 86 开始,就属于数据的反复生产
    • 如果下次数据从 89 开始,就是与数据的失落
    • // 消费者主动提交偏移量的工夫距离 props.put("auto.commit.interval.ms", "1010");
      提交距离》单条执行工夫(反复)

Kafka 优化

Partition 数目

  • 一般来说,每个 partition 能解决的吞吐为几 MB/s(仍须要基于依据本地环境测试后获取精确指标),减少更多的 partitions 意味着:

    • 更高的并行度与吞吐
    • 能够扩大更多的(同一个 consumer group 中的)consumers
    • 若是集群中有较多的 brokers,则可更大程度上利用闲置的 brokers
    • 然而会造成 Zookeeper 的更多选举
    • 也会在 Kafka 中关上更多的文件
  • 调整准则

    • 一般来说,若是集群较小(小于 6 个 brokers),则配置 2 x broker 数的 partition 数。在这里次要思考的是之后的扩大。若是集群扩大了一倍(例如 12 个),则不必放心会有 partition 有余的景象产生
    • 一般来说,若是集群较大(大于 12 个),则配置 1 x broker 数的 partition 数。因为这里不须要再思考集群的扩大状况,与 broker 数雷同的 partition 数曾经足够应酬惯例场景。若有必要,则再手动调整
    • 思考最高峰吞吐须要的并行 consumer 数,调整 partition 的数目。若是利用场景须要有 20 个(同一个 consumer group 中的)consumer 并行生产,则据此设置为 20 个 partition
    • 思考 producer 所需的吞吐,调整 partition 数目(如果 producer 的吞吐十分高,或是在接下来两年内都比拟高,则减少 partition 的数目)

Replication factor

  • 此参数决定的是 records 复制的数目,倡议至多 设置为 2,个别是 3,最高设置为 4。
  • 更高的 replication factor(假如数目为 N)意味着:

    • 零碎更稳固(容许 N - 1 个 broker 宕机)
    • 更多的正本(如果 acks=all,则会造成较高的延时)
    • 零碎磁盘的使用率会更高(个别若是 RF 为 3,则绝对于 RF 为 2 时,会占据更多 50% 的磁盘空间)
  • 调整准则:

    • 以 3 为起始(当然至多须要有 3 个 brokers,同时也不倡议一个 Kafka 集群中节点数少于 3 个节点)
    • 如果 replication 性能成为了瓶颈或是一个 issue,则倡议应用一个性能更好的 broker,而不是升高 RF 的数目
    • 永远不要在生产环境中设置 RF 为 1

批量写入

  • 为了大幅度提高 producer 写入吞吐量,须要定期批量写文件
  • 每当 producer 写入 10000 条音讯时,刷数据到磁盘
    log.flush.interval.messages=10000
    
    每距离 1 秒钟工夫,刷数据到磁盘
    log.flush.interval.ms=1000

    Flume+Kafka 集成

  • 搭建 Flume 并编写配置文件

    • vim /opt/lzj/flume-1.6.0/options/f2k.conf
    • #flume-ng agent -n a1 -f /opt/lzj/flume-1.6.0/options/f2k.conf - flume.root.logger=INFO,console
      
      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /var/bdp/baidu.ping
      
      # Describe the sink
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.topic = baidu
      a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
      a1.sinks.k1.requiredAcks = 1
      a1.sinks.k1.batchSize = 10
      a1.sinks.k1.channel = c1
      
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000000
      a1.channels.c1.transactionCapacity = 10000
      
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
  • 启动 Zookeeper,Kafka, 而后创立 Topic(baidu), 开启消费者

    • 【123】zkServer.sh start
      【123】kafka-server-start.sh /opt/lzj/kafka_2.11/config/server.properties
      【1】kafka-topics.sh –zookeeper node01:2181 –create –replication-factor 3 –partitions 3 –topic baidu
      【1】kafka-console-consumer.sh –zookeeper node01:2181 –from-beginning –topic baidu
  • 开启 Flume

    • 【1】flume-ng agent -n a1 -f /opt/lzj/apache-flume-1.6.0-bin/options/f2k.conf -Dflume.root.logger=INFO,console
  • 开始 ping 百度的脚本

    • ping www.baidu.com >> /var/bdp/baidu.ping 2>&1 &

感激大家的认同与反对,小编会继续转发《乐字节》优质文章

正文完
 0