本文章转自:乐字节
文章次要解说:Kafka
获取更多Java相干材料能够关注公众号《乐字节》 发送:999

异步通信原理

观察者模式

  • 观察者模式(Observer),又叫公布-订阅模式(Publish/Subscribe)
  • 定义对象间一种一对多的依赖关系,使得每当一个对象扭转状态,则所有依赖于它的对象都会失去告诉并自动更新。
  • 一个对象(指标对象)的状态产生扭转,所有的依赖对象(观察者对象)都将失去告诉。
  • 现实生活中的利用场景

    • 京东到货告诉

  • 《鸡毛信》

生产者消费者模式

  • 传统模式

    • 生产者间接将消息传递给指定的消费者
    • 耦合性特地高,当生产者或者消费者发生变化,都须要重写业务逻辑
  • 生产者消费者模式

    • 通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不间接通信,而通过阻塞队列来进行通信
  • 数据传递流程

    • 生产者消费者模式,即N个线程进行生产,同时N个线程进行生产,两种角色通过内存缓冲区进行通信,
    • 生产者负责向缓冲区外面增加数据单元
    • 消费者负责从缓冲区外面取出数据单元

      • 个别遵循先进先出的准则

缓冲区

  • 解耦

    • 假如生产者和消费者别离是两个类。如果让生产者间接调用消费者的某个办法,那么生产者对于消费者就会产生依赖
  • 反对并发

    • 生产者间接调用消费者的某个办法过程中函数调用是同步的
    • 万一消费者解决数据很慢,生产者就会白白糟践大好时光
  • 反对忙闲不均

    • 缓冲区还有另一个益处。如果制作数据的速度时快时慢,缓冲区的益处就体现进去了。
    • 当数据制作快的时候,消费者来不及解决,未解决的数据能够临时存在缓冲区中。
    • 等生产者的制作速度慢下来,消费者再缓缓解决掉。

数据单元

  • 关联到业务对象

    • 数据单元必须关联到某种业务对象
  • 完整性

    • 就是在传输过程中,要保障该数据单元的残缺
  • 独立性

    • 就是各个数据单元之间没有相互依赖
    • 某个数据单元传输失败不应该影响曾经实现传输的单元;也不应该影响尚未传输的单元。
  • 颗粒度

    • 数据单元须要关联到某种业务对象。那么数据单元和业务对象应该处于的关系(一对一?一对多)
    • 如果颗粒度过小会减少数据传输的次数
    • 如果颗粒度过大会减少单个数据传输的工夫,影响前期生产

音讯零碎原理

一个音讯零碎负责将数据从一个利用传递到另外一个利用,利用只需关注于数据,无需关注数据在两个或多个利用间是如何传递的。

点对点消息传递

  • 在点对点音讯零碎中,音讯长久化到一个队列中。此时,将有一个或多个消费者生产队列中的数据。然而一条音讯只能被生产一次。
  • 当一个消费者生产了队列中的某条数据之后,该条数据则从音讯队列中删除。
  • 该模式即便有多个消费者同时生产数据,也能保障数据处理的程序。
  • 基于推送模型的音讯零碎,由音讯代理记录生产状态。

    • 音讯代理将音讯推送(push)到消费者后,标记这条音讯为曾经被生产,然而这种形式无奈很好地保障生产的解决语义。

公布订阅消息传递

  • 在公布-订阅音讯零碎中,音讯被长久化到一个topic中。
  • 消费者能够订阅一个或多个topic,消费者能够生产该topic中所有的数据,同一条数据能够被多个消费者生产,数据被生产后不会立马删除。
  • 在公布-订阅音讯零碎中,音讯的生产者称为发布者,消费者称为订阅者。
  • Kafka 采取拉取模型(Poll),由本人管制生产速度,以及生产的进度,消费者能够依照任意的偏移量进行生产。

Kafka简介

  • 官网:http://kafka.apache.org/
  • Kafka是由Apache软件基金会开发的一个开源流解决平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式公布订阅音讯零碎,它能够解决消费者在网站中的所有动作流数据。

