接上篇文章 【kafka KSQL】游戏日志统计分析(2),本文主要通过实例展示KSQL的连接查询功能。创建另一个topicbin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic prop-normalized往新topic中写入数据bin/kafka-console-producer –broker-list localhost:9092 –topic prop-normalized>{“user__name”:“lzb”, “prop__id”:“id1”}从prop-normalized主题创建StreamCREATE STREAM PROP_USE_EVENT \ (user__name VARCHAR, \ prop__id VARCHAR ) \ WITH (KAFKA_TOPIC=‘prop-normalized’, \ VALUE_FORMAT=‘json’);重新设置ROWKEY为user__nameCREATE STREAM PROP_USE_EVENT_REKEY AS \ SELECT * FROM PROP_USE_EVENT \ PARTITION BY user__name;查询完成3局对局且没有使用过道具的所有玩家查询出所有玩家的对局情况,并创建表USER_SCORE_TABLE(前面已经创建过了):CREATE TABLE USER_SCORE_TABLE AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \ FROM USER_SCORE_EVENT \ WHERE reason = ‘game’ \ GROUP BY username;查询出所有玩家的道具使用情况,并创建表USER_PROP_TABLE:CREATE TABLE USER_PROP_TABLE AS \ SELECT username, COUNT() \ FROM PROP_USE_EVENT_REKEY \ GROUP BY username;使用LEFT JOIN进行左关联查询:SELECT s.username AS username \FROM USER_SCORE_TABLE s \LEFT JOIN USER_PROP_TABLE p \ON s.username = p.username;