基于go-micro 2.9.1版本,
样例代码example/greeter,git commit:3b3de68cded8879ca3dde5d81192f2881619aabd
一个微服务server的核心只有3步
service := micro.NewService()
service.Init()
service.Run()
先看micro.NewService()
service := micro.NewService(
micro.Name("greeter"),
micro.Version("latest"),
micro.Metadata(map[string]string{
"type": "helloworld",
}),
)
micro.NewService的参数都会返回一个Option
,
这些参数没有做任何事情,只是返回了一些设置用的函数,
这种写法是“函数选项模式”,可参考https://segmentfault.com/a/11...
先看NewService()
里面做了什么
// Name of the service
func Name(n string) Option {
return func(o *Options) {
o.Server.Init(server.Name(n))
}
}
//Option函数在micro.go中定义
//Options结构体在options.go中定义
type Option func(*Options)
// Options for micro service
type Options struct {
Auth auth.Auth
Broker broker.Broker
Cmd cmd.Cmd
Config config.Config
Client client.Client
Server server.Server
Store store.Store
Registry registry.Registry
Router router.Router
Runtime runtime.Runtime
Transport transport.Transport
Profile profile.Profile
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
AfterStart []func() error
AfterStop []func() error
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
Signal bool
}
server.Name(n)实现这样,通过micro包提供的函数设置micro.options
// Server name
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
NewService中调用了service.go中的newService(opts...)
// NewService creates and returns a new Service based on the packages within.
func NewService(opts ...Option) Service {
return newService(opts...)
}
type service struct {
opts Options
once sync.Once
}
func newService(opts ...Option) Service {
service := new(service)
options := newOptions(opts...)
// service name
serviceName := options.Server.Options().Name
// we pass functions to the wrappers since the values can change during initialisation
authFn := func() auth.Auth { return options.Server.Options().Auth }
cacheFn := func() *client.Cache { return options.Client.Options().Cache }
// wrap client to inject From-Service header on any calls
options.Client = wrapper.FromService(serviceName, options.Client)
options.Client = wrapper.TraceCall(serviceName, trace.DefaultTracer, options.Client)
options.Client = wrapper.CacheClient(cacheFn, options.Client)
options.Client = wrapper.AuthClient(authFn, options.Client)
// wrap the server to provide handler stats
options.Server.Init(
server.WrapHandler(wrapper.HandlerStats(stats.DefaultStats)),
server.WrapHandler(wrapper.TraceHandler(trace.DefaultTracer)),
server.WrapHandler(wrapper.AuthHandler(authFn)),
)
// set opts
service.opts = options
return service
}
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {
o(&opt)
}
return opt
}
实例化service结构体,初始化参数并赋值到opts
- 先初始化一些Options属性
- 再从opts中遍历执行micro.NewService的参数(实际就是函数),设置各种值
为options.Client增加几个wrapper,ctx中增加了4个键值对,在client发起请求的时候回放到header中
- wrapper.FromService()中,ctx增加
Micro-From-Service
- wrapper.TraceHandler(trace.DefaultTracer) -> t.Start(ctx, req.Service()+"."+req.Endpoint()) -> Tracer.start()(trace/memory/memory.go),ctx增加
Micro-Trace-Id, Micro-Span-Id
- server.WrapHandler(wrapper.AuthHandler(authFn)), ctx增加
Micro-Namespace
- wrapper.FromService()中,ctx增加
- 为options.Server增加几个wrapper
options.Server.Init(server.Name(n))
这里的options.Server是哪里来的呢,前面没见有初始化这个属性的地方,其实在go-micro/defaults.go的init()中
func init() {
// default client
client.DefaultClient = gcli.NewClient()
// default server
server.DefaultServer = gsrv.NewServer()
// default store
store.DefaultStore = memoryStore.NewStore()
// set default trace
trace.DefaultTracer = memTrace.NewTracer()
}
init()中定义了4个变量,server,client,store,trace
,需要注意的是这里的server是默认的grpc,是micro包内部变量,在其他地方无法直接访问
o.Server.Init(server.Name(n)) 的 Init()
则是server/grpc.go中的init(),初始化grpcServer.opts[类型是server.Options]的一些属性,如server.Name(n)设置的是grpcServer.opts.Name
grpc.configure()中的其他部分这里暂不细看
func (g *grpcServer) Init(opts ...server.Option) error {
g.configure(opts...)
return nil
}
func (g *grpcServer) configure(opts ...server.Option) {
g.Lock()
defer g.Unlock()
// Don't reprocess where there's no config
if len(opts) == 0 && g.srv != nil {
return
}
for _, o := range opts {
o(&g.opts)
}
maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxMsgSize),
grpc.MaxSendMsgSize(maxMsgSize),
grpc.UnknownServiceHandler(g.handler),
}
if creds := g.getCredentials(); creds != nil {
gopts = append(gopts, grpc.Creds(creds))
}
if opts := g.getGrpcOptions(); opts != nil {
gopts = append(gopts, opts...)
}
g.rsvc = nil
g.srv = grpc.NewServer(gopts...)
}
下面再看service.Init()
// Init will parse the command line flags. Any flags set will
// override the above settings. Options defined here will
// override anything set on the command line.
service.Init(
// Add runtime action
// We could actually do this above
micro.Action(func(c *cli.Context) error {
if c.Bool("run_client") {
runClient(service)
os.Exit(0)
}
return nil
}),
)
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
func (s *service) Init(opts ...Option) {
// process options
for _, o := range opts {
o(&s.opts)
}
s.once.Do(func() {
// setup the plugins
for _, p := range strings.Split(os.Getenv("MICRO_PLUGIN"), ",") {
if len(p) == 0 {
continue
}
// load the plugin
c, err := plugin.Load(p)
if err != nil {
logger.Fatal(err)
}
// initialise the plugin
if err := plugin.Init(c); err != nil {
logger.Fatal(err)
}
}
// set cmd name
if len(s.opts.Cmd.App().Name) == 0 {
s.opts.Cmd.App().Name = s.Server().Options().Name
}
// Initialise the command flags, overriding new service
if err := s.opts.Cmd.Init(
cmd.Auth(&s.opts.Auth),
cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Runtime(&s.opts.Runtime),
cmd.Transport(&s.opts.Transport),
cmd.Client(&s.opts.Client),
cmd.Config(&s.opts.Config),
cmd.Server(&s.opts.Server),
cmd.Store(&s.opts.Store),
cmd.Profile(&s.opts.Profile),
); err != nil {
logger.Fatal(err)
}
// Explicitly set the table name to the service name
name := s.opts.Cmd.App().Name
s.opts.Store.Init(store.Table(name))
})
}
- 和micro.NewService的参数处理一样,初始化参数
s.once.Do(),只执行一次
- 加载插件
- 设置cmd名字
- 初始化命令行参数,覆盖service中的属性
- 显式地将cmd名字设置为服务名
最后一步service.Run()
func (s *service) Run() error {
// register the debug handler
s.opts.Server.Handle(
s.opts.Server.NewHandler(
handler.NewHandler(s.opts.Client),
server.InternalHandler(true),
),
)
// start the profiler
if s.opts.Profile != nil {
// to view mutex contention
rtime.SetMutexProfileFraction(5)
// to view blocking profile
rtime.SetBlockProfileRate(1)
if err := s.opts.Profile.Start(); err != nil {
return err
}
defer s.opts.Profile.Stop()
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Starting [service] %s", s.Name())
}
if err := s.Start(); err != nil {
return err
}
ch := make(chan os.Signal, 1)
if s.opts.Signal {
signal.Notify(ch, signalutil.Shutdown()...)
}
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-s.opts.Context.Done():
}
return s.Stop()
}
- 注册debug handler
- 启动profiler,控制台输出启动信息
- s.start(),后面再看
- 监听退出Signal和ctx取消信号,收到信号后执行s.stop()
func (s *service) Start() error {
for _, fn := range s.opts.BeforeStart {
if err := fn(); err != nil {
return err
}
}
if err := s.opts.Server.Start(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
}
}
return nil
}
func (s *service) Stop() error {
var gerr error
for _, fn := range s.opts.BeforeStop {
if err := fn(); err != nil {
gerr = err
}
}
if err := s.opts.Server.Stop(); err != nil {
return err
}
for _, fn := range s.opts.AfterStop {
if err := fn(); err != nil {
gerr = err
}
}
return gerr
}
启动:
- 依次执行
s.opts.BeforeStart
列表中的函数 - 启动服务
s.opts.Server.Start()
,具体就看用的什么服务了 - 依次执行
s.opts.AfterStart
列表中的函数
退出:
退出流程与启动流程一致,依次执行s.opts.BeforeStop,s.opts.Server.Stop(),s.opts.AfterStop
BeforeStart的例子,其他的类似
func aa() error {
fmt.Println("beforestart fmt")
return nil
}
service := micro.NewService(
micro.BeforeStart(aa),
)
默认的store.DefaultStore
使用https://github.com/patrickmn/...
在memory/memory.go中做了一些封装
其他init()
,在golang中引用的包,会自动执行init()
logger/default.go 初始化logger
micro.NewService()中的所有设置选项见go-micro/options.go,可以参见【Micro In Action(二):项目结构与启动过程】
https://medium.com/@dche423/m...
这就是go micro微服务的启动过程,一定要先了解函数选项模式
才有助于理解go micro。
有疑问加站长微信联系(非本文作者)