micro.newService()中newOptions
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {
o(&opt)
}
return opt
}
初始化了一堆基础设置,来看看configconfig.DefaultConfig,
在config/config.go中
var (
// Default Config Manager
DefaultConfig, _ = NewConfig()
)
// NewConfig returns new config
func NewConfig(opts ...Option) (Config, error) {
return newConfig(opts...)
}
func newConfig(opts ...Option) (Config, error) {
var c config
c.Init(opts...)
go c.run()
return &c, nil
}
func (c *config) Init(opts ...Option) error {
c.opts = Options{
Reader: json.NewReader(),
}
c.exit = make(chan bool)
for _, o := range opts {
o(&c.opts)
}
// default loader uses the configured reader
if c.opts.Loader == nil {
c.opts.Loader = memory.NewLoader(memory.WithReader(c.opts.Reader))
}
err := c.opts.Loader.Load(c.opts.Source...)
if err != nil {
return err
}
c.snap, err = c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
if err != nil {
return err
}
return nil
}
func (c *config) run() {
watch := func(w loader.Watcher) error {
for {
// get changeset
snap, err := w.Next()
if err != nil {
return err
}
c.Lock()
if c.snap.Version >= snap.Version {
c.Unlock()
continue
}
// save
c.snap = snap
// set values
c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
c.Unlock()
}
}
for {
w, err := c.opts.Loader.Watch()
if err != nil {
time.Sleep(time.Second)
continue
}
done := make(chan bool)
// the stop watch func
go func() {
select {
case <-done:
case <-c.exit:
}
w.Stop()
}()
// block watch
if err := watch(w); err != nil {
// do something better
time.Sleep(time.Second)
}
// close done chan
close(done)
// if the config is closed exit
select {
case <-c.exit:
return
default:
}
}
}
看看Init()
左做了什么
- 初始化并设置opts,创建exit用于监听退出信号,设置opts
设置默认loader,c.opts.Loader默认是memory
memory.NewLoader()
[config/loader/memory/memory.go]- 初始化并设置opts,包含Reader[默认json]
- 初始化memory{}
设置m.sets,并
watch()
每个options.Source,看看watch()
做了什么定义watch()函数
调用watcher.Next(),下面看看next()做了什么
- 定义update()函数,返回loader.Snapshot{}
- 监听watcher.exit,watcher.updates信号,有更新时且版本更新时,调用上面的update()函数,更新watcher.value并返回loader.Snapshot{}
- 保存m.sets[idx],值为loader.Snapshot{}
- 合并所有m.sets
- 读取所有值到m.vals,保存快照到m.snap
调用update()
- 获取所有watcher,如果版本有更新,则发送watcher.updates信号
调用Watch()函数返回watcher,注意W是大写,调用的是memory.Watch()
- 调用Get(),返回m.vals.Get(path...)
- 初始化watcher,并添加到m.watchers【双向链表】
- 开协程,监听watcher.exit信号,收到信号从watchers中移除当前watcher
- 开协程,监听完成信号done和exit信号,收到信号后执行Stop(),关闭exit,updates这2个channel
- 调用上面定义的watch()
- 关闭done channel,监听m.exit信号
调用c.opts.Loader.Load()
- 循环所有source,更新m.sources,m.sets,并watch()所有source
调用
reload()
- 合并所有sets
- 设置m.vals,m.snap
- 调用m.update()
调用
c.opts.Loader.Snapshot()
- 如已经load,直接复制一份并返回m.snap
- 没载入就调用
Sync()
同步配置 - 复制一份m.snap返回
- 调用
c.opts.Reader.Values()
,赋值config.vals【reader.Values
类型】
绕来绕去,终于完了。
主流程其实并不复杂,主要是涉及到watch更新,所以比较绕。
config这块其实是比较独立的包,可以在其他项目中引用。
go micro 分析系列文章
go micro server 启动分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 网关鉴权
go micro 链路追踪
go micro 熔断与限流
go micro wrapper 中间件
go micro metrics 接入Prometheus、Grafana
有疑问加站长微信联系(非本文作者)