关于程序员:Kafka的学习与使用

27次阅读

共计 3032 个字符,预计需要花费 8 分钟才能阅读完成。

Kafka 的学习与应用

简介

起初是由 LinkedIn 公司采纳 Scala 语言开发的一个分布式、多分区、多正本且基于 zookeeper 协调的分布式音讯零碎,现已募捐给 Apache 基金会。它是一种高吞吐量的分布式公布订阅音讯零碎,以可程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如 Cloudera、Apache Storm、Spark、Flink 等都反对与 Kafka 集成。

kafka 是用 Scala 写的,一下两种框架都是应用 scala 语言写的:

  • kafka(音讯队列): 分布式音讯队列 外部代码常常用来解决并发的问题 用 scala 能够大大简化其代码。
  • spark(实时的大数据分析框架,和 flink 性能类似): 解决多线程场景不便 另外 spark 次要用作内存计算 常常要用来实现简单的算法 利用 scala 这种函数式编程语言 能够大大简化代码

Kafka 与 zk 的关系

zk 为 kafka 提供一下几点作用(须要留神,在 0.9 版本之前 offset 存储在 ZK0.9 版本及之后 offset 存储在 kafka 本地外部 topic 本人保留)

1.Broker 注册

Broker 是分布式部署并且相互之间互相独立,然而须要有一个注册零碎可能将整个集群中的 Broker 治理起来 ,此时就应用到了 Zookeeper。在 Zookeeper 上会有一个专门 用来进行 Broker 服务器列表记录 的节点:/brokers/ids

每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创立属于本人的节点,如 /brokers/ids/[0…N]。

Kafka 应用了全局惟一的数字来指代每个 Broker 服务器,不同的 Broker 必须应用不同的 Broker ID 进行注册,创立完节点后,每个 Broker 就会将本人的 IP 地址和端口信息记录 到该节点中去。其中,Broker 创立的节点类型是长期节点,一旦 Broker 宕机,则对应的长期节点也会被主动删除。

2.Topic 注册

在 Kafka 中,同一个 Topic 的音讯会被分成多个分区 并将其散布在多个 Broker 上,这些分区信息及与 Broker 的对应关系 也都是由 Zookeeper 在保护,由专门的节点来记录,如:/borkers/topics

Kafka 中每个 Topic 都会以 /brokers/topics/[topic]的模式被记录,如 /brokers/topics/login 和 /brokers/topics/search 等。Broker 服务器启动后,会到对应 Topic 节点(/brokers/topics)上注册本人的 Broker ID 并写入针对该 Topic 的分区总数,如 /brokers/topics/login/3->2,这个节点示意 Broker ID 为 3 的一个 Broker 服务器,对于 ”login” 这个 Topic 的音讯,提供了 2 个分区进行音讯存储,同样,这个分区节点也是长期节点。

https://www.cnblogs.com/Lee-y…

Kafka 的架构图解析

Kafka 装置步骤

kafka 是依赖 zookeeper 的,所以在装置前你须要装置并启动 zookeeper。

  • 上传文件

这里我应用的 kafka 版本比拟老,因为个别公司应用的都不会是最新的版本。在 linux 中的 /opt 文件夹中新增 kafka 文件夹,并把安装包上传,能够应用 xftp 进行上传。

  • 解压

应用命令 tar -zxvf kafka_2.11-0.11.0.0.tgz 解压 kafka 的文件,解压后应用 mv 命令进行重命名,把解压后的文件夹重命名为 kafka,这样不便当前输出。

  • 批改配置

进入 kafka 文件夹的 config 目录。能够看到目录构造如下

首先批改 server.properties文件,命令vi server.properties

先在 kafka 文件夹下新建一个 logs 的文件夹,上面须要应用

这里次要批改到处,在高版本中曾经没有删除 topic 性能这个配置能够自行疏忽

#broker 的全局惟一编号,不能反复
broker.id=0
#删除 topic 性能使能
delete.topic.enable=true
#kafka 运行日志寄存的门路
log.dirs=/opt/kafka/logs
#zk 的地址, 这里改成本人的 zk 地址就好
zookeeper.connect=localhost:2181

这里须要留神的点是默认 kafka 音讯默认是存储 168 小时也就是 7 天,这个也是在这个文件中配置的。

而后配置环境变量

应用命令:vi /etc/profile,在文件的最初配置 kafka

#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

最初如果是集群装置还要散发安装包,这里临时不须要。散发应用xsync kafka/, 散发后记得批改 brokerid,在集群中须要惟一。

  • 启动

启动的时候要带上咱们的配置文件,且以守护过程的形式启动

bin/kafka-server-start.sh -daemon config/server.properties

如果是集群能够本人写群起脚本。

Kafka 命令行操作

首先进入 kafka 装置目录的 bin 目录。

  • 查看以后服务器中的所有 topic

    kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

  • 创立 topic

    kafka-topics.sh --zookeeper 127.0.0.1:12181 --create --replication-factor 3 --partitions 1 --topic first

    选项阐明:
    –topic 定义 topic 名
    –replication-factor 定义正本数
    –partitions 定义分区数

  • 删除 topic

    kafka-topics.sh –zookeeper 127.0.0.1:12181 –delete –topic first

    须要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。

  • 发送音讯

    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first

    输出命令后就能够进行发送音讯了

  • 生产音讯(多种形式)

    这是之前老版本的形式,连贯 zk,当初不举荐

    kafka-console-consumer.sh --zookeeper 127.0.0.1:12181 --topic first

    新版本的对于 offset 等数据曾经存储在 kafka 本地了,所以应用这个命令

    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first

    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first

    –from-beginning:会把主题中以往所有的数据都读取进去。

  • 查看某个 Topic 的详情

    kafka-topics.sh --zookeeper 127.0.0.1:12181 --describe --topic first

  • 批改分区数

    kafka-topics.sh --zookeeper127.0.0.1:12181 --alter --topic first --partitions 6

正文完
 0