IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Python gRPC

    Elmagnifico\'s Blog发表于 2024-11-22 00:00:00
    love 0

    Foreword

    gRPC

    example测试

    安装gRPC库

    pip install grpcio
    

    安装gRPC工具

    pip install grpcio-tools
    

    下载官方例程

    git clone -b v1.66.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc
    

    演示用例在这里

    grpc/examples/python/helloworld
    

    先启动服务端

    python greeter_server.py
    

    image-20241120171002516

    可以看到已经在监听了

    再启动客户端

    python greeter_client.py
    

    正常连接到了服务端

    image-20241120171021552

    源码分析

    服务端

    from concurrent import futures
    import logging
    
    import grpc
    import helloworld_pb2
    import helloworld_pb2_grpc
    
    # 继承自helloworld_pb2_grpc.GreeterServicer,重写了sayhello的函数
    class Greeter(helloworld_pb2_grpc.GreeterServicer):
        def SayHello(self, request, context):
            # 对应返回 hello 和访问者的名字
            return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
    
    
    def serve():
        # 启动还是比较简单的,设置好端口
        port = "50051"
        # 调用helloworld_pb2_grpc就完成了
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
        server.add_insecure_port("[::]:" + port)
        server.start()
        print("Server started, listening on " + port)
        server.wait_for_termination()
    
    
    if __name__ == "__main__":
        logging.basicConfig()
        serve()
    
    

    客户端

    from __future__ import print_function
    
    import logging
    
    import grpc
    import helloworld_pb2
    import helloworld_pb2_grpc
    
    
    def run():
        # NOTE(gRPC Python Team): .close() is possible on a channel and should be
        # used in circumstances in which the with statement does not fit the needs
        # of the code.
        print("Will try to greet world ...")
        # 设置本地 端口
        with grpc.insecure_channel("localhost:50051") as channel:
            stub = helloworld_pb2_grpc.GreeterStub(channel)
            # 发送信息 并等待结果
            response = stub.SayHello(helloworld_pb2.HelloRequest(name="you"))
        print("Greeter client received: " + response.message)
    
    
    if __name__ == "__main__":
        logging.basicConfig()
        run()
    
    

    helloworld_pb2_grpc.py

    # 继承的原型函数在这里
    class GreeterServicer(object):
        """The greeting service definition.
        """
    
        def SayHello(self, request, context):
            """Sends a greeting
            """
            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
            context.set_details('Method not implemented!')
            raise NotImplementedError('Method not implemented!')
    
        def SayHelloStreamReply(self, request, context):
            """Missing associated documentation comment in .proto file."""
            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
            context.set_details('Method not implemented!')
            raise NotImplementedError('Method not implemented!')
    
        def SayHelloBidiStream(self, request_iterator, context):
            """Missing associated documentation comment in .proto file."""
            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
            context.set_details('Method not implemented!')
            raise NotImplementedError('Method not implemented!')
            
    # 主要是这个函数,把函数的返回绑定到一起
    def add_GreeterServicer_to_server(servicer, server):
        rpc_method_handlers = {
                'SayHello': grpc.unary_unary_rpc_method_handler(
                        servicer.SayHello,
                        request_deserializer=helloworld__pb2.HelloRequest.FromString,
                        response_serializer=helloworld__pb2.HelloReply.SerializeToString,
                ),
                'SayHelloStreamReply': grpc.unary_stream_rpc_method_handler(
                        servicer.SayHelloStreamReply,
                        request_deserializer=helloworld__pb2.HelloRequest.FromString,
                        response_serializer=helloworld__pb2.HelloReply.SerializeToString,
                ),
                'SayHelloBidiStream': grpc.stream_stream_rpc_method_handler(
                        servicer.SayHelloBidiStream,
                        request_deserializer=helloworld__pb2.HelloRequest.FromString,
                        response_serializer=helloworld__pb2.HelloReply.SerializeToString,
                ),
        }
        # 创建服务名称和通用句柄
        generic_handler = grpc.method_handlers_generic_handler(
                'helloworld.Greeter', rpc_method_handlers)
        # server添加通用的句柄
        server.add_generic_rpc_handlers((generic_handler,))
        # 将处理方法注册给server
        server.add_registered_method_handlers('helloworld.Greeter', rpc_method_handlers)
    

    helloworld_pb2.py

    """Generated protocol buffer code."""
    from google.protobuf import descriptor as _descriptor
    from google.protobuf import descriptor_pool as _descriptor_pool
    from google.protobuf import runtime_version as _runtime_version
    from google.protobuf import symbol_database as _symbol_database
    from google.protobuf.internal import builder as _builder
    _runtime_version.ValidateProtobufRuntimeVersion(
        _runtime_version.Domain.PUBLIC,
        5,
        27,
        2,
        '',
        'helloworld.proto'
    )
    # @@protoc_insertion_point(imports)
    
    _sym_db = _symbol_database.Default()
    
    
    
    # 这里直接用代码的形式写了一个helloworld的描述符
    DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xe4\x01\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x12K\n\x13SayHelloStreamReply\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x30\x01\x12L\n\x12SayHelloBidiStream\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00(\x01\x30\x01\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')
    
    _globals = globals()
    _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
    _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'helloworld_pb2', _globals)
    if not _descriptor._USE_C_DESCRIPTORS:
      _globals['DESCRIPTOR']._loaded_options = None
      _globals['DESCRIPTOR']._serialized_options = b'\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'
      _globals['_HELLOREQUEST']._serialized_start=32
      _globals['_HELLOREQUEST']._serialized_end=60
      _globals['_HELLOREPLY']._serialized_start=62
      _globals['_HELLOREPLY']._serialized_end=91
      _globals['_GREETER']._serialized_start=94
      _globals['_GREETER']._serialized_end=322
    # @@protoc_insertion_point(module_scope)
    
    

    实际这里使用的proto文件,是如下定义的

    // The greeting service definition.
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply) {}
    }
    
    // The request message containing the user's name.
    message HelloRequest {
      string name = 1;
    }
    
    // The response message containing the greetings
    message HelloReply {
      string message = 1;
    }
    

    自定义函数

    实际使用的proto文件是在examples/protos/helloworld.proto中的,这里添加一个新的函数

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "io.grpc.examples.helloworld";
    option java_outer_classname = "HelloWorldProto";
    option objc_class_prefix = "HLW";
    
    package helloworld;
    
    // The greeting service definition.
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply) {}
    
      rpc SayHello2 (HelloRequest) returns (HelloReply) {}
    
      rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
    
      rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}
    }
    
    // The request message containing the user's name.
    message HelloRequest {
      string name = 1;
    }
    
    // The response message containing the greetings
    message HelloReply {
      string message = 1;
    }
    
    

    需要重新生成对应的代码

    python -m grpc_tools.protoc -I../../protos --python_out=. --pyi_out=. --grpc_python_out=. ../../protos/helloworld.proto
    

    这里就会重新生成了

    def add_GreeterServicer_to_server(servicer, server):
        rpc_method_handlers = {
                'SayHello': grpc.unary_unary_rpc_method_handler(
                        servicer.SayHello,
                        request_deserializer=helloworld__pb2.HelloRequest.FromString,
                        response_serializer=helloworld__pb2.HelloReply.SerializeToString,
                ),
                'SayHello2': grpc.unary_unary_rpc_method_handler(
                        servicer.SayHello2,
                        request_deserializer=helloworld__pb2.HelloRequest.FromString,
                        response_serializer=helloworld__pb2.HelloReply.SerializeToString,
                ),
    

    server和client都增加hello2以后,再次运行就能看到已经给过来正确的反应了

    image-20241122144701070

    到这里最简单的gRPC就完成了

    小总结

    核心就三步:

    1. 定义proto,其实就是定义函数和参数,
    2. 生成,生成会自动根据定义,生成中间需要的类或者成员函数
    3. 修改server和client的调用

    gRPC的流式传输

    上面演示的例子都是C/S架构的,也是gRPC常用的模式,一方请求,一方应答,这是普通的RPC。服务方是不能主动发起请求的,只有客户方主动。还有其他3种方式。

    • 响应流式传输
    • 请求流式传输
    • 双向流式传输

    还是之前的例子中,有对应的流式实现

    NUMBER_OF_REPLY = 10
    class Greeter(MultiGreeterServicer):
        async def sayHello(
            self, request: HelloRequest, context: grpc.aio.ServicerContext
        ) -> HelloReply:
            logging.info("Serving sayHello request %s", request)
            for i in range(NUMBER_OF_REPLY):
                yield HelloReply(message=f"Hello number {i}, {request.name}!")
    
    

    对于服务端的响应,这里可以看到返回了10此请求,并且这个函数是异步的

    async def run() -> None:
        async with grpc.aio.insecure_channel("localhost:50051") as channel:
            stub = hellostreamingworld_pb2_grpc.MultiGreeterStub(channel)
    
            # Read from an async generator
            async for response in stub.sayHello(
                hellostreamingworld_pb2.HelloRequest(name="you")
            ):
                print(
                    "Greeter client received from async generator: "
                    + response.message
                )
    
            # Direct read from the stub
            hello_stream = stub.sayHello(
                hellostreamingworld_pb2.HelloRequest(name="you")
            )
            while True:
                response = await hello_stream.read()
                if response == grpc.aio.EOF:
                    break
                print(
                    "Greeter client received from direct read: " + response.message
                )
    

    客户端这边,前面是异步流式获取,后面是正常流式获取获取

    可能看例子,这里流式传输的意义不是很明显,除了能多次发送请求或者多次响应,还有啥用。

    • 大文件流式传输就需要多次请求和多次响应,比如音频、视频
    • 主动推送或者回报就需要流式来实现,比如广告、广播推送
    • 高并发,可以同时响应多个请求,不再是顺序执行,串联影响效率
    • 任务完成的进度回显就必须多次响应

    grpc的双向流式可以类比成WebSocket,客户端和服务器都可以互相发送信息

    // The greeting service definition.
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply) {}
    
      rpc SayHello2 (HelloRequest) returns (HelloReply) {}
    
      rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
    
      rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}
    }
    

    在流式传输的例子中proto文件的定义使用了一个特殊关键词,stream凡是被stream修饰的参数,那么传输时就会采用流式。

    如果修饰到返回值,那就是服务器流式,如果修饰参数,那就是客户端流式,如果同时有那就是双向流式传输

    Summary

    总体来说gRPC就是这样,流式上感觉似乎没有WebSocket简单,特别是如果是用来做双向交互的时候WebSocket似乎更简单,更好做一些

    Quote

    https://grpc.io/docs/languages/python/quickstart/

    https://blog.yuanpei.me/posts/grpc-streaming-transmission-minimalist-guide/

    https://hamhire.tech/posts/coding/grpc-03.stream-demo.html



沪ICP备19023445号-2号
友情链接