context&日志项目
context
一般场景下取消goroutine的方法
var wg sync.WaitGroup
var exit bool
func worker(exitChan chan struct{}) {
LOOP:
for {
fmt.Printf("work\n")
time.Sleep(time.Second)
/*if exit {
break
}
*/
select {
case <- exitChan:
break LOOP
default:
}
}
wg.Done()
}
func main() {
var exitChan chan struct{} = make(chan struct{},1)
wg.Add(1)
go worker(exitChan)
time.Sleep(time.Second*3)
exitChan <- struct{}{}
//exit = true
wg.Wait()
}
- 场景:
- 返回一个cancel函数,调用cancel函数的时候,会触发context.Done()函数 [优雅退出线程]
var wg sync.WaitGroup
func worker(ctx context.Context) {
LOOP:
for {
fmt.Printf("work\n")
time.Sleep(time.Second)
select {
case <- ctx.Done():
break LOOP
default:
}
}
wg.Done()
}
func main() {
cxt := context.Background()
cxt,cancel := context.WithCancel(cxt)
wg.Add(1)
go worker(cxt)
time.Sleep(time.Second*3)
cancel() //取消goroutine
wg.Wait()
}
- trace 实例
var wg sync.WaitGroup
func worker(cxt context.Context) {
traceCode,ok := cxt.Value("TRACE_CODE").(string)
if ok {
fmt.Printf("traceCode=%s\n",traceCode)
}
LOOP:
for {
fmt.Printf("worker\n")
time.Sleep(time.Millisecond)
select {
case <- cxt.Done():
break LOOP
default:
}
}
fmt.Printf("worker Done,trace_Code:%s\n",traceCode)
wg.Done()
}
func main() {
ctx := context.Background()
ctx,cancel := context.WithTimeout(ctx,time.Millisecond*50)
ctx = context.WithValue(ctx,"TRACE_CODE","212334121")
wg.Add(1)
go worker(ctx)
time.Sleep(time.Second*3)
cancel() //释放contex资源
wg.Wait()
}
日志项目(一)
日志收集系统设计
-
项目背景
- 每个系统都有日志,当系统出现问题的时候,需要通过日志解决问题
- 当系统及其比较少的时候,登陆到服务器上即可查看满足
- 当系统及其规模巨大,逐个登陆机器查看日志几乎不现实
-
解决方案
- 把机器上的日志实时收集,统一存储到中心系统
- 然后对这些日志建立索引,通过搜索找到对应的日志
- 通过提供友好的web界面,通过web界面完成日志搜索
-
面临的问题
- 实时的日质量非常大,每天几十亿条
- 日志准实时收集,延迟控制在分钟级别
- 能够水平拓展
- 业界方案ELK
- 存在的问题
- 运维成本较高,每增加一个日志收集,都需要手动去配置
- 监控缺失,无法准确获取logstash的状态
- 无法做定制化的开发以及维护
- 存在的问题
日志收集系统设计
- 组件介绍
- Log Agent: 日志采集客户端,用来收集服务器上的日志
- kafka: 高吞吐的分布式消息队列,linkin开发,apache顶级开源项目
- ES: 开源的搜索引擎,基于http restful风格的web接口
- Hadoop:分布式计算框架
kafka的应用场景
高可用kafka集群部署:https://blog.51cto.com/navyaijm/2429959?source=drh
- 异步处理,把非关键流程异步化,提高系统响应时间和健壮性
- 应用解耦
- 流量削峰:高并发场景
zookeeper应用场景
- 服务注册与发现
- 配置中心
- 分布式锁
- zookeeper是强一致的
- 多个客户端同事在zookeeper上创建相同的znode,只有一个创建成功
日志客户端开发
-
logAgent 设计
- logagent的流程
kafka的使用
- 导入的库: go get github.com/Shopify/sarama
- 使用代码
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//消息配置
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a test,messsage transfer ok")
//连接配置
client,err := sarama.NewSyncProducer([]string{"192.168.56.11:9092"},config)
if err != nil {
fmt.Printf("send message faild,error:%v\n",err)
return
}
defer client.Close()
for {
pid,offset,err := client.SendMessage(msg)
if err != nil {
fmt.Printf("send message faild,err:%v\n",err)
return
}
fmt.Printf("pid:%v,offset:%v\n",pid,offset)
time.Sleep(time.Second)
}
}
tailf 组件的使用
- 导入的库: go get github.com/hpcloud/tail
- 使用代码
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
func main() {
filename := "./my.log"
tails,err := tail.TailFile(filename,tail.Config{
Location: &tail.SeekInfo{
Offset: 0,
Whence: 2,
},
ReOpen: true,
MustExist: false,
Poll: true,
Follow: true,
})
if err != nil {
fmt.Printf("tail file error:%v\n",err)
return
}
var msg *tail.Line
var ok bool
for true {
msg,ok = <- tails.Lines
if !ok {
fmt.Printf("tail file close reopen,filename:%s\n",filename)
time.Sleep(100*time.Millisecond)
continue
}
fmt.Printf("msg=%v\n",msg)
}
}
- kafka消费日志命令参考
[root@centos7-node1 ~]# cd /opt/application/kafka/bin/
[root@centos7-node1 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_log --from-beginning
有疑问加站长微信联系(非本文作者)