用户行为日志剖析是实时数据处理很常见的一个利用场景,比方常见的 PV、UV 统计。本文将基于 Flink 从 0 到 1 构建一个用户行为日志剖析零碎,包含架构设计与代码实现。本文分享将残缺出现日志剖析零碎的数据处理链路,通过本文,你能够理解到:
- 基于 discuz 搭建一个论坛平台
- Flume 日志收集零碎应用形式
- Apache 日志格局剖析
- Flume 与 Kafka 集成
- 日志剖析解决流程
- 架构设计与残缺的代码实现
我的项目简介
本文分享会从 0 到 1 基于 Flink 实现一个实时的用户行为日志剖析零碎,根本架构图如下:
首先会先搭建一个论坛平台,对论坛平台产生的用户点击日志进行剖析。而后应用 Flume 日志收集系统对产生的 Apache 日志进行收集,并将其推送到 Kafka。接着咱们应用 Flink 对日志进行实时剖析解决,将解决之后的后果写入 MySQL 供前端利用可视化展现。本文次要实现以下三个指标计算:
- 统计热门板块,即访问量最高的板块
- 统计热门文章,即访问量最高的帖子文章
- 统计不同客户端对版块和文章的总访问量
基于 discuz 搭建一个论坛平台
装置 XAMPP
- 下载
wget https://www.apachefriends.org/xampp-files/5.6.33/xampp-linux-x64-5.6.33-0-installer.run
- 装置
# 赋予文件执行权限
chmod u+x xampp-linux-x64-5.6.33-0-installer.run
# 运行安装文件
./xampp-linux-x64-5.6.33-0-installer.run
- 配置环境变量
将以下内容退出到 ~/.bash_profile
export XAMPP=/opt/lampp/
export PATH=$PATH:$XAMPP:$XAMPP/bin
- 刷新环境变量
source ~/.bash_profile
- 启动 XAMPP
xampp restart
- MySQL 的 root 用户明码和权限批改
# 批改 root 用户明码为 123qwe
update mysql.user set password=PASSWORD('123qwe') where user='root';
flush privileges;
#赋予 root 用户近程登录权限
grant all privileges on *.* to 'root'@'%' identified by '123qwe' with grant option;
flush privileges;
装置 Discuz
- 下载 discuz
wget http://download.comsenz.com/DiscuzX/3.2/Discuz_X3.2_SC_UTF8.zip
- 装置
# 删除原有的 web 利用
rm -rf /opt/lampp/htdocs/*
unzip Discuz_X3.2_SC_UTF8.zip –d /opt/lampp/htdocs/
cd /opt/lampp/htdocs/
mv upload/*
#批改目录权限
chmod 777 -R /opt/lampp/htdocs/config/
chmod 777 -R /opt/lampp/htdocs/data/
chmod 777 -R /opt/lampp/htdocs/uc_client/
chmod 777 -R /opt/lampp/htdocs/uc_server/
Discuz 基本操作
- 自定义版块
- 进入 discuz 后盾:http://kms-4/admin.php
- 点击顶部的 论坛 菜单
- 依照页面提醒创立所需版本,能够创立父子版块
Discuz 帖子 / 版块存储数据库表介
-- 登录 ultrax 数据库
mysql -uroot -p123 ultrax
-- 查看蕴含帖子 id 及题目对应关系的表
-- tid, subject(文章 id、题目)select tid, subject from pre_forum_post limit 10;
-- fid, name(版块 id、题目)select fid, name from pre_forum_forum limit 40;
当咱们在各个板块增加帖子之后,如下所示:
批改日志格局
- 查看拜访日志
# 日志默认地址
/opt/lampp/logs/access_log
# 实时查看日志命令
tail –f /opt/lampp/logs/access_log
- 批改日志格局
Apache 配置文件名称为 httpd.conf,残缺门路为 /opt/lampp/etc/httpd.conf
。因为默认的日志类型为common 类型,总共有 7 个字段。为了获取更多的日志信息,咱们须要将其格局批改为 combined 格局,该日志格局共有 9 个字段。批改形式如下:
# 启用组合日志文件
CustomLog "logs/access_log" combined
- 从新加载配置文件
xampp reload
Apache 日志格局介绍
192.168.10.1 - - [30/Aug/2020:15:53:15 +0800] "GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" 200 30647 "http://kms-4/forum.php" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"
下面的日志格局共有 9 个字段,别离用空格隔开。每个字段的具体含意如下:
192.168.10.1 ##(1)客户端的 IP 地址
- ## (2)客户端 identity 标识, 该字段为 "-"
- ## (3)客户端 userid 标识, 该字段为 "-"
[30/Aug/2020:15:53:15 +0800] ## (4)服务器实现申请解决时的工夫
"GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" ## (5)申请类型 申请的资源 应用的协定
200 ## (6)服务器返回给客户端的状态码,200 示意胜利
30647 ## (7)返回给客户端不包含响应头的字节数,如果没有信息返回,则此项应该是 "-"
"http://kms-4/forum.php" ## (8)Referer 申请头
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36" ## (9)客户端的浏览器信息
对于下面的日志格局,能够应用正则表达式进行匹配:
(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) (\S+) (\S+) (\[.+?\]) (\"(.*?)\") (\d{3}) (\S+) (\"(.*?)\") (\"(.*?)\")
Flume 与 Kafka 集成
本文应用 Flume 对产生的 Apache 日志进行收集,而后推送至 Kafka。须要启动 Flume agent 对日志进行收集,对应的配置文件如下:
# agent 的名称为 a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
# set source
a1.sources.source1.type = TAILDIR
a1.sources.source1.filegroups = f1
a1.sources.source1.filegroups.f1 = /opt/lampp/logs/access_log
a1sources.source1.fileHeader = flase
# 配置 sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.brokerList=kms-2:9092,kms-3:9092,kms-4:9092
a1.sinks.sink1.topic= user_access_logs
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
# 配置 channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /home/kms/data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /home/kms/data/flume_data/data
# 配置 bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1
知识点:
Taildir Source相比 Exec Source、Spooling Directory Source 的劣势是什么?
TailDir Source:断点续传、多目录。Flume1.6 以前须要本人自定义 Source 记录每次读取文件地位, 实现断点续传
Exec Source:能够实时收集数据, 然而在 Flume 不运行或者 Shell 命令出错的状况下, 数据将会失落
Spooling Directory Source:监控目录, 不反对断点续传
值得注意的是,下面的配置是间接将原始日志 push 到 Kafka。除此之外,咱们还能够自定义 Flume 的拦截器对原始日志先进行过滤解决,同时也能够实现将不同的日志 push 到 Kafka 的不同 Topic 中。
启动 Flume Agent
将启动 Agent 的命令封装成 shell 脚本:start-log-collection.sh , 脚本内容如下:
#!/bin/bash
echo "start log agent !!!"
/opt/modules/apache-flume-1.9.0-bin/bin/flume-ng agent --conf-file /opt/modules/apache-flume-1.9.0-bin/conf/log_collection.conf --name a1 -Dflume.root.logger=INFO,console
查看 push 到 Kafka 的日志数据
将控制台消费者命令封装成 shell 脚本:kafka-consumer.sh,脚本内容如下:
#!/bin/bash
echo "kafka consumer"
bin/kafka-console-consumer.sh --bootstrap-server kms-2.apache.com:9092,kms-3.apache.com:9092,kms-4.apache.com:9092 --topic $1 --from-beginning
应用上面命令生产 Kafka 中的数据:
[kms@kms-2 kafka_2.11-2.1.0]$ ./kafka-consumer.sh user_access_logs
日志剖析解决流程
为了不便解释,上面会对重要代码进行解说,残缺代码移步github:https://github.com/jiamx/flin…
创立 MySQL 数据库和指标表
-- 客户端访问量统计
CREATE TABLE `client_ip_access` (`client_ip` char(50) NOT NULL COMMENT '客户端 ip',
`client_access_cnt` bigint(20) NOT NULL COMMENT '拜访次数',
`statistic_time` text NOT NULL COMMENT '统计工夫',
PRIMARY KEY (`client_ip`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 热门文章统计
CREATE TABLE `hot_article` (`article_id` int(10) NOT NULL COMMENT '文章 id',
`subject` varchar(80) NOT NULL COMMENT '文章题目',
`article_pv` bigint(20) NOT NULL COMMENT '拜访次数',
`statistic_time` text NOT NULL COMMENT '统计工夫',
PRIMARY KEY (`article_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 热门板块统计
CREATE TABLE `hot_section` (`section_id` int(10) NOT NULL COMMENT '版块 id',
`name` char(50) NOT NULL COMMENT '版块题目',
`section_pv` bigint(20) NOT NULL COMMENT '拜访次数',
`statistic_time` text NOT NULL COMMENT '统计工夫',
PRIMARY KEY (`section_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
AccessLogRecord 类
该类封装了日志所蕴含的字段数据,共有 9 个字段。
/**
* 应用 lombok
* 原始日志封装类
*/
@Data
public class AccessLogRecord {
public String clientIpAddress; // 客户端 ip 地址
public String clientIdentity; // 客户端身份标识, 该字段为 `-`
public String remoteUser; // 用户标识, 该字段为 `-`
public String dateTime; // 日期, 格局为[day/month/yearhourminutesecond zone]
public String request; // url 申请, 如:`GET /foo ...`
public String httpStatusCode; // 状态码,如:200; 404.
public String bytesSent; // 传输的字节数,有可能是 `-`
public String referer; // 参考链接, 即起源页
public String userAgent; // 浏览器和操作系统类型
}
LogParse 类
该类是日志解析类,通过正则表达式对日志进行匹配,对匹配上的日志进行依照字段解析。
public class LogParse implements Serializable {
// 构建正则表达式
private String regex = "(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) (\\S+) (\\S+) (\\[.+?\\]) (\\\"(.*?)\\\") (\\d{3}) (\\S+) (\\\"(.*?)\\\") (\\\"(.*?)\\\")";
private Pattern p = Pattern.compile(regex);
/*
* 结构拜访日志的封装类对象
* */
public AccessLogRecord buildAccessLogRecord(Matcher matcher) {AccessLogRecord record = new AccessLogRecord();
record.setClientIpAddress(matcher.group(1));
record.setClientIdentity(matcher.group(2));
record.setRemoteUser(matcher.group(3));
record.setDateTime(matcher.group(4));
record.setRequest(matcher.group(5));
record.setHttpStatusCode(matcher.group(6));
record.setBytesSent(matcher.group(7));
record.setReferer(matcher.group(8));
record.setUserAgent(matcher.group(9));
return record;
}
/**
* @param record:record 示意一条 apache combined 日志
* @return 解析日志记录,将解析的日志封装成一个 AccessLogRecord 类
*/
public AccessLogRecord parseRecord(String record) {Matcher matcher = p.matcher(record);
if (matcher.find()) {return buildAccessLogRecord(matcher);
}
return null;
}
/**
* @param request url 申请,类型为字符串,相似于 "GET /the-uri-here HTTP/1.1"
* @return 一个三元组(requestType, uri, httpVersion). requestType 示意申请类型,如 GET, POST 等
*/
public Tuple3<String, String, String> parseRequestField(String request) {
// 申请的字符串格局为:“GET /test.php HTTP/1.1”,用空格切割
String[] arr = request.split(" ");
if (arr.length == 3) {return Tuple3.of(arr[0], arr[1], arr[2]);
} else {return null;}
}
/**
* 将 apache 日志中的英文日期转化为指定格局的中文日期
*
* @param dateTime 传入的 apache 日志中的日期字符串,"[21/Jul/2009:02:48:13 -0700]"
* @return
*/
public String parseDateField(String dateTime) throws ParseException {
// 输出的英文日期格局
String inputFormat = "dd/MMM/yyyy:HH:mm:ss";
// 输入的日期格局
String outPutFormat = "yyyy-MM-dd HH:mm:ss";
String dateRegex = "\\[(.*?) .+]";
Pattern datePattern = Pattern.compile(dateRegex);
Matcher dateMatcher = datePattern.matcher(dateTime);
if (dateMatcher.find()) {String dateString = dateMatcher.group(1);
SimpleDateFormat dateInputFormat = new SimpleDateFormat(inputFormat, Locale.ENGLISH);
Date date = dateInputFormat.parse(dateString);
SimpleDateFormat dateOutFormat = new SimpleDateFormat(outPutFormat);
String formatDate = dateOutFormat.format(date);
return formatDate;
} else {return "";}
}
/**
* 解析 request, 即拜访页面的 url 信息解析
* "GET /about/forum.php?mod=viewthread&tid=5&extra=page%3D1 HTTP/1.1"
* 匹配出拜访的 fid: 版本 id
* 以及 tid:文章 id
* @param request
* @return
*/
public Tuple2<String, String> parseSectionIdAndArticleId(String request) {
// 匹配出后面是 "forumdisplay&fid=" 的数字记为版块 id
String sectionIdRegex = "(\\?mod=forumdisplay&fid=)(\\d+)";
Pattern sectionPattern = Pattern.compile(sectionIdRegex);
// 匹配出后面是 "tid=" 的数字记为文章 id
String articleIdRegex = "(\\?mod=viewthread&tid=)(\\d+)";
Pattern articlePattern = Pattern.compile(articleIdRegex);
String[] arr = request.split(" ");
String sectionId = "";
String articleId = "";
if (arr.length == 3) {Matcher sectionMatcher = sectionPattern.matcher(arr[1]);
Matcher articleMatcher = articlePattern.matcher(arr[1]);
sectionId = (sectionMatcher.find()) ? sectionMatcher.group(2) : "";
articleId = (articleMatcher.find()) ? articleMatcher.group(2) : "";
}
return Tuple2.of(sectionId, articleId);
}
}
LogAnalysis 类
该类是日志解决的根本逻辑
public class LogAnalysis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 checkpoint,工夫距离为毫秒
senv.enableCheckpointing(5000L);
// 抉择状态后端
// 本地测试
// senv.setStateBackend(new FsStateBackend("file:///E://checkpoint"));
// 集群运行
senv.setStateBackend(new FsStateBackend("hdfs://kms-1:8020/flink-checkpoints"));
// 重启策略
senv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings);
// kafka 参数配置
Properties props = new Properties();
// kafka broker 地址
props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
// 消费者组
props.put("group.id", "log_consumer");
// kafka 音讯的 key 序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// kafka 音讯的 value 序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
"user_access_logs",
new SimpleStringSchema(),
props);
DataStreamSource<String> logSource = senv.addSource(kafkaConsumer);
// 获取无效的日志数据
DataStream<AccessLogRecord> availableAccessLog = LogAnalysis.getAvailableAccessLog(logSource);
// 获取[clienIP,accessDate,sectionId,articleId]
DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = LogAnalysis.getFieldFromLog(availableAccessLog);
// 从 DataStream 中创立长期视图, 名称为 logs
// 增加一个计算字段:proctime, 用于维表 JOIN
tEnv.createTemporaryView("logs",
fieldFromLog,
$("clientIP"),
$("accessDate"),
$("sectionId"),
$("articleId"),
$("proctime").proctime());
// 需要 1:统计热门板块
LogAnalysis.getHotSection(tEnv);
// 需要 2:统计热门文章
LogAnalysis.getHotArticle(tEnv);
// 需要 3:统计不同客户端 ip 对版块和文章的总访问量
LogAnalysis.getClientAccess(tEnv);
senv.execute("log-analysisi");
}
/**
* 统计不同客户端 ip 对版块和文章的总访问量
* @param tEnv
*/
private static void getClientAccess(StreamTableEnvironment tEnv) {
// sink 表
// [client_ip,client_access_cnt,statistic_time]
// [客户端 ip, 拜访次数, 统计工夫]
String client_ip_access_ddl = ""+"CREATE TABLE client_ip_access (\n"+" client_ip STRING ,\n"+" client_access_cnt BIGINT,\n"+" statistic_time STRING,\n"+" PRIMARY KEY (client_ip) NOT ENFORCED\n"+")WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n"+" 'table-name' = 'client_ip_access', \n"+" 'driver' = 'com.mysql.jdbc.Driver',\n"+" 'username' = 'root',\n"+" 'password' = '123qwe'\n"+") ";
tEnv.executeSql(client_ip_access_ddl);
String client_ip_access_sql = ""+"INSERT INTO client_ip_access\n"+"SELECT\n"+" clientIP,\n"+" count(1) AS access_cnt,\n"+" FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n"+"FROM\n"+" logs \n"+"WHERE\n"+" articleId <> 0 \n"+" OR sectionId <> 0 \n"+"GROUP BY\n"+" clientIP "
;
tEnv.executeSql(client_ip_access_sql);
}
/**
* 统计热门文章
* @param tEnv
*/
private static void getHotArticle(StreamTableEnvironment tEnv) {
// JDBC 数据源
// 文章 id 及题目对应关系的表,[tid, subject]别离为:文章 id 和题目
String pre_forum_post_ddl = ""+"CREATE TABLE pre_forum_post (\n"+" tid INT,\n"+" subject STRING,\n"+" PRIMARY KEY (tid) NOT ENFORCED\n"+") WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n"+" 'table-name' = 'pre_forum_post', \n"+" 'driver' = 'com.mysql.jdbc.Driver',\n"+" 'username' = 'root',\n"+" 'password' = '123qwe'\n"+")";
// 创立 pre_forum_post 数据源
tEnv.executeSql(pre_forum_post_ddl);
// 创立 MySQL 的 sink 表
// [article_id,subject,article_pv,statistic_time]
// [文章 id, 题目名称, 拜访次数, 统计工夫]
String hot_article_ddl = ""+"CREATE TABLE hot_article (\n"+" article_id INT,\n"+" subject STRING,\n"+" article_pv BIGINT ,\n"+" statistic_time STRING,\n"+" PRIMARY KEY (article_id) NOT ENFORCED\n"+")WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n"+" 'table-name' = 'hot_article', \n"+" 'driver' = 'com.mysql.jdbc.Driver',\n"+" 'username' = 'root',\n"+" 'password' = '123qwe'\n"+")";
tEnv.executeSql(hot_article_ddl);
// 向 MySQL 指标表 insert 数据
String hot_article_sql = ""+"INSERT INTO hot_article\n"+"SELECT \n"+" a.articleId,\n"+" b.subject,\n"+" count(1) as article_pv,\n"+" FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n"+"FROM logs a \n"+" JOIN pre_forum_post FOR SYSTEM_TIME AS OF a.proctime as b ON a.articleId = b.tid\n"+"WHERE a.articleId <> 0\n"+"GROUP BY a.articleId,b.subject\n"+"ORDER BY count(1) desc\n"+"LIMIT 10";
tEnv.executeSql(hot_article_sql);
}
/**
* 统计热门板块
*
* @param tEnv
*/
public static void getHotSection(StreamTableEnvironment tEnv) {// 板块 id 及其名称对应关系表,[fid, name]别离为:版块 id 和板块名称
String pre_forum_forum_ddl = ""+"CREATE TABLE pre_forum_forum (\n"+" fid INT,\n"+" name STRING,\n"+" PRIMARY KEY (fid) NOT ENFORCED\n"+") WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n"+" 'table-name' = 'pre_forum_forum', \n"+" 'driver' = 'com.mysql.jdbc.Driver',\n"+" 'username' = 'root',\n"+" 'password' = '123qwe',\n"+" 'lookup.cache.ttl' = '10',\n"+" 'lookup.cache.max-rows' = '1000'" +
")";
// 创立 pre_forum_forum 数据源
tEnv.executeSql(pre_forum_forum_ddl);
// 创立 MySQL 的 sink 表
// [section_id,name,section_pv,statistic_time]
// [板块 id, 板块名称, 拜访次数, 统计工夫]
String hot_section_ddl = ""+"CREATE TABLE hot_section (\n"+" section_id INT,\n"+" name STRING ,\n"+" section_pv BIGINT,\n"+" statistic_time STRING,\n"+" PRIMARY KEY (section_id) NOT ENFORCED \n"+") WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n"+" 'table-name' = 'hot_section', \n"+" 'driver' = 'com.mysql.jdbc.Driver',\n"+" 'username' = 'root',\n"+" 'password' = '123qwe'\n"+")";
// 创立 sink 表:hot_section
tEnv.executeSql(hot_section_ddl);
// 统计热门板块
// 应用日志流与 MySQL 的维表数据进行 JOIN
// 从而获取板块名称
String hot_section_sql = ""+"INSERT INTO hot_section\n"+"SELECT\n"+" a.sectionId,\n"+" b.name,\n"+" count(1) as section_pv,\n"+" FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time \n"+"FROM\n"+" logs a\n"+" JOIN pre_forum_forum FOR SYSTEM_TIME AS OF a.proctime as b ON a.sectionId = b.fid \n"+"WHERE\n"+" a.sectionId <> 0 \n"+"GROUP BY a.sectionId, b.name\n"+"ORDER BY count(1) desc\n"+"LIMIT 10";
// 执行数据 insert
tEnv.executeSql(hot_section_sql);
}
/**
* 获取[clienIP,accessDate,sectionId,articleId]
* 别离为客户端 ip, 拜访日期, 板块 id, 文章 id
*
* @param logRecord
* @return
*/
public static DataStream<Tuple4<String, String, Integer, Integer>> getFieldFromLog(DataStream<AccessLogRecord> logRecord) {DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = logRecord.map(new MapFunction<AccessLogRecord, Tuple4<String, String, Integer, Integer>>() {
@Override
public Tuple4<String, String, Integer, Integer> map(AccessLogRecord accessLogRecord) throws Exception {LogParse parse = new LogParse();
String clientIpAddress = accessLogRecord.getClientIpAddress();
String dateTime = accessLogRecord.getDateTime();
String request = accessLogRecord.getRequest();
String formatDate = parse.parseDateField(dateTime);
Tuple2<String, String> sectionIdAndArticleId = parse.parseSectionIdAndArticleId(request);
if (formatDate == ""|| sectionIdAndArticleId == Tuple2.of("", "")) {return new Tuple4<String, String, Integer, Integer>("0.0.0.0", "0000-00-00 00:00:00", 0, 0);
}
Integer sectionId = (sectionIdAndArticleId.f0 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f0);
Integer articleId = (sectionIdAndArticleId.f1 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f1);
return new Tuple4<>(clientIpAddress, formatDate, sectionId, articleId);
}
});
return fieldFromLog;
}
/**
* 筛选可用的日志记录
*
* @param accessLog
* @return
*/
public static DataStream<AccessLogRecord> getAvailableAccessLog(DataStream<String> accessLog) {final LogParse logParse = new LogParse();
// 解析原始日志,将其解析为 AccessLogRecord 格局
DataStream<AccessLogRecord> filterDS = accessLog.map(new MapFunction<String, AccessLogRecord>() {
@Override
public AccessLogRecord map(String log) throws Exception {return logParse.parseRecord(log);
}
}).filter(new FilterFunction<AccessLogRecord>() {
// 过滤掉有效日志
@Override
public boolean filter(AccessLogRecord accessLogRecord) throws Exception {return !(accessLogRecord == null);
}
}).filter(new FilterFunction<AccessLogRecord>() {
// 过滤掉状态码非 200 的记录,即保留申请胜利的日志记录
@Override
public boolean filter(AccessLogRecord accessLogRecord) throws Exception {return !accessLogRecord.getHttpStatusCode().equals("200");
}
});
return filterDS;
}
}
将上述代码打包上传到集群运行,在执行提交命令之前,须要先将 Hadoop 的依赖 jar 包搁置在 Flink 装置目录下的 lib 文件下:flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,因为咱们配置了 HDFS 上的状态后端,而 Flink 的 release 包不含有 Hadoop 的依赖 Jar 包。
否则会报如下谬误:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
提交到集群
编写提交命令脚本
#!/bin/bash
/opt/modules/flink-1.11.1/bin/flink run -m kms-1:8081 \
-c com.jmx.analysis.LogAnalysis \
/opt/softwares/com.jmx-1.0-SNAPSHOT.jar
提交之后,拜访 Flink 的 Web 界面,查看工作:
此时拜访论坛,点击板块和帖子文章,察看数据库变动:
总结
本文次要分享了从 0 到 1 构建一个用户行为日志剖析零碎。首先,基于 discuz 搭建了论坛平台,针对论坛产生的日志,应用 Flume 进行收集并 push 到 Kafka 中;接着应用 Flink 对其进行剖析解决;最初将处理结果写入 MySQL 供可视化展现应用。