设计指标

  • 以工夫复杂度为O(1)的形式提供音讯长久化能力,即便对TB级以上数据也能保障常数工夫的拜访性能。
  • 高吞吐率。即便在十分便宜的商用机器上也能做到单机反对每秒100K条音讯的传输。
  • 反对Kafka Server间的音讯分区,及分布式生产,同时保障每个partition内的音讯程序传输。
  • 同时反对离线数据处理和实时数据处理。
  • 反对在线程度扩大

Kafka的长处

  • 解耦:

在我的项目启动之初来预测未来我的项目会碰到什么需要,是极其艰难的。音讯零碎在处理过程两头插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这容许你独立的扩大或批改两边的处理过程,只有确保它们恪守同样的接口束缚。

  • 冗余

有些状况下,解决数据的过程会失败。除非数据被长久化,否则将造成失落。音讯队列把数据进行长久化直到它们曾经被齐全解决,通过这一形式躲避了数据失落危险。许多音讯队列所采纳的"插入-获取-删除"范式中,在把一个音讯从队列中删除之前,须要你的解决零碎明确的指出该音讯曾经被处理完毕,从而确保你的数据被平安的保留直到你应用结束。

  • 扩展性

因为音讯队列解耦了你的处理过程,所以增大音讯入队和解决的频率是很容易的,只有另外减少处理过程即可。不须要扭转代码、不须要调节参数。扩大就像调大电力按钮一样简略。

  • 灵活性&峰值解决能力

在访问量剧增的状况下,利用依然须要持续发挥作用,然而这样的突发流量并不常见;如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。

  • 可恢复性

零碎的一部分组件生效时,不会影响到整个零碎。音讯队列升高了过程间的耦合度,所以即便一个解决音讯的过程挂掉,退出队列中的音讯依然能够在零碎复原后被解决。

  • 程序保障

在大多应用场景下,数据处理的程序都很重要。大部分音讯队列原本就是排序的,并且能保证数据会依照特定的程序来解决。Kafka保障一个Partition内的音讯的有序性。

  • 缓冲

在任何重要的零碎中,都会有须要不同的解决工夫的元素。例如,加载一张图片比利用过滤器破费更少的工夫。音讯队列通过一个缓冲层来帮忙工作最高效率的执行———写入队列的解决会尽可能的疾速。该缓冲有助于管制和优化数据流通过零碎的速度。

  • 异步通信

很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。

Kafka零碎架构

Broker

  • Kafka 集群蕴含一个或多个服务器,服务器节点称为broker。

Topic

  • 每条公布到Kafka集群的音讯都有一个类别,这个类别被称为Topic。
  • 相似于数据库的表名或者ES的Index
  • 物理上不同Topic的音讯离开存储
  • 逻辑上一个Topic的音讯尽管保留于一个或多个broker上但用户只需指定音讯的Topic即可生产或生产数据而不用关怀数据存于何处)
  • 创立流程
  • 1.controller在ZooKeeper的/brokers/topics节点上注册watcher,当topic被创立,则controller会通过watch失去该topic的partition/replica调配。2.controller从/brokers/ids读取以后所有可用的broker列表,对于set_p中的每一个partition:    2.1从调配给该partition的所有replica(称为AR)中任选一个可用的broker作为新的leader,并将AR设置为新的ISR    2.2将新的leader和ISR写入/brokers/topics/[topic]/partitions/[partition]/state3.controller通过RPC向相干的broker发送LeaderAndISRRequest。
  • 删除流程
  • 1.controller在zooKeeper的/brokers/topics节点上注册watcher,当topic被删除,则controller会通过watch失去该topic的partition/replica调配。2.若delete.topic.enable=false,完结;否则controller注册在/admin/delete_topics上的watch被fire,controller通过回调向对应的broker发送StopReplicaRequest。

Partition

  • topic中的数据宰割为一个或多个partition。
  • 每个topic至多有一个partition,当生产者产生数据的时候,依据调配策略,抉择分区,而后将音讯追加到指定的分区的开端(队列)

    • ## Partation数据路由规定1. 指定了 patition,则间接应用;2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
  • 每条音讯都会有一个自增的编号

    • 标识程序
    • 用于标识音讯的偏移量
  • 每个partition中的数据应用多个segment文件存储。
  • partition中的数据是有序的,不同partition间的数据失落了数据的程序。
  • 如果topic有多个partition,生产数据时就不能保证数据的程序。严格保障音讯的生产程序的场景下,须要将partition数目设为1。

    Leader

  • 每个partition有多个正本,其中有且仅有一个作为Leader,Leader是以后负责数据的读写的partition。
  • 1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader2. producer 将音讯发送给该 leader3. leader 将音讯写入本地 log4. followers 从 leader pull 音讯,写入本地 log 后 leader 发送 ACK5. leader 收到所有 ISR 中的 replica 的 ACK 后,减少 HW(high watermark,最初 commit 的 offset) 并向 producer 发送 ACK

