关于linux:超详细-kafka-入门最佳实践

42次阅读

共计 15624 个字符,预计需要花费 40 分钟才能阅读完成。

意识 kafka

kafka 简介

Kafka 是一个分布式流媒体平台,kafka 官网:http://kafka.apache.org/

  • (1)流媒体平台有三个要害性能:
  • 公布和订阅记录流,相似于音讯队列或企业消息传递零碎。
  • 以容错的长久形式存储 记录流。
  • 记录产生时解决流。
  • (2)Kafka 通常用于两大类利用:
  • 构建可在 零碎或应用程序之间 牢靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • (3)首先是几个概念:
  • Kafka 作为一个集群运行在一个或多个可跨多个 数据中心的服务器 上。
  • Kafka 集群以称为 topics 主题 的类别存储记录流。
  • 每条记录都蕴含 一个键,一个值和一个工夫戳
  • (4)Kafka 有四个外围 API:
  • Producer API(生产者 API)容许应用程序公布记录流至一个或多个 kafka 的 topics(主题)。
  • Consumer API(消费者 API)容许应用程序订阅一个或多个 topics(主题),并解决所产生的对他们记录的数据流。
  • Streams API(流 API)容许应用程序充当流处理器,从一个或多个 topics(主题)耗费的输出流,并产生一个输入流至一个或多个输入的 topics(主题),无效地变换所述输出流,以输入流。
  • Connector API(连接器 API)容许构建和运行 kafka topics(主题)连贯到现有的应用程序或数据系统中重用生产者或消费者。例如,关系数据库的连接器可能捕捉对表的每个更改。

    在 Kafka 中,客户端和服务器之间的通信是通过简略,高性能,语言无关的 TCP 协定实现的。此协定已版本化并放弃与旧版本的向后兼容性。Kafka 提供 Java 客户端,但客户端有多种语言版本。

1.2 Topics 主题 和 partitions 分区

咱们首先深刻理解 Kafka 为记录流提供的外围形象 – 主题 topics

一个 Topic 能够认为是一类音讯,每个 topic 将被分成多个 partition(区), 每个 partition 在存储层面是 append log 文件

主题是公布记录的类别或订阅源名称。Kafka 的主题总是多用户; 也就是说,一个主题能够有零个,一个或多个消费者订阅写入它的数据。

对于每个主题,Kafka 群集都保护一个如下所示的分区日志:

每个分区都是一个有序的,不可变的记录序列,一直附加到结构化的提交日志中。分区中的记录每个都调配了一个称为偏移的程序 ID 号,它惟一地标识分区中的每个记录。

Kafka 集群长久保留所有已公布的记录 – 无论是否已应用 – 应用可配置的保留期。例如,如果保留策略设置为两天,则在公布记录后的两天内,它可供使用,之后将被抛弃以开释空间。Kafka 的性能在数据大小方面实际上是恒定的,因而长时间存储数据不是问题。实际上,基于每个消费者保留的惟一元数据是该消费者在日志中的偏移或地位。这种偏移由消费者管制:通常消费者在读取记录时会线性地进步其偏移量,但事实上,因为该地位由消费者管制,因而它能够依照本人喜爱的任何程序生产记录。例如,消费者能够重置为较旧的偏移量来重新处理过来的数据,或者跳到最近的记录并从“当初”开始生产。

这些性能组合意味着 Kafka 消费者 consumers 十分 cheap – 他们能够来来往往对集群或其余消费者没有太大影响。例如,您能够应用咱们的命令行工具“tail”任何主题的内容,而无需更改任何现有使用者所耗费的内容。

日志中的分区有多种用处。首先,它们容许日志扩大到超出适宜单个服务器的大小。每个独自的分区必须适宜托管它的服务器,但主题可能有许多分区,因而它能够解决任意数量的数据。其次,它们充当了并行性的单位 – 更多的是它。

1.3 Distribution 调配

一个 Topic 的多个 partitions, 被散布在 kafka 集群中的多个 server 上; 每个 server(kafka 实例)负责 partitions 中音讯的读写操作; 此外 kafka 还能够配置 partitions 须要备份的个数(replicas), 每个 partition 将会被备份到多台机器上, 以进步可用性.

基于 replicated 计划, 那么就意味着须要对多个备份进行调度; 每个 partition 都有一个 server 为 ”leader”;leader 负责所有的读写操作, 如果 leader 生效, 那么将会有其余 follower 来接管(成为新的 leader);follower 只是枯燥的和 leader 跟进, 同步音讯即可.. 由此可见作为 leader 的 server 承载了全副的申请压力, 因而从集群的整体思考, 有多少个 partitions 就意味着有多少个 ”leader”,kafka 会将 ”leader” 平衡的扩散在每个实例上, 来确保整体的性能稳固。

