micro.newService()中newOptions
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {
o(&opt)
}
return opt
}
初始化了一堆基础设置,来看看Registryregistry.DefaultRegistry,
在registry/registry.go中的DefaultRegistry = NewRegistry()
// NewRegistry returns a new default registry which is mdns
func NewRegistry(opts ...Option) Registry {
return newRegistry(opts...)
}
func newRegistry(opts ...Option) Registry {
options := Options{
Context: context.Background(),
Timeout: time.Millisecond * 100,
}
for _, o := range opts {
o(&options)
}
// set the domain
defaultDomain := DefaultDomain
d, ok := options.Context.Value("mdns.domain").(string)
if ok {
defaultDomain = d
}
return &mdnsRegistry{
defaultDomain: defaultDomain,
globalDomain: globalDomain,
opts: options,
domains: make(map[string]services),
watchers: make(map[string]*mdnsWatcher),
}
}
这里做了以下事情:
- 初始化并设置Options
- 设置defaultDomain,默认micro,如果options.Context中定义了mdns.domain,则使用这里定义的
- 返回mdnsRegistry{}实例
在micro server篇中介绍了service的启动过程service.Run()
中调用了s.Start()
,s.Start()
中调用了s.opts.Server.Start()
,这里的s.opts.Server就是micro/defaults.go中定义的server.DefaultServer = gsrv.NewServer()
那我们去看server/grpc/grpc.go中的Start()
func (g *grpcServer) Start() error {
g.RLock()
if g.started {
g.RUnlock()
return nil
}
g.RUnlock()
config := g.Options()
// micro: config.Transport.Listen(config.Address)
var ts net.Listener
if l := g.getListener(); l != nil {
ts = l
} else {
var err error
// check the tls config for secure connect
if tc := config.TLSConfig; tc != nil {
ts, err = tls.Listen("tcp", config.Address, tc)
// otherwise just plain tcp listener
} else {
ts, err = net.Listen("tcp", config.Address)
}
if err != nil {
return err
}
}
if g.opts.Context != nil {
if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {
ts = netutil.LimitListener(ts, c)
}
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
}
g.Lock()
g.opts.Address = ts.Addr().String()
g.Unlock()
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
// announce self to the world
if err := g.Register(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server register error: %v", err)
}
}
// micro: go ts.Accept(s.accept)
go func() {
if err := g.srv.Serve(ts); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("gRPC Server start error: %v", err)
}
}
}()
go func() {
t := new(time.Ticker)
// only process if it exists
if g.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(g.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := g.Register(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("Server register error: ", err)
}
}
// wait for exit
case ch = <-g.exit:
break Loop
}
}
// deregister self
if err := g.Deregister(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("Server deregister error: ", err)
}
}
// wait for waitgroup
if g.wg != nil {
g.wg.Wait()
}
// stop the grpc server
exit := make(chan bool)
go func() {
g.srv.GracefulStop()
close(exit)
}()
select {
case <-exit:
case <-time.After(time.Second):
g.srv.Stop()
}
// close transport
ch <- nil
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}()
// mark the server as started
g.Lock()
g.started = true
g.Unlock()
return nil
}
这个过程在micro server篇中有介绍,现在只看registry部分,注册后开一个协程定时注册
g.Register()
注册到服务发现
func (g *grpcServer) Register() error {
g.RLock()
rsvc := g.rsvc
config := g.opts
g.RUnlock()
regFunc := func(service *registry.Service) error {
var regErr error
for i := 0; i < 3; i++ {
// set the ttl and namespace
rOpts := []registry.RegisterOption{
registry.RegisterTTL(config.RegisterTTL),
registry.RegisterDomain(g.opts.Namespace),
}
// attempt to register
if err := config.Registry.Register(service, rOpts...); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}
return regErr
}
// if service already filled, reuse it and return early
if rsvc != nil {
if err := regFunc(rsvc); err != nil {
return err
}
return nil
}
var err error
var advt, host, port string
var cacheService bool
// check the advertise address first
// if it exists then use it, otherwise
// use the address
if len(config.Advertise) > 0 {
advt = config.Advertise
} else {
advt = config.Address
}
if cnt := strings.Count(advt, ":"); cnt >= 1 {
// ipv6 address in format [host]:port or ipv4 host:port
host, port, err = net.SplitHostPort(advt)
if err != nil {
return err
}
} else {
host = advt
}
if ip := net.ParseIP(host); ip != nil {
cacheService = true
}
addr, err := addr.Extract(host)
if err != nil {
return err
}
// make copy of metadata
md := meta.Copy(config.Metadata)
// register service
node := ®istry.Node{
Id: config.Name + "-" + config.Id,
Address: mnet.HostPort(addr, port),
Metadata: md,
}
node.Metadata["broker"] = config.Broker.String()
node.Metadata["registry"] = config.Registry.String()
node.Metadata["server"] = g.String()
node.Metadata["transport"] = g.String()
node.Metadata["protocol"] = "grpc"
g.RLock()
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range g.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
handlerList = append(handlerList, n)
}
}
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range g.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, n := range handlerList {
endpoints = append(endpoints, g.handlers[n].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
g.RUnlock()
service := ®istry.Service{
Name: config.Name,
Version: config.Version,
Nodes: []*registry.Node{node},
Endpoints: endpoints,
}
g.RLock()
registered := g.registered
g.RUnlock()
if !registered {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
}
// register the service
if err := regFunc(service); err != nil {
return err
}
// already registered? don't need to register subscribers
if registered {
return nil
}
g.Lock()
defer g.Unlock()
for sb := range g.subscribers {
handler := g.createSubHandler(sb, g.opts)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue))
}
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
}
if !sb.Options().AutoAck {
opts = append(opts, broker.DisableAutoAck())
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
g.registered = true
if cacheService {
g.rsvc = service
}
return nil
}
这里做了以下事情:
rsvc := g.rsvc,定义regFunc()
- 定义[]registry.RegisterOption{}
- 调用config.Registry.Register()注册,失败会重试3次
- rsvc不为空就调用regFunc()注册了并返回了,空就往下走,继续注册
- 验证host,port,复制metadata,定义registry.Node{},在metadata中增加broker,registry,server,transport,protocol
- g.handlers放到handlerList中(非内部handle),排个序,保持一致性。g.subscribers也放到subscriberList,按topic排序。最后都放入endpoints
- 定义registry.Service{},调用regFunc()注册,如果没有错误,也没有订阅需要处理就返回
- 处理订阅
到registry/mdns_registry.go中看看Register()
func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {
m.Lock()
// parse the options
var options RegisterOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = m.defaultDomain
}
// create the domain in the memory store if it doesn't yet exist
if _, ok := m.domains[options.Domain]; !ok {
m.domains[options.Domain] = make(services)
}
// create the wildcard entry used for list queries in this domain
entries, ok := m.domains[options.Domain][service.Name]
if !ok {
entry, err := createServiceMDNSEntry(service.Name, options.Domain)
if err != nil {
m.Unlock()
return err
}
entries = append(entries, entry)
}
var gerr error
for _, node := range service.Nodes {
var seen bool
for _, entry := range entries {
if node.Id == entry.id {
seen = true
break
}
}
// this node has already been registered, continue
if seen {
continue
}
txt, err := encode(&mdnsTxt{
Service: service.Name,
Version: service.Version,
Endpoints: service.Endpoints,
Metadata: node.Metadata,
})
if err != nil {
gerr = err
continue
}
host, pt, err := net.SplitHostPort(node.Address)
if err != nil {
gerr = err
continue
}
port, _ := strconv.Atoi(pt)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
}
// we got here, new node
s, err := mdns.NewMDNSService(
node.Id,
service.Name,
options.Domain+".",
"",
port,
[]net.IP{net.ParseIP(host)},
txt,
)
if err != nil {
gerr = err
continue
}
srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})
if err != nil {
gerr = err
continue
}
entries = append(entries, &mdnsEntry{id: node.Id, node: srv})
}
// save the mdns entry
m.domains[options.Domain][service.Name] = entries
m.Unlock()
// register in the global Domain so it can be queried as one
if options.Domain != m.globalDomain {
srv := *service
srv.Nodes = nil
for _, n := range service.Nodes {
node := n
// set the original domain in node metadata
if node.Metadata == nil {
node.Metadata = map[string]string{"domain": options.Domain}
} else {
node.Metadata["domain"] = options.Domain
}
srv.Nodes = append(srv.Nodes, node)
}
if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil {
gerr = err
}
}
return gerr
}
这里做了以下事情:
- 设置optionsentries
- 创建m.domains[options.Domain],并赋值entries
- 循环每个service.Nodes,entries看有没有注册过,有就跳过
- 编码mdnsTxt{},调用mdns.NewMDNSService()得到一个新node,在调用mdns.NewServer()得到mdns.Server,包装到mdnsEntry{},放入entries,在存入m.domainsoptions.Domain
- 如果options.Domain != m.globalDomain,设置service.node中的Metadata["domain"]为options.Domain,注册到global Domain中
这里是默认的mdns实现,实际使用中可以指定consul,etcd等,具体的流程请见各自的Register()
go micro 分析系列文章
go micro server 启动分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 网关鉴权
go micro 链路追踪
go micro 熔断与限流
go micro wrapper 中间件
go micro metrics 接入Prometheus、Grafana
有疑问加站长微信联系(非本文作者)