Kafka的学习与应用

简介

起初是由LinkedIn公司采纳Scala语言开发的一个分布式、多分区、多正本且基于zookeeper协调的分布式音讯零碎,现已募捐给Apache基金会。它是一种高吞吐量的分布式公布订阅音讯零碎,以可程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如Cloudera、Apache Storm、Spark、Flink等都反对与Kafka集成。

kafka是用Scala写的,一下两种框架都是应用scala语言写的:

  • kafka(音讯队列) : 分布式音讯队列 外部代码常常用来解决并发的问题 用scala能够大大简化其代码。
  • spark(实时的大数据分析框架,和flink性能类似) : 解决多线程场景不便 另外 spark次要用作内存计算 常常要用来实现简单的算法 利用scala这种函数式编程语言 能够大大简化代码

Kafka与zk的关系

zk为kafka提供一下几点作用(须要留神,在0.9版本之前offset存储在ZK0.9版本及之后offset存储在kafka本地外部topic本人保留)

1.Broker注册

Broker是分布式部署并且相互之间互相独立,然而须要有一个注册零碎可能将整个集群中的Broker治理起来,此时就应用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokers/ids

每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创立属于本人的节点,如/brokers/ids/[0...N]。

Kafka应用了全局惟一的数字来指代每个Broker服务器,不同的Broker必须应用不同的Broker ID进行注册,创立完节点后,每个Broker就会将本人的IP地址和端口信息记录到该节点中去。其中,Broker创立的节点类型是长期节点,一旦Broker宕机,则对应的长期节点也会被主动删除。

2.Topic注册

在Kafka中,同一个Topic的音讯会被分成多个分区并将其散布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在保护,由专门的节点来记录,如:/borkers/topics

Kafka中每个Topic都会以/brokers/topics/[topic]的模式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册本人的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点示意Broker ID为3的一个Broker服务器,对于"login"这个Topic的音讯,提供了2个分区进行音讯存储,同样,这个分区节点也是长期节点。

https://www.cnblogs.com/Lee-y...

Kafka的架构图解析

Kafka装置步骤

kafka是依赖zookeeper的,所以在装置前你须要装置并启动zookeeper。

  • 上传文件

这里我应用的kafka版本比拟老,因为个别公司应用的都不会是最新的版本。在linux中的/opt文件夹中新增kafka文件夹,并把安装包上传,能够应用xftp进行上传。

  • 解压

应用命令tar -zxvf kafka_2.11-0.11.0.0.tgz解压kafka的文件,解压后应用mv 命令进行重命名,把解压后的文件夹重命名为kafka,这样不便当前输出。

  • 批改配置

进入kafka文件夹的config目录。能够看到目录构造如下

首先批改 server.properties文件,命令vi server.properties

先在kafka文件夹下新建一个logs的文件夹,上面须要应用

这里次要批改到处,在高版本中曾经没有删除 topic 性能这个配置能够自行疏忽

#broker 的全局惟一编号,不能反复broker.id=0#删除 topic 性能使能delete.topic.enable=true#kafka 运行日志寄存的门路log.dirs=/opt/kafka/logs#zk的地址,这里改成本人的zk地址就好zookeeper.connect=localhost:2181

这里须要留神的点是默认kafka音讯默认是存储168小时也就是7天,这个也是在这个文件中配置的。

而后配置环境变量

应用命令:vi /etc/profile,在文件的最初配置kafka

#KAFKA_HOMEexport KAFKA_HOME=/opt/kafkaexport PATH=$PATH:$KAFKA_HOME/bin

最初如果是集群装置还要散发安装包,这里临时不须要。散发应用xsync kafka/,散发后记得批改brokerid,在集群中须要惟一。

  • 启动

启动的时候要带上咱们的配置文件,且以守护过程的形式启动

bin/kafka-server-start.sh -daemon config/server.properties

如果是集群能够本人写群起脚本。

Kafka 命令行操作

首先进入kafka装置目录的bin目录。

  • 查看以后服务器中的所有 topic

    kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

  • 创立 topic

    kafka-topics.sh --zookeeper 127.0.0.1:12181 --create --replication-factor 3 --partitions 1 --topic first

    选项阐明:
    --topic 定义 topic 名
    --replication-factor 定义正本数
    --partitions 定义分区数

  • 删除 topic

    kafka-topics.sh --zookeeper 127.0.0.1:12181 --delete --topic first

    须要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。

  • 发送音讯

    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first

    输出命令后就能够进行发送音讯了

  • 生产音讯(多种形式)

    这是之前老版本的形式,连贯zk,当初不举荐

    kafka-console-consumer.sh --zookeeper 127.0.0.1:12181 --topic first

    新版本的对于offset等数据曾经存储在kafka本地了,所以应用这个命令

    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first

    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first

    --from-beginning:会把主题中以往所有的数据都读取进去。

  • 查看某个 Topic 的详情

    kafka-topics.sh --zookeeper 127.0.0.1:12181 --describe --topic first

  • 批改分区数

    kafka-topics.sh --zookeeper127.0.0.1:12181 --alter --topic first --partitions 6