【kafka KSQL】游戏日志统计分析(1)
以游戏结算日志为例,展示利用 KSQL 对日志进行统计分析的过程。
启动 confluent
cd ~/Documents/install/confluent-5.0.1/
bin/confluent start
查看 kafka 主题列表
bin/kafka-topics –list –zookeeper localhost:2181
创建接受游戏结算日志的 topic
bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic score-normalized
使用生产者命令行工具往 topic 中写日志
bin/kafka-console-producer –broker-list localhost:9092 –topic score-normalized
>
{“cost”:7, “epoch”:1512342568296,”gameId”:”2017-12-04_07:09:28_高手 1 区_200_015_185175″,”gameType”:”situan”,”gamers”: [{“balance”:4405682,”delta”:-60,”username”:”0791754000″}, {“balance”:69532,”delta”:-60,”username”:”70837999″}, {“balance”:972120,”delta”:-60,”username”:”abc6378303″}, {“balance”:23129,”delta”:180,”username”:”a137671268″}],”reason”:”xiayu”}
使用消费者命令行工具查看日志是否正常写入
bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic score-normalized –from-beginning
;; 可以看到
{“cost”:7, “epoch”:1512342568296,”gameId”:”2017-12-04_07:09:28_高手 1 区_200_015_185175″,”gameType”:”situan”,”gamers”: [{“balance”:4405682,”delta”:-60,”username”:”0791754000″}, {“balance”:69532,”delta”:-60,”username”:”70837999″}, {“balance”:972120,”delta”:-60,”username”:”abc6378303″}, {“balance”:23129,”delta”:180,”username”:”a137671268″}],”reason”:”xiayu”}
启动 KSQL 客户端
bin/ksql http://localhost:8088
可以看到 ksql 启动后的图标,和操作终端。
ksql 终端查看 kafka topic 列表
ksql> show topics;
打印 topic 中的消息
PRINT ‘score-normalized’;
可以看到:
Format:STRING
19-1-5 下午 11 时 59 分 31 秒 , NULL , {“cost”:7, “epoch”:1512342568296,”gameId”:”2017-12-04_07:09:28_\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175″,”gameType”:”situan”,”gamers”: [{“balance”:4405682,”delta”:-60,”username”:”0791754000″}, {“balance”:69532,”delta”:-60,”username”:”70837999″}, {“balance”:972120,”delta”:-60,”username”:”abc6378303″}, {“balance”:23129,”delta”:180,”username”:”a137671268″}],”reason”:”xiayu”}
其中:
第一个逗号 19-1-5 下午 11 时 59 分 31 秒表示消息时间。
第二个逗号 NULL 为消息的 Key,因为是从 kafka-console-producer 推送的,默认为 NULL。
后面的就是推送过来的消息内容。
从 topic score-normalized 创建一个 Stream
CREATE STREAM SCORE_EVENT \
(epoch BIGINT, \
gameType VARCHAR, \
cost INTEGER, \
gamers ARRAY< \
STRUCT< \
username VARCHAR, \
balance BIGINT, \
delta BIGINT \
> \
>, \
gameId VARCHAR, \
tax BIGINT, \
reason VARCHAR) \
WITH (KAFKA_TOPIC=’score-normalized’, \
VALUE_FORMAT=’JSON’);
删除一个 STREAM
DROP STREAM stream_name ;
如果有查询语句在查询该流,则会出现错误:
Cannot drop USER_SCORE_EVENT.
The following queries read from this source: [].
The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3].
You need to terminate them before dropping USER_SCORE_EVENT.
需要用 TERMINATE 命令停止这些查询语句,然后再删除流:
TERMINATE CSAS_USER_SCORE_EVENT_2;
TERMINATE InsertQuery_4;
从最早记录开始查询
ksql> SET ‘auto.offset.reset’ = ‘earliest’;
从 Stream 中查询所有数据
ksql> SELECT * FROM SCORE_EVENT;
可以看到:
1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手 1 区_200_015_185175 | null | xiayu
其中:
第 1 列为记录的时间戳。
第 2 列为记录的 key。
第 3 列以后就是消息中的各个字段的值,对应创建流时的顺序。
倒数第 2 列的 null,是因为消息中 tax 字段不存在。
统计 2017-12-04 日的对局总数
;; 增加一个 game_date 字段,用于统计
CREATE STREAM SCORE_EVENT_WITH_DATE AS \
SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \
FROM SCORE_EVENT;
SELECT game_date, COUNT(*) \
FROM SCORE_EVENT_WITH_DATE \
WHERE game_date = ‘2017-12-04’ AND reason = ‘game’ \
GROUP BY game_date;
目前 KSQL 还不支持类似下面的查询:
SELECT COUNT(*) \
FROM SCORE_EVENT \
WHERE gameId LIKE ‘2017-12-04_%’;
统计参与对局的总玩家数(去重)
因为一条日志中包含多个玩家的对局信息,所以想法把每个玩家拆分成单独的事件
整合各个玩家的事件到一个统一的流 USER_SCORE_EVENT:
CREATE STREAM USER_SCORE_EVENT AS \
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \
FROM SCORE_EVENT;
INSERT INTO USER_SCORE_EVENT \
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \
FROM SCORE_EVENT;
INSERT INTO USER_SCORE_EVENT \
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \
FROM SCORE_EVENT;
INSERT INTO USER_SCORE_EVENT \
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \
FROM SCORE_EVENT;
统计各个玩家总的对局数、输赢总数、贡献的总税收,并以此创建一个表 USER_SCORE_TABLE:
CREATE TABLE USER_SCORE_TABLE AS \
SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \
FROM USER_SCORE_EVENT \
WHERE reason = ‘game’ \
GROUP BY username;
查看 USER_SCORE_TABLE 所有数据:
ksql> SELECT * FROM USER_SCORE_TABLE;
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
1546709352758 | 0791754000 | 0791754000 | 4 | -240 | 0
1546709338711 | a137671268 | a137671268 | 4 | 720 | 0
1546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0
查询某个玩家的对局数、输赢总数、贡献的总税收:
ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = ‘70837999’;
输出:
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
统计玩家总数(去重)
添加一个傀儡列用于统计:
CREATE TABLE USER_SCORE_WITH_TAG AS \
SELECT 1 AS tag, * FROM USER_SCORE_TABLE;
统计去重后的玩家总数
SELECT tag, COUNT(username) \
FROM USER_SCORE_WITH_TAG \
GROUP BY tag;
续
KSQL WINDOW 功能。