1.4 Producers 生产者 和 Consumers 消费者

1.4.1 Producers 生产者

Producers 将数据公布到指定的 topics 主题。同时 Producer 也能决定将此音讯归属于哪个 partition; 比方基于 ”round-robin” 形式或者通过其余的一些算法等。

1.4.2 Consumers

  • 实质上 kafka 只反对 Topic. 每个 consumer 属于一个 consumer group; 反过来说, 每个 group 中能够有多个 consumer. 发送到 Topic 的音讯, 只会被订阅此 Topic 的每个 group 中的一个 consumer 生产。
  • 如果所有使用者实例具备雷同的使用者组,则记录将无效地在使用者实例上进行负载平衡。
  • 如果所有消费者实例具备不同的消费者组,则每个记录将播送到所有消费者过程。剖析:两个服务器 Kafka 群集,托管四个分区(P0-P3),蕴含两个使用者组。消费者组 A 有两个消费者实例,B 组有四个消费者实例。

在 Kafka 中实现生产 consumption 的形式是通过在消费者实例上划分日志中的分区,以便每个实例在任何工夫点都是调配的“偏心份额”的独占消费者。保护组中成员资格的过程由 Kafka 协定动静解决。如果新实例退出该组,他们将从该组的其余成员接管一些分区; 如果实例死亡,其分区将分发给其余实例。

Kafka 仅提供分区内记录的总订单,而不是主题中不同分区之间的记录。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。然而,如果您须要对记录进行总订单,则能够应用仅蕴含一个分区的主题来实现,但这将意味着每个使用者组只有一个使用者过程。

1.5 Consumers kafka 确保

  • 发送到 partitions 中的音讯将会依照它接管的程序追加到日志中。也就是说,如果记录 M1 由与记录 M2 雷同的生成者发送,并且首先发送 M1,则 M1 将具备比 M2 更低的偏移并且在日志中更早呈现。
  • 消费者实例依照它们存储在日志中的程序查看记录。对于消费者而言, 它们生产音讯的程序和日志中音讯程序统一。
  • 如果 Topic 的 ”replicationfactor” 为 N, 那么容许 N - 1 个 kafka 实例生效,咱们将容忍最多 N - 1 个服务器故障,而不会失落任何提交到日志的记录。

1.6 kafka 作为音讯零碎

Kafka 的流概念与传统的企业邮件系统相比如何?

(1)传统音讯零碎

音讯传统上有两种模型:queuing 排队 and publish-subscribe 公布 – 订阅。在队列中,消费者池能够从服务器读取并且每个记录转到其中一个; 在公布 – 订阅中,记录被播送给所有消费者。这两种模型中的每一种都有长处和毛病。排队的劣势在于它容许您在多个消费者实例上划分数据处理,从而能够扩大您的解决。可怜的是,一旦一个过程读取它曾经隐没的数据,队列就不是多用户。公布 – 订阅容许您将数据播送到多个过程,但因为每条音讯都发送给每个订阅者,因而无奈进行扩大解决。

卡夫卡的消费者群体概念概括了这两个概念。与队列一样,使用者组容许您将解决划分为一组过程(使用者组的成员)。与公布 – 订阅一样,Kafka 容许您向多个消费者组播送音讯。

(2)kafka 的劣势

Kafka 模型的劣势在于每个主题都具备这些属性 – 它能够扩大解决并且也是多用户 – 不须要抉择其中一个。

与传统的音讯零碎相比,Kafka 具备更强的订购保障。

传统队列在服务器上按程序保留记录,如果多个消费者从队列中耗费,则服务器依照存储程序散发记录。然而,尽管服务器按程序散发记录,然而记录是异步传递给消费者的,因而它们可能会在不同的消费者处呈现故障。这实际上意味着在存在并行耗费的状况下失落记录的程序。消息传递零碎通常通过具备“独占消费者”概念来解决这个问题,该概念只容许一个过程从队列中耗费,但当然这意味着解决中没有并行性。

kafka 做得更好。通过在主题中具备并行性概念 – 分区 –,Kafka 可能在消费者流程池中提供订购保障和负载平衡。这是通过将主题中的分区调配给使用者组中的使用者来实现的,以便每个分区仅由该组中的一个使用者应用。通过这样做,咱们确保使用者是该分区的惟一读者并按程序应用数据。因为有许多分区,这依然能够均衡许多消费者实例的负载。但请留神,消费者组中的消费者实例不能超过分区。

