一、系统实现功能:
- 文件读取
- 正则匹配,规范化部分数据
- 录入数据库
- 简单输出到web端
二、代码部分:
代码部分:
package main
//系统主体部分
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
//读接口
type Reader interface {
Read(rc chan string)
}
//写结构
type Writer interface {
Write(wc chan *Message)
}
//处理模块结构体,包含读入通道,写出通道,以及读写接口结构
type LogProcess struct {
readChanel chan string
writeChanel chan *Message
read Reader
write Writer
}
//实现读接口结构体
type ReadFromFile struct {
path string
}
//实现写接口结构体
type WriteToDB struct {
DBinfo string
}
//格式化数据结构体,用于格式化从文件中读出的数据
type Message struct {
TimeLocal time.Time `db:"time"`
BytesSent int `db:"bytesSent"`
Path string `db:"path"`
Method string `db:"method"`
Scheme string `db:"scheme"`
Status string `db:"status"`
UpstreamTime float64 `db:"upstreamtime"`
RequestTime float64 `db:"requestTime"`
}
// 系统日志结构体,用于记录该系统运转情况,并反馈到前端response
type systemInfo struct {
Tps float64 `json:"tps"`
RunTime string `json:"runtime"`
HandleLine int `json:"handleline"`
ReadChanLen int `json:"readchanlen"`
WriteChanLen int `json:"writechanlen"`
ErrNum int `json:"errnum"`
}
//记录错误日志的缓存通道
var ErrNum chan int = make(chan int, 200)
//错误类型
const (
TypeHandleLine = 0
TypeErrNum = 1
)
//日志监控结构体
type monitor struct {
startTime time.Time
data *systemInfo
tpsli []int
}
//日志初始化函数
func (m *monitor) start(lp *LogProcess) {
//记录错误状态
go func() {
for n := range ErrNum {
switch n {
case TypeErrNum:
m.data.ErrNum += 1
case TypeHandleLine:
m.data.HandleLine += 1
}
}
}()
//计算TPS吞吐量
ticker := time.NewTicker(time.Second * 5)
go func() {
for {
<-ticker.C
m.tpsli = append(m.tpsli, m.data.HandleLine)
if len(m.tpsli) > 2 {
m.tpsli = m.tpsli[1:]
}
}
}()
// 初始化前端响应体
engine := gin.Default()
engine.GET("/monitor", func(c *gin.Context) {
m.data.RunTime = time.Since(m.startTime).String()
m.data.ReadChanLen = len(lp.readChanel)
m.data.WriteChanLen = len(lp.writeChanel)
if len(m.tpsli) >= 2 {
m.data.Tps = float64((m.tpsli[1] - m.tpsli[0]) / 5)
}
c.JSON(http.StatusOK, m.data)
})
engine.Run(":8080")
}
//127.0.0.12 -- [04/Mar/2018:13:49:52 + 0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-"
// "KeepAliveClient" "-" 1.005 1.854 格式化的结构
func (l *LogProcess) Porcess() {
fmt.Println("LogProcess is transforming data")
//正则匹配文件的模拟数据
r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
for data := range l.readChanel {
ret := r.FindStringSubmatch(string(data))
if len(ret) != 14 {
ErrNum <- TypeErrNum
log.Println("FindStringSubmatch error ", ret)
continue
}
message := &Message{}
//
TimeZone, _ := time.LoadLocation("Asia/Shanghai")
time, _ := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], TimeZone)
message.TimeLocal = time
//
bytesent, _ := strconv.Atoi(ret[8])
message.BytesSent = bytesent
//
retslice := strings.Split(ret[6], " ")
if len(retslice) != 3 {
ErrNum <- TypeErrNum
log.Printf("Split error %v \n", retslice)
}
message.Method = retslice[0]
url, err := url.Parse(retslice[1])
if err != nil {
ErrNum <- TypeErrNum
log.Printf("Parse error %v \n", url)
}
message.Path = url.Path
message.Status = ret[7]
message.Scheme = ret[5]
message.UpstreamTime, _ = strconv.ParseFloat(ret[12], 64)
message.RequestTime, _ = strconv.ParseFloat(ret[13], 64)
/*
[172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
172.0.0.12 - - 04/Mar/2018:13:49:52 +0000 http GET /foo?query=t HTTP/1.0 200 2133 - KeepAliveClient - 1.005 1.854]
*/
fmt.Println(ret[4])
l.writeChanel <- message
}
}
//读接口函数
func (r *ReadFromFile) Read(rc chan string) {
f, err := os.OpenFile(r.path, 'r', 0644)
if err != nil {
ErrNum <- TypeErrNum
fmt.Printf("%v", err.Error())
}
f.Seek(0, 2)
freader := bufio.NewReader(f)
for {
data, err := freader.ReadBytes('\n')
if err == io.EOF {
time.Sleep(1 * time.Second)
continue
} else if err != nil {
ErrNum <- TypeErrNum
panic(fmt.Sprintf("ReadLine panic %v", err.Error()))
}
ErrNum <- TypeHandleLine
rc <- string(data[0 : len(data)-1])
}
}
//写入数据库接口函数
func (w *WriteToDB) Write(wc chan *Message) {
// sql语句,问好和stmt.Exec(args),args 对应,为了简单的防sql注入攻击
sql := "insert into logdata values(?,?,?,?,?,?,?,?) "
DB, err := sqlx.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/golang_test")
//用户名:密码&协议(ip:port)/databasename
if err != nil {
ErrNum <- TypeErrNum
panic("open mysql failed " + err.Error())
}
for data := range wc {
fmt.Printf("write data is %v", data)
stmt, err := DB.Prepare(sql)
if err != nil {
ErrNum <- TypeErrNum
panic(err.Error())
}
stmt.Exec(data.TimeLocal, data.BytesSent, data.Path, data.Method, data.Scheme, data.Status, data.UpstreamTime, data.RequestTime)
}
}
func main() {
readerIO := &ReadFromFile{
path: "./access.log",
}
writeIO := &WriteToDB{
DBinfo: "username&&password",
}
LProcess := &LogProcess{
readChanel: make(chan string),
writeChanel: make(chan *Message),
read: readerIO,
write: writeIO,
}
m := &monitor{
startTime: time.Now(),
data: &systemInfo{},
}
for i := 0; i < 2; i++ {
go LProcess.read.Read(LProcess.readChanel)
}
for i := 0; i < 3; i++ {
go LProcess.Porcess()
}
for i := 0; i < 5; i++ {
go LProcess.write.Write(LProcess.writeChanel)
}
m.start(LProcess)
}
模拟数据部分:
package main
import (
"fmt"
"os"
"time"
)
func main() {
f, err := os.OpenFile("./others/access.log", 'w', 0644)
// 写入文件存放目录,"w" ,以写方式打开,0644,read,write,execute 4 2 1
if err != nil {
os.Exit(1)
}
for {
n, err := f.WriteString(`172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854` + "\n")
fmt.Println(n)
time.Sleep(1 * time.Second)
if err != nil {
fmt.Printf("%v", err)
}
}
}
有疑问加站长微信联系(非本文作者)