Follower

  • Follower追随Leader,所有写申请都通过Leader路由,数据变更会播送给所有Follower,Follower与Leader保持数据同步。
  • 如果Leader生效,则从Follower中选举出一个新的Leader。
  • 当Follower挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,从新创立一个Follower。

replication

  • 数据会寄存到topic的partation中,然而有可能分区会损坏
  • 咱们须要对分区的数据进行备份(备份多少取决于你对数据的器重水平)
  • 咱们将分区的分为Leader(1)和Follower(N)

    • Leader负责写入和读取数据
    • Follower只负责备份
    • 保障了数据的一致性
  • 备份数设置为N,示意主+备=N(参考HDFS)

    • ## Kafka 调配 Replica 的算法如下1. 将所有 broker(假如共 n 个 broker)和待调配的 partition 排序2. 将第 i 个 partition 调配到第(i mod n)个 broker 上
  • producer

  • 生产者即数据的发布者,该角色将音讯公布到Kafka的topic中。
  • broker接管到生产者发送的音讯后,broker将该音讯追加到以后用于追加数据的segment文件中。
  • 生产者发送的音讯,存储到一个partition中,生产者也能够指定数据存储的partition。

    consumer

  • 消费者能够从broker中读取数据。消费者能够生产多个topic中的数据。
  • kafka 提供了两套 consumer API:

    • 1. The high-level Consumer API
  • high-level consumer API 提供了一个从 kafka 生产数据的高层形象,而 SimpleConsumer API 则须要开发人员更多地关注细节。

Consumer Group

  • 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
  • 将多个消费者集中到一起去解决某一个Topic的数据,能够更快的进步数据的生产能力
  • 整个消费者组共享一组偏移量(避免数据被反复读取),因为一个Topic有多个分区

offset偏移量

  • 能够惟一的标识一条音讯
  • 偏移量决定读取数据的地位,不会有线程平安的问题,消费者通过偏移量来决定下次读取的音讯
  • 音讯被生产之后,并不被马上删除,这样多个业务就能够重复使用kafka的音讯
  • 咱们某一个业务也能够通过批改偏移量达到从新读取音讯的目标,偏移量由用户管制
  • 音讯最终还是会被删除的,默认生命周期为1周(7*24小时)

Zookeeper

  • kafka 通过 zookeeper 来存储集群的 meta 信息。

Kafka环境搭建

  • 基于Zookeeper搭建并开启

    • 验证ZK的可用性
    • 【123】zkServer.sh start
  • 配置Kafka

    • 基本操作

      • 上传解压拷贝
    • 批改配置文件

      • ## vim server.properties20 broker.id=025 port=909258 log.dirs=/var/bdp/kafka-logs
    • 批改环境变量

      • ## vim /etc/profileexport KAFKA_HOME=/opt/lzj/kafka_2.11-0.8.2.1export PATH=$KAFKA_HOME/bin:$PATH## 配置文件失效
    • 将文件目录拷贝到其余机器

      • [1]scp -r kafka_2.11-0.8.2.1 root@node02:`pwd`[1]scp -r kafka_2.11-0.8.2.1 root@node03:`pwd`[1]scp /etc/profile root@node02:/etc/profile[1]scp /etc/profile root@node03:/etc/profile
    • 批改其余机器上的配置

      • ## vim server.properties[2]broker.id=1
    • 启动集群

      • kafka-server-start.sh /opt/lzj/kafka_2.11-0.8.2.1/config/server.properties
    • 常见命令

      • //创立主题kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic userlogkafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 6 --topic studentlog
    kafka-topics.sh --zookeeper node01:2181 --delete --replication-factor 2 --partitions 6 --topic baidu    //查看所有主题  kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list    //查看主题  kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic userlog    //创立生产者  kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog    //创立消费者  kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic userlog  ```

