elasticsearch,golang客户端聚合查询

Alston · · 6432 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

目前在做的监控项目中有个对es的聚合查询的需求,需要用go语言实现,

需求就是查询某个IP在一个时间范围内,各个监控指标取时间单位内的平均值。
有点拗口,如下是es的查询语句,可以很明显的看到是要聚合cpu和mem两个field。
另外,时区必须要加上,否则少8小时,你懂的。

GET /monitor/v1/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "ip": {
              "value": "192.168.1.100"
            }
          }
        },
        {
          "range": {
            "datatime": {
              "gte": 1545634340000,
              "lte": "now"
            }
          }
        }

      ]
    }
  },
  "size": 0,
  "aggs": {
    "AVG_Metric": {
      "date_histogram": {
        "field": "datatime",
        "interval": "day",
        "time_zone": "Asia/Shanghai" ,
         "format": "yyyy-MM-dd HH:mm:ss"
      },
      "aggs": {
        "avg_mem": {
          "avg": {
            "field": "mem"
          }
        },"avg_cpu":{
          "avg": {
            "field": "cpu"
          }
        }
      }
    }
  }
}

单条数据的内容大概是这样:
datatime是毫秒级的时间戳,索引的mapping一定要是date类型,否则不能做聚合。

{
    "cpu":5,
    "mem":77088,
    "datatime":1545702661000,
    "ip":"192.168.1.100"
}

查询出来的结果呢,长这个样子:
可以看到,在buckets中查询出了3条数据,其中几个字段的意思:
key_as_string与key:format后的时间和毫秒时间戳
doc_count:聚合了多少的doc
avg_mem与avg_cpu:查询语句中定义的别名和value。

{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2477,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "AVG_Metric": {
      "buckets": [
        {
          "key_as_string": "2018-12-24 00:00:00",
          "key": 1545580800000,
          "doc_count": 402,
          "avg_mem": {
            "value": 71208.1592039801
          },
          "avg_cpu": {
            "value": 4.338308457711443
          }
        },
        {
          "key_as_string": "2018-12-25 00:00:00",
          "key": 1545667200000,
          "doc_count": 1258,
          "avg_mem": {
            "value": 77958.16852146263
          },
          "avg_cpu": {
            "value": 4.639904610492846
          }
        },
        {
          "key_as_string": "2018-12-26 00:00:00",
          "key": 1545753600000,
          "doc_count": 817,
          "avg_mem": {
            "value": 86570.06609547124
          },
          "avg_cpu": {
            "value": 4.975520195838433
          }
        }
      ]
    }
  }
}

下面使用go语言的es客户端“github.com/olivere/elastic”来实现相同的查询需求,我用的是v6版本:
关于聚合结果的输出,遇到了些问题,网上也搜索不到,官方也没找到相应的例子。不过总算试出两个可行的,比较推荐第一种,可以少定义两个结构体。上代码:

package main

import (
    "github.com/olivere/elastic"
    "context"
    "encoding/json"
    "fmt"
)

type Aggregations struct {
    AVG_Metric AVG_Metric `json:"AVG_Metric"`
}

type AVG_Metric struct {
    Buckets []Metric `json:"buckets"`
}

type Metric struct {
    Key       int64 `json:"key"`
    Doc_count int64 `json:"doc_count"`
    Avg_mem   Value `json:"avg_mem"`
    Avg_cpu   Value `json:"avg_cpu"`
}

type Value struct {
    Value float64 `json:"value"`
}

var client, _ = elastic.NewSimpleClient(elastic.SetURL("http://127.0.0.1:9200"))

func main() {

    //指定ip和时间范围
    boolSearch := elastic.NewBoolQuery().
        Filter(elastic.NewTermsQuery("ip", "192.168.1.100")).
        Filter(elastic.NewRangeQuery("datatime").Gte(1545634340000).Lte("now"))

    //需要聚合的指标
    mem := elastic.NewAvgAggregation().Field("mem")
    cpu := elastic.NewAvgAggregation().Field("cpu")

    //单位时间和指定字段
    aggs := elastic.NewDateHistogramAggregation().
        Interval("day").
        Field("datatime").
        TimeZone("Asia/Shanghai").
        SubAggregation("avg_mem", mem).
        SubAggregation("avg_cpu", cpu)

    //查询语句
    result, _ := client.Search().
        Index("monitor").
        Type("v1").
        Query(boolSearch).
        Size(0).
        Aggregation("AVG_Metric", aggs).
        Do(context.Background())


    //结果输出:

    //第一种方式
    var m *[]Metric
    term, _ := result.Aggregations.Terms("AVG_Metric")
    for _, bucket := range term.Aggregations {
        b, _ := bucket.MarshalJSON()
        //fmt.Println(string(b))
        json.Unmarshal(b, &m)
        for _, v := range *m {
            fmt.Println(v)
        }
    }

    //第二种方式
    var a *Aggregations
    b, _ := json.Marshal(result.Aggregations)
    //fmt.Println(string(b))
    json.Unmarshal(b, &a)
    for _, v := range a.AVG_Metric.Buckets {
        fmt.Println(v)
    }

}
{1545580800000 402 {71208.1592039801} {4.338308457711443}}
{1545667200000 1258 {77958.16852146263} {4.639904610492846}}
{1545753600000 965 {87164.17409326424} {4.972020725388601}}
{1545580800000 402 {71208.1592039801} {4.338308457711443}}
{1545667200000 1258 {77958.16852146263} {4.639904610492846}}
{1545753600000 965 {87164.17409326424} {4.972020725388601}}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:Alston

查看原文:elasticsearch,golang客户端聚合查询

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

6432 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传