Manager
处理session
请求是通过_Dispatcher_Session_Handler
这个函数(./api/dispatcher.pb.go
):
func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SessionRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}
实际函数调用栈如下:
0 0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1 0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2 0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3 0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4 0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5 0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6 0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7 0x0000000000462bf0 in runtime.goexit
at /usr/local/go/src/runtime/asm_amd64.s:1998
Dispatcher.Session()
函数代码如下:
// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
ctx := stream.Context()
nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return err
}
nodeID := nodeInfo.NodeID
if err := d.isRunningLocked(); err != nil {
return err
}
// register the node.
sessionID, err := d.register(stream.Context(), nodeID, r.Description)
if err != nil {
return err
}
fields := logrus.Fields{
"node.id": nodeID,
"node.session": sessionID,
"method": "(*Dispatcher).Session",
}
if nodeInfo.ForwardedBy != nil {
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
log := log.G(ctx).WithFields(fields)
var nodeObj *api.Node
nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
nodeObj = store.GetNode(readTx, nodeID)
return nil
}, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
Checks: []state.NodeCheckFunc{state.NodeCheckID}},
)
if cancel != nil {
defer cancel()
}
if err != nil {
log.WithError(err).Error("ViewAndWatch Node failed")
}
if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
return err
}
if err := stream.Send(&api.SessionMessage{
SessionID: sessionID,
Node: nodeObj,
Managers: d.getManagers(),
NetworkBootstrapKeys: d.networkBootstrapKeys,
}); err != nil {
return err
}
managerUpdates, mgrCancel := d.mgrQueue.Watch()
defer mgrCancel()
keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
defer keyMgrCancel()
// disconnectNode is a helper forcibly shutdown connection
disconnectNode := func() error {
// force disconnect by shutting down the stream.
transportStream, ok := transport.StreamFromContext(stream.Context())
if ok {
// if we have the transport stream, we can signal a disconnect
// in the client.
if err := transportStream.ServerTransport().Close(); err != nil {
log.WithError(err).Error("session end")
}
}
nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
log.WithError(err).Error("failed to remove node")
}
// still return an abort if the transport closure was ineffective.
return grpc.Errorf(codes.Aborted, "node must disconnect")
}
for {
// After each message send, we need to check the nodes sessionID hasn't
// changed. If it has, we will the stream and make the node
// re-register.
node, err := d.nodes.GetWithSession(nodeID, sessionID)
if err != nil {
return err
}
var mgrs []*api.WeightedPeer
var disconnect bool
select {
case ev := <-managerUpdates:
mgrs = ev.([]*api.WeightedPeer)
case ev := <-nodeUpdates:
nodeObj = ev.(state.EventUpdateNode).Node
case <-stream.Context().Done():
return stream.Context().Err()
case <-node.Disconnect:
disconnect = true
case <-d.ctx.Done():
disconnect = true
case <-keyMgrUpdates:
}
if mgrs == nil {
mgrs = d.getManagers()
}
if err := stream.Send(&api.SessionMessage{
SessionID: sessionID,
Node: nodeObj,
Managers: mgrs,
NetworkBootstrapKeys: d.networkBootstrapKeys,
}); err != nil {
return err
}
if disconnect {
return disconnectNode()
}
}
}
这个stream
是处理agent
连接的。前半部分是把连接的agent
记录下来;后半部分是如果cluster
信息发送变化,比如manager
的leader
发生变化,需要通知agent
重新连接。disconnectNode()
函数则是需要同agent node
断开连接时的处理:包括断开连接,agent node
信息删除,等等。
有疑问加站长微信联系(非本文作者)