Kafka数据检索机制

  • topic在物理层面以partition为分组,一个topic能够分成若干个partition
  • partition还能够细分为Segment,一个partition物理上由多个Segment组成

    • segment 的参数有两个:

      • log.segment.bytes:单个segment可包容的最大数据量,默认为1GB
      • log.segment.ms:Kafka在commit一个未写满的segment前,所期待的工夫(默认为7天)
  • LogSegment 文件由两局部组成,别离为“.index”文件和“.log”文件,别离示意为 Segment 索引文件和数据文件。

    • partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最初一条音讯的offset值
    • 数值大小为64位,20位数字字符长度,没有数字用0填充
    • 第一个segment00000000000000000000.index 00000000000000000000.log    第二个segment,文件命名以第一个segment的最初一条音讯的offset组成00000000000000170410.index 00000000000000170410.log 第三个segment,文件命名以上一个segment的最初一条音讯的offset组成00000000000000239430.index 
  • 音讯都具备固定的物理构造,包含:offset(8 Bytes)、音讯体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,能够确定一条音讯的大小,即读取到哪里截止。
  • 数据的安全性

    producer delivery guarantee

  • 0. At least one 音讯绝不会丢,但可能会反复传输1. At most once 音讯可能会丢,但绝不会反复传输2. Exactly once 每条音讯必定会被传输一次且仅传输一次
  • Producers能够抉择是否为数据的写入接管ack,有以下几种ack的选项:request.required.acks

    • acks=0:

      • Producer 在 ISR 中的 Leader 已胜利收到的数据并失去确认后发送下一条 Message。
    • acks=1:

      • 这意味着 Producer 无需期待来自 Broker 的确认而持续发送下一批音讯。
    • acks=all:

      • Producer 须要期待 ISR 中的所有 Follower 都确认接管到数据后才算一次发送实现,可靠性最高。

ISR机制

  • 关键词

    • AR : Assigned Replicas 用来标识正本的选集
    • OSR : out -sync Replicas 来到同步队列的正本
    • ISR : in -sync Replicas 退出同步队列的正本
    • ISR = Leader + 没有落后太多的正本;AR = OSR+ ISR。
  • 咱们备份数据就是避免数据失落,当主节点挂掉时,能够启用备份节点

    • producer--push-->leader
    • leader--pull-->follower
    • Follower每距离肯定工夫去Leader拉取数据,来保证数据的同步
  • ISR(in-syncReplica)

    • 当主节点挂点,并不是去Follower抉择主,而是从ISR中抉择主
    • 判断规范

      • 超过10秒钟没有同步数据

        • replica.lag.time.max.ms=10000
      • 主副节点差4000条数据

        • rerplica.lag.max.messages=4000
    • 脏节点选举

      • kafka采纳一种降级措施来解决:
      • 选举第一个复原的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举

