IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Swarmkit笔记(10)——manager处理session请求

    nanxiao发表于 2016-08-05 03:16:32
    love 0

    Manager处理session请求是通过_Dispatcher_Session_Handler这个函数(./api/dispatcher.pb.go):

    func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
        m := new(SessionRequest)
        if err := stream.RecvMsg(m); err != nil {
            return err
        }
        return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
    }
    

    实际函数调用栈如下:

    0  0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
       at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
    1  0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
       at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
    2  0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
       at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
    3  0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
       at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
    4  0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
       at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
    5  0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
       at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
    6  0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
       at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
    7  0x0000000000462bf0 in runtime.goexit
       at /usr/local/go/src/runtime/asm_amd64.s:1998
    

    Dispatcher.Session()函数代码如下:

    // Session is a stream which controls agent connection.
    // Each message contains list of backup Managers with weights. Also there is
    // a special boolean field Disconnect which if true indicates that node should
    // reconnect to another Manager immediately.
    func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
        ctx := stream.Context()
        nodeInfo, err := ca.RemoteNode(ctx)
        if err != nil {
            return err
        }
        nodeID := nodeInfo.NodeID
    
        if err := d.isRunningLocked(); err != nil {
            return err
        }
    
        // register the node.
        sessionID, err := d.register(stream.Context(), nodeID, r.Description)
        if err != nil {
            return err
        }
    
        fields := logrus.Fields{
            "node.id":      nodeID,
            "node.session": sessionID,
            "method":       "(*Dispatcher).Session",
        }
        if nodeInfo.ForwardedBy != nil {
            fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
        }
        log := log.G(ctx).WithFields(fields)
    
        var nodeObj *api.Node
        nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
            nodeObj = store.GetNode(readTx, nodeID)
            return nil
        }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
            Checks: []state.NodeCheckFunc{state.NodeCheckID}},
        )
        if cancel != nil {
            defer cancel()
        }
    
        if err != nil {
            log.WithError(err).Error("ViewAndWatch Node failed")
        }
    
        if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
            return err
        }
    
        if err := stream.Send(&api.SessionMessage{
            SessionID:            sessionID,
            Node:                 nodeObj,
            Managers:             d.getManagers(),
            NetworkBootstrapKeys: d.networkBootstrapKeys,
        }); err != nil {
            return err
        }
    
        managerUpdates, mgrCancel := d.mgrQueue.Watch()
        defer mgrCancel()
        keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
        defer keyMgrCancel()
    
        // disconnectNode is a helper forcibly shutdown connection
        disconnectNode := func() error {
            // force disconnect by shutting down the stream.
            transportStream, ok := transport.StreamFromContext(stream.Context())
            if ok {
                // if we have the transport stream, we can signal a disconnect
                // in the client.
                if err := transportStream.ServerTransport().Close(); err != nil {
                    log.WithError(err).Error("session end")
                }
            }
    
            nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
            if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
                log.WithError(err).Error("failed to remove node")
            }
            // still return an abort if the transport closure was ineffective.
            return grpc.Errorf(codes.Aborted, "node must disconnect")
        }
    
        for {
            // After each message send, we need to check the nodes sessionID hasn't
            // changed. If it has, we will the stream and make the node
            // re-register.
            node, err := d.nodes.GetWithSession(nodeID, sessionID)
            if err != nil {
                return err
            }
    
            var mgrs []*api.WeightedPeer
    
            var disconnect bool
    
            select {
            case ev := <-managerUpdates:
                mgrs = ev.([]*api.WeightedPeer)
            case ev := <-nodeUpdates:
                nodeObj = ev.(state.EventUpdateNode).Node
            case <-stream.Context().Done():
                return stream.Context().Err()
            case <-node.Disconnect:
                disconnect = true
            case <-d.ctx.Done():
                disconnect = true
            case <-keyMgrUpdates:
            }
            if mgrs == nil {
                mgrs = d.getManagers()
            }
    
            if err := stream.Send(&api.SessionMessage{
                SessionID:            sessionID,
                Node:                 nodeObj,
                Managers:             mgrs,
                NetworkBootstrapKeys: d.networkBootstrapKeys,
            }); err != nil {
                return err
            }
            if disconnect {
                return disconnectNode()
            }
        }
    }
    

    这个stream是处理agent连接的。前半部分是把连接的agent记录下来;后半部分是如果cluster信息发送变化,比如manager的leader发生变化,需要通知agent重新连接。disconnectNode()函数则是需要同agent node断开连接时的处理:包括断开连接,agent node信息删除,等等。



沪ICP备19023445号-2号
友情链接