应用访问地域排名-分析

37次阅读

共计 4438 个字符,预计需要花费 12 分钟才能阅读完成。

应用访问地域排名

题目内容:给定陌陌一段时间的 Nginx AccessLog(多个文件,估计 66G 左右),以最快的方式找到访问次数最多的 5 个 IP。提交脚本或是可执行程序,约定以命令行参数的形式传入文件所在路径。按照次数降序输出 5 个 IP,每个 IP 一行。

已知说明:
 1. Linux Centos7 服务器,配置限制在内存 2G,4 核 CPU
 2. Nginx access log 放置在指定目录下, 文件内容格式
   ‘$remote\_addr\\t-\\t$remote_user\t$time_local\t’
                ‘$http\_x\_forwarded\_for\\t$tcpinfo_rtt\t$tcpinfo\_rttvar\\t$tcpinfo_snd_cwnd\t$tcpinfo_rcv_space\t’
                    ‘$request\_method\\t$host\t$request\_uri\\t$server_protocol\t$request\_length\\t$request_time\t’
                    ‘$status\\t$body_bytes_sent\t$bytes_sent\t’
                    ‘$http\_referer\\t$http_user_agent\t’
                    ‘$connection\\t$connection_requests\t’
                    ‘$upstream\_addr\\t$upstream_status\t$upstream\_response\_length\\t$upstream_response_time\t’
                    ‘$scheme\\t$ssl_session_reused’;
    10.0.0.1 – – 22/Oct/2019:00:00:05 +0800 – 45250 5000 20 14600 POST api.immomo.com /v1/welcome/logs?fr=123456789 HTTP/1.1 567 0.029 200 96 651 – MomoChat/8.20.2 ios/1878 (iPhone 7 Plus; iOS 11.0.3; zh_CN; iPhone9,2; S1) 93983365152 15 10.0.0.1:9000 200 101 0.029 https .
 3. 不限制实现语言,但是不能依赖任何开源的第三方依赖或者服务
 3. 题目输入参数只有一个就是: Accesslog 的文件夹路径
 4. 题目输出需要在程序运行路径下创建 result 的文件,文件内容的格式是:按照访问量倒排的 5 个 IP 和对应的访问次数。
比如:
10.12.12.1    10000
102.12.12.2   9999

评判规则:
统计准确且耗时最短者胜出
2 核 4G  机械硬盘

解题思路

本文下方解题代码是使用思路 1
思路 1:  2.1 直接将 IP 变成十进制 hash 算次数。2.2 mod N 进行堆排序 2.3 进行 N 个堆 TOP10 排序聚合 | 2.4 输出聚合后的堆 TOP10
思路 2: 是否可以组合我们的超大数字 - 组合方式 出现次数 + 十进制数字、堆排序、直接就能得到结果集 - 避免我自建结构体

性能讨论点

耗时分析

    注意本题目给的机器配置是 2 核 4G、对测试数据 (5GB) 进行 如下算法。发现堆排序占用耗时近 300ms 左右、processLine 和 CalculateIp 耗时几秒, 可优化点很少。ReadLine 占比耗时 90%、那么本文重点讨论的就是 ReadLine 读取文件 IO 的性能! 
    我们如果进行多线程读取会不会更快那?继续往下看~

单线程 / 多线程读写文件快慢?

1.     磁盘 IO 单线程顺序读取是最快的?why ?
       如果多线程读取, 磁盘的磁头要不断重新寻址, 导致读取速度慢于单线程
2.     Linux 会对顺序读取 进行预读!      
3.     随机读取多线程大概会比单线程快 N 倍。(取决于线程数量)
4.     多线程 IO, 我们读取的还是同一文件, 就算我们使用 seek+w/r 方式读取的话, 需要加锁。5.     我们每个线程打开一套文件描述符(file 对象), 能否提高 IO? 我们在核心中有 N 个 file 对象, 但是只有一个 inode 对象, 文件读写最终是落到 inode 完成。所以不会提高 IO

结论: 在我们处理大文件读取的时候, 单线程要优于多线程的~

实现代码

package main

import (
    "bufio"
    "container/heap"
    "fmt"
    "io"
    "os"
    "runtime"
    "strconv"
    "strings"
    "time"
)
const N = 256
// 构建 N 个堆
var GlobalIp map[int64]*IpQueue

// 然后 N 个堆 获取 TOP10
var GlobalNum map[int64]int64 // 次数

