关于java:Kafka-实战五Kafka-Stream-API-实现

37次阅读

共计 8125 个字符,预计需要花费 21 分钟才能阅读完成。

案例一:实现 topic 之间的流传输

一、Kafka Java 代码

创立 maven 过程,导入以下依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>2.0.0</version>
</dependency>

代码局部

public class MyStream {public static void main(String[] args) {Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        // 创立流结构器
        StreamsBuilder builder = new StreamsBuilder();

        // 构建好 builder 将 mystreamin topic 中的数据写入到 mystreamout topic 中
        builder.stream("mystreamin").to("mystreamout");

        final Topology topo = builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){

            @Override
            public void run() {streams.close();
                latch.countDown();}
        });
        try {streams.start();
            latch.await();} catch (InterruptedException e) {e.printStackTrace();
        }
        System.exit(0);
    }
}

二、Kafka Shell 命令

1、创立 Topic

`kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic mystreamin --partitions 1 --replication-factor 1
kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic mystreamout --partitions 1 --replication-factor 1` 

*   1
*   2


查看 Topic

kafka-topics.sh --zookeeper 192.168.247.201:2181 --list

2、运行 Java 代码,执行以下步骤:
生产音讯

kafka-console-producer.sh --topic mystreamin --broker-list 127.0.0.1:9092

生产音讯

kafka-console-consumer.sh --topic mystreamout --bootstrap-server 127.0.0.1:9092 --from-beginning

案例二:WordCount Stream API

一、Kafka Java 代码

代码局部

public class WordCountStream {public static void main(String[] args) {Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest");  // earliest  latest
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        // 创立流结构器
        // wordcount-input
        // hello world
        // hello java
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> count = builder.stream("wordcount-input")  // 从 kafka 中一条一条的取数据
                .flatMapValues(           // 返回压扁后的数据
                        (value) -> {       // 对数据进行按空格切割,返回 List 汇合
                            String[] split = value.toString().split(" ");
                            List<String> strings = Arrays.asList(split);
                            return strings;
                        })  // key:null value:hello ,key:null value:world ,key:null value:hello ,key:null value:java
                .map((k, v) -> {return new KeyValue<String, String>(v,"1");
                }).groupByKey().count();

        count.toStream().foreach((k,v) -> {System.out.println("key:"+k+"value:"+v);
        });

        count.toStream().map((x,y) -> {return new KeyValue<String,String>(x,y.toString());
        }).to("wordcount-out");

        final Topology topo = builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){
            @Override
            public void run() {streams.close();
                latch.countDown();}
        });
        try {streams.start();
            latch.await();} catch (InterruptedException e) {e.printStackTrace();
        }
        System.exit(0);
    }
}

二、Kafka Shell 命令

1、创立 Topic

kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic wordcount-input --partitions 1 --replication-factor 1
kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic wordcount-out --partitions 1 --replication-factor 1


**2、运行 Java 代码,执行以下步骤:
生产音讯 **

kafka-console-producer.sh --topic wordcount-input --broker-list 127.0.0.1:9092

生产音讯

kafka-console-consumer.sh --topic wordcount-out --bootstrap-server 127.0.0.1:9092 --from-beginning

显示 key 生产音讯

kafka-console-consumer.sh --topic wordcount-out --bootstrap-server 127.0.0.1:9092 --property print.key=true --from-beginning

案例三:利用 Kafka 流实现对输出数字的求和

一、Kafka Java 代码

public class SumStream {public static void main(String[] args) {Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sumstream");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest");  // earliest  latest
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Object, Object> source = builder.stream("suminput");
        source.map((key,value) ->
                new KeyValue<String,String>("sum:",value.toString())
        ).groupByKey().reduce((x,y) ->{System.out.println("x:"+x+"y:"+y);
            Integer sum = Integer.valueOf(x)+Integer.valueOf(y);
            System.out.println("sum:"+sum);
            return sum.toString();});

        final Topology topo = builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){
            @Override
            public void run() {streams.close();
                latch.countDown();}
        });
        try {streams.start();
            latch.await();} catch (InterruptedException e) {e.printStackTrace();
        }
        System.exit(0);
    }
}

二、Kafka Shell 命令

1、创立 Topic

kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic suminput --partitions 1 --replication-factor 1

**2、运行 Java 代码,执行以下步骤:
生产音讯 **

kafka-console-producer.sh --topic suminput --broker-list 127.0.0.1:9092

案例四:Kafka Stream 实现不同窗口的流解决

一、Kafka Java 代码

package cn.kgc.kb09;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Qianchun
 * @Date 2020/12/16
 * @Description
 */
public class WindowStream {public static void main(String[] args) {Properties prop = new Properties();
        // 不同的窗口流不能应用雷同的利用 ID
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"SessionWindow");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest");  // earliest  latest
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Object, Object> source = builder.stream("windowdemo");
        source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))
                .map((x,y) -> {return new KeyValue<String, String>(y,"1");
                }).groupByKey()

                // 以下所有窗口的工夫均可通过下方参数调设

                // Tumbling Time Window(窗口为 5 秒,5 秒内无效)
//                .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()))

                // Hopping Time Window(窗口为 5 秒, 每次挪动 2 秒, 所以若 5 秒内只输出一次会呈现 5 /2+1= 3 次)
//                .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis())
//                        .advanceBy(Duration.ofSeconds(2).toMillis()))

                // Session Time Window(20 秒内只有输出 Session 就无效, 间隔下一次输出超过 20 秒 Session 生效, 所有从从新从 0 开始)
//                .windowedBy(SessionWindows.with(Duration.ofSeconds(20).toMillis()))

                .count().toStream().foreach((x,y) -> {System.out.println("x:"+x+"y:"+y);
        });

        final Topology topo = builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){
            @Override
            public void run() {streams.close();
                latch.countDown();}
        });
        try {streams.start();
            latch.await();} catch (InterruptedException e) {e.printStackTrace();
        }
        System.exit(0);
    }
}

二、Kafka Shell 命令

1、创立 Topic

kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic windowdemo --partitions 1 --replication-factor 1

**2、运行 Java 代码,执行以下步骤:
生产音讯 **

kafka-console-producer.sh --topic windowdemo --broker-list 127.0.0.1:9092

留神:

  • ERROR:

    • Exception in thread“sum-a3bbe4d0-4cc9-4812-a7a0-e650a8a60c9f-StreamThread-1”java.lang.IllegalArgumentException: Window endMs time cannot be smaller than window startMs time.
    • 数组越界
  • 解决方案:

    • 大概率是窗口 ID 统一,请批改 prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "sessionwindow"); 的参数。

正文完
 0