目录
- 1、Kafka Stream背景
- 1.1 Kafka Stream是什么
- 1.2 什么是流式计算
- 1.3 为什么要有Kafka Stream
- 2、Kafka Stream如何解决流式零碎中关键问题
- 2.1 KTable和KSteam
- 2.2 工夫
- 2.3 窗口
- 3、Kafka Stream利用示例
- 3.1 案例一:将topicA的数据写入到topicB中(纯复制)
- 3.2 案例二:将TopicA中的数据实现wordcount写入到TopicB
- 3.3 示例三:在TopicA中每输出一个值求和并写入到TopicB
- 3.4 案例四:窗口
- 3.4.1 每隔2秒钟输入一次过来5秒内topicA里的wordcount,后果写入到TopicB
- 3.4.2 每隔5秒钟输入一次过来5秒内topicA里的wordcount,后果写入到TopicB
- 3.4.3 TopicA 15秒内的wordcount,后果写入TopicB
- 3.5 案例五:将TopicA的某一列扁平化解决写入TopicB
- 3.6 案例六:将TopicA的多列扁平化解决写入TopicB
学习Kafka Stream,咱们须要先理解什么是Kafka Stream ,为什么要应用Kafka Stream,以及咱们怎么应用Kafka Stream。
在此举荐Kafka Stream学习博客:https://www.cnblogs.com/warehouse/p/9521382.html
1、Kafka Stream背景
1.1 Kafka Stream是什么
Kafka Streams是一套客户端类库,它能够对存储在Kafka内的数据进行流式解决和剖析。
1.2 什么是流式计算
- 流式计算:输出是继续的,个别先定义指标计算,而后数据到来之后将计算逻辑利用于数据,往往用增量计算代替全量计算。
- 批量计算:个别先有全量数据集,而后定义计算逻辑,并将计算利用于全量数据。特点是全量计算,并且计算结果一次性全量输入。
1.3 为什么要有Kafka Stream
开源流式解决零碎有:Spark Streaming和Apache Storm,它们能与SQL解决集成等长处,功能强大,那为何还须要Kafka Stream呢?
1、使用方便。Spark和Storm都是流式解决框架,而Kafka Stream是基于Kafka的流式解决类库。开发者很难理解框架的具体运行形式,调试老本高,应用受限。而类库间接提供具体的类给开发者应用,整个利用的运行形式次要由开发者管制,方便使用和调试。
2、应用成本低。就流式解决零碎而言,根本都反对Kafka作为数据源。Kafka基本上是支流的流式解决零碎的规范数据源。大部分流式零碎中都部署了Kafka,包含Spark和Storm,此时应用Kafka Stream的老本非常低。
3、省资源。应用Storm或Spark Streaming时,须要为框架自身的过程预留资源,框架自身也占资源。
4、Kafka自身也有长处。因为Kafka Consumer Rebalance机制,Kafka Stream能够在线动静调整并发度。
2、Kafka Stream如何解决流式零碎中关键问题
2.1 KTable和KSteam
KTable和KSteam是Kafka中十分重要的概念,在此剖析一下二者区别。
- KStream是一个数据流,能够认为所有的记录都通过Insert only的形式插入进这个数据流中。
- KTable代表一个残缺的数据集,能够了解为数据库中的表。每条记录都是KV键值对,key能够了解为数据库中的主键,是惟一的,而value代表一条记录。咱们能够认为KTable中的数据时通过Update only的形式进入的。如果是雷同的key,会笼罩掉原来的那条记录。
- 综上来说,KStream是数据流,来多少数据就插入多少数据,是Insert only;KTable是数据集,雷同key只容许保留最新的记录,也就是Update only
2.2 工夫
Kafka反对三种工夫:
- 事件产生工夫:事件产生的工夫,蕴含在数据记录中。产生工夫由Producer在结构ProducerRecord时指定。并且须要Broker或者Topic将message.timestamp.type设置为CreateTime(默认值)能力失效。
- 音讯接管工夫:也即音讯存入Broker的工夫。当Broker或Topic将message.timestamp.type设置为LogAppendTime时失效。此时Broker会在接管到音讯后,存入磁盘前,将其timestamp属性值设置为以后机器工夫。个别音讯接管工夫比拟靠近于事件产生工夫,局部场景下可代替事件产生工夫。
- 音讯解决工夫。也即Kafka Stream解决音讯时的工夫。
2.3 窗口
流式数据在工夫上无界的,然而聚合操作只能作用在特定(有界)的数据集,咋整???这时候就有了窗口的概念,在工夫无界的数据流中定义一个边界来用于计算。Kafka反对的窗口如下:
- 1)Hopping Time Window:举一个典型的利用场景,每隔5秒钟输入一次过来1个小时内网站的PV或者UV。外面有两个工夫1小时和5秒钟,1小时指定了窗口的大小(Window size),5秒钟定义输入的工夫距离(Advance interval)。
- 2)Tumbling Time Window:能够认为是Hopping Time Window的一种特例,窗口大小=输入工夫距离,它的特点是各个Window之间齐全不相交。
- 3)Sliding Window该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假如该窗口的大小为5秒,则参加Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,能够进行Join计算。
- 4)Session Window该窗口用于对Key做Group后的聚合操作中。它须要对Key做分组,而后对组内的数据依据业务需要定义一个窗口的起始点和完结点。一个典型的案例是,心愿通过Session Window计算某个用户拜访网站的工夫。对于一个特定的用户(用Key示意)而言,当产生登录操作时,该用户(Key)的窗口即开始,当产生退出操作或者超时时,该用户(Key)的窗口即完结。窗口完结时,可计算该用户的拜访工夫或者点击次数等。
3、Kafka Stream利用示例
增加pom依赖:
`<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.0.0</version></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version></dependency>` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10
3.1 案例一:将topicA的数据写入到topicB中(纯复制)
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class MyStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //输出key的类型 prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //输出value的类型 //创立流结构器 StreamsBuilder builder = new StreamsBuilder(); //构建好builder,将myStreamIn topic中的数据写入到myStreamOut topic中 builder.stream("myStreamIn").to("myStreamOut"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43
在这里阐明一下prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
,咱们将TopicA的数据写入到TopicB中,就相当于这个流在生产TopicA的数据,咱们晓得一个Topic中,一个生产组里只能有一个消费者去生产它。假如咱们将TopicA的数据写入到TopicB的过程中,报错了(比方虚拟机内存满了),这时数据只写了一半,咱们清理完内存后,想持续写剩下的数据,再次运行咱们发现报错,写不了了。这时候咱们须要批改这个参数将mystream改成别的名字,因为同一个消费者组里只能有一个消费者去生产它。
开启zookeeper和Kafka
`# 开启zookeeperzkServer.sh start# 后盾启动Kafkakafka-server-start.sh -daemon /opt/kafka/config/server.properties` * 1* 2* 3* 4
创立topic myStreamIn
`kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamIn --partitions 1 --replication-factor 1` * 1
创立topic myStreamOut
`kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamOut --partitions 1 --replication-factor 1` * 1
生产音讯写入到myStreamIn
`kafka-console-producer.sh --topic myStreamIn --broker-list 192.168.136.20:9092` * 1
生产myStreamOut里的数据
`kafka-console-consumer.sh --topic myStreamOut --bootstrap-server 192.168.136.20:9092 --from-beginning` * 1
运行示例代码并在生产者端输出数据,能在生产端看到数据,表明Kafka Stream写入胜利。
3.2 案例二:将TopicA中的数据实现wordcount写入到TopicB
工作中不可能像案例一一样将一个Topic的数据一成不变存入另一个Topic,个别是要通过解决,这就须要在流中加上逻辑。
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.KTable;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class WordCountStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcountstream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); //提交工夫设置为2秒 //prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,""earliest ); //earliest latest none 默认latest //prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(主动提交) false(手动提交) prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //创立流结构器 //hello world //hello java StreamsBuilder builder = new StreamsBuilder(); KTable<String, Long> count = builder.stream("wordcount-input") //从kafka中一条一条取数据 .flatMapValues( //返回压扁后的数据 (value) -> { //对数据按空格进行切割,返回List汇合 String[] split = value.toString().split(" "); List<String> strings = Arrays.asList(split); return strings; }) //null hello,null world,null hello,null java .map((k, v) -> { return new KeyValue<String, String>(v,"1"); }).groupByKey().count(); count.toStream().foreach((k,v)->{ //为了测试不便,咱们将kv输入到控制台 System.out.println("key:"+k+" "+"value:"+v); }); count.toStream().map((x,y)->{ return new KeyValue<String,String>(x,y.toString()); //留神转成toString类型,咱们后面设置的kv的类型都是string类型 }).to("wordcount-output"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51* 52* 53* 54* 55* 56* 57* 58* 59* 60* 61* 62* 63
`# 创立TopicA(wordcount-input)kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic wordcount-input --partitions 1 --replication-factor 1# 创立TopicB(wordcount-output)kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic wordcount-output --partitions 1 --replication-factor 1# 创立生产者kafka-console-producer.sh --topic wordcount-input --broker-list 192.168.136.20:9092# 创立消费者,须要打印出key(--property print.key=true)kafka-console-consumer.sh --topic wordcount-output --bootstrap-server 192.168.136.20:9092 --from-beginning --property print.key=true` * 1* 2* 3* 4* 5* 6* 7* 8
运行示例代码并在生产者端输出数据,能在生产端看到数据,表明Kafka Stream写入胜利。
3.3 示例三:在TopicA中每输出一个值求和并写入到TopicB
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KTable;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class SumStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sumstream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); //提交工夫设置为2秒 //prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //earliest latest none 默认latest //prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(主动提交) false(手动提交) prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //创立流结构器 StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> source = builder.stream("suminput"); KTable<String, String> sum1 = source.map((key, value) -> new KeyValue<String, String>("sum", value.toString()) ).groupByKey().reduce((x, y) -> { System.out.println("x: " + x + " " + "y: "+y); Integer sum = Integer.valueOf(x) + Integer.valueOf(y); System.out.println("sum: "+sum); return sum.toString(); }); sum1.toStream().to("sumout"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51* 52
`# 创立topicA(suminput)kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic suminput --partitions 1 --replication-factor 1# 创立生产者kafka-console-producer.sh --topic suminput --broker-list 192.168.136.20:9092# 创立topicB(sumout)kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic sumout --partitions 1 --replication-factor 1# 创立消费者kafka-console-consumer.sh --topic sumout --bootstrap-server 192.168.136.20:9092 --from-beginning` * 1* 2* 3* 4* 5* 6* 7* 8
运行示例代码并在生产者端输出数据,能在生产端看到数据,表明Kafka Stream写入胜利。
3.4 案例四:窗口
3.4.1 每隔2秒钟输入一次过来5秒内topicA里的wordcount,后果写入到TopicB
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.*;import java.time.Duration;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class WindowStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交工夫设置为3秒 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> source = builder.stream("topicA"); KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+"))) .map((x, y) -> { return new KeyValue<String, String>(y, "1"); }).groupByKey() //加5秒窗口,按步长2秒滑动 Hopping Time Window .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()).advanceBy(Duration.ofSeconds(2).toMillis())) //.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis())) .count(); //为了不便查看,输入到控制台 countKtable.toStream().foreach((x,y)->{ System.out.println("x: "+x+" y: "+y); }); countKtable.toStream().map((x,y)-> { return new KeyValue<String, String>(x.toString(), y.toString()); }).to("topicB"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51* 52* 53* 54* 55* 56* 57* 58
3.4.2 每隔5秒钟输入一次过来5秒内topicA里的wordcount,后果写入到TopicB
加5秒窗口,与3.4.1不同的是,前一个5秒与下一个5秒没有任何穿插。
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.*;import java.time.Duration;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class WindowStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交工夫设置为3秒 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> source = builder.stream("topicA"); KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+"))) .map((x, y) -> { return new KeyValue<String, String>(y, "1"); }).groupByKey() //加五秒的窗口(前一个5秒和下一个5秒没有任何穿插) Tumbling Time Window .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis())) .count(); countKtable.toStream().foreach((x,y)->{ System.out.println("x: "+x+" y: "+y); }); countKtable.toStream().map((x,y)-> { return new KeyValue<String, String>(x.toString(), y.toString()); }).to("topicB"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51* 52* 53* 54* 55
3.4.3 TopicA 15秒内的wordcount,后果写入TopicB
比方登录某app,20分钟内不操作,会主动退出。
一个典型的案例是,心愿通过Session Window计算某个用户拜访网站的工夫。对于一个特定的用户(用Key示意)而言,当产生登录操作时,该用户(Key)的窗口即开始,当产生退出操作或者超时时,该用户(Key)的窗口即完结。窗口完结时,可计算该用户的拜访工夫或者点击次数等。
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.*;import java.time.Duration;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class WindowStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交工夫设置为3秒 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> source = builder.stream("windowdemo1"); KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+"))) .map((x, y) -> { return new KeyValue<String, String>(y, "1"); }).groupByKey() .windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis())) .count(); countKtable.toStream().foreach((x,y)->{ System.out.println("x: "+x+" y: "+y); }); countKtable.toStream().map((x,y)-> { return new KeyValue<String, String>(x.toString(), y.toString()); }).to("windowDemoOut"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51* 52* 53* 54* 55
3.5 案例五:将TopicA的某一列扁平化解决写入TopicB
现有一张表user_friends,表构造如下,去掉表头,应用flume将内容写进Kafka的TopicA,将第二列扁平化解决写入TopicB。Flume–>Kafka的TopicA见另一篇博客Flume整合Kafka,本文演示从TopicA–>TopicB。
解决后的数据要求如下,就是将第二列开展。
`import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class UserFriendStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"UserFriendStream1"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); builder.stream("user_friends_raw").flatMap((k,v)->{ List<KeyValue<String,String>> list=new ArrayList<>(); String[] info = v.toString().split(","); if(info.length==2){ String[] friends = info[1].split("s+"); if (info[0].trim().length()>0){ for (String friend : friends) { //为了不便测试打印进去 System.out.println(info[0]+" "+friend); list.add(new KeyValue<String,String>(null,info[0]+","+friend)); }}} return list; }).to("user_friends"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51
3.6 案例六:将TopicA的多列扁平化解决写入TopicB
现有一张表event_attendees.csv,内容如下,去掉表头,应用Flume将内容写入Kafka的TopicA,将二三四五列扁平化解决写入TopicB。本案例仅演示TopicA–>TopicB,不演示Flume–>Kafka。
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.KStream;import java.util.ArrayList;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class EventAttendStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"UserFriendStream1"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Object, Object> ear = builder.stream("event_attendees_raw"); KStream<String, String> eventStream = ear.flatMap((k, v) -> { //event,yes,maybe,invited,no System.out.println(k + " " + v); String[] split = v.toString().split(","); ArrayList<KeyValue<String, String>> list = new ArrayList<>(); if (split.length >= 2 && split[1].trim().length() > 0) { String[] yes = split[1].split("s+"); for (String y : yes) { list.add(new KeyValue<String, String>(null, split[0] + "," + y + ",yes")); } } if (split.length >= 3 && split[2].trim().length() > 0) { String[] maybe = split[2].split("s+"); for (String mb : maybe) { list.add(new KeyValue<String, String>(null, split[0] + "," + mb + ",maybe")); } } if (split.length >= 4 && split[3].trim().length() > 0) { String[] invited = split[3].split("s+"); for (String inv : invited) { list.add(new KeyValue<String, String>(null, split[0] + "," + inv + ",invited")); } } if (split.length >= 5 && split[4].trim().length() > 0) { String[] no = split[4].split("s+"); for (String n : no) { list.add(new KeyValue<String, String>(null, split[0] + "," + no + ",no")); } } return list; }); eventStream.to("events_attend"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }}