Nodata版本
VERSION = "0.0.11"
Nodata组件功能
nodata用于检测监控数据的上报异常。nodata和实时报警judge模块协同工作,过程为: 配置了nodata的采集项超时未上报数据,nodata生成一条默认的模拟数据;用户配置相应的报警策略,收到mock数据就产生报警。采集项上报异常检测,作为judge模块的一个必要补充,能够使judge的实时报警功能更加可靠、完善。【官方描述】
Nodata组件逻辑图
系统流图
模块结构
main入口分析
func main() {
//命令参数解析
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
versionGit := flag.Bool("vg", false, "show version")
flag.Parse()
//版本输出
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
//gitcommit序列号输出
if *versionGit {
fmt.Println(g.VERSION, g.COMMIT)
os.Exit(0)
}
// 全局配置格式化
g.ParseConfig(*cfg)
// 统计
g.StartProc()
// 缓存Nodata配置
config.Start() // 【参考详细分析】
// 缓存Nodata配置主机的采集数据点
collector.Start() // 【参考详细分析】
// judge策略判断
judge.Start() // 【参考详细分析】
// http API服务
http.Start() // 【参考详细分析】
select {}
}
config.Start() 从DB加载Nodata配置(dashboard配置nodata策略写入mysql)
# 加载nodata配置主函数
func Start() {
if !g.Config().Config.Enabled {
log.Println("config.Start warning, not enabled")
return
}
service.InitDB() //初始化DB
StartNdConfigCron() //加载nodata配置缓存至内存
log.Println("config.Start ok")
}
## 初始化DB连接
func InitDB() {
_, err := GetDbConn(dbBaseConnName) //"db.base"连接conn初始化并保存至内存Map
if err != nil {
log.Fatalln("config.InitDB error", err)
return // never go here
}
log.Println("config.InitDB ok")
}
### GetDbConn实现函数
### makeDbConn函数实现sql客户端连接dbconn
### 内存map dbConnMap[connName]保存dbconn
func GetDbConn(connName string) (c *sql.DB, e error) {
dbLock.Lock()
defer dbLock.Unlock()
var err error
var dbConn *sql.DB
dbConn = dbConnMap[connName]
if dbConn == nil {
dbConn, err = makeDbConn() //创建sql客户端连接
if err != nil {
closeDbConn(dbConn)
return nil, err
}
dbConnMap[connName] = dbConn
}
err = dbConn.Ping() //dbconn检测,conn.Ping()
if err != nil {
closeDbConn(dbConn) //dbconn关闭,conn.Close()
delete(dbConnMap, connName)
return nil, err
}
return dbConn, err
}
func makeDbConn() (conn *sql.DB, err error) {
conn, err = sql.Open("mysql", g.Config().Config.Dsn)
if err != nil {
return nil, err
}
conn.SetMaxIdleConns(int(g.Config().Config.MaxIdle))
err = conn.Ping()
return conn, err
}
##
func StartNdConfigCron() {
ndconfigCron.AddFuncCC(ndconfigCronSpec, func() {
start := time.Now().Unix()
cnt, _ := syncNdConfig() //
end := time.Now().Unix()
if g.Config().Debug {
log.Printf("config cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start))
}
// 统计
g.ConfigCronCnt.Incr()
g.ConfigLastTs.SetCnt(end - start)
g.ConfigLastCnt.SetCnt(int64(cnt))
}, 1)
ndconfigCron.Start() //
}
#### 获取nodata配置、重新格式化及缓存配置全局公开Map(NdConfigMap)
func syncNdConfig() (cnt int, errt error) {
// 获取nodata配置函数调用
configs := service.GetMockCfgFromDB()
// 重新格式化配置NodateConfig结构
nm := nmap.NewSafeMap()
for _, ndc := range configs {
endpoint := ndc.Endpoint
metric := ndc.Metric
tags := ndc.Tags
if endpoint == "" {
log.Printf("bad config: %+v\n", ndc)
continue
}
pk := cutils.PK(endpoint, metric, tags)
nm.Put(pk, ndc)
}
// 缓存map
SetNdConfigMap(nm)
return nm.Size(), nil //返回map长度
}
##### 底层获取nodata配置实现函数
func GetMockCfgFromDB() map[string]*cmodel.NodataConfig {
ret := make(map[string]*cmodel.NodataConfig)
dbConn, err := GetDbConn("nodata.mockcfg") //获取dbConn连接
if err != nil {
log.Println("db.get_conn error, mockcfg", err)
return ret
}
q := fmt.Sprintf("SELECT id,name,obj,obj_type,metric,tags,dstype,step,mock FROM mockcfg")
rows, err := dbConn.Query(q) //执行mockcfg表查询语句
if err != nil {
log.Println("db.query error, mockcfg", err)
return ret
}
defer rows.Close()
for rows.Next() { //迭代查询结果集
t := MockCfg{}
tags := ""
err := rows.Scan(&t.Id, &t.Name, &t.Obj, &t.ObjType, &t.Metric, &tags, &t.Type, &t.Step, &t.Mock)
if err != nil {
log.Println("db.scan error, mockcfg", err)
continue
}
t.Tags = cutils.DictedTagstring(tags) //"tagskey=value"格式化map[Key]Value
err = checkMockCfg(&t) //检测配置是否有效
if err != nil {
log.Println("check mockcfg, error:", err)
continue
}
endpoints := getEndpoint(t.ObjType, t.Obj) //获取endpoint列表(hosts slice),后面有objtype为"host/group/other"处理函数分析。
if len(endpoints) < 1 {
continue
}
for _, ep := range endpoints {
uuid := cutils.PK(ep, t.Metric, t.Tags) //UUID format 'endpoint/metric/k=v,k=v(tags)'
ncfg := cmodel.NewNodataConfig(t.Id, t.Name, t.ObjType, ep, t.Metric, t.Tags, t.Type, t.Step, t.Mock)
val, found := ret[uuid]
if !found { //如果不存在则新建
ret[uuid] = ncfg
continue
}
//如果存在,判断配置类型
if isSpuerNodataCfg(val, ncfg) {
// val is spuer than ncfg, so drop ncfg
log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, val.Name, ncfg.Name)
} else {
ret[uuid] = ncfg //如果原val配置类型为group,而新ncfg配置类型为host则覆盖原有配置
log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, ncfg.Name, val.Name)
}
}
}
return ret //返回map[UUID]*cmodel.NodataConfig
}
###### 根据objType获取Hosts slice
func getEndpoint(objType string, obj string) []string {
switch objType {
case "host": //类型Host与处理函数
return getEndpointFromHosts(obj)
case "group": //类型group与处理函数
return getEndpointFromGroups(obj)
case "other": //类型other与处理函数
return getEndpointFromOther(obj)
default:
return make([]string, 0)
}
}
//类型Host与处理函数
func getEndpointFromHosts(hosts string) []string {
ret := make([]string, 0)
hlist := strings.Split(hosts, "\n") //分隔处理
for _, host := range hlist {
nh := strings.TrimSpace(host)
if nh != "" {
ret = append(ret, nh)
}
}
return ret
}
//类型group与处理函数
func getEndpointFromGroups(grps string) []string {
grplist := strings.Split(grps, "\n")
// get host map, avoid duplicating
hosts := make(map[string]string)
for _, grp := range grplist {
ngrp := strings.TrimSpace(grp)
if len(ngrp) < 1 {
continue
}
hostmap := GetHostsFromGroup(grp) //根据Group名称获取主机MAP
for hostname := range hostmap {
if hostname != "" {
hosts[hostname] = hostname
}
}
}
// get host slice
ret := make([]string, 0)
for key := range hosts {
ret = append(ret, key)
}
return ret
}
//类型other与处理函数,同类型Host与处理函数
func getEndpointFromOther(other string) []string {
return getEndpointFromHosts(other)
}
collector.Start() 缓存Nodata配置主机的采集数据点
# 运行收集nodata数据主函数
func Start() {
if !g.Config().Collector.Enabled {
log.Println("collector.Start warning, not enabled")
return
}
StartCollectorCron() //周期任务,收集nodata数据
log.Println("collector.Start ok")
}
## 定时任务执行,收集函数调用与任务运行
func StartCollectorCron() {
collectorCron.AddFuncCC("*/20 * * * * ?", func() {
start := time.Now().Unix()
cnt := collectDataOnce() //收集函数调用
end := time.Now().Unix()
if g.Config().Debug {
log.Printf("collect cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start))
}
//统计
g.CollectorCronCnt.Incr()
g.CollectorLastTs.SetCnt(end - start)
g.CollectorLastCnt.SetCnt(int64(cnt))
g.CollectorCnt.IncrBy(int64(cnt))
}, 1)
collectorCron.Start()
}
### 收集功能实现函数
func collectDataOnce() int {
keys := config.Keys()
keysLen := len(keys)
// 并发+同步控制
cfg := g.Config().Collector //collector全局配置
concurrent := int(cfg.Concurrent) //并发数
if concurrent < 1 || concurrent > 50 {
concurrent = 10
}
sema := tsema.NewSemaphore(concurrent) //创建并发同步
batch := int(cfg.Batch) //全局批量处理数
if batch < 100 || batch > 1000 {
batch = 200 //batch不能太小, 否则channel将会很大
}
//根据nodata配置数长度和批量处理数创建channel长度
batchCnt := (keysLen + batch - 1) / batch
rch := make(chan int, batchCnt+1)
i := 0
for i < keysLen {
leftLen := keysLen - i
fetchSize := batch // 每次处理batch个配置
if leftLen < fetchSize {
fetchSize = leftLen
}
fetchKeys := keys[i : i+fetchSize]
// 并发collect数据
sema.Acquire()
//线程批量处理(fetchKeys, fetchSize)
go func(keys []string, keySize int) {
defer sema.Release()
size, err := fetchItemsAndStore(keys, keySize)//批查获取函数调用
if err != nil {
log.Printf("fetchItemAndStore fail, size:%v, error:%v", size, err)
}
if g.Config().Debug {
log.Printf("fetchItemAndStore keys:%v, key_size:%v, ret_size:%v", keys, keySize, size)
}
rch <- size
}(fetchKeys, fetchSize)
i += fetchSize
}
collectCnt := 0
//计数
for i := 0; i < batchCnt; i++ {
select {
case cnt := <-rch:
collectCnt += cnt
}
}
return collectCnt
}
#### 获取数据实现函数
func fetchItemsAndStore(fetchKeys []string, fetchSize int) (size int, errt error) {
if fetchSize < 1 {
return
}
// form request args
args := make([]*cmodel.GraphLastParam, 0)
for _, key := range fetchKeys {
ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置
if !found {
continue
}
endpoint := ndcfg.Endpoint //endpoint主机对象
counter := cutils.Counter(ndcfg.Metric, ndcfg.Tags) // 格式metric/tags(k=v,k=v)
arg := &cmodel.GraphLastParam{endpoint, counter} //请求参数
args = append(args, arg)
}
if len(args) < 1 {
return
}
resp, err := queryLastPoints(args)//API调用查询endpoint最近一次采集数据。(POST请求API组件api调用,查看后面函数分析)
if err != nil {
return 0, err
}
// store items
fts := time.Now().Unix() //存储Items时间float time,Judge逻辑用到
for _, glr := range resp {
//log.Printf("collect:%v\n", glr)
if glr == nil || glr.Value == nil {
continue
}
AddItem(cutils.PK2(glr.Endpoint, glr.Counter), NewDataItem(glr.Value.Timestamp, float64(glr.Value.Value), "OK", fts)) //缓存收集到的监控数据(ItemMap)。Value.Timestamp数据项时间戳,Value.Value数据项值,"OK"数据项状态,fts数据项存储时间。
}
return len(resp), nil
}
##### config.GetNdConfig 根据hostname返回NodataConfig配置
func GetNdConfig(key string) (*cmodel.NodataConfig, bool) {
rwlock.RLock()
defer rwlock.RUnlock()
val, found := NdConfigMap.Get(key)//map操作
if found && val != nil {
return val.(*cmodel.NodataConfig), true
}
return &cmodel.NodataConfig{}, false
}
##### API组件POST请求实现函数
func queryLastPoints(param []*cmodel.GraphLastParam) (resp []*cmodel.GraphLastResp, err error) {
cfg := g.Config()
uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.PlusApi.Addr) //接口定义
var req *httplib.BeegoHttpRequest
headers := map[string]string{"Content-type": "application/json"}
req, err = requests.CurlPlus(uri, "POST", "nodata", cfg.PlusApi.Token,
headers, map[string]string{}) //Curl请求头与方法
if err != nil {
return
}
req.SetTimeout(time.Duration(cfg.PlusApi.ConnectTimeout)*time.Millisecond,
time.Duration(cfg.PlusApi.RequestTimeout)*time.Millisecond)
b, err := json.Marshal(param)
if err != nil {
return
}
req.Body(b) //请求体
err = req.ToJson(&resp) //请求执行与response json
if err != nil {
return
}
return resp, nil
}
##### 缓存数据点
func AddItem(key string, val *DataItem) {
listv, found := ItemMap.Get(key)
if !found {
ll := tlist.NewSafeListLimited(3) //每个采集指标,缓存最新的3个数据点
ll.PushFrontViolently(val)
ItemMap.Put(key, ll)
return
}
listv.(*tlist.SafeListLimited).PushFrontViolently(val)
}
func NewDataItem(ts int64, val float64, fstatus string, fts int64) *DataItem {
return &DataItem{Ts: ts, Value: val, FStatus: fstatus, FTs: fts}
}
type GraphLastResp struct {
Endpoint string `json:"endpoint"`
Counter string `json:"counter"`
Value *RRDData `json:"value"`
}
judge.Start()
# Judge运行入口函数
func Start() {
StartJudgeCron() //运行调用
log.Println("judge.Start ok")
}
# 定时任务Judge执行函数
func StartJudgeCron() {
judgeCron.AddFuncCC(judgeCronSpec, func() {
start := time.Now().Unix()
judge() //执行judge函数调用
end := time.Now().Unix()
if g.Config().Debug {
log.Printf("judge cron, time %ds, start %s\n", end-start, ttime.FormatTs(start))
}
// 统计
g.JudgeCronCnt.Incr()
g.JudgeLastTs.SetCnt(end - start)
// 触发Mock列表数据发送
sender.SendMockOnceAsync()
}, 1)
judgeCron.Start() //执行Cron
}
## judge实现函数
func judge() {
now := time.Now().Unix()
keys := config.Keys()
for _, key := range keys {
ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置
if !found { //策略不存在,不处理
continue
}
step := ndcfg.Step
mock := ndcfg.Mock
item, found := collector.GetFirstItem(key) //根据hostname返回collector最近一次配置
if !found { //没有数据,未开始采集,不处理
continue
}
//3*step(or 180)超时时间
lastTs := now - getTimeout(step)
if item.FStatus != "OK" || item.FTs < lastTs { //数据采集失败,不处理
continue
}
//采集到的数据为mock数据,则认为上报超时了
if fCompare(mock, item.Value) == 0 {
//判断采集到的最后一次数据项值timestamp+step(上报周期)
//如果小于当前时间则认为已经上报超时了,过了应该上报的周期
//如果大于当前时间则表示还在有效的上报周期内
if LastTs(key)+step <= now {
TurnNodata(key, now) //设置nodata状态
genMock(genTs(now, step), key, ndcfg) //添加到Mock列表
}
continue
}
//判断采集到的最后一次数据项值timestamp,如果小于
//3*step(or 180)超时时间则认为数据过期,则认为上报超时
if item.Ts < lastTs {
if LastTs(key)+step <= now {
TurnNodata(key, now)
genMock(genTs(now, step), key, ndcfg)
}
continue
}
TurnOk(key, now)
}
}
### 返回最后一次采集数据的timestamp
func LastTs(key string) int64 {
statusLock.RLock()
var ts int64 = 0
v, found := StatusMap.Get(key)
if !found {
statusLock.RUnlock()
return ts
}
ts = v.(*NodataStatus).Ts
statusLock.RUnlock()
return ts
}
// NoData Data Item Struct
type DataItem struct {
Ts int64 //timestamp
Value float64
FStatus string // OK|ERR
FTs int64 //存储时间float
}
// Nodata Status Struct
type NodataStatus struct {
Key string
Status string // OK|NODATA
Cnt int
Ts int64
}
### 设置为Nodata状态
func TurnNodata(key string, ts int64) {
statusLock.Lock()
v, found := StatusMap.Get(key)
if !found {
// create new status
ns := NewNodataStatus(key, "NODATA", 1, ts)
StatusMap.Put(key, ns)
statusLock.Unlock()
return
}
// update status
ns := v.(*NodataStatus)
ns.Status = "NODATA"
ns.Cnt += 1
ns.Ts = ts
statusLock.Unlock()
return
}
### 添加Item至Mock列表
func genMock(ts int64, key string, ndcfg *cmodel.NodataConfig) {
sender.AddMock(key, ndcfg.Endpoint, ndcfg.Metric, cutils.SortedTags(ndcfg.Tags), ts, ndcfg.Type, ndcfg.Step, ndcfg.Mock)
}
func AddMock(key string, endpoint string, metric string, tags string, ts int64, dstype string, step int64, value interface{}) {
item := &cmodel.JsonMetaData{metric, endpoint, ts, step, value, dstype, tags}
MockMap.Put(key, item) //put into map
}
sender.SendMockOnceAsync() 发送模拟数据
# 发送Mock数据入口函数
func SendMockOnceAsync() {
go SendMockOnce()
}
## 并发发送和统计
func SendMockOnce() int {
if !sema.TryAcquire() {
return -1
}
defer sema.Release()
// not enabled
if !g.Config().Sender.Enabled {
return 0
}
start := time.Now().Unix()
cnt, _ := sendMock() //调用发送功能模块
end := time.Now().Unix()
if g.Config().Debug {
log.Printf("sender cron, cnt %d, time %ds, start %s", cnt, end-start, ttime.FormatTs(start))
}
// 统计
g.SenderCronCnt.Incr()
g.SenderLastTs.SetCnt(end - start)
g.SenderCnt.IncrBy(int64(cnt))
return cnt
}
### 发送模拟数据
func sendMock() (cnt int, errt error) {
cfg := g.Config().Sender //全局配置加载
batch := int(cfg.Batch) //批量值
connTimeout := cfg.ConnectTimeout //连接超时
requTimeout := cfg.RequestTimeout //API请求超时
// 发送至transfer组件
mocks := MockMap.Slice() //获取Mock列表
MockMap.Clear() //清空列表空间
mockSize := len(mocks)
i := 0
for i < mockSize {
leftLen := mockSize - i
sendSize := batch
if leftLen < sendSize {
sendSize = leftLen
}
fetchMocks := mocks[i : i+sendSize] //取一批量
i += sendSize
items := make([]*cmodel.JsonMetaData, 0)
//整理为slice
for _, val := range fetchMocks {
if val == nil {
continue
}
items = append(items, val.(*cmodel.JsonMetaData))
}
cntonce, err := sendItemsToTransfer(items, len(items), "nodata.mock",
time.Millisecond*time.Duration(connTimeout),
time.Millisecond*time.Duration(requTimeout)) //API POT调用,发送Mock至Transfer
if err == nil {
if g.Config().Debug {
log.Println("send items:", items)
}
cnt += cntonce
}
}
return cnt, nil
}
// API接口调用发送Mock至Transfer
func sendItemsToTransfer(items []*cmodel.JsonMetaData, size int, httpcliname string,
connT, reqT time.Duration) (cnt int, errt error) {
if size < 1 {
return
}
cfg := g.Config()
transUlr := fmt.Sprintf("http://%s/api/push", cfg.Sender.TransferAddr) //请求接口API
hcli := thttpclient.GetHttpClient(httpcliname, connT, reqT)
// 请求体
itemsBody, err := json.Marshal(items)
if err != nil {
log.Println(transUlr+", format body error,", err)
errt = err
return
}
// 构造与执行API
req, err := http.NewRequest("POST", transUlr, bytes.NewBuffer(itemsBody))
req.Header.Set("Content-Type", "application/json; charset=UTF-8") //请求内容类型
req.Header.Set("Connection", "close")
postResp, err := hcli.Do(req) //执行POST
if err != nil {
log.Println(transUlr+", post to dest error,", err)
errt = err
return
}
defer postResp.Body.Close()
//响应状态200判断
if postResp.StatusCode/100 != 2 {
log.Println(transUlr+", post to dest, bad response,", postResp.Body)
errt = fmt.Errorf("request failed, %s", postResp.Body)
return
}
return size, nil
}
http.Start() http API服务
func Start() {
go startHttpServer()
}
func configRoutes() {
configCommonRoutes() //公共API路由,可参考Agent模块
configProcHttpRoutes() //
configDebugHttpRoutes() //Debug API路由
}
func startHttpServer() {
if !g.Config().Http.Enabled {
return
}
addr := g.Config().Http.Listen
if addr == "" {
return
}
configRoutes() //配置路由
s := &http.Server{ //httpServer实例
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("http.startHttpServer ok, listening", addr)
log.Fatalln(s.ListenAndServe()) //监听与服务
}
## Proc统计API模块
func configProcHttpRoutes() {
// counters
http.HandleFunc("/proc/counters", func(w http.ResponseWriter, r *http.Request) {
})
http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) {
})
// judge.status, /proc/status/$endpoint/$metric/$tags-pairs
http.HandleFunc("/proc/status/", func(w http.ResponseWriter, r *http.Request) {
})
// collector.last.item, /proc/collect/$endpoint/$metric/$tags-pairs
http.HandleFunc("/proc/collect/", func(w http.ResponseWriter, r *http.Request) {
})
// config.mockcfg
http.HandleFunc("/proc/config", func(w http.ResponseWriter, r *http.Request) {
})
// config.mockcfg /proc/config/$endpoint/$metric/$tags-pairs
http.HandleFunc("/proc/config/", func(w http.ResponseWriter, r *http.Request) {
})
// config.hostgroup, /group/$grpname
http.HandleFunc("/proc/group/", func(w http.ResponseWriter, r *http.Request) {
urlParam := r.URL.Path[len("/proc/group/"):]
RenderDataJson(w, service.GetHostsFromGroup(urlParam))
})
}
## Debug API
func configDebugHttpRoutes() {
http.HandleFunc("/debug/collector/collect", func(w http.ResponseWriter, r *http.Request) {
})
http.HandleFunc("/debug/config/sync", func(w http.ResponseWriter, r *http.Request) {
})
http.HandleFunc("/debug/sender/send", func(w http.ResponseWriter, r *http.Request) {
})
}
思考与查证
- nodata config、collector 、judge各模块执行周期是多少?
- Judge主要判断Nodata主机的两点要素是?
扩展学习
- github.com/toolkits/cron 定时任务golang Cron库
- github.com/go-sql-driver/mysql Mysql连接客户端
有疑问加站长微信联系(非本文作者)