乐趣区

关于大数据:大数据开发SparkRDD实操案例http日志分析

1. 在生产环境下,如何解决配置文件 && 表的数据处理

配置文件,或者配置表,个别是放在在线 db,比方 mysql 等关系型数据库,或者后盾 rd 间接丢给你一份文件,数据量比起整个离线数据仓库的大表来说算很小,所以这种状况下,个别的做法是将小表,或者小文件播送进来,那么上面一个例子来看,播送表的应用解决 ip 地址映射问题

数据地址:链接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ 提取码:hell

2. 日志剖析案例 1

2.1 数据阐明

http.log

用户拜访网站所产生的日志。日志格局为:工夫戳、IP 地址、拜访网址、拜访数据、浏览器信息等,样例如下:

ip.dat:ip 段数据,记录着一些 ip 段范畴对应的地位,总量大略在 11 万条,数据量也算很小的,样例如下

文件地位:data/http.log、data/ip.dat

链接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ 提取码:hell

要求:将 http.log 文件中的 ip 转换为地址。如将 122.228.96.111 转为温州,并统计各城市的总访问量

2.2. 实现思路和代码如下

有三个关键点,http.log 的要害信息是 ip 地址,所以依据数据的精简准则,只读取 ip 即可,另外 ip 映射比对的时候,ip 地址映射文件是排序的,所以为了进步查找效率,采纳将 ip 地址转为 long 类型,而后再用二分法来查找,找到地址后映射为地址。

package com.hoult.work

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * 数据源:1.ip 地址的拜访日志 2.ip 地址映射表
 * 须要把映射表播送,地址转换为 long 类型进行比拟
 */
object FindIp {def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()
    val sc = spark.sparkContext

    import spark.implicits._
    val ipLogsRDD = sc.textFile("data/http.log")
      .map(_.split("\\|")(1))


    val ipInfoRDD = sc.textFile("data/ip.dat").map {
      case line: String => {val strSplit: Array[String] = line.split("\\|")
        Ip(strSplit(0), strSplit(1), strSplit(7))
      }
    }


    val brIPInfo = sc.broadcast(ipInfoRDD.map(x => (ip2Long(x.startIp), ip2Long(x.endIp), x.address))collect())

    // 关联后的后果 rdd
    ipLogsRDD
      .map(x => {val index  = binarySearch(brIPInfo.value, ip2Long(x))
        if (index != -1)
          brIPInfo.value(index)._3
        else
          "NULL"
      }).map(x => (x, 1))
      .reduceByKey(_ + _)
      .map(x => s"城市:${x._1}, 访问量:${x._2}")
      .saveAsTextFile("data/work/output_ips")

  }

  //ip 转成 long 类型
  def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i <- 0 until fragments.length) {ipNum = fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }

  // 二分法匹配 ip 规定
  def binarySearch(lines: Array[(Long, Long, String)], ip: Long): Int = {
    var low = 0
    var high = lines.length - 1
    while (low <= high) {val middle = (low + high) / 2
      if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
        return middle
      if (ip < lines(middle)._1)
        high = middle - 1
      else {low = middle + 1}
    }
    -1
  }

}

case class Ip(startIp: String, endIp: String, address: String)

后果截图如下:

3. 日志剖析案例 2

3.1 数据阐明

日志格局 :IP 命中率(Hit/Miss) 响应工夫申请工夫申请办法申请 URL 申请协定状态码响应大小 referer 用户代理

日志文件地位:data/cdn.txt

数据 case:

工作

2.1、计算独立 IP 数

2.2、统计每个视频独立 IP 数(视频的标记:在日志文件的某些能够找到 *.mp4,代表一个视频文件)

2.3、统计一天中每个小时的流量

剖析 :刚开始去找格林工夫的 jod-time 解析,找了一圈不晓得该怎么写,前面发现只须要小时即可,应用正则来提取, 留神在求 video 的拜访 ip 时候,能够用aggregateByKey 来进步性能

3.2 实现代码

package com.hoult.work

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * 读取日志表到 rdd
 * 拿到须要的字段:ip, 拜访工夫:小时即可, 视频名 video_name (url 中的 xx.mp4),
 * 剖析:* 1. 计算独立 IP 数
 * 2. 统计每个视频独立 IP 数(视频的标记:在日志文件的某些能够找到 *.mp4,代表一个视频文件)* 3. 统计一天中每个小时的流量
 */
object LogAnaylse {def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()
    val sc = spark.sparkContext


    val cdnRDD = sc.textFile("data/cdn.txt")

    // 计算独立 ips
//    aloneIPs(cdnRDD.repartition(1))

    // 每个视频独立 ip 数
//    videoIPs(cdnRDD.repartition(1))