1.7 kafka 作为存储系统

  • 任何容许公布与生产音讯拆散的音讯的音讯队列实际上充当了正在进行的音讯的存储系统。Kafka 的不同之处在于它是一个十分好的存储系统。
  • 写入 Kafka 的数据将写入磁盘并进行复制以实现容错。Kafka 容许生产者期待确认,以便在齐全复制之前写入不被认为是残缺的,并且即便写入的服务器失败也保障写入依然存在。
  • 磁盘构造 Kafka 很好地应用了规模 – 无论服务器上有 50 KB 还是 50 TB 的持久数据,Kafka 都会执行雷同的操作。
  • 因为认真对待存储并容许客户端管制其读取地位,您能够将 Kafka 视为一种专用于高性能,低提早提交日志存储,复制和流传的专用分布式文件系统。

1.8 kafka 用于流解决

  • 仅仅读取,写入和存储数据流是不够的,目标是实现流的实时处理。
  • 在 Kafka 中,流处理器是指从输出主题获取间断数据流,对此输出执行某些解决以及生成间断数据流以输入主题的任何内容。
  • 例如,批发应用程序可能会接管销售和发货的输出流,并输入从新排序流和依据此数据计算的价格调整。
  • 能够应用生产者和消费者 API 间接进行简略解决。然而,对于更简单的转换,Kafka 提供了齐全集成的 Streams API。这容许构建执行非平庸解决的应用程序,这些应用程序能够计算流的聚合或将流连贯在一起。
  • 此工具有助于解决此类应用程序面临的难题:解决无序数据,在代码更改时重新处理输出,执行有状态计算等。
  • 流 API 构建在 Kafka 提供的外围原语上:它应用生产者和消费者 API 进行输出,应用 Kafka 进行有状态存储,并在流处理器实例之间应用雷同的组机制来实现容错。

2、kafka 应用场景

2.1 音讯 Messaging

Kafka 能够代替更传统的音讯代理。音讯代理的应用有多种起因(将解决与数据生成器拆散,缓冲未解决的音讯等)。与大多数消息传递零碎相比,Kafka 具备更好的吞吐量,内置分区,复制和容错性能,这使其成为大规模音讯解决应用程序的现实解决方案。

依据教训,消息传递的应用通常绝对较低,但可能须要较低的端到端提早,并且通常取决于 Kafka 提供的弱小的耐用性保障。

在这个畛域,Kafka 可与传统的消息传递零碎(如 ActiveMQ 或 RabbitMQ)相媲美。

2.2 网站流动跟踪

Kafka 的原始用例是可能将用户流动跟踪管道重建为一组实时公布 – 订阅源。这意味着站点流动(页面查看,搜寻或用户可能采取的其余操作)将公布到核心主题,每个流动类型蕴含一个主题。这些源可用于订购一系列用例,包含实时处理,实时监控以及加载到 Hadoop 或离线数据仓库零碎以进行脱机解决和报告。

流动跟踪通常十分高,因为为每个用户页面视图生成了许多流动音讯。

2.3 度量 Metrics

Kafka 通常用于经营监控数据。这波及从分布式应用程序聚合统计信息以生成操作数据的集中式提要。

2.4 日志聚合

许多人应用 Kafka 作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在地方地位(可能是文件服务器或 HDFS)进行解决。Kafka 形象出文件的细节,并将日志或事件数据作为音讯流更清晰地形象进去。这容许更低提早的解决并更容易反对多个数据源和分布式数据耗费。与 Scribe 或 Flume 等以日志为核心的零碎相比,Kafka 提供了同样杰出的性能,因为复制而具备更强的耐用性保障,以及更低的端到端提早。

2.5 流解决

许多 Kafka 用户在解决由多个阶段组成的管道时解决数据,其中原始输出数据从 Kafka 主题中生产,而后聚合,丰盛或以其余形式转换为新主题以供进一步生产或后续解决。

例如,用于举荐新闻文章的解决管道能够从 RSS 订阅源抓取文章内容并将其公布到“文章”主题; 进一步解决可能会对此内容进行规范化或反复数据删除,并将已清理的文章内容公布到新主题; 最终解决阶段可能会尝试向用户举荐此内容。此类解决管道基于各个主题创立实时数据流的图形。从 0.10.0.0 开始,这是一个轻量级但功能强大的流解决库,名为 Kafka Streams 在 Apache Kafka 中可用于执行如上所述的此类数据处理。除了 Kafka Streams 之外,其余开源流解决工具包含 Apache Storm 和 Apache Samza。

