Kafka-Streams未来可期

40次阅读

共计 7885 个字符,预计需要花费 20 分钟才能阅读完成。

核心知识预热

TIPS
1. 资料来源说明书以及内部构造
2. 学习技术就是不断解惑的过程,就 kafka stream 自问:是个什么技术,能干什么,怎么使用..

  1. Kafka Streams 是一个 数据输入和数据输出都保存在 kafka 集群 程序和微服务 构建的客户端类库,那么就不需要专门去搭建计算集群,方便快捷;
  2. Kafka Streams 提供两种方法来定义流处理拓扑。Kafka Streams DSL 提供了最通用的可直接使用的数据转换操作(比如 map);低阶的处理器 API 则允许开发者定义和连接到自定义的处理器或者和 state store 进行交互。也就是说前者是高阶 API,封装好了的,通用场景使用且能快速开发;后者是低阶 API,更接近底层,开发难度大但是能更好地适配程序和业务。
  3. Kafka Streams 同样支持状态统计、窗口函数、eventTime 和 exactly-once 语义等实时场景;

前置概念

concept desc
stream processing application 多个处理器形成的拓扑结构,包含有一定处理逻辑的应用程序
processor topology 流处理器拓扑,是 processor+...+processor 的形式,source 和 sink 是特殊的 processor
Source Processor 源头处理器,即上游没有其他的流处理器,从 kafka 的 topic 中消费数据产生数据流输送到下游
Sink Processor 结果处理器,即下游没有其他的流处理器,将上游的数据输送到指定的 kafka topic
Time 联想 flink 的时间语义,例如某某 time1 手机端购买某商品,产生了日志数据,然后 time2 这个日志数据被实时采集到 Kafka 持久化到 topic,然后进入流式处理框架,在 time3 正式被计算,那么 time123 分别称为:event time,ingestion time,processing time
states 保存和查询数据状态的功能,可以定义流处理应用外的程序进行只读访问
processing guarantees 消费是否丢失和是否重复的级别,比如 exactly-once,at-least-once,at-most-once

拓扑

kafka stream 的拓扑其实就是一个个 processor 连接起来的流程图,其中 source 和 sink 是比较特殊的 processor,分别没有上游和下游处理器。拓扑创建方式是在创建下游 processor 的时候指定上游的 processor 名称进行连接

// DSL 转换算子生成新 KStream 是调用
void addGraphNode(final StreamsGraphNode parent,final StreamsGraphNode child) {}
// 直接通过 builder 添加 processor
public synchronized Topology addProcessor(final String name,final ProcessorSupplier supplier,final String... parentNames) {}

使用

使用上核心都是四个步骤:

  1. 创建流处理应用配置参数;
  2. 构造流处理拓扑结构;
  3. 创建流处理客户端实例;
  4. 开始执行流处理程序;

使用 DSL 编写单词统计

测试代码

    /* 1.props */
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");// 可作为 consumer 的 group id
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//kafka 的地址,多个逗号分隔
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());// 序列化和反序列化,在读取和写出流的时候、在读取和写出 state 的时候都会用到
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    /* 2.topology */
    final StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> source = builder.stream("streams-plaintext-input");//source processor, 传入参数可定义 key,value 的序列化方式,以及时间提取器等

    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))//KString<String,String>
            .groupBy((key, value) -> value)// KGroupedStream<String,String>
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//KTable<String,String>
            .toStream()//KStream<String,Long>
            .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//sink processor, 指定输出 key,value 的数据类型

    final Topology topology = builder.build();

    /* 3.KafkaStreams 实例 */
    final KafkaStreams streams = new KafkaStreams(topology, props);
    // CountDownLatch 用 await()阻塞当前线程,countDown()记录完成线程的数量
    // 当 getCount()= 0 的时候继续执行 await 后续的代码
    final CountDownLatch latch = new CountDownLatch(1);

    System.out.println(topology.describe());// 打印流处理拓扑

    // 钩子函数
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {streams.close();
            latch.countDown();}
    });

    try {
        // 4. 执行
        streams.start();
        latch.await();} catch (Throwable e) {System.exit(1);
    }
    System.exit(0);

测试数据

