学习kafka教程(三)

39次阅读

共计 6575 个字符,预计需要花费 17 分钟才能阅读完成。

欢迎关注公众号:n 平方如有问题或建议,请后台留言,我会尽力解决你的问题。
本文主要介绍【Kafka Streams 的架构和使用】
目标

了解 kafka streams 的架构。
掌握 kafka streams 编程。

架构分析
总体
Kafka 流通过构建 Kafka 生产者和消费者库,并利用 Kafka 的本地功能来提供数据并行性、分布式协调、容错和操作简单性,从而简化了应用程序开发。下图展示了一个使用 Kafka Streams 库的应用程序的结构。

流分区和任务
Kafka 的消息传递层对数据进行分区,以存储和传输数据。Kafka 流划分数据进行处理。在这两种情况下,这种分区都支持数据局部性、灵活性、可伸缩性、高性能和容错性。Kafka 流使用分区和任务的概念作为基于 Kafka 主题分区的并行模型的逻辑单元。Kafka 流与 Kafka 在并行性上下文中有着紧密的联系:

每个流分区都是一个完全有序的数据记录序列,并映射到 Kafka 主题分区。
流中的数据记录映射到来自该主题的 Kafka 消息。
数据记录的键值决定了 Kafka 流和 Kafka 流中数据的分区,即,如何将数据路由到主题中的特定分区。

应用程序的处理器拓扑通过将其分解为多个任务进行扩展。更具体地说,Kafka 流基于应用程序的输入流分区创建固定数量的任务,每个任务分配一个来自输入流的分区列表(例如,kafka 的 topic)。分配给任务的分区永远不会改变,因此每个任务都是应用程序并行性的固定单元。然后,任务可以基于分配的分区实例化自己的处理器拓扑; 它们还为每个分配的分区维护一个缓冲区,并从这些记录缓冲区一次处理一条消息。因此,流任务可以独立并行地处理,而无需人工干预。
理解 Kafka 流不是一个资源管理器,而是一个“运行”其流处理应用程序运行的任何地方的库。应用程序的多个实例要么在同一台机器上执行,要么分布在多台机器上,库可以自动将任务分配给运行应用程序实例的那些实例。分配给任务的分区从未改变; 如果应用程序实例失败,它分配的所有任务将在其他实例上自动重新启动,并继续从相同的流分区使用。
下图显示了两个任务,每个任务分配一个输入流分区。
线程模型
Kafka 流允许用户配置库用于在应用程序实例中并行处理的线程数。每个线程可以独立地使用其处理器拓扑执行一个或多个任务。例如,下图显示了一个流线程运行两个流任务。

启动更多的流线程或应用程序实例仅仅相当于复制拓扑并让它处理 Kafka 分区的不同子集,从而有效地并行处理。值得注意的是,线程之间不存在共享状态,因此不需要线程间的协调。这使得跨应用程序实例和线程并行运行拓扑变得非常简单。Kafka 主题分区在各种流线程之间的分配是由 Kafka 流利用 Kafka 的协调功能透明地处理的。
如上所述,使用 Kafka 流扩展您的流处理应用程序很容易: 您只需要启动应用程序的其他实例,Kafka 流负责在应用程序实例中运行的任务之间分配分区。您可以启动与输入 Kafka 主题分区一样多的应用程序线程,以便在应用程序的所有运行实例中,每个线程 (或者更确切地说,它运行的任务) 至少有一个输入分区要处理。
本地状态存储
Kafka 流提供了所谓的状态存储,流处理应用程序可以使用它来存储和查询数据,这是实现有状态操作时的一项重要功能。例如,Kafka Streams DSL 在调用有状态操作符 (如 join() 或 aggregate())或打开流窗口时自动创建和管理这样的状态存储。
Kafka Streams 应用程序中的每个流任务都可以嵌入一个或多个本地状态存储,这些存储可以通过 api 访问,以存储和查询处理所需的数据。Kafka 流为这种本地状态存储提供容错和自动恢复功能。
下图显示了两个流任务及其专用的本地状态存储。
容错
Kafka 流构建于 Kafka 中本地集成的容错功能之上。Kafka 分区是高度可用和复制的; 因此,当流数据持久化到 Kafka 时,即使应用程序失败并需要重新处理它,流数据也是可用的。Kafka 流中的任务利用 Kafka 消费者客户端提供的容错功能来处理失败。如果任务在失败的机器上运行,Kafka 流将自动在应用程序的一个剩余运行实例中重新启动该任务。
此外,Kafka 流还确保本地状态存储对于故障也是健壮的。对于每个状态存储,它维护一个复制的 changelog Kafka 主题,其中跟踪任何状态更新。这些变更日志主题也被分区,这样每个本地状态存储实例,以及访问该存储的任务,都有自己专用的变更日志主题分区。在 changelog 主题上启用了日志压缩,这样可以安全地清除旧数据,防止主题无限增长。如果任务在一台失败的机器上运行,并在另一台机器上重新启动,Kafka 流通过在恢复对新启动的任务的处理之前重播相应的更改日志主题,确保在失败之前将其关联的状态存储恢复到内容。因此,故障处理对最终用户是完全透明的。
编程实例
管道(输入输出)实例
就是控制台输入到 kafka 中,经过处理输出。
package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class PipeDemo {

public static void main(String[] args) {

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, “streams-pipe”);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

builder.stream(“streams-plaintext-input”).to(“streams-pipe-output”);

final Topology topology = builder.build();

final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(“streams-shutdown-hook”) {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}

}
分词实例
就是将你输入的字符串进行分词输出。
package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplitDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, “streams-linesplit”);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream(“streams-plaintext-input”);
source.flatMapValues(value -> Arrays.asList(value.split(“\\W+”)))
.to(“streams-linesplit-output”);

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(“streams-shutdown-hook”) {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);

}
}

词汇统计实例
将你输入的字符串进行按单词统计输出。
package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCountDemo {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, “streams-wordcount”);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream(“streams-plaintext-input”);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(“\\W+”)))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(“counts-store”))
.toStream()
.to(“streams-wordcount-output”, Produced.with(Serdes.String(), Serdes.Long()));

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(“streams-shutdown-hook”) {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

最后
本人水平有限,欢迎各位建议以及指正。顺便关注一下公众号呗,会经常更新文章的哦。

正文完
 0