2.6 Event Sourcing

Event Sourcing 是一种利用程序设计格调,其中状态更改记录为按工夫排序的记录序列。Kafka 对十分大的存储日志数据的反对使其成为以这种格调构建的应用程序的杰出后端。

2.7 提交日志

Kafka 能够作为分布式系统的一种内部提交日志。该日志有助于在节点之间复制数据,并充当故障节点复原其数据的从新同步机制。Kafka 中的日志压缩性能有助于反对此用法。在这种用法中,Kafka 相似于 Apache BookKeeper 我的项目。

3、kafka 装置

3.1 下载安装

到官网 http://kafka.apache.org/downl…。

注:因为 Kafka 控制台脚本对于基于 Unix 和 Windows 的平台是不同的,因而在 Windows 平台上应用 binwindows 而不是 bin/ 将脚本扩展名更改为.bat。

[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[root@along ~]# cd /data/kafka_2.11-2.1.0/

3.2 配置启动 zookeeper

kafka 失常运行,必须配置 zookeeper,否则无论是 kafka 集群还是客户端的生存者和消费者都无奈失常的工作的;所以须要配置启动 zookeeper 服务。

(1)zookeeper 须要 java 环境

[root@along ~]# yum -y install java-1.8.0

(2)这里 kafka 下载包曾经包含 zookeeper 服务,所以只需批改配置文件,启动即可。

如果须要下载指定 zookeeper 版本;能够独自去 zookeeper 官网 http://mirrors.shu.edu.cn/apa…。

[root@along ~]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties
dataDir=/tmp/zookeeper   #数据存储目录
clientPort=2181   #zookeeper 端口
maxClientCnxns=0

注:可自行添加批改 zookeeper 配置

3.3 配置 kafka

(1)批改配置文件

[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

注:可依据本人需要批改配置文件

 broker.id:# 惟一标识 ID
 listeners=PLAINTEXT://localhost:9092:#kafka 服务监听地址和端口
 log.dirs:# 日志存储目录
 zookeeper.connect:# 指定 zookeeper 服务

(2)配置环境变量

[root@along ~]# vim /etc/profile.d/kafka.sh
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
[root@along ~]# source /etc/profile.d/kafka.sh

(3)配置服务启动脚本

[root@along ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
 
source /etc/rc.d/init.d/functions
 
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
 
[-e /etc/sysconfig/kafka] && . /etc/sysconfig/kafka
 
# See how we were called.
case "$1" in
 
  start)
    echo -n "Starting Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
    echo "done."
    exit 0
    ;;
 
  stop)
    echo -n "Stopping Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk'{print $2}'| xargs kill -9"
    echo "done."
    exit 0
    ;;
  hardstop)
    echo -n "Stopping (hard) Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk'{print $2}'| xargs kill -9"
    echo "done."
    exit 0
    ;;
 
  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if ["$c_pid" = ""] ; then
      echo "Stopped"
      exit 3
    else
      echo "Running $c_pid"
      exit 0
    fi
    ;;
 
  restart)
    stop
    start
    ;;
 
  *)
    echo "Usage: kafka {start|stop|hardstop|status|restart}"
    exit 1
    ;;
 
esac

3.4 启动 kafka 服务

(1)后盾启动 zookeeper 服务

