共计 1013 个字符,预计需要花费 3 分钟才能阅读完成。
接上篇文章【kafka KSQL】游戏日志统计分析(2),本文主要通过实例展示 KSQL 的连接查询功能。
创建另一个 topic
bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic prop-normalized
往新 topic 中写入数据
bin/kafka-console-producer –broker-list localhost:9092 –topic prop-normalized
>
{“user__name”:”lzb”, “prop__id”:”id1″}
从 prop-normalized 主题创建 Stream
CREATE STREAM PROP_USE_EVENT \
(user__name VARCHAR, \
prop__id VARCHAR ) \
WITH (KAFKA_TOPIC=’prop-normalized’, \
VALUE_FORMAT=’json’);
重新设置 ROWKEY 为 user__name
CREATE STREAM PROP_USE_EVENT_REKEY AS \
SELECT * FROM PROP_USE_EVENT \
PARTITION BY user__name;
查询完成 3 局对局且没有使用过道具的所有玩家
查询出所有玩家的对局情况,并创建表 USER_SCORE_TABLE(前面已经创建过了):
CREATE TABLE USER_SCORE_TABLE AS \
SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \
FROM USER_SCORE_EVENT \
WHERE reason = ‘game’ \
GROUP BY username;
查询出所有玩家的道具使用情况,并创建表 USER_PROP_TABLE:
CREATE TABLE USER_PROP_TABLE AS \
SELECT username, COUNT(*) \
FROM PROP_USE_EVENT_REKEY \
GROUP BY username;
使用 LEFT JOIN 进行左关联查询:
SELECT s.username AS username \
FROM USER_SCORE_TABLE s \
LEFT JOIN USER_PROP_TABLE p \
ON s.username = p.username;