【kafka KSQL】游戏日志统计分析(1)以游戏结算日志为例,展示利用KSQL对日志进行统计分析的过程。启动confluentcd ~/Documents/install/confluent-5.0.1/bin/confluent start查看kafka主题列表bin/kafka-topics –list –zookeeper localhost:2181创建接受游戏结算日志的topicbin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic score-normalized使用生产者命令行工具往topic中写日志bin/kafka-console-producer –broker-list localhost:9092 –topic score-normalized> {“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}使用消费者命令行工具查看日志是否正常写入bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic score-normalized –from-beginning;; 可以看到{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}启动KSQL客户端bin/ksql http://localhost:8088可以看到ksql启动后的图标,和操作终端。ksql终端查看kafka topic列表ksql> show topics;打印topic中的消息PRINT ‘score-normalized’;可以看到:Format:STRING19-1-5 下午11时59分31秒 , NULL , {“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}其中:第一个逗号19-1-5 下午11时59分31秒表示消息时间。第二个逗号NULL为消息的Key,因为是从kafka-console-producer推送的,默认为NULL。后面的就是推送过来的消息内容。从topic score-normalized创建一个StreamCREATE STREAM SCORE_EVENT \ (epoch BIGINT, \ gameType VARCHAR, \ cost INTEGER, \ gamers ARRAY< \ STRUCT< \ username VARCHAR, \ balance BIGINT, \ delta BIGINT \ > \ >, \ gameId VARCHAR, \ tax BIGINT, \ reason VARCHAR) \ WITH ( KAFKA_TOPIC=‘score-normalized’, \ VALUE_FORMAT=‘JSON’);删除一个STREAMDROP STREAM stream_name ;如果有查询语句在查询该流,则会出现错误:Cannot drop USER_SCORE_EVENT. The following queries read from this source: []. The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3]. You need to terminate them before dropping USER_SCORE_EVENT.需要用TERMINATE命令停止这些查询语句,然后再删除流:TERMINATE CSAS_USER_SCORE_EVENT_2;TERMINATE InsertQuery_4;从最早记录开始查询ksql> SET ‘auto.offset.reset’ = ’earliest’;从Stream中查询所有数据ksql> SELECT * FROM SCORE_EVENT;可以看到:1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手1区_200_015_185175 | null | xiayu其中:第1列为记录的时间戳。第2列为记录的key。第3列以后就是消息中的各个字段的值,对应创建流时的顺序。倒数第2列的null,是因为消息中tax字段不存在。统计2017-12-04日的对局总数;; 增加一个game_date字段,用于统计CREATE STREAM SCORE_EVENT_WITH_DATE AS \ SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \ FROM SCORE_EVENT; SELECT game_date, COUNT() \ FROM SCORE_EVENT_WITH_DATE \ WHERE game_date = ‘2017-12-04’ AND reason = ‘game’ \ GROUP BY game_date;目前KSQL还不支持类似下面的查询:SELECT COUNT() \ FROM SCORE_EVENT \ WHERE gameId LIKE ‘2017-12-04_%’;统计参与对局的总玩家数(去重)因为一条日志中包含多个玩家的对局信息,所以想法把每个玩家拆分成单独的事件整合各个玩家的事件到一个统一的流USER_SCORE_EVENT:CREATE STREAM USER_SCORE_EVENT AS \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \ FROM SCORE_EVENT;统计各个玩家总的对局数、输赢总数、贡献的总税收,并以此创建一个表USER_SCORE_TABLE:CREATE TABLE USER_SCORE_TABLE AS \ SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \ FROM USER_SCORE_EVENT \ WHERE reason = ‘game’ \ GROUP BY username;查看USER_SCORE_TABLE所有数据:ksql> SELECT * FROM USER_SCORE_TABLE;1546709338711 | 70837999 | 70837999 | 4 | -240 | 01546709352758 | 0791754000 | 0791754000 | 4 | -240 | 01546709338711 | a137671268 | a137671268 | 4 | 720 | 01546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0查询某个玩家的对局数、输赢总数、贡献的总税收:ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = ‘70837999’;输出:1546709338711 | 70837999 | 70837999 | 4 | -240 | 0统计玩家总数(去重)添加一个傀儡列用于统计:CREATE TABLE USER_SCORE_WITH_TAG AS \ SELECT 1 AS tag, * FROM USER_SCORE_TABLE;统计去重后的玩家总数SELECT tag, COUNT(username) \FROM USER_SCORE_WITH_TAG \GROUP BY tag;续KSQL WINDOW 功能。
...