[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &

(2)启动 kafka 服务

[root@along ~]# service kafka start
Starting kafka (via systemctl):                            [OK]
[root@along ~]# service kafka status
Running 86018
[root@along ~]# ss -nutl
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                              
tcp   LISTEN     0      50                    :::9092                              :::*                 
tcp   LISTEN     0      50                    :::2181                              :::*

4、kafka 应用简略入门

4.1 创立主题 topics

创立一个名为“along”的主题,它只蕴含一个分区,只有一个正本:

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".

如果咱们运行 list topic 命令,咱们当初能够看到该主题:

[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181
along 

4.2 发送一些音讯

Kafka 附带一个命令行客户端,它将从文件或规范输出中获取输出,并将其作为音讯发送到 Kafka 集群。默认状况下,每行将作为独自的音讯发送。

运行生产者,而后在控制台中键入一些音讯以发送到服务器。

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message

4.3 启动消费者

Kafka 还有一个命令行使用者,它会将音讯转储到规范输入。

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
This is a message
This is another message

所有命令行工具都有其余选项; 运行不带参数的命令将显示更具体地记录它们的应用信息。

5、设置多代理 kafka 群集

到目前为止,咱们始终在与一个 broker 运行,但这并不好玩。对于 Kafka,单个代理只是一个大小为 1 的集群,因而除了启动一些代理实例之外没有太多变动。然而为了感触它,让咱们将咱们的集群扩大到三个节点(依然在咱们的本地机器上)。

5.1 筹备配置文件

[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.11-2.1.0]# vim config/server-1.properties
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

注:该 broker.id 属性是群集中每个节点的惟一且永恒的名称。咱们必须笼罩端口和日志目录,因为咱们在同一台机器上运行这些,并且咱们心愿让所有代理尝试在同一端口上注册或笼罩彼此的数据。

5.2 开启集群另 2 个 kafka 服务

[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &
[root@along ~]# ss -nutl
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                          
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                 
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*                                
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*

5.3 在集群中进行操作

(1)当初创立一个复制因子为 3 的新主题 my-replicated-topic

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

(2)在一个集群中,运行“describe topics”命令查看哪个 broker 正在做什么

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
#正文:第一行给出了所有分区的摘要,每个附加行提供无关一个分区的信息。因为咱们只有一个分区用于此主题,因而只有一行。#“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机抉择的分区局部的领导者。#“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即便它们以后处于活动状态。#“isr”是“同步”复制品的汇合。这是正本列表的子集,该列表以后处于沉闷状态并且曾经被领导者捕捉。#请留神,Leader: 2,在我的示例中,节点 2 是该主题的惟一分区的 Leader。

(3)能够在咱们创立的原始主题上运行雷同的命令,以查看它的地位

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along
Topic:along PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: along    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

(4)向咱们的新主题公布一些音讯:

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
>^C

(5)当初让咱们应用这些音讯:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

5.4 测试集群的容错性

(1)当初让咱们测试一下容错性。Broker 2 充当 leader 所以让咱们杀了它:

[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737
[root@along ~]# kill -9 106737
[root@along ~]# ss -nutl
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                       
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*

(2)leader 已切换到其中一个隶属节点,节点 2 不再位于同步正本集中:

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 0,1

(3)即便最后承受写入的 leader 曾经失败,这些音讯仍可供生产:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

6、应用 Kafka Connect 导入 / 导出数据

从控制台写入数据并将其写回控制台是一个不便的终点,但有时候可能心愿应用其余起源的数据或将数据从 Kafka 导出到其余零碎。对于许多零碎,您能够应用 Kafka Connect 导入或导出数据,而不是编写自定义集成代码。

Kafka Connect 是 Kafka 附带的工具,用于向 Kafka 导入和导出数据。它是一个可扩大的工具,运行连接器,实现与内部零碎交互的自定义逻辑。在本疾速入门中,咱们将理解如何应用简略的连接器运行 Kafka Connect,这些连接器将数据从文件导入 Kafka 主题并将数据从 Kafka 主题导出到文件。

(1)首先创立一些种子数据进行测试:

[root@along ~]# echo -e "foonbar" > test.txt
或者在 Windows 上:> echo foo> test.txt
> echo bar>> test.txt

(2)接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用过程中运行。提供三个配置文件作为参数。

第一个始终是 Kafka Connect 流程的配置,蕴含常见配置,例如要连贯的 Kafka 代理和数据的序列化格局。

其余配置文件均指定要创立的连接器。这些文件包含惟一的连接器名称,要实例化的连接器类以及连接器所需的任何其余配置。

[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:
... ...
#注:Kafka 附带的这些示例配置文件应用您之前启动的默认本地群集配置并创立两个连接器:第一个是源连接器,它从输出文件读取行并生成每个 Kafka 主题,第二个是宿连接器从 Kafka 主题读取音讯并将每个音讯生成为输入文件中的一行。

(3)验证是否导入胜利(另起终端)

在启动过程中,您将看到许多日志音讯,包含一些批示正在实例化连接器的日志音讯。

① 一旦 Kafka Connect 过程启动,源连接器应该开始从 test.txt 主题读取行并将其生成到主题 connect-test,并且接收器连接器应该开始从主题读取音讯 connect-test 并将它们写入文件 test.sink.txt。咱们能够通过查看输入文件的内容来验证数据是否已通过整个管道传递:

[root@along ~]# cat test.sink.txt
foo
bar

② 请留神,数据存储在 Kafka 主题中 connect-test,因而咱们还能够运行控制台使用者来查看主题中的数据(或应用自定义使用者代码来解决它):

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

(4)持续追加数据,验证

[root@along ~]# echo Another line>> test.txt 
[root@along ~]# cat test.sink.txt
foo
bar
Another line
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

起源:https://www.cnblogs.com/along…

正文完
 0