func ReadLine(filePth string, hookfn func([]byte)) error {f, err := os.Open(filePth)
    if err != nil {return err}
    defer f.Close()

    bfRd := bufio.NewReader(f)
    for {line, err := bfRd.ReadBytes('\n')
        hookfn(line)
        if err != nil {
            if err == io.EOF {return nil}
            return err
        }
    }

}

// 初始化全局变量
func initHeap() {GlobalNum = make(map[int64]int64)
    GlobalIp = make(map[int64]*IpQueue)
    for i := 0; i <= N; i++ {q := make(IpQueue, 1)
        q[0] = &Item{ip: "0.0.0.0", num: -1}
        heap.Init(&q)
        GlobalIp[int64(i)] = &q // 堆给到全局 Global
    }
}

//2.1 直接将 IP 变成十进制 hash 算次数
func processLine(line []byte) {

    var result int
    for i := 7; i <= 15; i++ {if line[i] == '\t' || line[i] == '-' {
            result = i
            break
        }
    }
    str := string(line[0:result])

    ipv4 := CalculateIp(string(str))

    GlobalNum[int64(ipv4)]++
}

//2.2 mod N 进行堆排序
func handleHash() {
    // 堆耗时开始
    timestamp := time.Now().UnixNano() / 1000000
    for k, v := range GlobalNum {heap.Push(GlobalIp[k%N], &Item{ip: RevIp(k), num: int64(v)})
    }
    edgiest := time.Now().UnixNano() / 1000000
    fmt.Println("堆耗时总时间 ms:", edgiest-timestamp)
}

//2.3 进行 N 个堆 TOP10 排序聚合
func polyHeap() {
    // 聚合 N 个 小堆的 top10
    for i := 0; i < N; i++ {
        iterator := 10
        if iterator > GlobalIp[int64(i)].Len() {iterator = GlobalIp[int64(i)].Len()}
        for j := 0; j < iterator; j++ {
            // 写入到堆栈 N
            item := heap.Pop(GlobalIp[int64(i)]).(*Item)
            heap.Push(GlobalIp[N], item)
        }
    }
}

//2.4 输出聚合后的堆 TOP10
func printResult() {
    result := 0
    for result < 10 {item := heap.Pop(GlobalIp[N]).(*Item)
        fmt.Printf("出现的次数:%d|IP:%s \n", item.num, item.ip)
        result++
    }
}

//string 转 IP
func CalculateIp(str string) int64 {x := strings.Split(str, ".")
    b0, _ := strconv.ParseInt(x[0], 10, 0)
    b1, _ := strconv.ParseInt(x[1], 10, 0)
    b2, _ := strconv.ParseInt(x[2], 10, 0)
    b3, _ := strconv.ParseInt(x[3], 10, 0)

    number0 := b0 * 16777216 //256*256*256
    number1 := b1 * 65536    //256*256
    number2 := b2 * 256      //256
    number3 := b3 * 1        //1
    sum := number0 + number1 + number2 + number3
    return sum
}

//ip 转 string
func RevIp(ip int64) string {

    ip0 := ip / 16777216 // 高一位
    ip1 := (ip - ip0*16777216) / 65536
    ip2 := (ip - ip0*16777216 - ip1*65536) / 256
    ip3 := ip - ip0*16777216 - ip1*65536 - ip2*256
    return fmt.Sprintf("%d.%d.%d.%d", ip0, ip1, ip2, ip3)
}

type Item struct {
    ip  string
    num int64
}

type IpQueue []*Item

func (pq IpQueue) Len() int { return len(pq) }

func (pq IpQueue) Less(i, j int) bool {return pq[i].num > pq[j].num
}
func (pq IpQueue) Swap(i, j int) {pq[i], pq[j] = pq[j], pq[i]
}

func (pq *IpQueue) Push(x interface{}) {item := x.(*Item)
    *pq = append(*pq, item)
}

func (pq *IpQueue) Pop() interface{} {
    old := *pq
    n := len(old)
    item := old[n-1]
    *pq = old[0 : n-1]
    return item
}

func main() {runtime.GOMAXPROCS(2)
    timestamp := time.Now().UnixNano() / 1000000

    // 初始化
    initHeap()

    // 串行 读取文件 写入到 hash map
    _ = ReadLine("/Users/admin/Downloads/api.immomo.com-access_10-01.log", processLine)

    // 多个小堆
    handleHash()

    // 聚合堆
    polyHeap()

    // 打印结果

    printResult()

    fmt.Println(time.Now().UnixNano()/1000000 - timestamp)
}

结尾

感谢 信惠敏(陌陌)、李耕勇 的热心支持与讨论、

正文完
 0