案例一:实现topic之间的流传输
一、Kafka Java代码
创立maven过程,导入以下依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.0.0</version></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version></dependency>
代码局部
public class MyStream { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); // 创立流结构器 StreamsBuilder builder = new StreamsBuilder(); // 构建好builder 将mystreamin topic中的数据写入到 mystreamout topic中 builder.stream("mystreamin").to("mystreamout"); final Topology topo = builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}
二、Kafka Shell 命令
1、创立Topic
`kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic mystreamin --partitions 1 --replication-factor 1kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic mystreamout --partitions 1 --replication-factor 1` * 1* 2
查看Topic
kafka-topics.sh --zookeeper 192.168.247.201:2181 --list
2、运行Java代码,执行以下步骤:
生产音讯
kafka-console-producer.sh --topic mystreamin --broker-list 127.0.0.1:9092
生产音讯
kafka-console-consumer.sh --topic mystreamout --bootstrap-server 127.0.0.1:9092 --from-beginning
案例二:WordCount Stream API
一、Kafka Java代码
代码局部
public class WordCountStream { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092"); prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest"); // earliest latest prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); // 创立流结构器 // wordcount-input // hello world // hello java StreamsBuilder builder = new StreamsBuilder(); KTable<String, Long> count = builder.stream("wordcount-input") // 从kafka中一条一条的取数据 .flatMapValues( // 返回压扁后的数据 (value) -> { // 对数据进行按空格切割,返回List汇合 String[] split = value.toString().split(" "); List<String> strings = Arrays.asList(split); return strings; }) // key:null value:hello ,key:null value:world ,key:null value:hello ,key:null value:java .map((k, v) -> { return new KeyValue<String, String>(v,"1"); }).groupByKey().count(); count.toStream().foreach((k,v) -> { System.out.println("key:"+k+" value:"+v); }); count.toStream().map((x,y) -> { return new KeyValue<String,String>(x,y.toString()); }).to("wordcount-out"); final Topology topo = builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}
二、Kafka Shell 命令
1、创立Topic
kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic wordcount-input --partitions 1 --replication-factor 1kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic wordcount-out --partitions 1 --replication-factor 1
**2、运行Java代码,执行以下步骤:
生产音讯**
kafka-console-producer.sh --topic wordcount-input --broker-list 127.0.0.1:9092
生产音讯
kafka-console-consumer.sh --topic wordcount-out --bootstrap-server 127.0.0.1:9092 --from-beginning
显示key生产音讯
kafka-console-consumer.sh --topic wordcount-out --bootstrap-server 127.0.0.1:9092 --property print.key=true --from-beginning
案例三:利用Kafka流实现对输出数字的求和
一、Kafka Java代码
public class SumStream { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sumstream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092"); prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest"); // earliest latest prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> source = builder.stream("suminput"); source.map((key,value) -> new KeyValue<String,String>("sum: ",value.toString()) ).groupByKey().reduce((x,y) ->{ System.out.println("x: "+x+" y: "+y); Integer sum = Integer.valueOf(x)+Integer.valueOf(y); System.out.println("sum: "+sum); return sum.toString(); }); final Topology topo = builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}
二、Kafka Shell 命令
1、创立Topic
kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic suminput --partitions 1 --replication-factor 1
**2、运行Java代码,执行以下步骤:
生产音讯**
kafka-console-producer.sh --topic suminput --broker-list 127.0.0.1:9092
案例四:Kafka Stream实现不同窗口的流解决
一、Kafka Java代码
package cn.kgc.kb09;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.protocol.types.Field;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.SessionWindows;import org.apache.kafka.streams.kstream.TimeWindows;import java.time.Duration;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;/** * @Qianchun * @Date 2020/12/16 * @Description */public class WindowStream { public static void main(String[] args) { Properties prop = new Properties(); // 不同的窗口流不能应用雷同的利用ID prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"SessionWindow"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092"); prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest"); // earliest latest prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> source = builder.stream("windowdemo"); source.flatMapValues(value -> Arrays.asList(value.toString().split("s+"))) .map((x,y) -> { return new KeyValue<String, String>(y,"1"); }).groupByKey() //以下所有窗口的工夫均可通过下方参数调设 // Tumbling Time Window(窗口为5秒,5秒内无效)// .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis())) // Hopping Time Window(窗口为5秒,每次挪动2秒,所以若5秒内只输出一次会呈现5/2+1=3次)// .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis())// .advanceBy(Duration.ofSeconds(2).toMillis())) // Session Time Window(20秒内只有输出Session就无效,间隔下一次输出超过20秒Session生效,所有从从新从0开始)// .windowedBy(SessionWindows.with(Duration.ofSeconds(20).toMillis())) .count().toStream().foreach((x,y) -> { System.out.println("x: "+x+" y:"+y); }); final Topology topo = builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}
二、Kafka Shell 命令
1、创立Topic
kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic windowdemo --partitions 1 --replication-factor 1
**2、运行Java代码,执行以下步骤:
生产音讯**
kafka-console-producer.sh --topic windowdemo --broker-list 127.0.0.1:9092
留神:
ERROR:
- Exception in thread “sum-a3bbe4d0-4cc9-4812-a7a0-e650a8a60c9f-StreamThread-1” java.lang.IllegalArgumentException: Window endMs time cannot be smaller than window startMs time.
- 数组越界
解决方案:
- 大概率是窗口ID统一,请批改
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "sessionwindow");
的参数。
- 大概率是窗口ID统一,请批改