IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    ps-lite_part4_customer讲解

    admin发表于 2023-03-22 13:25:48
    love 0

    距离上一次发文又间隔了一段时间,都有点忘记写到哪一部分了,先回顾一下之前写到哪里了

    1. 介绍ps-lite的基本概念 https://www.deeplearn.me/4302.html
    2. 介绍ps-lite核心组成 postOffice https://www.deeplearn.me/4303.html
    3. 介绍ps-lite 通信模块van https://www.deeplearn.me/4306.html

    本篇文章主要讲解 customer。customer的角色主要干的事情其实是一个中介的事情,它主要负责server 和worker之间的一些信息传递。

    我们先看看 customer 在哪里被创建使用?

    在test/test_kv_app_multi_workers.cc文件中我们看下如何调用的,先看下server端调用

    void StartServer() {
      if (!IsServer()) return;
      //重点是kvserver的初始化,传递的参数是app_id
      auto server = new KVServer<float>(0);
      server->set_request_handle(KVServerDefaultHandle<float>());
      RegisterExitCallback([server](){ delete server; });
    }
    

    对于一个server来说app_id可以理解为是kv数据库的id,这个在之前有提到过,kvsever的初始化函数如下所示:

     explicit KVServer(int app_id) : SimpleApp() {
        using namespace std::placeholders;
        obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
      }
    
    

    从上面就可以清晰的看到customer被创建的过程,直接调用customer初始化方法。这里有一点需要注意的是customer这个初始化参数第一个和第二个参数传递的都是一个值,如果是这样的话是不是意味着server端就一个customer?

    这个其实是对比worker端而言,下面放一下woker端customer初始化对比

      explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
        using namespace std::placeholders;
        slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
        obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
      }
    

    这么一对比就发现差异蛮大的,kvworker的初始化需要指定传递app_id和customer_id的,那么意味着app-id和customer_id是一对多的关系。

    以上只是看一下customer在哪了被调用,以及在worker端和server端之间的差异。

    Customer

    构造函数

    接下来就是看下customer的真面貌来!!!还是先看下customer的构造函数:

    Customer::Customer(int app_id, int customer_id, const Customer::RecvHandle& recv_handle)
        : app_id_(app_id), customer_id_(customer_id), recv_handle_(recv_handle) {
      Postoffice::Get()->AddCustomer(this);
      recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
    }
    

    首先看下传参,app_id和customer_id就不多解释了,const Customer::RecvHandle& recv_handle 这个参数的含义是表示接受到消息之后要对消息做什么处理,一般情况下这个是由kvserver和kvworker初始化的时候传递给customer的,也就是说这些处理逻辑本身跟customer没啥关系,由实际需求方自己定义。

    举个最简单的例子,server端拿到worker端梯度信息要去更新参数,这个处理逻辑就是 const Customer::RecvHandle& recv_handle 这个要干的事情。

    构造函数做了哪些事情?

    1. 调用PostOffice::AddCustomer将当前Customer注册到PostOffice;
      1. PostOffice的customers成员: 在对应的app_id的元素上添加custom_id;
      2. PostOffice的barrier_done成员将该custom_id的同步状态设为false
    2. 新起一个接收消息的线程,不断的从外界接收消息并且根据实际的处理逻辑做出相应的处理。

    消息处理

    生产和消费模式

    Customer的消息处理遵循“producer+consumer”模式,之前也提到customer只是个中介,那么它这个生产和消费是如何进行的?

    生产

    核心的API是Accept函数

     inline void Accept(const Message& recved) {
        recv_queue_.Push(recved);
      }
    

    recv_queue_就是一个队列,存储调用房发送过来的消息,对于customer而言上游发送过来的消息就是生产出来的内容,customer拿到这个也只是存起来。

    消费

    核心函数就是这个 Receiving 成员函数,负责消息的consumer

    void Customer::Receiving() {
      while (true) {
        Message recv;
        //从队列中取出相应的数据
        recv_queue_.WaitAndPop(&recv);
        //判断当前的消息control指令是不是结束的指令,如果是则直接结束
        if (!recv.meta.control.empty() &&
            recv.meta.control.cmd == Control::TERMINATE) {
          break;
        }
        //调用的制定的处理函数来消费内容
        recv_handle_(recv);
        //判断当前的消息是请求和响应,如果是响应则修改tracker_数据
        //tracker是负责核对消息完整性,比如你发送3个请求,那么收到3个响应才算完整,那么这个参数就是干这个事
        if (!recv.meta.request) {
          std::lock_guard<std::mutex> lk(tracker_mu_);
          tracker_[recv.meta.timestamp].second++;
          tracker_cond_.notify_all();
        }
      }
    }
    

    请求相关

    比如kvworker需要去server拉取参数,那么就会经过customer来进行的消息请求,由customer新建请求去server拉取数据。

    int Customer::NewRequest(int recver) {
      std::lock_guard<std::mutex> lk(tracker_mu_);
      //这里是获取需要通知的nodeid下的节点个数,比如worker向server请求,那么这就是所有需要参与处理server的节点数
      int num = Postoffice::Get()->GetNodeIDs(recver).size();
      //构建一个tracker_ 用于校对请求的完整星,这个在Receiving函数里也有体现
      tracker_.push_back(std::make_pair(num, 0));
      return tracker_.size() - 1;
    }
    

    上面也提到一个worker会从多个server拉取数据,那么什么时候才能保证数据都获取到呢,那么有的处理响应快,有的慢,怎么保证数据的完整性,那么下面的这个函数就是做这个事情

    void Customer::WaitRequest(int timestamp) {
      std::unique_lock<std::mutex> lk(tracker_mu_);
      tracker_cond_.wait(lk, [this, timestamp]{
          return tracker_[timestamp].first == tracker_[timestamp].second;
        });
    }
    

    就是用tracker_cond_阻塞等待“请求节点数”和“回复节点数”相等。而tracker_cond_是在Customer::Receiving每次接受到消息时通知一下。

    注意这个阻塞是“单机阻塞”,适用于比如,worker必须在从server拉取完最新参数之后,才能开始下一轮的训练。由于pull是异步的,所以worker需要调用WaitRequest阻塞等待那个pull request完成。多机之间的阻塞同步,要用PostOffice:Barrier。

    总结

    主要讲解了customer的作用–中介,同时作为中介,它是如何进行消息的生产与消费,最后也提到了消息的同步这块的知识。



沪ICP备19023445号-2号
友情链接