IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    ps-lite_part3_Van讲解

    admin发表于 2023-03-11 10:30:50
    love 0

    学习还是要继续,在上一篇文章讲解PostOffice当中其实也提到了Van,但是那一篇文章主要还是在讲PostOffice相关的信息,所以Van 暂时搁置放到这里。还是延续之前的学习的思路,按照ps服务启动的顺序,到那个模块加载的时候,在重点介绍这块的信息。一句题外话,其实在写这些东西的时候,也是会看很多大佬们的讲解,单靠自己去看源码,很多时候也看不懂,也许c++代码看懂了,但是为什么这么设计?所以写的有些不足或者错误也欢迎email我。

    ok,那我们先回顾一下Van是在哪里被调用的,是在Postoffice的启动过程中。InitEnvironment 负责创建,van_->Start(customer_id) 负责启动。

    void Postoffice::InitEnvironment() {
      const char* val = NULL;
      std::string van_type = GetEnv("DMLC_PS_VAN_TYPE", "zmq");
      van_ = Van::Create(van_type);
      val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_WORKER"));
      num_workers_ = atoi(val);
      val =  CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_SERVER"));
      num_servers_ = atoi(val);
      val = CHECK_NOTNULL(Environment::Get()->find("DMLC_ROLE"));
      std::string role(val);
      is_worker_ = role == "worker";
      is_server_ = role == "server";
      is_scheduler_ = role == "scheduler";
      verbose_ = GetEnv("PS_VERBOSE", 0);
    }
    
    void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) {
      start_mu_.lock();
      if (init_stage_ == 0) {
        InitEnvironment();
        // init glog
        if (argv0) {
          dmlc::InitLogging(argv0);
        } else {
          dmlc::InitLogging("ps-lite\0");
        }
    
        // init node info.
        for (int i = 0; i < num_workers_; ++i) {
          int id = WorkerRankToID(i);
          for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup,
                        kWorkerGroup + kScheduler,
                        kWorkerGroup + kServerGroup + kScheduler}) {
            node_ids_[g].push_back(id);
          }
        }
    
        for (int i = 0; i < num_servers_; ++i) {
          int id = ServerRankToID(i);
          for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup,
                        kServerGroup + kScheduler,
                        kWorkerGroup + kServerGroup + kScheduler}) {
            node_ids_[g].push_back(id);
          }
        }
    
        for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup,
                      kScheduler + kWorkerGroup, kScheduler + kServerGroup}) {
          node_ids_[g].push_back(kScheduler);
        }
        init_stage_++;
      }
      start_mu_.unlock();
    
      // start van
      van_->Start(customer_id);
    
      start_mu_.lock();
      if (init_stage_ == 1) {
        // record start time
        start_time_ = time(NULL);
        init_stage_++;
      }
      start_mu_.unlock();
      // do a barrier here
      if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);
    }
    

    Van 的创建

    我们先看看Van对象的创建

    Van* Van::Create(const std::string& type) {
      if (type == "zmq") {
        return new ZMQVan();
      } else if (type == "p3") {
        return new P3Van();
    #ifdef DMLC_USE_IBVERBS
    } else if (type == "ibverbs") {
        return new IBVerbsVan();
    #endif
      } else {
        LOG(FATAL) << "Unsupported van type: " << type;
        return nullptr;
      }
    }
    

    根据初始化传递的type参数,我们也可以判断出它是一个通信模块,zmq是一个开源的socket编程库,ZMQVan也是基于zmq的Van库,至于下面的p3和ibverbs是额外的两种方式,ibverbs是字节提交的库,有兴趣的可以研究下,对Van本身对理解影响不大。

    前面也说了,程序启动分三次,分别是scheduler、server和worker的启动,这里还是先以scheduler这个角色启动为代表讲,跟之前一样,在实际的讲解过程中也会拓展到其他角色。

    Van 的启动

    先看下代码

    void Van::Start(int customer_id) {
      // get scheduler info
      start_mu_.lock();
    
      if (init_stage == 0) {
        scheduler_.hostname = std::string(
            CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
        scheduler_.port =
            atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
        scheduler_.role = Node::SCHEDULER;
        scheduler_.id = kScheduler;
        is_scheduler_ = Postoffice::Get()->is_scheduler();
    
        // get my node info
        if (is_scheduler_) {
          my_node_ = scheduler_;
        } else {
          auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
          const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
          std::string ip;
          if (nhost) ip = std::string(nhost);
          if (ip.empty()) {
            const char* itf = Environment::Get()->find("DMLC_INTERFACE");
            std::string interface;
            if (itf) interface = std::string(itf);
            if (interface.size()) {
              GetIP(interface, &ip);
            } else {
              GetAvailableInterfaceAndIP(&interface, &ip);
            }
            CHECK(!interface.empty()) << "failed to get the interface";
          }
          int port = GetAvailablePort();
          const char* pstr = Environment::Get()->find("PORT");
          if (pstr) port = atoi(pstr);
          CHECK(!ip.empty()) << "failed to get ip";
          CHECK(port) << "failed to get a port";
          my_node_.hostname = ip;
          my_node_.role = role;
          my_node_.port = port;
          // cannot determine my id now, the scheduler will assign it later
          // set it explicitly to make re-register within a same process possible
          my_node_.id = Node::kEmpty;
          my_node_.customer_id = customer_id;
        }
    
        // bind.
        my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
        PS_VLOG(1) << "Bind to " << my_node_.DebugString();
        CHECK_NE(my_node_.port, -1) << "bind failed";
    
        // connect to the scheduler
        Connect(scheduler_);
    
        // for debug use
        if (Environment::Get()->find("PS_DROP_MSG")) {
          drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
        }
        // start receiver
        receiver_thread_ =
            std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
        init_stage++;
      }
      start_mu_.unlock();
    
      if (!is_scheduler_) {
        // let the scheduler know myself
        Message msg;
        Node customer_specific_node = my_node_;
        customer_specific_node.customer_id = customer_id;
        msg.meta.recver = kScheduler;
        msg.meta.control.cmd = Control::ADD_NODE;
        msg.meta.control.node.push_back(customer_specific_node);
        msg.meta.timestamp = timestamp_++;
        Send(msg);
      }
    
      // wait until ready
      while (!ready_.load()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
      }
    
      start_mu_.lock();
      if (init_stage == 1) {
        // resender
        if (Environment::Get()->find("PS_RESEND") &&
            atoi(Environment::Get()->find("PS_RESEND")) != 0) {
          int timeout = 1000;
          if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
            timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
          }
          resender_ = new Resender(timeout, 10, this);
        }
    
        if (!is_scheduler_) {
          // start heartbeat thread
          heartbeat_thread_ =
              std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
        }
        init_stage++;
      }
      start_mu_.unlock();
    }
    

    代码首先在获取scheduler的基础信息

    //获取host    
    scheduler_.hostname = std::string(
            CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
    //获取端口
        scheduler_.port =
            atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
    //赋值 SCHEDULER 角色名 
        scheduler_.role = Node::SCHEDULER;
    // kScheduler=1 ,之前讲解nodeid的时候提到过 1,2,4分别代表相应的group
        scheduler_.id = kScheduler;
    // 判断当前的节点是否是 SCHEDULER 节点,这个也好理解,我们不同角色启动程序,只有在 SCHEDULER 角色启动这个才是 true
        is_scheduler_ = Postoffice::Get()->is_scheduler();
    

    Van的成员里面有个my_node是记录自己的node节点信息,接下来就是要当前的节点信息写进去

    // get my node info
    //如果是SCHEDULER 那么 my_node直接赋值
        if (is_scheduler_) {
          my_node_ = scheduler_;
        } else {
          //判断在worker或者server下的角色操作,主要也是获取ip、host和端口等信息
          auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
          const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
          std::string ip;
          if (nhost) ip = std::string(nhost);
          if (ip.empty()) {
            const char* itf = Environment::Get()->find("DMLC_INTERFACE");
            std::string interface;
            if (itf) interface = std::string(itf);
            if (interface.size()) {
              GetIP(interface, &ip);
            } else {
              GetAvailableInterfaceAndIP(&interface, &ip);
            }
            CHECK(!interface.empty()) << "failed to get the interface";
          }
          int port = GetAvailablePort();
          const char* pstr = Environment::Get()->find("PORT");
          if (pstr) port = atoi(pstr);
          CHECK(!ip.empty()) << "failed to get ip";
          CHECK(port) << "failed to get a port";
          my_node_.hostname = ip;
          my_node_.role = role;
          my_node_.port = port;
          // cannot determine my id now, the scheduler will assign it later
          // set it explicitly to make re-register within a same process possible
          my_node_.id = Node::kEmpty;
          my_node_.customer_id = customer_id;
        }
    
    

    获取基础的node信息之后就是绑定端口,这是为了后续通信服务使用

        // bind.
        my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
        PS_VLOG(1) << "Bind to " << my_node_.DebugString();
        CHECK_NE(my_node_.port, -1) << "bind failed";
    

    接下来就是比较关键的点了,需要跟scheduler连接上

    // connect to the scheduler
        Connect(scheduler_);
    

    这一点乍一看有点问题,按道理worker和server说连接scheduler还好理解,你说scheduler本身也要连接自己?

    在connect函数中

      // worker doesn't need to connect to the other workers. same for server
        if ((node.role == my_node_.role) && (node.id != my_node_.id)) {
          return;
        }
    

    那么按照这段逻辑,在scheduler角色启动的时候,应该也是直接return,没有做任何的操作。跟这个源码的诸事应该保持一致。

    至于其他的worker和server在启动的时候都要连接到 scheduler。这里也仅仅只是连接而已,只能说明在指定的端口,不同的node之间建立一个通道,实际上还没有进行任何有用的数据传输。

    接下来就是启动一个接收消息的线程,负责消息的接收

    // start receiver,启动一个消息接收的线程
        receiver_thread_ =
            std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
    

    真正干点事还是在这里

      if (!is_scheduler_) {
        // let the scheduler know myself
        Message msg;
        Node customer_specific_node = my_node_;
        customer_specific_node.customer_id = customer_id;
        msg.meta.recver = kScheduler;
        msg.meta.control.cmd = Control::ADD_NODE;
        msg.meta.control.node.push_back(customer_specific_node);
        msg.meta.timestamp = timestamp_++;
        Send(msg);
      }
    

    当当前的启动不是以 scheduler启动的时候,需要向 scheduler 注册信息,比如worker 节点1 启动了,那么需要让 scheduler 知道又个代号 1 的worker 节点即将上线。那么之前在启动scheduler 的那个消息接收的线程接收到这个请求之后,开始处理这个请求。

    现在我们看看这个接收消息的线程是如何处理消息的?

    void Van::Receiving() {
      Meta nodes;
      Meta recovery_nodes;  // store recovery nodes
      recovery_nodes.control.cmd = Control::ADD_NODE;
    
      while (true) {
        Message msg;
        int recv_bytes = RecvMsg(&msg);
        // For debug, drop received message
        if (ready_.load() && drop_rate_ > 0) {
          unsigned seed = time(NULL) + my_node_.id;
          if (rand_r(&seed) % 100 < drop_rate_) {
            LOG(WARNING) << "Drop message " << msg.DebugString();
            continue;
          }
        }
    
        CHECK_NE(recv_bytes, -1);
        recv_bytes_ += recv_bytes;
        if (Postoffice::Get()->verbose() >= 2) {
          PS_VLOG(2) << msg.DebugString();
        }
        // duplicated message
        if (resender_ && resender_->AddIncomming(msg)) continue;
    
        if (!msg.meta.control.empty()) {
          // control msg
          auto& ctrl = msg.meta.control;
          if (ctrl.cmd == Control::TERMINATE) {
            ProcessTerminateCommand();
            break;
          } else if (ctrl.cmd == Control::ADD_NODE) {
            ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes);
          } else if (ctrl.cmd == Control::BARRIER) {
            ProcessBarrierCommand(&msg);
          } else if (ctrl.cmd == Control::HEARTBEAT) {
            ProcessHearbeat(&msg);
          } else {
            LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
          }
        } else {
          ProcessDataMsg(&msg);
        }
      }
    }
    

    在上面的Receiving函数中重点是那个while循环,在循环外定义了

    Meta nodes;
    Meta recovery_nodes;  // store recovery nodes 这个是指之前有问题重新恢复使用的节点注册信息
    

    在循环的刚开始那有些debug代码可以不用管

    if (resender_ && resender_->AddIncomming(msg)) continue;
    

    resender是一个重试机制,假设你之前的请求失败,失败的原因有很多比如超时,如果你设置了resender,那么就会重试

    接下来就是要根据接受到的消息内容执行相应的任务

    停止任务 ProcessTerminateCommand

    void Van::ProcessTerminateCommand() {
      PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
      ready_ = false;
    }
    

    这个ready_标志位是表示当前这个节点的可用性,如果置为false那么就表示当前这个节点不可用。

    新增节点 ProcessAddNodeCommand

    void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes,
                                    Meta* recovery_nodes) {
      // 获取之前记录的dead node
      auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);
      std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());
      auto& ctrl = msg->meta.control;
    
      UpdateLocalID(msg, &dead_set, nodes, recovery_nodes);
    
      if (is_scheduler_) {
        ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
      } else {
        for (const auto& node : ctrl.node) {
          std::string addr_str = node.hostname + ":" + std::to_string(node.port);
          if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
            Connect(node);
            connected_nodes_[addr_str] = node.id;
          }
          if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
          if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
        }
        PS_VLOG(1) << my_node_.ShortDebugString() << " is connected to others";
        ready_ = true;
      }
    }
    

    UpdateLocalID 的函数作用如下

    此函数作用是更新节点内部的node id 信息,也是分为两种情况,函数逻辑如下:

    • 如果msg->meta.sender是Meta::kEmpty,即未设定,则处理此message的一定是Scheduler,会进入 if 分支。
      • 如果目前 nodes 的control.node数目小于 “配置的server数目 + 配置的worker数目”,则说明是系统启动阶段,将当前消息的node信息加入到 control.node 之中。
      • 否则说明是系统运行阶段,应该是有些节点死掉重启后再次连接。那么,就从 nodes 的control.node 之中找到一个已经死掉的且节点role 与当前消息一致(同类型)的 node id,把这个 node id 赋给这个重启的节点。并且更新 nodes->control.node 和 recovery_nodes。
    • 下面就是普通节点处理的逻辑:
      • 即在 scheduler 传回来的所有节点信息中查找,目的是找到与自己的ip,port一致的节点。
      • 如果找到,就更新本地节点信息(因为在本节点启动时候,并没有设置 node_id 这个信息,这个需要scheduler统一设置,从注释看,目的是为了使重新注册成为可能)。包括全局 rank 信息。
    void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set,
                            Meta* nodes, Meta* recovery_nodes) {
      auto& ctrl = msg->meta.control;
      // 获取这个ps系统worker和server的总数
      size_t num_nodes =
          Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
      // assign an id
      if (msg->meta.sender == Meta::kEmpty) {
        CHECK(is_scheduler_);
        CHECK_EQ(ctrl.node.size(), 1);
        //判断当前的已经正常运行的节点数与实际要求的节点数,如果小于实际要求数量,说明系统还处于启动阶段,还需要将
        //这个节点加进来,处理这个只有scheduler才能处理
        if (nodes->control.node.size() < num_nodes) {
          nodes->control.node.push_back(ctrl.node[0]);
        } else {
          // some node dies and restarts ,节点挂掉和重启的处理逻辑
          CHECK(ready_.load());
          for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) {
            const auto& node = nodes->control.node[i];
            if (deadnodes_set->find(node.id) != deadnodes_set->end() &&
                node.role == ctrl.node[0].role) {
              auto& recovery_node = ctrl.node[0];
              // assign previous node id
              recovery_node.id = node.id;
              recovery_node.is_recovery = true;
              PS_VLOG(1) << "replace dead node " << node.DebugString()
                         << " by node " << recovery_node.DebugString();
              nodes->control.node[i] = recovery_node;
              recovery_nodes->control.node.push_back(recovery_node);
              break;
            }
          }
        }
      }
    
      // update my id,根据scheduler回传的数据更新自己的信息,这样才能保证注册成功
      for (size_t i = 0; i < ctrl.node.size(); ++i) {
        const auto& node = ctrl.node[i];
        if (my_node_.hostname == node.hostname && my_node_.port == node.port) {
          if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) {
            my_node_ = node;
            std::string rank = std::to_string(Postoffice::IDtoRank(node.id));
    #ifdef _MSC_VER
            _putenv_s("DMLC_RANK", rank.c_str());
    #else
            setenv("DMLC_RANK", rank.c_str(), true);
    #endif
          }
        }
      }
    }
    

    ok,接下来才是真正的注册了

    sheduler 任务

    // 在 sheduler 处理添加节点
    if (is_scheduler_) {
        ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
      } 
    

    看看 ProcessAddNodeCommandAtScheduler 做了哪些事情?

    void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes,
                                               Meta* recovery_nodes) {
      recovery_nodes->control.cmd = Control::ADD_NODE;
      time_t t = time(NULL);
      // 还是获取系统要求的worker和server总数
      size_t num_nodes =
          Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
      // 判断实际收到注册的节点数与要求的节点数一致,那么就需要按照一定规则排序分配id
      if (nodes->control.node.size() == num_nodes) {
        // 按照 host+port的方式来排序
        std::sort(nodes->control.node.begin(), nodes->control.node.end(),
                  [](const Node& a, const Node& b) {
                    return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0;
                  });
        // 分配节点rank值,循环遍历所有的节点
        for (auto& node : nodes->control.node) {
          std::string node_host_ip =
              node.hostname + ":" + std::to_string(node.port);
           //如果ip:port不存在van_中的话
          if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {
            CHECK_EQ(node.id, Node::kEmpty);
            // 根据server和worker的角色调用rank to id 方法,这个在上一篇文章中有提到
            // server 2*rank+8 ,worker 2*rank+9
            int id = node.role == Node::SERVER
                         ? Postoffice::ServerRankToID(num_servers_)
                         : Postoffice::WorkerRankToID(num_workers_);
            PS_VLOG(1) << "assign rank=" << id << " to node " << node.DebugString();
            node.id = id;
            // scheduler连接上这个节点
            Connect(node);
            // 更新心跳,这个心跳机制是为了监控这个节点是否还在线
            Postoffice::Get()->UpdateHeartbeat(node.id, t);
            connected_nodes_[node_host_ip] = id;
          } else {
            int id = node.role == Node::SERVER
                         ? Postoffice::ServerRankToID(num_servers_)
                         : Postoffice::WorkerRankToID(num_workers_);
            shared_node_mapping_[id] = connected_nodes_[node_host_ip];
            node.id = connected_nodes_[node_host_ip];
          }
          // num_servers_ 这就是上面注释里面的rank(server 2*rank+8 ) 
          if (node.role == Node::SERVER) num_servers_++;
          if (node.role == Node::WORKER) num_workers_++;
        }
        // 开始向这些节点回复消息,消息的 cmd = Control::ADD_NODE
        nodes->control.node.push_back(my_node_);
        nodes->control.cmd = Control::ADD_NODE;
        Message back;
        back.meta = *nodes;
        for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
          int recver_id = r;
          if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
            back.meta.recver = recver_id;
            back.meta.timestamp = timestamp_++;
            Send(back);
          }
        }
        PS_VLOG(1) << "the scheduler is connected to " << num_workers_
                   << " workers and " << num_servers_ << " servers";
        // 这里表示 scheduler 已经准备好了
        ready_ = true;
      } else if (!recovery_nodes->control.node.empty()) {
        // 处理重启节点
        // 回去失去心跳的dead node
        auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);
        std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());
        // send back the recovery node
        CHECK_EQ(recovery_nodes->control.node.size(), 1);
        //连接节点和更新心跳
        Connect(recovery_nodes->control.node[0]);
        Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t);
        Message back;
        for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
          if (r != recovery_nodes->control.node[0].id &&
              dead_set.find(r) != dead_set.end()) {
            // do not try to send anything to dead node
            continue;
          }
          // only send recovery_node to nodes already exist
          // but send all nodes to the recovery_node
          back.meta =
              (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes;
          back.meta.recver = r;
          back.meta.timestamp = timestamp_++;
          Send(back);
        }
      }
    }
    

    worker和server任务

    for (const auto& node : ctrl.node) {
          std::string addr_str = node.hostname + ":" + std::to_string(node.port);
          //在已连接的节点里去找这个要即将 add node 的节点
          if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
            //没找到则说明是个新节点,那么就连接这个节点并且记录下来
            Connect(node);
            connected_nodes_[addr_str] = node.id;
          }
          //根据角色名更新系统的server 和worker计数,
          if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
          if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
        }
        PS_VLOG(1) << my_node_.ShortDebugString() << " is connected to others";
        ready_ = true;
    

    至此 ProcessAddNodeCommand 所有的任务都完成了。

    阻塞任务 ProcessBarrierCommand

    这块在上一篇文章讲了,我放个链接吧!https://www.deeplearn.me/4303.html

    心跳管理 ProcessHearbeat

    void Van::ProcessHearbeat(Message* msg) {
      auto& ctrl = msg->meta.control;
      time_t t = time(NULL);
      for (auto& node : ctrl.node) {
        //定时更新心跳,心跳记录即使时间戳
        Postoffice::Get()->UpdateHeartbeat(node.id, t);
        //scheduler节点需要相应心跳请求
        if (is_scheduler_) {
          Message heartbeat_ack;
          heartbeat_ack.meta.recver = node.id;
          heartbeat_ack.meta.control.cmd = Control::HEARTBEAT;
          heartbeat_ack.meta.control.node.push_back(my_node_);
          heartbeat_ack.meta.timestamp = timestamp_++;
          // send back heartbeat
          Send(heartbeat_ack);
        }
      }
    }
    

    为了记录网络的可达性,PS Lite 设计了心跳机制。具体而言:

    • 每一个节点的 PostOffice 单例中维护了一个 MAP 结构,存储了心跳关联的节点的活跃信息。键为节点编号,值为上次收到其 HEARTBEAT 消息的时间戳。
    • Worker/Server 只记录 Scheduler 的心跳,Scheduler 则记录所有节点的心跳。基于时间戳和心跳超时,可以输出所有的死亡节点。
    • 每一个 Worker/Server 节点,会新建立一个心跳线程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息;
    • Scheduler 节点收到后,响应一个 HEARTBEAT 消息。
    • scheduler进行应答,通过当前时间与心跳包接收时间之差判断是否alive。
    • Scheduler 会依据心跳节点的时间戳来判断死亡节点。如果新增的节点id在dead_node容器里,表示这个节点是重新恢复的;而新增节点通过schedular的中转与现有节点形成互联。

    在Van::start函数中,给出了心跳的调用,对于非 scheduler 节点启动了一个线程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息:

        if (!is_scheduler_) {
          // start heartbeat thread
          heartbeat_thread_ =
              std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
        }
    

    具体心跳函数是:

    void Van::Heartbeat() {
      //从环境变量里读取间隔时间,定时发送心跳请求
      const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL");
      const int interval = val ? atoi(val) : kDefaultHeartbeatInterval;
      while (interval > 0 && ready_.load()) {
        std::this_thread::sleep_for(std::chrono::seconds(interval));
        Message msg;
        msg.meta.recver = kScheduler;
        msg.meta.control.cmd = Control::HEARTBEAT;
        msg.meta.control.node.push_back(my_node_);
        msg.meta.timestamp = timestamp_++;
        Send(msg);
      }
    }
    
    

    差不多到这里 Van的启动已经基本上都完成了。其实在Van里面还有关于通信消息的优化,比如Pb序列化,有兴趣可以去看看。



沪ICP备19023445号-2号
友情链接