应用访问地域排名
题目内容:给定陌陌一段时间的Nginx AccessLog(多个文件,估计66G左右),以最快的方式找到访问次数最多的5个IP。提交脚本或是可执行程序,约定以命令行参数的形式传入文件所在路径。按照次数降序输出5个IP,每个IP一行。
已知说明:
1. Linux Centos7服务器,配置限制在内存2G,4核CPU
2. Nginx access log 放置在指定目录下, 文件内容格式
'$remote\_addr\\t-\\t$remote_user\t$time_local\t'
'$http\_x\_forwarded\_for\\t$tcpinfo_rtt\t$tcpinfo\_rttvar\\t$tcpinfo_snd_cwnd\t$tcpinfo_rcv_space\t'
'$request\_method\\t$host\t$request\_uri\\t$server_protocol\t$request\_length\\t$request_time\t'
'$status\\t$body_bytes_sent\t$bytes_sent\t'
'$http\_referer\\t$http_user_agent\t'
'$connection\\t$connection_requests\t'
'$upstream\_addr\\t$upstream_status\t$upstream\_response\_length\\t$upstream_response_time\t'
'$scheme\\t$ssl_session_reused';
10.0.0.1 - - 22/Oct/2019:00:00:05 +0800 - 45250 5000 20 14600 POST api.immomo.com /v1/welcome/logs?fr=123456789 HTTP/1.1 567 0.029 200 96 651 - MomoChat/8.20.2 ios/1878 (iPhone 7 Plus; iOS 11.0.3; zh_CN; iPhone9,2; S1) 93983365152 15 10.0.0.1:9000 200 101 0.029 https .
3. 不限制实现语言,但是不能依赖任何开源的第三方依赖或者服务
3. 题目输入参数只有一个就是: Accesslog的文件夹路径
4. 题目输出需要在程序运行路径下创建result的文件,文件内容的格式是:按照访问量倒排的5个IP和对应的访问次数。
比如:
10.12.12.1 10000
102.12.12.2 9999
...
评判规则:
统计准确且耗时最短者胜出
2核4G 机械硬盘
解题思路
本文下方解题代码是使用思路1
思路1: 2.1 直接将IP变成十进制 hash算次数。2.2 mod N 进行堆排序 2.3 进行N个堆TOP10 排序聚合 | 2.4 输出聚合后的堆TOP10
思路2: 是否可以组合我们的超大数字- 组合方式 出现次数+十进制数字、堆排序、直接就能得到结果集-避免我自建结构体
性能讨论点
耗时分析
注意本题目给的机器配置是2核4G、
对测试数据(5GB)进行 如下算法。发现堆排序占用耗时近300ms 左右、
processLine 和CalculateIp 耗时几秒,可优化点很少。
ReadLine 占比耗时90%、那么本文重点讨论的就是ReadLine 读取文件IO 的性能!
我们如果进行多线程读取会不会更快那?继续往下看~
单线程/多线程读写文件快慢?
1. 磁盘IO 单线程顺序读取是最快的?why ?
如果多线程读取,磁盘的磁头要不断重新寻址,导致读取速度慢于单线程
2. Linux会对顺序读取 进行预读!
3. 随机读取多线程大概会比单线程快N倍。(取决于线程数量)
4. 多线程IO,我们读取的还是同一文件,就算我们使用seek+w/r 方式读取的话,需要加锁。
5. 我们每个线程打开一套文件描述符(file 对象),能否提高IO?我们在核心中有N个file对象,但是只有一个inode 对象,文件读写最终是落到inode 完成。所以不会提高IO
结论:在我们处理大文件读取的时候,单线程要优于多线程的~
实现代码
package main
import (
"bufio"
"container/heap"
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"time"
)
const N = 256
//构建N个堆
var GlobalIp map[int64]*IpQueue
//然后N个堆 获取TOP10
var GlobalNum map[int64]int64 //次数
func ReadLine(filePth string, hookfn func([]byte)) error {
f, err := os.Open(filePth)
if err != nil {
return err
}
defer f.Close()
bfRd := bufio.NewReader(f)
for {
line, err := bfRd.ReadBytes('\n')
hookfn(line)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}
//初始化全局变量
func initHeap() {
GlobalNum = make(map[int64]int64)
GlobalIp = make(map[int64]*IpQueue)
for i := 0; i <= N; i++ {
q := make(IpQueue, 1)
q[0] = &Item{ip: "0.0.0.0", num: -1}
heap.Init(&q)
GlobalIp[int64(i)] = &q //堆给到全局Global
}
}
//2.1 直接将IP变成十进制 hash算次数
func processLine(line []byte) {
var result int
for i := 7; i <= 15; i++ {
if line[i] == '\t' || line[i] == '-' {
result = i
break
}
}
str := string(line[0:result])
ipv4 := CalculateIp(string(str))
GlobalNum[int64(ipv4)]++
}
//2.2 mod N 进行堆排序
func handleHash() {
//堆耗时开始
timestamp := time.Now().UnixNano() / 1000000
for k, v := range GlobalNum {
heap.Push(GlobalIp[k%N], &Item{ip: RevIp(k), num: int64(v)})
}
edgiest := time.Now().UnixNano() / 1000000
fmt.Println("堆耗时总时间ms:", edgiest-timestamp)
}
//2.3 进行N个堆TOP10 排序聚合
func polyHeap() {
//聚合N 个 小堆的top10
for i := 0; i < N; i++ {
iterator := 10
if iterator > GlobalIp[int64(i)].Len() {
iterator = GlobalIp[int64(i)].Len()
}
for j := 0; j < iterator; j++ {
//写入到堆栈N
item := heap.Pop(GlobalIp[int64(i)]).(*Item)
heap.Push(GlobalIp[N], item)
}
}
}
//2.4 输出聚合后的堆TOP10
func printResult() {
result := 0
for result < 10 {
item := heap.Pop(GlobalIp[N]).(*Item)
fmt.Printf("出现的次数:%d|IP:%s \n", item.num, item.ip)
result++
}
}
//string 转IP
func CalculateIp(str string) int64 {
x := strings.Split(str, ".")
b0, _ := strconv.ParseInt(x[0], 10, 0)
b1, _ := strconv.ParseInt(x[1], 10, 0)
b2, _ := strconv.ParseInt(x[2], 10, 0)
b3, _ := strconv.ParseInt(x[3], 10, 0)
number0 := b0 * 16777216 //256*256*256
number1 := b1 * 65536 //256*256
number2 := b2 * 256 //256
number3 := b3 * 1 //1
sum := number0 + number1 + number2 + number3
return sum
}
//ip 转string
func RevIp(ip int64) string {
ip0 := ip / 16777216 //高一位
ip1 := (ip - ip0*16777216) / 65536
ip2 := (ip - ip0*16777216 - ip1*65536) / 256
ip3 := ip - ip0*16777216 - ip1*65536 - ip2*256
return fmt.Sprintf("%d.%d.%d.%d", ip0, ip1, ip2, ip3)
}
type Item struct {
ip string
num int64
}
type IpQueue []*Item
func (pq IpQueue) Len() int { return len(pq) }
func (pq IpQueue) Less(i, j int) bool {
return pq[i].num > pq[j].num
}
func (pq IpQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *IpQueue) Push(x interface{}) {
item := x.(*Item)
*pq = append(*pq, item)
}
func (pq *IpQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func main() {
runtime.GOMAXPROCS(2)
timestamp := time.Now().UnixNano() / 1000000
//初始化
initHeap()
//串行 读取文件 写入到hash map
_ = ReadLine("/Users/admin/Downloads/api.immomo.com-access_10-01.log", processLine)
//多个小堆
handleHash()
//聚合堆
polyHeap()
//打印结果
printResult()
fmt.Println(time.Now().UnixNano()/1000000 - timestamp)
}
结尾
感谢 信惠敏(陌陌)、李耕勇 的热心支持与讨论、
有疑问加站长微信联系(非本文作者)