golang nsq源码分析&添加中文注释系列(二):Nsqd入口主流程

晓亮1988 · · 867 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

以往看网上的源码分析时,基本都是前面一段讲解,后面跟一大段代码,没有上下文分析,我就暗暗的想,如果一个函数或一段逻辑能有中文注释(俺小本毕业英语不太好)带有上下文分析,这样读源码岂不是会更快顺畅。。。不废话了,我们开始吧。

上一篇大概讲解了基本介绍,我们也把Nsqd一步一步跑起来了(假设您已动手尝试过),本篇则从源码入口开始讲解

前言

  • 针对特殊的包或者方法,会单独开一篇博客讲解,请注意代码里面的链接地址,建议手动尝试一下
  • 文章宗旨是学习大神的每一行代码,所以看起来会比较啰嗦,建议您一边看代码一遍读文章(效果更佳)

NSQ整体流程

NSQ由3个守护进程组成:

  • nsqd 是接收、保存和传送消息到客户端的守护进程
  • nsqlookupd 是管理的拓扑信息,维护着所有nsqd的状态,并提供了最终一致发现服务的守护进程
  • nsqadmin 是一个web ui的实时监控集群和执行各种管理任务
    image

nsqd入口文件:nsq/apps/nsqd/main.go

废话不多说,都在酒里了(代码里),直接看注释就能理解

package main

import (
    "flag"
    "fmt"
    "math/rand"
    "os"
    "path/filepath"
    "sync"
    "syscall"
    "time"

    // toml开源包
    "github.com/BurntSushi/toml"
    // go-options开源包
    "github.com/mreiferson/go-options"
    // 内部版本号
    "github.com/nsqio/nsq/internal/version"
    // 命令行控制包svc 服务控制
    "github.com/judwhite/go-svc/svc"

    // 内部包 日志中间件 log
    "github.com/nsqio/nsq/internal/lg"

    // nsqd真正工作的区域
    "github.com/nsqio/nsq/nsqd"
)

/*
定义业务program结构体
*/
type program struct {

    // once能确保实例化对象Do方法在多线程环境只运行一次,内部通过互斥锁实现
    once sync.Once
    nsqd *nsqd.NSQD
}

/*
采用SVC包进行服务控制,主要是统一管理服务,对于信号控制不用每次都写在业务上,在ctrl+c时,能正常监听defer结束,方便获取很多日志,参数等
*/
func main() {
    // 实例化
    prg := &program{}
    /*
    http://shuchimao.com/2019/12/20/golang服务控制实践-svc包转/
    
    // Implement this interface and pass it to the Run function to start your program.
type Service interface {
    // Init is called before the program/service is started and after it's
    // determined if the program is running as a Windows Service.
    Init(Environment) error

    // Start is called after Init. This method must be non-blocking.
    Start() error

    // Stop is called in response to os.Interrupt, os.Kill, or when a
    // Windows Service is stopped.
    Stop() error
}
    svc 第一个参数需要实现Service接口才可以正常运行,这也就是大伙看到的program 实现的init/start/stop三个函数

    使用svc启动相关程序
    */
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        logFatal("%s", err)
    }
}

func (p *program) Init(env svc.Environment) error {
    // 检查是否是windows 服务。。。目测一般时候也用不到
    if env.IsWindowsService() {
        dir := filepath.Dir(os.Args[0])
        return os.Chdir(dir)
    }
    return nil
}

