目录
- 1、Kafka Stream 背景
-
- 1.1 Kafka Stream 是什么
- 1.2 什么是流式计算
- 1.3 为什么要有 Kafka Stream
- 2、Kafka Stream 如何解决流式零碎中关键问题
-
- 2.1 KTable 和 KSteam
- 2.2 工夫
- 2.3 窗口
- 3、Kafka Stream 利用示例
-
- 3.1 案例一:将 topicA 的数据写入到 topicB 中(纯复制)
- 3.2 案例二:将 TopicA 中的数据实现 wordcount 写入到 TopicB
- 3.3 示例三:在 TopicA 中每输出一个值求和并写入到 TopicB
- 3.4 案例四:窗口
-
- 3.4.1 每隔 2 秒钟输入一次过来 5 秒内 topicA 里的 wordcount,后果写入到 TopicB
- 3.4.2 每隔 5 秒钟输入一次过来 5 秒内 topicA 里的 wordcount,后果写入到 TopicB
- 3.4.3 TopicA 15 秒内的 wordcount,后果写入 TopicB
- 3.5 案例五:将 TopicA 的某一列扁平化解决写入 TopicB
- 3.6 案例六:将 TopicA 的多列扁平化解决写入 TopicB
学习 Kafka Stream,咱们须要先理解什么是 Kafka Stream,为什么要应用 Kafka Stream,以及咱们怎么应用 Kafka Stream。
在此举荐 Kafka Stream 学习博客:https://www.cnblogs.com/warehouse/p/9521382.html
1、Kafka Stream 背景
1.1 Kafka Stream 是什么
Kafka Streams 是一套客户端类库,它能够对存储在Kafka 内的数据进行流式解决和剖析。
1.2 什么是流式计算
- 流式计算:输出是继续的,个别先定义指标计算,而后数据到来之后将计算逻辑利用于数据,往往用增量计算代替全量计算。
- 批量计算:个别先有全量数据集,而后定义计算逻辑,并将计算利用于全量数据。特点是全量计算,并且计算结果一次性全量输入。
1.3 为什么要有 Kafka Stream
开源流式解决零碎有:Spark Streaming 和 Apache Storm,它们能与 SQL 解决集成等长处,功能强大,那为何还须要 Kafka Stream 呢?
1、使用方便。Spark 和 Storm 都是流式解决 框架 ,而 Kafka Stream 是基于 Kafka 的流式解决 类库。开发者很难理解框架的具体运行形式,调试老本高,应用受限。而类库间接提供具体的类给开发者应用,整个利用的运行形式次要由开发者管制,方便使用和调试。
2、应用成本低。就流式解决零碎而言,根本都反对 Kafka 作为数据源。Kafka 基本上是支流的流式解决零碎的规范数据源。大部分流式零碎中都部署了 Kafka,包含 Spark 和 Storm,此时应用 Kafka Stream 的老本非常低。
3、省资源。应用 Storm 或 Spark Streaming 时,须要为框架自身的过程预留资源,框架自身也占资源。
4、Kafka 自身也有长处。因为 Kafka Consumer Rebalance 机制,Kafka Stream 能够在线动静调整并发度。
2、Kafka Stream 如何解决流式零碎中关键问题
2.1 KTable 和 KSteam
KTable 和 KSteam 是 Kafka 中十分重要的概念,在此剖析一下二者区别。
- KStream 是一个数据流,能够认为所有的记录都通过 Insert only 的形式插入进这个数据流中。
- KTable 代表一个残缺的数据集,能够了解为数据库中的表。每条记录都是 KV 键值对,key 能够了解为数据库中的主键,是惟一的,而 value 代表一条记录。咱们能够认为 KTable 中的数据时通过 Update only 的形式进入的。如果是雷同的 key,会笼罩掉原来的那条记录。
- 综上来说,KStream 是数据流,来多少数据就插入多少数据,是 Insert only;KTable 是数据集,雷同 key 只容许保留最新的记录,也就是 Update only
2.2 工夫
Kafka 反对三种工夫:
- 事件产生工夫:事件产生的工夫,蕴含在数据记录中。产生工夫由 Producer 在结构 ProducerRecord 时指定。并且须要 Broker 或者 Topic 将 message.timestamp.type 设置为 CreateTime(默认值)能力失效。
- 音讯接管工夫:也即音讯存入 Broker 的工夫。当 Broker 或 Topic 将 message.timestamp.type 设置为 LogAppendTime 时失效。此时 Broker 会在接管到音讯后,存入磁盘前,将其 timestamp 属性值设置为以后机器工夫。个别音讯接管工夫比拟靠近于事件产生工夫,局部场景下可代替事件产生工夫。
- 音讯解决工夫。也即 Kafka Stream 解决音讯时的工夫。
2.3 窗口
流式数据在工夫上无界的,然而聚合操作只能作用在特定 (有界) 的数据集,咋整???这时候就有了窗口的概念,在工夫无界的数据流中定义一个边界来用于计算。Kafka 反对的窗口如下:
- 1)Hopping Time Window:举一个典型的利用场景,每隔 5 秒钟输入一次过来 1 个小时内网站的 PV 或者 UV。外面有两个工夫 1 小时和 5 秒钟,1 小时指定了窗口的大小(Window size),5 秒钟定义输入的工夫距离(Advance interval)。
- 2)Tumbling Time Window:能够认为是 Hopping Time Window 的一种特例,窗口大小 = 输入工夫距离,它的特点是各个 Window 之间齐全不相交。
- 3)Sliding Window 该窗口只用于 2 个 KStream 进行 Join 计算时。该窗口的大小定义了 Join 两侧 KStream 的数据记录被认为在同一个窗口的最大时间差。假如该窗口的大小为 5 秒,则参加 Join 的 2 个 KStream 中,记录时间差小于 5 的记录被认为在同一个窗口中,能够进行 Join 计算。
- 4)Session Window 该窗口用于对 Key 做 Group 后的聚合操作中。它须要对 Key 做分组,而后对组内的数据依据业务需要定义一个窗口的起始点和完结点。一个典型的案例是,心愿通过 Session Window 计算某个用户拜访网站的工夫。对于一个特定的用户(用 Key 示意)而言,当产生登录操作时,该用户(Key)的窗口即开始,当产生退出操作或者超时时,该用户(Key)的窗口即完结。窗口完结时,可计算该用户的拜访工夫或者点击次数等。
3、Kafka Stream 利用示例
增加 pom 依赖:
`<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
3.1 案例一:将 topicA 的数据写入到 topicB 中(纯复制)
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class MyStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 输出 key 的类型
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); // 输出 value 的类型
// 创立流结构器
StreamsBuilder builder = new StreamsBuilder();
// 构建好 builder,将 myStreamIn topic 中的数据写入到 myStreamOut topic 中
builder.stream("myStreamIn").to("myStreamOut");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
在这里阐明一下prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
,咱们将 TopicA 的数据写入到 TopicB 中,就相当于这个流在生产 TopicA 的数据,咱们晓得一个 Topic 中,一个生产组里只能有一个消费者去生产它。假如咱们将 TopicA 的数据写入到 TopicB 的过程中,报错了(比方虚拟机内存满了),这时数据只写了一半,咱们清理完内存后,想持续写剩下的数据,再次运行咱们发现报错,写不了了。这时候咱们须要批改这个参数将 mystream 改成别的名字,因为同一个消费者组里只能有一个消费者去生产它。
开启 zookeeper 和 Kafka
`# 开启 zookeeper
zkServer.sh start
# 后盾启动 Kafka
kafka-server-start.sh -daemon /opt/kafka/config/server.properties`
* 1
* 2
* 3
* 4
创立 topic myStreamIn
`kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamIn --partitions 1 --replication-factor 1`
* 1
创立 topic myStreamOut
`kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamOut --partitions 1 --replication-factor 1`
* 1
生产音讯写入到 myStreamIn
`kafka-console-producer.sh --topic myStreamIn --broker-list 192.168.136.20:9092`
* 1
生产 myStreamOut 里的数据
`kafka-console-consumer.sh --topic myStreamOut --bootstrap-server 192.168.136.20:9092 --from-beginning`
* 1
运行示例代码并在生产者端输出数据,能在生产端看到数据,表明 Kafka Stream 写入胜利。
3.2 案例二:将 TopicA 中的数据实现 wordcount 写入到 TopicB
工作中不可能像案例一一样将一个 Topic 的数据一成不变存入另一个 Topic,个别是要通过解决,这就须要在流中加上逻辑。
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WordCountStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcountstream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); // 提交工夫设置为 2 秒
//prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,""earliest); //earliest latest none 默认 latest
//prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(主动提交) false(手动提交)
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
// 创立流结构器
//hello world
//hello java
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Long> count = builder.stream("wordcount-input") // 从 kafka 中一条一条取数据
.flatMapValues( // 返回压扁后的数据
(value) -> { // 对数据按空格进行切割,返回 List 汇合
String[] split = value.toString().split(" ");
List<String> strings = Arrays.asList(split);
return strings;
}) //null hello,null world,null hello,null java
.map((k, v) -> {return new KeyValue<String, String>(v,"1");
}).groupByKey().count();
count.toStream().foreach((k,v)->{
// 为了测试不便,咱们将 kv 输入到控制台
System.out.println("key:"+k+""+"value:"+v);
});
count.toStream().map((x,y)->{return new KeyValue<String,String>(x,y.toString()); // 留神转成 toString 类型,咱们后面设置的 kv 的类型都是 string 类型
}).to("wordcount-output");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
* 56
* 57
* 58
* 59
* 60
* 61
* 62
* 63
`# 创立 TopicA(wordcount-input)
kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic wordcount-input --partitions 1 --replication-factor 1
# 创立 TopicB(wordcount-output)
kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic wordcount-output --partitions 1 --replication-factor 1
# 创立生产者
kafka-console-producer.sh --topic wordcount-input --broker-list 192.168.136.20:9092
# 创立消费者,须要打印出 key(--property print.key=true)
kafka-console-consumer.sh --topic wordcount-output --bootstrap-server 192.168.136.20:9092 --from-beginning --property print.key=true`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
运行示例代码并在生产者端输出数据,能在生产端看到数据,表明 Kafka Stream 写入胜利。
3.3 示例三:在 TopicA 中每输出一个值求和并写入到 TopicB
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class SumStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sumstream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); // 提交工夫设置为 2 秒
//prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //earliest latest none 默认 latest
//prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(主动提交) false(手动提交)
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
// 创立流结构器
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("suminput");
KTable<String, String> sum1 = source.map((key, value) ->
new KeyValue<String, String>("sum", value.toString())
).groupByKey().reduce((x, y) -> {System.out.println("x:" + x + "" +"y: "+y);
Integer sum = Integer.valueOf(x) + Integer.valueOf(y);
System.out.println("sum:"+sum);
return sum.toString();});
sum1.toStream().to("sumout");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
`# 创立 topicA(suminput)
kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic suminput --partitions 1 --replication-factor 1
# 创立生产者
kafka-console-producer.sh --topic suminput --broker-list 192.168.136.20:9092
# 创立 topicB(sumout)
kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic sumout --partitions 1 --replication-factor 1
# 创立消费者
kafka-console-consumer.sh --topic sumout --bootstrap-server 192.168.136.20:9092 --from-beginning`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
运行示例代码并在生产者端输出数据,能在生产端看到数据,表明 Kafka Stream 写入胜利。
3.4 案例四:窗口
3.4.1 每隔 2 秒钟输入一次过来 5 秒内 topicA 里的 wordcount,后果写入到 TopicB
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WindowStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); // 提交工夫设置为 3 秒
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("topicA");
KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))
.map((x, y) -> {return new KeyValue<String, String>(y, "1");
}).groupByKey()
// 加 5 秒窗口, 按步长 2 秒滑动 Hopping Time Window
.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()).advanceBy(Duration.ofSeconds(2).toMillis()))
//.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))
.count();
// 为了不便查看,输入到控制台
countKtable.toStream().foreach((x,y)->{System.out.println("x:"+x+"y:"+y);
});
countKtable.toStream().map((x,y)-> {return new KeyValue<String, String>(x.toString(), y.toString());
}).to("topicB");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
* 56
* 57
* 58
3.4.2 每隔 5 秒钟输入一次过来 5 秒内 topicA 里的 wordcount,后果写入到 TopicB
加 5 秒窗口,与 3.4.1 不同的是,前一个 5 秒与下一个 5 秒没有任何穿插。
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WindowStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); // 提交工夫设置为 3 秒
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("topicA");
KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))
.map((x, y) -> {return new KeyValue<String, String>(y, "1");
}).groupByKey()
// 加五秒的窗口(前一个 5 秒和下一个 5 秒没有任何穿插) Tumbling Time Window
.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()))
.count();
countKtable.toStream().foreach((x,y)->{System.out.println("x:"+x+"y:"+y);
});
countKtable.toStream().map((x,y)-> {return new KeyValue<String, String>(x.toString(), y.toString());
}).to("topicB");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
3.4.3 TopicA 15 秒内的 wordcount,后果写入 TopicB
比方登录某 app,20 分钟内不操作,会主动退出。
一个典型的案例是,心愿通过 Session Window 计算某个用户拜访网站的工夫。对于一个特定的用户(用 Key 示意)而言,当产生登录操作时,该用户(Key)的窗口即开始,当产生退出操作或者超时时,该用户(Key)的窗口即完结。窗口完结时,可计算该用户的拜访工夫或者点击次数等。
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WindowStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); // 提交工夫设置为 3 秒
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("windowdemo1");
KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))
.map((x, y) -> {return new KeyValue<String, String>(y, "1");
}).groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))
.count();
countKtable.toStream().foreach((x,y)->{System.out.println("x:"+x+"y:"+y);
});
countKtable.toStream().map((x,y)-> {return new KeyValue<String, String>(x.toString(), y.toString());
}).to("windowDemoOut");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
3.5 案例五:将 TopicA 的某一列扁平化解决写入 TopicB
现有一张表 user_friends,表构造如下,去掉表头,应用 flume 将内容写进 Kafka 的 TopicA,将第二列扁平化解决写入 TopicB。Flume–>Kafka 的 TopicA 见另一篇博客 Flume 整合 Kafka,本文演示从 TopicA–>TopicB。
解决后的数据要求如下,就是将第二列开展。
`import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class UserFriendStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"UserFriendStream1");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.stream("user_friends_raw").flatMap((k,v)->{List<KeyValue<String,String>> list=new ArrayList<>();
String[] info = v.toString().split(",");
if(info.length==2){String[] friends = info[1].split("s+");
if (info[0].trim().length()>0){for (String friend : friends) {
// 为了不便测试打印进去
System.out.println(info[0]+" "+friend);
list.add(new KeyValue<String,String>(null,info[0]+","+friend));
}}}
return list;
}).to("user_friends");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
3.6 案例六:将 TopicA 的多列扁平化解决写入 TopicB
现有一张表 event_attendees.csv,内容如下,去掉表头,应用 Flume 将内容写入 Kafka 的 TopicA,将二三四五列扁平化解决写入 TopicB。本案例仅演示 TopicA–>TopicB,不演示 Flume–>Kafka。
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class EventAttendStream {public static void main(String[] args) {Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"UserFriendStream1");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper 的地址
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> ear = builder.stream("event_attendees_raw");
KStream<String, String> eventStream = ear.flatMap((k, v) -> { //event,yes,maybe,invited,no
System.out.println(k + " " + v);
String[] split = v.toString().split(",");
ArrayList<KeyValue<String, String>> list = new ArrayList<>();
if (split.length >= 2 && split[1].trim().length() > 0) {String[] yes = split[1].split("s+");
for (String y : yes) {list.add(new KeyValue<String, String>(null, split[0] + "," + y + ",yes"));
}
}
if (split.length >= 3 && split[2].trim().length() > 0) {String[] maybe = split[2].split("s+");
for (String mb : maybe) {list.add(new KeyValue<String, String>(null, split[0] + "," + mb + ",maybe"));
}
}
if (split.length >= 4 && split[3].trim().length() > 0) {String[] invited = split[3].split("s+");
for (String inv : invited) {list.add(new KeyValue<String, String>(null, split[0] + "," + inv + ",invited"));
}
}
if (split.length >= 5 && split[4].trim().length() > 0) {String[] no = split[4].split("s+");
for (String n : no) {list.add(new KeyValue<String, String>(null, split[0] + "," + no + ",no"));
}
}
return list;
});
eventStream.to("events_attend");
final Topology topo=builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {streams.close();
latch.countDown();}
});
try {streams.start();
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
System.exit(0);
}
}