Go语言(十 八)context&日志项目

wx5b285b48ed74e · · 759 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

context&日志项目

context

一般场景下取消goroutine的方法

var wg sync.WaitGroup
var exit bool

func worker(exitChan chan struct{})  {
   LOOP:
   for {
      fmt.Printf("work\n")
      time.Sleep(time.Second)
      /*if exit {
         break
      }
       */
      select {
      case <- exitChan:
         break LOOP
      default:

      }
   }
   wg.Done()
}

func main()  {
   var exitChan chan struct{} = make(chan struct{},1)
   wg.Add(1)
   go worker(exitChan)
   time.Sleep(time.Second*3)
   exitChan <- struct{}{}
   //exit = true
   wg.Wait()
}
  • 场景:
    • 返回一个cancel函数,调用cancel函数的时候,会触发context.Done()函数 [优雅退出线程]
var wg sync.WaitGroup
func worker(ctx context.Context) {
   LOOP:
   for {
      fmt.Printf("work\n")
      time.Sleep(time.Second)
      select {
      case <- ctx.Done():
         break LOOP
      default:

      }
   }
   wg.Done()
}
func main() {
   cxt := context.Background()
   cxt,cancel := context.WithCancel(cxt)
   wg.Add(1)
   go worker(cxt)
   time.Sleep(time.Second*3)
   cancel()    //取消goroutine
   wg.Wait()
}
  • trace 实例
var wg sync.WaitGroup
func worker(cxt context.Context)  {
   traceCode,ok := cxt.Value("TRACE_CODE").(string)
   if ok {
      fmt.Printf("traceCode=%s\n",traceCode)
   }
    LOOP:
   for {
      fmt.Printf("worker\n")
      time.Sleep(time.Millisecond)
      select {
      case <- cxt.Done():
         break LOOP
      default:
      }
   }
   fmt.Printf("worker Done,trace_Code:%s\n",traceCode)
   wg.Done()
}
func main()  {
   ctx := context.Background()
   ctx,cancel := context.WithTimeout(ctx,time.Millisecond*50)
   ctx = context.WithValue(ctx,"TRACE_CODE","212334121")
   wg.Add(1)
   go worker(ctx)
   time.Sleep(time.Second*3)
   cancel()     //释放contex资源
   wg.Wait()
}

日志项目(一)

日志收集系统设计

  • 项目背景

    • 每个系统都有日志,当系统出现问题的时候,需要通过日志解决问题
    • 当系统及其比较少的时候,登陆到服务器上即可查看满足
    • 当系统及其规模巨大,逐个登陆机器查看日志几乎不现实
  • 解决方案

    • 把机器上的日志实时收集,统一存储到中心系统
    • 然后对这些日志建立索引,通过搜索找到对应的日志
    • 通过提供友好的web界面,通过web界面完成日志搜索
  • 面临的问题

    • 实时的日质量非常大,每天几十亿条
    • 日志准实时收集,延迟控制在分钟级别
    • 能够水平拓展
  • 业界方案ELK
    • 存在的问题
      • 运维成本较高,每增加一个日志收集,都需要手动去配置
      • 监控缺失,无法准确获取logstash的状态
      • 无法做定制化的开发以及维护
        Go语言(十 八)context&日志项目

日志收集系统设计

Go语言(十 八)context&日志项目

  • 组件介绍
    • Log Agent: 日志采集客户端,用来收集服务器上的日志
    • kafka: 高吞吐的分布式消息队列,linkin开发,apache顶级开源项目
    • ES: 开源的搜索引擎,基于http restful风格的web接口
    • Hadoop:分布式计算框架

kafka的应用场景

高可用kafka集群部署:https://blog.51cto.com/navyaijm/2429959?source=drh

  • 异步处理,把非关键流程异步化,提高系统响应时间和健壮性
    Go语言(十 八)context&日志项目
  • 应用解耦
    Go语言(十 八)context&日志项目
  • 流量削峰:高并发场景
    Go语言(十 八)context&日志项目

zookeeper应用场景

  • 服务注册与发现
    Go语言(十 八)context&日志项目
  • 配置中心
    Go语言(十 八)context&日志项目
  • 分布式锁
    • zookeeper是强一致的
    • 多个客户端同事在zookeeper上创建相同的znode,只有一个创建成功

日志客户端开发

  • logAgent 设计
    Go语言(十 八)context&日志项目

  • logagent的流程
    Go语言(十 八)context&日志项目

kafka的使用

  • 导入的库: go get github.com/Shopify/sarama
  • 使用代码
import (
   "fmt"
   "github.com/Shopify/sarama"
   "time"
)

func main() {
   //初始化配置
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll
   config.Producer.Partitioner = sarama.NewRandomPartitioner
   config.Producer.Return.Successes = true
   //消息配置
   msg := &sarama.ProducerMessage{}
   msg.Topic = "nginx_log"
   msg.Value = sarama.StringEncoder("this is a test,messsage transfer ok")
   //连接配置
   client,err := sarama.NewSyncProducer([]string{"192.168.56.11:9092"},config)
   if err != nil {
      fmt.Printf("send message faild,error:%v\n",err)
      return
   }

   defer client.Close()
   for   {
      pid,offset,err := client.SendMessage(msg)
      if err != nil {
         fmt.Printf("send message faild,err:%v\n",err)
         return
      }
      fmt.Printf("pid:%v,offset:%v\n",pid,offset)
      time.Sleep(time.Second)
   }
}

tailf 组件的使用

  • 导入的库: go get github.com/hpcloud/tail
  • 使用代码
import (
   "fmt"
   "github.com/hpcloud/tail"
   "time"
)

func main() {
   filename := "./my.log"
   tails,err := tail.TailFile(filename,tail.Config{
      Location:    &tail.SeekInfo{
         Offset: 0,
         Whence: 2,
      },
      ReOpen:      true,
      MustExist:   false,
      Poll:        true,
      Follow:      true,
   })

   if err != nil {
      fmt.Printf("tail file error:%v\n",err)
      return
   }

   var msg *tail.Line
   var ok bool
   for true {
      msg,ok = <- tails.Lines
      if !ok {
         fmt.Printf("tail file close reopen,filename:%s\n",filename)
         time.Sleep(100*time.Millisecond)
         continue
      }
      fmt.Printf("msg=%v\n",msg)
   }
}
  • kafka消费日志命令参考
[root@centos7-node1 ~]# cd /opt/application/kafka/bin/
[root@centos7-node1 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_log --from-beginning

有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:wx5b285b48ed74e

查看原文:Go语言(十 八)context&日志项目

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

759 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传