用户行为日志剖析是实时数据处理很常见的一个利用场景,比方常见的PV、UV统计。本文将基于Flink从0到1构建一个用户行为日志剖析零碎,包含架构设计与代码实现。本文分享将残缺出现日志剖析零碎的数据处理链路,通过本文,你能够理解到:
- 基于discuz搭建一个论坛平台
- Flume日志收集零碎应用形式
- Apache日志格局剖析
- Flume与Kafka集成
- 日志剖析解决流程
- 架构设计与残缺的代码实现
我的项目简介
本文分享会从0到1基于Flink实现一个实时的用户行为日志剖析零碎,根本架构图如下:
首先会先搭建一个论坛平台,对论坛平台产生的用户点击日志进行剖析。而后应用Flume日志收集系统对产生的Apache日志进行收集,并将其推送到Kafka。接着咱们应用Flink对日志进行实时剖析解决,将解决之后的后果写入MySQL供前端利用可视化展现。本文次要实现以下三个指标计算:
- 统计热门板块,即访问量最高的板块
- 统计热门文章,即访问量最高的帖子文章
- 统计不同客户端对版块和文章的总访问量
基于discuz搭建一个论坛平台
装置XAMPP
- 下载
wget https://www.apachefriends.org/xampp-files/5.6.33/xampp-linux-x64-5.6.33-0-installer.run
- 装置
# 赋予文件执行权限chmod u+x xampp-linux-x64-5.6.33-0-installer.run# 运行安装文件./xampp-linux-x64-5.6.33-0-installer.run
- 配置环境变量
将以下内容退出到 ~/.bash_profile
export XAMPP=/opt/lampp/export PATH=$PATH:$XAMPP:$XAMPP/bin
- 刷新环境变量
source ~/.bash_profile
- 启动XAMPP
xampp restart
- MySQL的root用户明码和权限批改
#批改root用户明码为123qwe update mysql.user set password=PASSWORD('123qwe') where user='root'; flush privileges; #赋予root用户近程登录权限 grant all privileges on *.* to 'root'@'%' identified by '123qwe' with grant option;flush privileges;
装置Discuz
- 下载discuz
wget http://download.comsenz.com/DiscuzX/3.2/Discuz_X3.2_SC_UTF8.zip
- 装置
#删除原有的web利用 rm -rf /opt/lampp/htdocs/*unzip Discuz_X3.2_SC_UTF8.zip –d /opt/lampp/htdocs/cd /opt/lampp/htdocs/ mv upload/* #批改目录权限 chmod 777 -R /opt/lampp/htdocs/config/chmod 777 -R /opt/lampp/htdocs/data/chmod 777 -R /opt/lampp/htdocs/uc_client/ chmod 777 -R /opt/lampp/htdocs/uc_server/
Discuz基本操作
- 自定义版块
- 进入discuz后盾:http://kms-4/admin.php
- 点击顶部的论坛菜单
- 依照页面提醒创立所需版本,能够创立父子版块
Discuz帖子/版块存储数据库表介
-- 登录ultrax数据库mysql -uroot -p123 ultrax -- 查看蕴含帖子id及题目对应关系的表-- tid, subject(文章id、题目)select tid, subject from pre_forum_post limit 10;-- fid, name(版块id、题目)select fid, name from pre_forum_forum limit 40;
当咱们在各个板块增加帖子之后,如下所示:
批改日志格局
- 查看拜访日志
# 日志默认地址 /opt/lampp/logs/access_log # 实时查看日志命令 tail –f /opt/lampp/logs/access_log
- 批改日志格局
Apache配置文件名称为httpd.conf,残缺门路为/opt/lampp/etc/httpd.conf
。因为默认的日志类型为common类型,总共有7个字段。为了获取更多的日志信息,咱们须要将其格局批改为combined格局,该日志格局共有9个字段。批改形式如下:
# 启用组合日志文件CustomLog "logs/access_log" combined
- 从新加载配置文件
xampp reload
Apache日志格局介绍
192.168.10.1 - - [30/Aug/2020:15:53:15 +0800] "GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" 200 30647 "http://kms-4/forum.php" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"
下面的日志格局共有9个字段,别离用空格隔开。每个字段的具体含意如下:
192.168.10.1 ##(1)客户端的IP地址- ## (2)客户端identity标识,该字段为"-"- ## (3)客户端userid标识,该字段为"-"[30/Aug/2020:15:53:15 +0800] ## (4)服务器实现申请解决时的工夫"GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" ## (5)申请类型 申请的资源 应用的协定200 ## (6)服务器返回给客户端的状态码,200示意胜利30647 ## (7)返回给客户端不包含响应头的字节数,如果没有信息返回,则此项应该是"-""http://kms-4/forum.php" ## (8)Referer申请头"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36" ## (9)客户端的浏览器信息
对于下面的日志格局,能够应用正则表达式进行匹配:
(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) (\S+) (\S+) (\[.+?\]) (\"(.*?)\") (\d{3}) (\S+) (\"(.*?)\") (\"(.*?)\")
Flume与Kafka集成
本文应用Flume对产生的Apache日志进行收集,而后推送至Kafka。须要启动Flume agent对日志进行收集,对应的配置文件如下:
# agent的名称为a1a1.sources = source1a1.channels = channel1a1.sinks = sink1# set sourcea1.sources.source1.type = TAILDIRa1.sources.source1.filegroups = f1a1.sources.source1.filegroups.f1 = /opt/lampp/logs/access_loga1sources.source1.fileHeader = flase# 配置sinka1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.sink1.brokerList=kms-2:9092,kms-3:9092,kms-4:9092a1.sinks.sink1.topic= user_access_logsa1.sinks.sink1.kafka.flumeBatchSize = 20a1.sinks.sink1.kafka.producer.acks = 1a1.sinks.sink1.kafka.producer.linger.ms = 1a1.sinks.sink1.kafka.producer.compression.type = snappy# 配置channela1.channels.channel1.type = filea1.channels.channel1.checkpointDir = /home/kms/data/flume_data/checkpointa1.channels.channel1.dataDirs= /home/kms/data/flume_data/data# 配置binda1.sources.source1.channels = channel1a1.sinks.sink1.channel = channel1
知识点:Taildir Source相比Exec Source、Spooling Directory Source的劣势是什么?
TailDir Source:断点续传、多目录。Flume1.6以前须要本人自定义Source记录每次读取文件地位,实现断点续传
Exec Source:能够实时收集数据,然而在Flume不运行或者Shell命令出错的状况下,数据将会失落
Spooling Directory Source:监控目录,不反对断点续传
值得注意的是,下面的配置是间接将原始日志push到Kafka。除此之外,咱们还能够自定义Flume的拦截器对原始日志先进行过滤解决,同时也能够实现将不同的日志push到Kafka的不同Topic中。
启动Flume Agent
将启动Agent的命令封装成shell脚本:start-log-collection.sh ,脚本内容如下:
#!/bin/bashecho "start log agent !!!"/opt/modules/apache-flume-1.9.0-bin/bin/flume-ng agent --conf-file /opt/modules/apache-flume-1.9.0-bin/conf/log_collection.conf --name a1 -Dflume.root.logger=INFO,console
查看push到Kafka的日志数据
将控制台消费者命令封装成shell脚本:kafka-consumer.sh,脚本内容如下:
#!/bin/bashecho "kafka consumer "bin/kafka-console-consumer.sh --bootstrap-server kms-2.apache.com:9092,kms-3.apache.com:9092,kms-4.apache.com:9092 --topic $1 --from-beginning
应用上面命令生产Kafka中的数据:
[kms@kms-2 kafka_2.11-2.1.0]$ ./kafka-consumer.sh user_access_logs
日志剖析解决流程
为了不便解释,上面会对重要代码进行解说,残缺代码移步github:https://github.com/jiamx/flin...
创立MySQL数据库和指标表
-- 客户端访问量统计CREATE TABLE `client_ip_access` ( `client_ip` char(50) NOT NULL COMMENT '客户端ip', `client_access_cnt` bigint(20) NOT NULL COMMENT '拜访次数', `statistic_time` text NOT NULL COMMENT '统计工夫', PRIMARY KEY (`client_ip`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 热门文章统计CREATE TABLE `hot_article` ( `article_id` int(10) NOT NULL COMMENT '文章id', `subject` varchar(80) NOT NULL COMMENT '文章题目', `article_pv` bigint(20) NOT NULL COMMENT '拜访次数', `statistic_time` text NOT NULL COMMENT '统计工夫', PRIMARY KEY (`article_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 热门板块统计CREATE TABLE `hot_section` ( `section_id` int(10) NOT NULL COMMENT '版块id', `name` char(50) NOT NULL COMMENT '版块题目', `section_pv` bigint(20) NOT NULL COMMENT '拜访次数', `statistic_time` text NOT NULL COMMENT '统计工夫', PRIMARY KEY (`section_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
AccessLogRecord类
该类封装了日志所蕴含的字段数据,共有9个字段。
/** * 应用lombok * 原始日志封装类 */@Datapublic class AccessLogRecord { public String clientIpAddress; // 客户端ip地址 public String clientIdentity; // 客户端身份标识,该字段为 `-` public String remoteUser; // 用户标识,该字段为 `-` public String dateTime; //日期,格局为[day/month/yearhourminutesecond zone] public String request; // url申请,如:`GET /foo ...` public String httpStatusCode; // 状态码,如:200; 404. public String bytesSent; // 传输的字节数,有可能是 `-` public String referer; // 参考链接,即起源页 public String userAgent; // 浏览器和操作系统类型}
LogParse类
该类是日志解析类,通过正则表达式对日志进行匹配,对匹配上的日志进行依照字段解析。
public class LogParse implements Serializable { //构建正则表达式 private String regex = "(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) (\\S+) (\\S+) (\\[.+?\\]) (\\\"(.*?)\\\") (\\d{3}) (\\S+) (\\\"(.*?)\\\") (\\\"(.*?)\\\")"; private Pattern p = Pattern.compile(regex); /* *结构拜访日志的封装类对象 * */ public AccessLogRecord buildAccessLogRecord(Matcher matcher) { AccessLogRecord record = new AccessLogRecord(); record.setClientIpAddress(matcher.group(1)); record.setClientIdentity(matcher.group(2)); record.setRemoteUser(matcher.group(3)); record.setDateTime(matcher.group(4)); record.setRequest(matcher.group(5)); record.setHttpStatusCode(matcher.group(6)); record.setBytesSent(matcher.group(7)); record.setReferer(matcher.group(8)); record.setUserAgent(matcher.group(9)); return record; } /** * @param record:record示意一条apache combined 日志 * @return 解析日志记录,将解析的日志封装成一个AccessLogRecord类 */ public AccessLogRecord parseRecord(String record) { Matcher matcher = p.matcher(record); if (matcher.find()) { return buildAccessLogRecord(matcher); } return null; } /** * @param request url申请,类型为字符串,相似于 "GET /the-uri-here HTTP/1.1" * @return 一个三元组(requestType, uri, httpVersion). requestType示意申请类型,如GET, POST等 */ public Tuple3<String, String, String> parseRequestField(String request) { //申请的字符串格局为:“GET /test.php HTTP/1.1”,用空格切割 String[] arr = request.split(" "); if (arr.length == 3) { return Tuple3.of(arr[0], arr[1], arr[2]); } else { return null; } } /** * 将apache日志中的英文日期转化为指定格局的中文日期 * * @param dateTime 传入的apache日志中的日期字符串,"[21/Jul/2009:02:48:13 -0700]" * @return */ public String parseDateField(String dateTime) throws ParseException { // 输出的英文日期格局 String inputFormat = "dd/MMM/yyyy:HH:mm:ss"; // 输入的日期格局 String outPutFormat = "yyyy-MM-dd HH:mm:ss"; String dateRegex = "\\[(.*?) .+]"; Pattern datePattern = Pattern.compile(dateRegex); Matcher dateMatcher = datePattern.matcher(dateTime); if (dateMatcher.find()) { String dateString = dateMatcher.group(1); SimpleDateFormat dateInputFormat = new SimpleDateFormat(inputFormat, Locale.ENGLISH); Date date = dateInputFormat.parse(dateString); SimpleDateFormat dateOutFormat = new SimpleDateFormat(outPutFormat); String formatDate = dateOutFormat.format(date); return formatDate; } else { return ""; } } /** * 解析request,即拜访页面的url信息解析 * "GET /about/forum.php?mod=viewthread&tid=5&extra=page%3D1 HTTP/1.1" * 匹配出拜访的fid:版本id * 以及tid:文章id * @param request * @return */ public Tuple2<String, String> parseSectionIdAndArticleId(String request) { // 匹配出后面是"forumdisplay&fid="的数字记为版块id String sectionIdRegex = "(\\?mod=forumdisplay&fid=)(\\d+)"; Pattern sectionPattern = Pattern.compile(sectionIdRegex); // 匹配出后面是"tid="的数字记为文章id String articleIdRegex = "(\\?mod=viewthread&tid=)(\\d+)"; Pattern articlePattern = Pattern.compile(articleIdRegex); String[] arr = request.split(" "); String sectionId = ""; String articleId = ""; if (arr.length == 3) { Matcher sectionMatcher = sectionPattern.matcher(arr[1]); Matcher articleMatcher = articlePattern.matcher(arr[1]); sectionId = (sectionMatcher.find()) ? sectionMatcher.group(2) : ""; articleId = (articleMatcher.find()) ? articleMatcher.group(2) : ""; } return Tuple2.of(sectionId, articleId); }}
LogAnalysis类
该类是日志解决的根本逻辑
public class LogAnalysis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint,工夫距离为毫秒 senv.enableCheckpointing(5000L); // 抉择状态后端 // 本地测试 // senv.setStateBackend(new FsStateBackend("file:///E://checkpoint")); // 集群运行 senv.setStateBackend(new FsStateBackend("hdfs://kms-1:8020/flink-checkpoints")); // 重启策略 senv.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) )); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings); // kafka参数配置 Properties props = new Properties(); // kafka broker地址 props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092"); // 消费者组 props.put("group.id", "log_consumer"); // kafka 音讯的key序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // kafka 音讯的value序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "user_access_logs", new SimpleStringSchema(), props); DataStreamSource<String> logSource = senv.addSource(kafkaConsumer); // 获取无效的日志数据 DataStream<AccessLogRecord> availableAccessLog = LogAnalysis.getAvailableAccessLog(logSource); // 获取[clienIP,accessDate,sectionId,articleId] DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = LogAnalysis.getFieldFromLog(availableAccessLog); //从DataStream中创立长期视图,名称为logs // 增加一个计算字段:proctime,用于维表JOIN tEnv.createTemporaryView("logs", fieldFromLog, $("clientIP"), $("accessDate"), $("sectionId"), $("articleId"), $("proctime").proctime()); // 需要1:统计热门板块 LogAnalysis.getHotSection(tEnv); // 需要2:统计热门文章 LogAnalysis.getHotArticle(tEnv); // 需要3:统计不同客户端ip对版块和文章的总访问量 LogAnalysis.getClientAccess(tEnv); senv.execute("log-analysisi"); } /** * 统计不同客户端ip对版块和文章的总访问量 * @param tEnv */ private static void getClientAccess(StreamTableEnvironment tEnv) { // sink表 // [client_ip,client_access_cnt,statistic_time] // [客户端ip,拜访次数,统计工夫] String client_ip_access_ddl = "" + "CREATE TABLE client_ip_access (\n" + " client_ip STRING ,\n" + " client_access_cnt BIGINT,\n" + " statistic_time STRING,\n" + " PRIMARY KEY (client_ip) NOT ENFORCED\n" + ")WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'client_ip_access', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ") "; tEnv.executeSql(client_ip_access_ddl); String client_ip_access_sql = "" + "INSERT INTO client_ip_access\n" + "SELECT\n" + " clientIP,\n" + " count(1) AS access_cnt,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" + "FROM\n" + " logs \n" + "WHERE\n" + " articleId <> 0 \n" + " OR sectionId <> 0 \n" + "GROUP BY\n" + " clientIP " ; tEnv.executeSql(client_ip_access_sql); } /** * 统计热门文章 * @param tEnv */ private static void getHotArticle(StreamTableEnvironment tEnv) { // JDBC数据源 // 文章id及题目对应关系的表,[tid, subject]别离为:文章id和题目 String pre_forum_post_ddl = "" + "CREATE TABLE pre_forum_post (\n" + " tid INT,\n" + " subject STRING,\n" + " PRIMARY KEY (tid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" + " 'table-name' = 'pre_forum_post', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ")"; // 创立pre_forum_post数据源 tEnv.executeSql(pre_forum_post_ddl); // 创立MySQL的sink表 // [article_id,subject,article_pv,statistic_time] // [文章id,题目名称,拜访次数,统计工夫] String hot_article_ddl = "" + "CREATE TABLE hot_article (\n" + " article_id INT,\n" + " subject STRING,\n" + " article_pv BIGINT ,\n" + " statistic_time STRING,\n" + " PRIMARY KEY (article_id) NOT ENFORCED\n" + ")WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'hot_article', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ")"; tEnv.executeSql(hot_article_ddl); // 向MySQL指标表insert数据 String hot_article_sql = "" + "INSERT INTO hot_article\n" + "SELECT \n" + " a.articleId,\n" + " b.subject,\n" + " count(1) as article_pv,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" + "FROM logs a \n" + " JOIN pre_forum_post FOR SYSTEM_TIME AS OF a.proctime as b ON a.articleId = b.tid\n" + "WHERE a.articleId <> 0\n" + "GROUP BY a.articleId,b.subject\n" + "ORDER BY count(1) desc\n" + "LIMIT 10"; tEnv.executeSql(hot_article_sql); } /** * 统计热门板块 * * @param tEnv */ public static void getHotSection(StreamTableEnvironment tEnv) { // 板块id及其名称对应关系表,[fid, name]别离为:版块id和板块名称 String pre_forum_forum_ddl = "" + "CREATE TABLE pre_forum_forum (\n" + " fid INT,\n" + " name STRING,\n" + " PRIMARY KEY (fid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" + " 'table-name' = 'pre_forum_forum', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe',\n" + " 'lookup.cache.ttl' = '10',\n" + " 'lookup.cache.max-rows' = '1000'" + ")"; // 创立pre_forum_forum数据源 tEnv.executeSql(pre_forum_forum_ddl); // 创立MySQL的sink表 // [section_id,name,section_pv,statistic_time] // [板块id,板块名称,拜访次数,统计工夫] String hot_section_ddl = "" + "CREATE TABLE hot_section (\n" + " section_id INT,\n" + " name STRING ,\n" + " section_pv BIGINT,\n" + " statistic_time STRING,\n" + " PRIMARY KEY (section_id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'hot_section', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ")"; // 创立sink表:hot_section tEnv.executeSql(hot_section_ddl); //统计热门板块 // 应用日志流与MySQL的维表数据进行JOIN // 从而获取板块名称 String hot_section_sql = "" + "INSERT INTO hot_section\n" + "SELECT\n" + " a.sectionId,\n" + " b.name,\n" + " count(1) as section_pv,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time \n" + "FROM\n" + " logs a\n" + " JOIN pre_forum_forum FOR SYSTEM_TIME AS OF a.proctime as b ON a.sectionId = b.fid \n" + "WHERE\n" + " a.sectionId <> 0 \n" + "GROUP BY a.sectionId, b.name\n" + "ORDER BY count(1) desc\n" + "LIMIT 10"; // 执行数据insert tEnv.executeSql(hot_section_sql); } /** * 获取[clienIP,accessDate,sectionId,articleId] * 别离为客户端ip,拜访日期,板块id,文章id * * @param logRecord * @return */ public static DataStream<Tuple4<String, String, Integer, Integer>> getFieldFromLog(DataStream<AccessLogRecord> logRecord) { DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = logRecord.map(new MapFunction<AccessLogRecord, Tuple4<String, String, Integer, Integer>>() { @Override public Tuple4<String, String, Integer, Integer> map(AccessLogRecord accessLogRecord) throws Exception { LogParse parse = new LogParse(); String clientIpAddress = accessLogRecord.getClientIpAddress(); String dateTime = accessLogRecord.getDateTime(); String request = accessLogRecord.getRequest(); String formatDate = parse.parseDateField(dateTime); Tuple2<String, String> sectionIdAndArticleId = parse.parseSectionIdAndArticleId(request); if (formatDate == "" || sectionIdAndArticleId == Tuple2.of("", "")) { return new Tuple4<String, String, Integer, Integer>("0.0.0.0", "0000-00-00 00:00:00", 0, 0); } Integer sectionId = (sectionIdAndArticleId.f0 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f0); Integer articleId = (sectionIdAndArticleId.f1 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f1); return new Tuple4<>(clientIpAddress, formatDate, sectionId, articleId); } }); return fieldFromLog; } /** * 筛选可用的日志记录 * * @param accessLog * @return */ public static DataStream<AccessLogRecord> getAvailableAccessLog(DataStream<String> accessLog) { final LogParse logParse = new LogParse(); //解析原始日志,将其解析为AccessLogRecord格局 DataStream<AccessLogRecord> filterDS = accessLog.map(new MapFunction<String, AccessLogRecord>() { @Override public AccessLogRecord map(String log) throws Exception { return logParse.parseRecord(log); } }).filter(new FilterFunction<AccessLogRecord>() { //过滤掉有效日志 @Override public boolean filter(AccessLogRecord accessLogRecord) throws Exception { return !(accessLogRecord == null); } }).filter(new FilterFunction<AccessLogRecord>() { //过滤掉状态码非200的记录,即保留申请胜利的日志记录 @Override public boolean filter(AccessLogRecord accessLogRecord) throws Exception { return !accessLogRecord.getHttpStatusCode().equals("200"); } }); return filterDS; }}
将上述代码打包上传到集群运行,在执行提交命令之前,须要先将Hadoop的依赖jar包搁置在Flink装置目录下的lib文件下:flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,因为咱们配置了HDFS上的状态后端,而Flink的release包不含有Hadoop的依赖Jar包。
否则会报如下谬误:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
提交到集群
编写提交命令脚本
#!/bin/bash/opt/modules/flink-1.11.1/bin/flink run -m kms-1:8081 \-c com.jmx.analysis.LogAnalysis \/opt/softwares/com.jmx-1.0-SNAPSHOT.jar
提交之后,拜访Flink的Web界面,查看工作:
此时拜访论坛,点击板块和帖子文章,察看数据库变动:
总结
本文次要分享了从0到1构建一个用户行为日志剖析零碎。首先,基于discuz搭建了论坛平台,针对论坛产生的日志,应用Flume进行收集并push到Kafka中;接着应用Flink对其进行剖析解决;最初将处理结果写入MySQL供可视化展现应用。