学习kafka教程(二)

52次阅读

共计 3826 个字符,预计需要花费 10 分钟才能阅读完成。

欢迎关注公众号:n 平方如有问题或建议,请后台留言,我会尽力解决你的问题。
本文主要介绍【KafkaStreams】
简介
Kafka Streams 编写关键任务实时应用程序和微服务的最简单方法,是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在 Kafka 集群中。它结合了在客户端编写和部署标准 Java 和 Scala 应用程序的简单性和 Kafka 服务器端集群技术的优点。
Kafka Streams 是一个用于构建关键任务实时应用程序和微服务的客户端库,其中输入和 / 或输出数据存储在 Kafka 集群中。Kafka Streams 结合了在客户端编写和部署标准 Java 和 Scala 应用程序的简单性和 Kafka 服务器端集群技术的优点,使这些应用程序具有高度可伸缩性、灵活性、容错性、分布式等等。
目标

了解 kafka Streams
会使用 kafka Streams

过程
1. 首先 WordCountDemo 示例代码(Java8 以上)
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic “streams-plaintext-input”, where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(“streams-plaintext-input”,
Consumed.with(stringSerde, stringSerde);

KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“\\W+”)))

// Group the text words as message keys
.groupBy((key, value) -> value)

// Count the occurrences of each word (message key).
.count()

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to(“streams-wordcount-output”, Produced.with(Serdes.String(), Serdes.Long()));
它实现了 WordCount 算法,该算法从输入文本计算单词出现的直方图。然而,与您以前可能看到的对有界数据进行操作的其他 WordCount 示例不同,WordCount 演示应用程序的行为略有不同,因为它被设计为对无限、无界的数据流进行操作。与有界变量类似,它是一种有状态算法,用于跟踪和更新单词的计数。然而,由于它必须假定输入数据可能是无界的,因此它将周期性地输出当前状态和结果,同时继续处理更多的数据,因为它不知道何时处理了“所有”输入数据。
2. 安装并启动 zookeeper 和 kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
3. 创建主题接下来,我们创建名为 streams-plain -input 的输入主题和名为 streams-wordcount-output 的输出主题:
bin/kafka-topics.sh –create \
–zookeeper localhost:2181 \
–replication-factor 1 \
–partitions 1 \
–topic streams-plaintext-input
Created topic “streams-plaintext-input”
我们创建启用压缩的输出主题,因为输出流是一个变更日志流.
bin/kafka-topics.sh –create \
–zookeeper localhost:2181 \
–replication-factor 1 \
–partitions 1 \
–topic streams-wordcount-output \
–config cleanup.policy=compact
Created topic “streams-wordcount-output”
创建的主题也可以使用相同的 kafka 主题进行描述
bin/kafka-topics.sh –zookeeper localhost:2181 –describe
4. 启动 Wordcount 应用程序
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
a)演示应用程序将从输入主题流 (明文输入) 中读取,对每个读取的消息执行 WordCount 算法的计算,并不断将其当前结果写入输出主题流(WordCount -output)。因此,除了日志条目之外,不会有任何 STDOUT 输出,因为结果是用 Kafka 写回去的。
b)现在我们可以在一个单独的终端上启动控制台生成器,向这个主题写入一些输入数据和检查输出的 WordCount 演示应用程序从其输出主题与控制台消费者在一个单独的终端.
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 \
–topic streams-wordcount-output \
–from-beginning \
–formatter kafka.tools.DefaultMessageFormatter \
–property print.key=true \
–property print.value=true \
–property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
–property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
c)输入端:现在让我们使用控制台生成器将一些消息写入输入主题流——纯文本输入,方法是输入一行文本,然后单击。这将发送新消息输入主题, 消息键为空和消息值是刚才输入的字符串编码的文本行。
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic streams-plaintext-input
此时你可以在控制台输入如下字符:
all streams lead to kafka
d))输出端:此消息将由 Wordcount 应用程序处理,以下输出数据将写入 streams-wordcount-output 主题并由控制台使用者打印:
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 \
–topic streams-wordcount-output \
–from-beginning \
–formatter kafka.tools.DefaultMessageFormatter \
–property print.key=true \
–property print.value=true \
–property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
–property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
这个时候会接收到刚刚在控制台输入的单词统计结果:
all 1
streams 1
lead 1
to 1
kafka 1
如此类推:你可以在输入端输入单词,对应的在输出端就会有统计结果。
小结:可以看到,Wordcount 应用程序的输出实际上是连续的更新流,其中每个输出记录 (即上面原始输出中的每一行) 是单个单词的更新计数,也就是记录键,如“kafka”。对于具有相同键的多个记录,后面的每个记录都是前一个记录的更新。
下面的两个图说明了幕后的本质。第一列显示 KTable 的当前状态的演变,该状态为 count 计算单词出现的次数。第二列显示 KTable 的状态更新所产生的更改记录,这些记录被发送到输出 Kafka 主题流 -wordcount-output。

最后
本人水平有限,欢迎各位建议以及指正。顺便关注一下公众号呗,会经常更新文章的哦。

正文完
 0