agent.Node
结构体有4
个channel
,理解它们的作用就可以理解swarmd
程序的框架:
// Node implements the primary node functionality for a member of a swarm
// cluster. Node handles workloads and may also run as a manager.
type Node struct {
......
started chan struct{}
stopped chan struct{}
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
......
closed chan struct{}
......
}
swarmd
程序的框架(其中executor
通过engine-addr
得到,代表最终运行task
的实体,实际是一个Docker engineapi.APIClient
。其它参数都通过命令行直接得到。):
......
// Create a context for our GRPC call
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......
n, err := agent.NewNode(&agent.NodeConfig{
Hostname: hostname,
ForceNewCluster: forceNewCluster,
ListenControlAPI: unix,
ListenRemoteAPI: addr,
JoinAddr: managerAddr,
StateDir: stateDir,
JoinToken: joinToken,
ExternalCAs: externalCAOpt.Value(),
Executor: executor,
HeartbeatTick: hb,
ElectionTick: election,
})
if err != nil {
return err
}
if err := n.Start(ctx); err != nil {
return err
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
n.Stop(ctx)
}()
go func() {
select {
case <-n.Ready():
case <-ctx.Done():
}
if ctx.Err() == nil {
logrus.Info("node is ready")
}
}()
return n.Err(ctx)
(1)
if err := n.Start(ctx); err != nil {
return err
}
看一下Node.Start()
函数的实现:
// Start starts a node instance.
func (n *Node) Start(ctx context.Context) error {
select {
case <-n.started:
select {
case <-n.closed:
return n.err
case <-n.stopped:
return errAgentStopped
case <-ctx.Done():
return ctx.Err()
default:
return errAgentStarted
}
case <-ctx.Done():
return ctx.Err()
default:
}
close(n.started)
go n.run(ctx)
return nil
}
如果执行Node.Start()
时没有任何异常发生,就会把Node.started
这个channel
关掉(close(n.started)
),然后启动这个节点初始化过程:go n.run(ctx)
。
(2)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
n.Stop(ctx)
}()
这段代码的含义是用户按Ctrl+C
可以中断程序。Node.Stop()
函数实现如下:
// Stop stops node execution
func (n *Node) Stop(ctx context.Context) error {
select {
case <-n.started:
select {
case <-n.closed:
return n.err
case <-n.stopped:
select {
case <-n.closed:
return n.err
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
default:
close(n.stopped)
// recurse and wait for closure
return n.Stop(ctx)
}
case <-ctx.Done():
return ctx.Err()
default:
return errAgentNotStarted
}
}
由于此时Node.started
这个channel
已经被关掉,所以会永远执行select
的第一个case
分支:case <-n.started
。然后会根据当时的情况,再决定执行哪个分支。
(3)
go func() {
select {
case <-n.Ready():
case <-ctx.Done():
}
if ctx.Err() == nil {
logrus.Info("node is ready")
}
}()
Node.Ready()
函数会返回Node.ready
这个channel
:
// Ready returns a channel that is closed after node's initialization has
// completes for the first time.
func (n *Node) Ready() <-chan struct{} {
return n.ready
}
当Node
初始化完成后,Node.ready
这个channel
就会被关掉。因此如果一切顺利的话,就会看到“node is ready
”的log
。
(4)
return n.Err(ctx)
Node.Err()
函数的实现:
// Err returns the error that caused the node to shutdown or nil. Err blocks
// until the node has fully shut down.
func (n *Node) Err(ctx context.Context) error {
select {
case <-n.closed:
return n.err
case <-ctx.Done():
return ctx.Err()
}
}
Node.Err()
函数阻塞在这里,等待Node
关闭。
有疑问加站长微信联系(非本文作者)