【kafka KSQL】游戏日志统计分析(3)

接上篇文章 【kafka KSQL】游戏日志统计分析(2),本文主要通过实例展示KSQL的连接查询功能。
创建另一个topic
bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic prop-normalized
往新topic中写入数据
bin/kafka-console-producer –broker-list localhost:9092 –topic prop-normalized
>
{“user__name”:”lzb”, “prop__id”:”id1″}
从prop-normalized主题创建Stream
CREATE STREAM PROP_USE_EVENT \
(user__name VARCHAR, \
prop__id VARCHAR ) \
WITH (KAFKA_TOPIC=’prop-normalized’, \
VALUE_FORMAT=’json’);
重新设置ROWKEY为user__name

CREATE STREAM PROP_USE_EVENT_REKEY AS \
SELECT * FROM PROP_USE_EVENT \
PARTITION BY user__name;
查询完成3局对局且没有使用过道具的所有玩家
查询出所有玩家的对局情况,并创建表USER_SCORE_TABLE(前面已经创建过了):
CREATE TABLE USER_SCORE_TABLE AS \
SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \
FROM USER_SCORE_EVENT \
WHERE reason = ‘game’ \
GROUP BY username;
查询出所有玩家的道具使用情况,并创建表USER_PROP_TABLE:
CREATE TABLE USER_PROP_TABLE AS \
SELECT username, COUNT(*) \
FROM PROP_USE_EVENT_REKEY \
GROUP BY username;
使用LEFT JOIN进行左关联查询:
SELECT s.username AS username \
FROM USER_SCORE_TABLE s \
LEFT JOIN USER_PROP_TABLE p \
ON s.username = p.username;

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理