    // 每小时流量
    hourPoor(cdnRDD.repartition(1))
  }



  /**
   * 独立 ip 数
   */
  def aloneIPs(cdnRDD: RDD[String]) = {
    // 匹配 ip 地址
    val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r

    val ipnums = cdnRDD
      .flatMap(x => (IPPattern findFirstIn x))
      .map(y => (y,1))
      .reduceByKey(_+_)
      .sortBy(_._2,false)

    ipnums.saveAsTextFile("data/cdn/aloneIPs")
  }

  /**
   * 视频独立 ip 数
   */
  def videoIPs(cdnRDD: RDD[String]) = {
    // 匹配 http 响应码和申请数据大小
    val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r


    //[15/Feb/2017:11:17:13 +0800]  匹配 2017:11 按每小时播放量统计
    val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r

    import scala.util.matching.Regex

    // Entering paste mode (ctrl-D to finish)

    def isMatch(pattern: Regex, str: String) = {
      str match {case pattern(_*) => true
        case _ => false
      }
    }

    def getTimeAndSize(line: String) = {var res = ("", 0L)
      try {val httpSizePattern(code, size) = line
        val timePattern(year, hour) = line
        res = (hour, size.toLong)
      } catch {case ex: Exception => ex.printStackTrace()
      }
      res
    }

    val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r

    val videoPattern = "([0-9]+).mp4".r

    val res = cdnRDD
      .filter(x => x.matches(".*([0-9]+)\\.mp4.*"))
      .map(x => (videoPattern findFirstIn x toString,IPPattern findFirstIn x toString))
      .aggregateByKey(List[String]())((lst, str) => (lst :+ str),
        (lst1, lst2) => (lst1 ++ lst2)
      )
      .mapValues(_.distinct)
      .sortBy(_._2.size,false)

      res.saveAsTextFile("data/cdn/videoIPs")
  }

  /**
   * 一天中每个小时的流量
   *
   */
  def hourPoor(cdnRDD: RDD[String]) = {val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r
    val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r
    import scala.util.matching.Regex

    def isMatch(pattern: Regex, str: String) = {
      str match {case pattern(_*) => true
        case _ => false
      }
    }

    def getTimeAndSize(line: String) = {var res = ("", 0L)
      try {val httpSizePattern(code, size) = line
        val timePattern(year, hour) = line
        res = (hour, size.toLong)
      } catch {case ex: Exception => ex.printStackTrace()
      }
      res
    }

    cdnRDD
      .filter(x=>isMatch(httpSizePattern,x))
      .filter(x=>isMatch(timePattern,x))
      .map(x=>getTimeAndSize(x))
      .groupByKey()
      .map(x=>(x._1,x._2.sum))
      .sortByKey()
      .map(x=>x._1+"时 CDN 流量 ="+x._2/(102424*1024)+"G")
      .saveAsTextFile("data/cdn/hourPoor")
  }
}

运行后果截图:



4. 广告曝光剖析案例

假如点击日志文件 (click.log) 和曝光日志 imp.log, 中每行记录格局如下

// 点击日志
INFO 2019-09-01 00:29:53 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:30:31 requestURI:/click?app=2&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:31:03 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=32
INFO 2019-09-01 00:31:51 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=33

// 曝光日志
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=34 

用 Spark-Core 实现统计每个 adid 的曝光数与点击数,思路较简略,间接上代码

代码:

package com.hoult.work

import org.apache.spark.sql.SparkSession

object AddLog {def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()
    val sc = spark.sparkContext

    val clickRDD = sc.textFile("data/click.log")
    val impRDD = sc.textFile("data/imp.log")

    val clickRes = clickRDD.map{line => {val arr = line.split("\\s+")
      val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
      (adid, 1)
    }}.reduceByKey(_ + _)

    val impRes = impRDD.map { line =>
      val arr = line.split("\\s+")
      val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
      (adid, 1)
    }.reduceByKey(_ + _)

    // 保留到 hdfs
    clickRes.fullOuterJoin(impRes)
      .map(x => x._1 + "," + x._2._1.getOrElse(0) + "," + x._2._2.getOrElse(0))
      .repartition(1)
//      .saveAsTextFile("hdfs://linux121:9000/data/")
      .saveAsTextFile("data/add_log")

    sc.stop()}
}

剖析:共有两次 shuffle, fulljon 能够批改为 union + reduceByKey,将shuffle 缩小到一次

5. 应用 spark-sql 实现上面的转换

A 表有三个字段:ID、startdate、enddate,有 3 条数据:

1 2019-03-04 2020-02-03

2 2020-04-05 2020-08-04

3 2019-10-09 2020-06-11

写 SQL(须要 SQL 和 DSL)将以上数据变动为:

2019-03-04 2019-10-09

2019-10-09 2020-02-03

2020-02-03 2020-04-05

2020-04-05 2020-06-11

2020-06-11 2020-08-04

2020-08-04 2020-08-04

剖析:察看,能够失去,第一列实际上是 startdate 和 enddate 两列叠加的后果,而第二列是下一个,能够用 lead

窗口函数

代码如下

package com.hoult.work

import org.apache.spark.sql.{DataFrame, SparkSession}

object DataExchange {def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("DateSort")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("warn")

    // 原数据
    val tab = List((1, "2019-03-04", "2020-02-03"),(2, "2020-04-05", "2020-08-04"),(3, "2019-10-09", "2020-06-11"))
    val df: DataFrame = spark.createDataFrame(tab).toDF("ID", "startdate", "enddate")

    val dateset: DataFrame = df.select("startdate").union(df.select("enddate"))
    dateset.createOrReplaceTempView("t")

    val result: DataFrame = spark.sql(
      """
        |select tmp.startdate, nvl(lead(tmp.startdate) over(partition by col order by tmp.startdate), startdate) enddate from
        |(select "1" col, startdate from t) tmp
        |""".stripMargin)

    result.show()}

}

运行后果


吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注

退出移动版