Broker数据存储机制

  • 无论音讯是否被生产,kafka 都会保留所有音讯。有两种策略能够删除旧数据:
  • 1. 基于工夫:log.retention.hours=1682. 基于大小:log.retention.bytes=1073741824

    consumer delivery guarantee

  • 如果将 consumer 设置为 autocommit,consumer 一旦读到数据立刻主动 commit。如果只探讨这一读取音讯的过程,那 Kafka 确保了 Exactly once。
  • 读完音讯先 commit 再解决音讯。

    • 如果 consumer 在 commit 后还没来得及解决音讯就 crash 了,下次从新开始工作后就无奈读到刚刚已提交而未解决的音讯
    • 这就对应于 At most once
  • 读完音讯先解决再 commit。

    • 如果在解决完音讯之后 commit 之前 consumer crash 了,下次从新开始工作时还会解决刚刚未 commit 的音讯,实际上该音讯曾经被解决过了。
    • 这就对应于 At least once。
  • 如果肯定要做到 Exactly once,就须要协调 offset 和实际操作的输入。

    • 经典的做法是引入两阶段提交。
  • Kafka 默认保障 At least once,并且容许通过设置 producer 异步提交来实现 At most once

    数据的生产

  • partiton_num=2,启动一个consumer过程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。
  • 如果auto.commit.enable=true,

    • 当consumer fetch了一些数据但还没有齐全解决掉的时候,
    • 刚好到commit interval登程了提交offset操作,接着consumer crash掉了。
    • 这时曾经fetch的数据还没有解决实现但曾经被commit掉,因而没有机会再次被解决,数据失落。
  • 如果auto.commit.enable=false,

    • 假如consumer的两个fetcher各自拿了一条数据,并且由两个线程同时解决,
    • 这时线程t1解决完partition1的数据,手动提交offset,这里须要着重阐明的是,当手动执行commit的时候,
    • 实际上是对这个consumer过程所占有的所有partition进行commit,kafka临时还没有提供更细粒度的commit形式,
    • 也就是说,即便t2没有解决完partition2的数据,offset也被t1提交掉了。如果这时consumer crash掉,t2正在解决的这条数据就失落了。
  • 办法1:(将多线程问题转成单线程)

    • 手动commit offset,并针对partition_num启同样数目的consumer过程,这样就能保障一个consumer过程占有一个partition,commit offset的时候不会影响别的partition的offset。但这个办法比拟局限,因为partition和consumer过程的数目必须严格对应
  • 办法2:(参考HDFS数据写入流程)

    • 手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,当把queue里所有的数据处理完之后,再批量提交offset,这样就能保障只有解决完的数据才被commit。

    JavaAPI

    生产者

  • 创立一线程反复的向kafka输出数据

    • 创立生产者线程类

      public class Hello01Producer extends Thread {
      //创立Kafka的生产者
      private Producer<String, String> producer;

      /**

      • 创立结构器
        */

      public Hello01Producer(String pname) {

       //设置线程的名字 super.setName(pname); //创立配置文件列表 Properties properties = new Properties(); // kafka地址,多个地址用逗号宰割 properties.put("metadata.broker.list", "192.168.58.161:9092,192.168.58.162:9092,192.168.58.163:9092"); //设置写出数据的格局 properties.put("serializer.class", StringEncoder.class.getName()); //写出的应答形式 properties.put("acks", 1); //批量写出 properties.put("batch.size", 16384); //创立生产者对象 producer = new Producer<String, String>(new kafka.producer.ProducerConfig(properties));

      }

      @Override
      public void run() {

       //初始化一个计数器 int count = 0; System.out.println("Hello01Producer.run--开始发送数据"); //迭代發送音讯 while (count < 100000) {     String key = String.valueOf(++count);     String value = Thread.currentThread().getName() + "--" + count;     //封装音讯对象     KeyedMessage<String, String> message = new KeyedMessage<>("userlog", key, value);     //发送音讯到服务器     producer.send(message);     //打印消息     System.out.println("Producer.run--" + key + "--" + value);     //每个1秒发送1条     try {         Thread.sleep(100);     } catch (InterruptedException e) {         e.printStackTrace();     } }

      }

      public static void main(String[] args) {

       Hello01Producer producer = new Hello01Producer("上海尚学堂"); producer.start();

      }
      }

    消费者

  • 创立一线程反复的向kafka生产数据

    //创立消费者对象private ConsumerConnector consumer;/** * 创立结构器 */public Hello01Consumer(String cname) {    super.setName(cname);    //读取配置文件    Properties properties = new Properties();    //ZK地址    properties.put("zookeeper.connect", "192.168.58.161:2181,192.168.58.162:2181,192.168.58.163:2181");    //消费者所在组的名称    properties.put("group.id", "shsxt-bigdata");    //ZK超时工夫    properties.put("zookeeper.session.timeout.ms", "400");    //当消费者第一次生产时,从最低的偏移量开始生产    properties.put("auto.offset.reset", "smallest");    //主动提交偏移量    properties.put("auto.commit.enable", "true");    //消费者主动提交偏移量的工夫距离    properties.put("auto.commit.interval.ms", "1000");    //创立消费者对象    consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));}@Overridepublic void run() {    // 形容读取哪个topic,须要几个线程读    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    topicCountMap.put("userlog", 1);    //消费者给句配置信息开始读取音讯流    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);    // 每个线程对应于一个KafkaStream    List<KafkaStream<byte[], byte[]>> list = consumerMap.get("userlog");    // 获取kafkastream流    KafkaStream stream0 = list.get(0);    ConsumerIterator<byte[], byte[]> it = stream0.iterator();    //开始迭代并获取数据    while (it.hasNext()) {        // 获取一条音讯        MessageAndMetadata<byte[], byte[]> value = it.next();        int partition = value.partition();        long offset = value.offset();        String data = new String(value.message());        System.out.println("开始" + data + " partition:" + partition + " offset:" + offset);    }}public static void main(String[] args) {    Hello01Consumer consumer01 = new Hello01Consumer("李毅");    consumer01.start();}

    }

    反复生产和数据的失落

  • 有可能一个消费者取出了一条数据(offset=88),然而还没有解决实现,然而消费者被敞开了

    • 如果下次还能从88重新处理就属于完满状况
    • 如果下次数据从86开始,就属于数据的反复生产
    • 如果下次数据从89开始,就是与数据的失落
    • //消费者主动提交偏移量的工夫距离props.put("auto.commit.interval.ms", "1010");提交距离》单条执行工夫  (反复)

