gRPC异步处理应答
(金庆的专栏)
gRPC的示例 greeter_async_client.cc 不算是异步客户端,
它使用了异步请求,但是阻塞式等待应答,结果成为一个同步调用。
std::string SayHello(const std::string& user) { ... std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq)); rpc->Finish(&reply, &status, (void*)1); void* got_tag; bool ok = false; // Block until the next result is available in the completion queue "cq". cq.Next(&got_tag, &ok); ... return reply.message(); }为了实现真正的异步RPC请求,发出请求后立即返回,然后在一个线程中处理所有应答。
以下代码经测试表明可以使用。
// Grpc异步应答处理,线程中运行.void HandleGrpcResponses(){ ... grpc::CompletionQueue & rCq = rMgr.GetCq(); for (;;) { void * pTag; bool ok = false; // Block until the next result is available in the completion queue "cq". rCq.Next(&pTag, &ok); // Act upon the status of the actual RPC. std::unique_ptr<IGrpcCb> pCb(static_cast<IGrpcCb*>(pTag)); const grpc::Status & rStatus = pCb->GetStatus(); if (rStatus.ok()) (*pCb)(); // run callback }}IGrpcCb是回调类,定义如下:
class IGrpcCb{public: explicit IGrpcCb(...) {}; virtual ~IGrpcCb(void) {}; grpc::ClientContext & GetContext() { return m_context; } grpc::Status & GetStatus() { return m_status; }public: virtual void operator()() {};protected: grpc::ClientContext m_context; grpc::Status m_status; ...};// R is response class like rpc::CreateRoomResponse.template <class R>class GrpcCb final : public IGrpcCb{public: explicit GrpcCb(...) : IGrpcCb(...) {}; virtual ~GrpcCb(void) override {};public: typedef std::unique_ptr<grpc::ClientAsyncResponseReader<R> > RpcPtr;public: R & GetResp() { return m_resp; } void SetRpcPtrAndFinish(RpcPtr pRpc) { m_pRpc.swap(pRpc); m_pRpc->Finish(&m_resp, &m_status, (void*)this); }public: virtual void operator()() override { // Deal m_resp... }private: RpcPtr m_pRpc; R m_resp;};异步请求代码示例如下:
grpc::CompletionQueue & cq = GetCq(); rpc::CreateRoomRequest req; // pGcb will be deleted in HandleGrpcResponses(). auto pGcb = new GrpcCb<rpc::CreateRoomResponse>(...); pGcb->SetRpcPtrAndFinish( m_pStub->AsyncCreateRoom(&pGcb->GetContext(), req, &cq));