# 生产者打印生产数据
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello hello hello hello
>kafka kafka kafka kafka
# 消费者打印消费数据
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

hello    4
kafka    4

打印拓扑

这里可以看到有点类似于宽依赖的时候,拓扑会划分,中间会生成 streams-wordcount-counts-store-repartition 主题保存中间结果。

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> counts-store-repartition-filter
      <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: counts-store-repartition-filter (stores: [])
      --> counts-store-repartition-sink
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: counts-store-repartition-sink (topic: counts-store-repartition)
      <-- counts-store-repartition-filter

  Sub-topology: 1
    Source: counts-store-repartition-source (topics: [counts-store-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
      --> KTABLE-TOSTREAM-0000000007
      <-- counts-store-repartition-source
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008 (topic: streams-wordcount-output)
      <-- KTABLE-TOSTREAM-0000000007

主题创建

DSL 程序工作后会生成 streams-wordcount-counts-store-changelog 的主题,名称规则是:application_id+store_name+changelog,是因为每次更新 KTable,都会发送最新的键值记录到流处理内部的变更日志主题

langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-topics.sh --zookeeper localhost:2181 --list
streams-plaintext-input
streams-wordcount-counts-store-changelog
streams-wordcount-counts-store-repartition
streams-wordcount-output

使用 Processor 编写单词统计

测试代码

    /* 1. 应用配置参数 */
    Properties props = new Properties();
    // 可作为 consumer 的 group id
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount1");
    // kafka 的地址,多个逗号分隔,目前只支持单集群
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    // 序列化和反序列化,在读取和写出流的时候、在读取和写出 state 的时候都会用到
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    /* 2. 拓扑 */
    StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("Counts"),
            Serdes.String(),
            Serdes.Long())
            .withLoggingDisabled(); // disable backing up the store to a changelog topic

    Topology builder = new Topology();

    // add the source processor node that takes Kafka topic "source-topic" as input
    builder.addSource("Source", "source-topic")
            // add the WordCountProcessor node which takes the source processor as its upstream processor
            .addProcessor("Process", WordCountProcessor::new, "Source")
            // add the count store associated with the WordCountProcessor processor
            .addStateStore(countStoreSupplier, "Process")
            // add the sink processor node that takes Kafka topic "sink-topic" as output
            // and the WordCountProcessor node as its upstream processor
            .addSink("Sink", "sink-topic", "Process");

    /* 3. 流处理客户端实例 */
    KafkaStreams streams = new KafkaStreams(builder, props);

    /* 4. 启动 */
    streams.start();

这里自定义了单词统计的 Processor

  1. void init(): 初始化 Processor
  2. void process(): 处理 key-value 的消息
  3. void close(): 清理资源
/**
 * @Description 通过实现 Processor 重写 process 方法自定义 Processor
 * 自定义的时候,先看上层接口,然后找一个内置的实现类参考比如 KStreamAggregateProcessor
 */
public static class WordCountProcessor implements Processor<String, String> {
    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;

    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {// keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;
        // 获取名为 Counts 的状态
        kvStore = (KeyValueStore) context.getStateStore("Counts");
    }

    // 每接收到一条消息就会执行一次 process, 这里是将结果放回缓存中
    public void process(String key, String value) {String[] words = value.toLowerCase().split(" ");
        for (String word : words) {
            // 获取之前这个单词统计的数量,之前没有统计就设置为 1
            Long preCount = this.kvStore.get(word);
            Long result = preCount == null ? 1 : preCount + 1;
            // 将结果写到缓存中
            this.kvStore.put(word, result);
            // 将结果写到下游 topic
            this.context.forward(word,result.toString());
            System.out.println("process , key =" + word + ",and value =" + result);
        }
    }

    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }
}

测试数据

# 生产者生产数据
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source-topic
>test test test
# 消费者消费数据
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic sink-topic
1
2
3
# 注意到 forward 写出的 key,value,实际仅打印保存了统计数据,即 value

打印拓扑

Topologies:
   Sub-topology: 0
    Source: Source (topics: [source-topic])
      --> Process
    Processor: Process (stores: [Counts])
      --> Sink
      <-- Source
    Sink: Sink (topic: sink-topic)
      <-- Process

正文完
 0