Kafka优化

Partition 数目

  • 一般来说,每个partition 能解决的吞吐为几MB/s(仍须要基于依据本地环境测试后获取精确指标),减少更多的partitions意味着:

    • 更高的并行度与吞吐
    • 能够扩大更多的(同一个consumer group中的)consumers
    • 若是集群中有较多的brokers,则可更大程度上利用闲置的brokers
    • 然而会造成Zookeeper的更多选举
    • 也会在Kafka中关上更多的文件
  • 调整准则

    • 一般来说,若是集群较小(小于6个brokers),则配置2 x broker数的partition数。在这里次要思考的是之后的扩大。若是集群扩大了一倍(例如12个),则不必放心会有partition有余的景象产生
    • 一般来说,若是集群较大(大于12个),则配置1 x broker 数的partition数。因为这里不须要再思考集群的扩大状况,与broker数雷同的partition数曾经足够应酬惯例场景。若有必要,则再手动调整
    • 思考最高峰吞吐须要的并行consumer数,调整partition的数目。若是利用场景须要有20个(同一个consumer group中的)consumer并行生产,则据此设置为20个partition
    • 思考producer所需的吞吐,调整partition数目(如果producer的吞吐十分高,或是在接下来两年内都比拟高,则减少partition的数目)

Replication factor

  • 此参数决定的是records复制的数目,倡议至多 设置为2,个别是3,最高设置为4。
  • 更高的replication factor(假如数目为N)意味着:

    • 零碎更稳固(容许N-1个broker宕机)
    • 更多的正本(如果acks=all,则会造成较高的延时)
    • 零碎磁盘的使用率会更高(个别若是RF为3,则绝对于RF为2时,会占据更多50% 的磁盘空间)
  • 调整准则:

    • 以3为起始(当然至多须要有3个brokers,同时也不倡议一个Kafka 集群中节点数少于3个节点)
    • 如果replication 性能成为了瓶颈或是一个issue,则倡议应用一个性能更好的broker,而不是升高RF的数目
    • 永远不要在生产环境中设置RF为1

批量写入

  • 为了大幅度提高producer写入吞吐量,须要定期批量写文件
  • 每当producer写入10000条音讯时,刷数据到磁盘log.flush.interval.messages=10000每距离1秒钟工夫,刷数据到磁盘log.flush.interval.ms=1000

    Flume+Kafka集成

  • 搭建Flume并编写配置文件

    • vim /opt/lzj/flume-1.6.0/options/f2k.conf
    • #flume-ng agent -n a1 -f /opt/lzj/flume-1.6.0/options/f2k.conf - flume.root.logger=INFO,console# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /var/bdp/baidu.ping# Describe the sinka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = baidua1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 10a1.sinks.k1.channel = c1# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000000a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channela1.sources.r1.channels = c1
  • 启动Zookeeper,Kafka,而后创立Topic(baidu),开启消费者

    • 【123】zkServer.sh start
      【123】kafka-server-start.sh /opt/lzj/kafka_2.11/config/server.properties
      【1】kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 3 --topic baidu
      【1】kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic baidu
  • 开启Flume

    • 【1】flume-ng agent -n a1 -f /opt/lzj/apache-flume-1.6.0-bin/options/f2k.conf -Dflume.root.logger=INFO,console
  • 开始ping百度的脚本

    • ping www.baidu.com >> /var/bdp/baidu.ping 2>&1 &

感激大家的认同与反对,小编会继续转发《乐字节》优质文章