Node.runAgent()
函数实现如下:
func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
var manager api.Peer
select {
case <-ctx.Done():
case manager = <-n.remotes.WaitSelect(ctx):
}
if ctx.Err() != nil {
return ctx.Err()
}
picker := picker.NewPicker(n.remotes, manager.Addr)
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
if err != nil {
return err
}
agent, err := New(&Config{
Hostname: n.config.Hostname,
Managers: n.remotes,
Executor: n.config.Executor,
DB: db,
Conn: conn,
Picker: picker,
NotifyRoleChange: n.roleChangeReq,
})
if err != nil {
return err
}
if err := agent.Start(ctx); err != nil {
return err
}
n.Lock()
n.agent = agent
n.Unlock()
defer func() {
n.Lock()
n.agent = nil
n.Unlock()
}()
go func() {
<-agent.Ready()
close(ready)
}()
// todo: manually call stop on context cancellation?
return agent.Err(context.Background())
}
上面函数解释如下:
(1)case manager = <-n.remotes.WaitSelect(ctx)
:首先获得manager
;
(2)接下来调用grpc.Dial()
去连接这个manager
:
picker := picker.NewPicker(n.remotes, manager.Addr)
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
if err != nil {
return err
}
(3)生成并运行一个Agent
:
agent, err := New(&Config{
Hostname: n.config.Hostname,
Managers: n.remotes,
Executor: n.config.Executor,
DB: db,
Conn: conn,
Picker: picker,
NotifyRoleChange: n.roleChangeReq,
})
if err != nil {
return err
}
if err := agent.Start(ctx); err != nil {
return err
}
关于Agent
结构体定义:
// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
config *Config
// The latest node object state from manager
// for this node known to the agent.
node *api.Node
keys []*api.EncryptionKey
sessionq chan sessionOperation
worker Worker
started chan struct{}
ready chan struct{}
stopped chan struct{} // requests shutdown
closed chan struct{} // only closed in run
err error // read only after closed is closed
}
其中Config
结构体定义:
// Config provides values for an Agent.
type Config struct {
// Hostname the name of host for agent instance.
Hostname string
// Managers provides the manager backend used by the agent. It will be
// updated with managers weights as observed by the agent.
Managers picker.Remotes
// Conn specifies the client connection Agent will use.
Conn *grpc.ClientConn
// Picker is the picker used by Conn.
// TODO(aaronl): This is only part of the config to allow resetting the
// GRPC connection. This should be refactored to address the coupling
// between Conn and Picker.
Picker *picker.Picker
// Executor specifies the executor to use for the agent.
Executor exec.Executor
// DB used for task storage. Must be open for the lifetime of the agent.
DB *bolt.DB
// NotifyRoleChange channel receives new roles from session messages.
NotifyRoleChange chan<- api.NodeRole
}
注释都很清楚,不必赘述。
Agent.Start()
会调到Agent.Run()
,实现如下:
func (a *Agent) run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(a.closed) // full shutdown.
ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))
log.G(ctx).Debugf("(*Agent).run")
defer log.G(ctx).Debugf("(*Agent).run exited")
var (
backoff time.Duration
session = newSession(ctx, a, backoff) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
)
if err := a.worker.Init(ctx); err != nil {
log.G(ctx).WithError(err).Error("worker initialization failed")
a.err = err
return // fatal?
}
// setup a reliable reporter to call back to us.
reporter := newStatusReporter(ctx, a)
defer reporter.Close()
a.worker.Listen(ctx, reporter)
for {
select {
case operation := <-sessionq:
operation.response <- operation.fn(session)
case msg := <-session.tasks:
if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
log.G(ctx).WithError(err).Error("task assignment failed")
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
close(ready)
}
ready = nil
registered = nil // we only care about this once per session
backoff = 0 // reset backoff
sessionq = a.sessionq
case err := <-session.errs:
// TODO(stevvooe): This may actually block if a session is closed
// but no error was sent. Session.close must only be called here
// for this to work.
if err != nil {
log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
}
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session failed")
}
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
// Bounce the connection.
if a.config.Picker != nil {
a.config.Picker.Reset()
}
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
// select a session registration delay from backoff range.
delay := time.Duration(rand.Int63n(int64(backoff)))
session = newSession(ctx, a, delay)
registered = session.registered
sessionq = a.sessionq
case <-a.stopped:
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
// this loop a few times.
return
case <-ctx.Done():
if a.err == nil {
a.err = ctx.Err()
}
return
}
}
}
其中重要的是session
这个概念,通过“session = newSession(ctx, a, backoff)
”这行代码将session
和Agent
关联起来。
有疑问加站长微信联系(非本文作者)