func (p *program) Start() error {
    /*
    实例化并初始一些配置和默认值
    /nsq/nsqd/options.go
    */
    opts := nsqd.NewOptions()

    /*
    封装了命令行的一些检查项,设置检查项的默认值
    使用apps目录:/nsq/apps/nsqd/options.go
    然后parse解析
    */
    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])

    // 生成随机数 time.Now().UnixNano()  单位纳秒
    rand.Seed(time.Now().UTC().UnixNano())

    // 打印版本号,接收命令行参数version  默认值:false
    /*
    执行效果
    bj-m-server:nsqd yixia$ go run ./ --version=true
    nsqd v1.2.1-alpha (built w/go1.12.2)
    */
    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
        // 打印版本号  %!V(string=nsqd v1.2.1-alpha (built w/go1.12.2))
        fmt.Println(version.String("nsqd"))
        os.Exit(0)
    }

    // 获取外部的配置文件,解析toml文件格式
    var cfg config
    /*
    bj-m-server:nsqd yixia$ go run ./ --config=config.toml
    */
    configFile := flagSet.Lookup("config").Value.String()
    // 如果不为空
    if configFile != "" {
        // 加载,读出的数据采用空_  抛弃,赋值给cfg
        _, err := toml.DecodeFile(configFile, &cfg)
        // 抛错
        if err != nil {
            logFatal("failed to load config file %s - %s", configFile, err)
        }
    }
    // 检查配置文件
    cfg.Validate()

    // 采用优先级从高到低依次进行解析,最终
    options.Resolve(opts, flagSet, cfg)
    /*
    传入用户自定义配置,实例化nsqd

    nsqd.new以后做了那些事情,大概捋一下,后续看的时候能加深印象
    1、检查命令行cli   opts.DataPath 、  opts.Logger没设置  设置默认值
    2、实例NSQD主对象
    3、监听tcp   net.Listen("tcp", opts.TCPAddress)
    4、监听http  net.Listen("tcp", opts.HTTPAddress)
    5、监听https tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)

    综合以上了解,基本做的事情就是实例化主对象,并对cli 自定义的命令一顿操作。。。然后就这样了,return (*NSQD, error)
    */
    nsqd, err := nsqd.New(opts)
    if err != nil {
        logFatal("failed to instantiate nsqd - %s", err)
    }
    p.nsqd = nsqd

    /*
    加载历史数据,数据来源nsqd.dat -> 历史数据格式{"topics":[],"version":"1.2.1-alpha"}
    1、获取历史数据
    2、解析成对应的结构体 json.Unmarshal(data, &m)
    3、遍历 for _, t := range m.Topics , 解析每个topic -> channel
    4、启动N个topic.Start()(重点代码中有一个GetTopic,采用线程线程安全方式,重点学习)
    5、func (n *NSQD) LoadMetadata() error
    */
    err = p.nsqd.LoadMetadata()
    if err != nil {
        logFatal("failed to load metadata - %s", err)
    }

    /*
    持久化最新数据
    1、获取原始数据文件名 fileName := newMetadataFile(n.getOpts())
    2、遍历 nsqd.topicMap -> ndqd.channelMap  ,这是对topicMap和channelMap加了互斥锁
    3、将最新数据写入到临时文件中,明文明文件名为:nsqd.dat.333569681738193261.tmp
    4、func (n *NSQD) PersistMetadata() error
    */
    err = p.nsqd.PersistMetadata()
    if err != nil {
        logFatal("failed to persist metadata - %s", err)
    }

    /*
    开启协程进入nsqd.Main主函数
    Main方法里重点使用了封装的WaitGroup
    下列出现的n.waitGroup.Wrap均采用了封装groutine
    可以看我另一篇文章讲解了封装的流程和使用方法:http://shuchimao.com/2019/12/24/golang积累-waitgroup包装/

    方法:
    func (n *NSQD) Main() error {

    大体执行思路
    1、实例化context
    2、建立退出通道,保证退出函数只运行一次,创建了匿名函数exitFunc
    3、初始化并监听TCPServer
    3.1、exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    3.2、TCPServer采用无限循环方式监听tcp client长连接,当有一个client连接,分配一个groutine进行处理
    for -> listener.Accept() -> groutine
    4、初始化HTTPServer
    4.1、使用httprouter进行路由设置,然后初始化各种接口

    5、初始化HttpsServer
    6、监控循环队列:n.waitGroup.Wrap(n.queueScanLoop)
    7、节点信息管理:n.waitGroup.Wrap(n.lookupLoop)
    8、统计信息:n.waitGroup.Wrap(n.statsdLoop)
    */
    go func() {
        err := p.nsqd.Main()
        if err != nil {
            p.Stop()
            os.Exit(1)
        }
    }()

    return nil
}

func (p *program) Stop() error {
    /*
        /*
    底层源码
    func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 1 {
        return
    }
    // Slow-path.
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

    可以看成这样的链式操作
    p.once.Do == program.once.Do

    确保在执行时只执行一次退出操作
    */

    p.once.Do(func() {
        p.nsqd.Exit()
    })
    return nil
}

func logFatal(f string, args ...interface{}) {
    lg.LogFatal("[nsqd] ", f, args...)
}

Nsqd流程图

偷个懒,从网上摘录的流程图直接拿下来了大家先看看

小结

nsqd代码逻辑清晰,利用Go协程高效并发处理分布式多节点nsqd的生产和消费,学习并发处理nsqd是最佳项目,每行代码都值得学习,坚持读每一行代码相信大伙一定会受益匪浅的,等我们把这项目2000多行代码都读差不多了,在回头看看成长,绝对比看几本书学的快,学以致用,多动手,多练习。

下一章具体分析nsqd主程序。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:晓亮1988

查看原文:golang nsq源码分析&添加中文注释系列(二):Nsqd入口主流程

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

867 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传