gRPC python

概述:gRPC python

[TOC]

1. gRPC

RPC,是Remote Procedure Call的简称,即远程过程调用,允许程序调用另一个地址空间(通常是另一台机器上)的类方法或函数的一种服务,是一种架设在计算机网络之上并隐藏底层网络技术,可以像调用本地服务一样调用远端程序,在编码代价不高的情况下提升吞吐的能力。

gRPC 是 Google 开源的基于 Protobuf 和 Http2.0 协议的通信框架,默认使用 protocol buffers协议,是 Google 开源的一套成熟的结构数据的序列化机制,当然也可以使用其他数据格式如 JSON,不过通常都使用protocol buffers这种灵活、高效的数据格式。大部分RPC都是基于socket实现的,可以比http请求来的高效。

将方法调用以及调用参数,响应参数等在两个服务器之间进行传输,就需要将这些参数序列化,gRPC采用的是protocol buffer的语法(检查proto),通过proto语法可以定义好要调用的方法、和参数以及响应格式,可以很方便地完成远程方法调用,而且非常利于扩展和更新参数。

Protocol Buffer是Google的跨语言,跨平台,可扩展机制的,用于序列化结构化数据,类似于Json和XML数据交换格式,Protobuf相对与XML和Json的不同之处,它是一种二进制的数据格式,具有更高的传输,打包和解包效率,不依赖于语言和平台,具有简单,数据量小,快速等优点,可以使用prorobuf将内容序列化后保存在文件中,加载文件,反序列化后就可以直接使用,网络通信时用于协议编解码的工具库。

gRPC客户端应用可像调用本地方法一样直接调用另一台机器上服务端应用的方法,容易创建分布式应用和服务。跟其他 RPC 系统类似,gRPC 也是基于以下理念:首先定义一个服务,定义能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个方法,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根,这个存根就是长得像服务端一样的方法(但是没有具体实现),客户端通过这个存根调用服务端的方法。

大致工作原理

1
2
gRPC stub(c++/python/...) ===== proto request ===>> gRPC Server
gRPC stub(c++/python/...) <<=== proto response === gRPC Server

2. 服务定义

通过protobuf定义服务的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 定义 HelloService 的服务,service & message 关键字可类比为 class
service HelloService {
// 定义一个叫SayHello的方法,这个方法接受HelloRequest消息作为参数,返回HelloResponse消息
rpc SayHello (HelloRequest) returns (HelloResponse);
}

// 定义HelloRequest消息
message HelloRequest {
required string greeting = 1;
}

// 定义HelloResponse消息
message HelloResponse {
required string reply = 1;
}

gRPC 允许你定义四类服务方法,以及客户端和服务端的交互方式:

21 单向RPC

即客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。

1
2
rpc SayHello(HelloRequest) returns (HelloResponse){
}

2.2 服务端流式 RPC

即客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。

即客户端请求一次,服务端就可以持续返回消息给客户端。

1
2
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){
}

2.3 客户端流式 RPC

即客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。

即请求一次,客户端就可以源源不断的往服务端发送消息。

1
2
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {
}

2.4 双向流式 RPC

即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。

类似tcp通信,客户端和服务端可以互相发消息。

1
2
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){
}

3. Install

1
2
3
4
5
6
7
# 可不指定具体版本
# 安装gRPC
pip install grpcio==1.36.1
# 安装protobuf
pip install protobuf==3.15.5
# 安装gRPC工具:包括protocol buffer编译器(protoc)和 python代码生成插件,python代码生成插件通过.proto服务定义文件,生成python的grpc服务端和客户端代码,即生成后可不再使用该库。
pip install grpcio_tools==1.36.1

4. 消息编译

生成 gRPC 代码,以person.proto为例,在相应工作目录下执行以下命令

1
python -m grpc_tools.protoc --python_out=.  --grpc_python_out=.  -I.  person.proto
  • python -m grpc_tools.protoc: 基于python模块(module) 运行protoc 编译器

  • –python_out=. : 编译生成处理 protobuf 相关的代码的路径(.表示当前目录)

  • –grpc_python_out=. : 编译生成处理 grpc 相关的代码的路径(.表示当前目录)

  • -I. [person.proto] : proto 协议文件的路径

生成person_pb2.pyperson_pb_grpc.py

  • person_pb2.py: 用于与 protobuf 数据进行交互,根据proto文件定义的数据结构生成的数据结构文件
  • person_pb2_grpc.py: 用于与 grpc 进行交互,定义了rpc方法的类以及类的请求参数和响应等,可直接进行python实例化调用

补充消息编译各个语言的类库:

1
2
3
4
5
6
7
8
--cpp_out=OUT_DIR           指定代码生成目录,生成 C++ 代码
--csharp_out=OUT_DIR 指定代码生成目录,生成 C# 代码
--java_out=OUT_DIR 指定代码生成目录,生成 java 代码
--js_out=OUT_DIR 指定代码生成目录,生成 javascript 代码
--objc_out=OUT_DIR 指定代码生成目录,生成 Objective C 代码
--php_out=OUT_DIR 指定代码生成目录,生成 php 代码
--python_out=OUT_DIR 指定代码生成目录,生成 python 代码
--ruby_out=OUT_DIR 指定代码生成目录,生成 ruby 代码

5. ProtoBuf

5.1 基础知识

protocol buffers (ProtoBuf)是一种语言无关、平台无关、可扩展的序列化结构数据的方法,它可用于(数据)通信协议、数据存储等。

Protocol Buffers 是一种灵活,高效,自动化机制的结构数据序列化方法-可类比 XML,但是比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更为简单。

json\xml都是基于文本格式,protobuf是二进制格式。

你可以通过 ProtoBuf 定义数据结构,然后通过 ProtoBuf 工具生成各种语言版本的数据结构类库,用于操作 ProtoBuf 协议数据

5.2 数据类型

  1. 复合数据类型包括:枚举和message类型,数组,MAP;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// (1) 枚举消息类型
// 使用enum关键词定义,一个电话类型的枚举类型使用`enum`对字段值进行枚举,`department`值可能是`DEFAULT, WEB, NEWS, CAMERS`中的一个,enum值是使用可变编码方式的,对负数不够高效,因此不推荐在enum中使用负数。
enum PhoneType
{
MOBILE = 0; //proto3版本中,首成员必须为0,成员不应有相同的值
HOME = 1;
WORK = 2;
}

// 定义一个电话消息
message PhoneNumber
{
string number = 1; // 电话号码字段
PhoneType type = 2; // 电话类型字段
}

// (2) 整数数组
message Msg {
// 只要使用repeated标记类型定义,就表示数组类型。
repeated int32 arrays = 1;
}

// (3) 字符串数组
message Msg {
repeated string names = 1;
}

// (4) MAP
// map<key_type, value_type> map_field = N;
// key_type可以是任何整数或字符串类型(除浮点类型和字节之外的任何标量类型,枚举不是有效的key_type)
// value_type 可以是除另一个映射之外的任何类型。
// Map 字段不能使用repeated关键字修饰。
message Product
{
string name = 1; // 商品名
// 定义一个k/v类型,key是string类型,value也是string类型
map<string, string> attrs = 2; // 商品属性,键值对
}
  1. 标准数据类型包含:整型,浮点,字符串等
.proto Type Notes C++ Type Java Type Python Type[2] Go Type Ruby Type C# Type PHP Type
double double double float float64 Float double float
float float float float float32 Float float float
int32 使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代 int32 int int int32 Fixnum 或者 Bignum(根据需要) int integer
uint32 使用变长编码 uint32 int int/long uint32 Fixnum 或者 Bignum(根据需要) uint integer
uint64 使用变长编码 uint64 long int/long uint64 Bignum ulong integer/string
sint32 使用变长编码,这些编码在负值时比int32高效的多 int32 int int int32 Fixnum 或者 Bignum(根据需要) int integer
sint64 使用变长编码,有符号的整型值。编码时比通常的int64高效。 int64 long int/long int64 Bignum long integer/string
fixed32 总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。 uint32 int int uint32 Fixnum 或者 Bignum(根据需要) uint integer
fixed64 总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。 uint64 long int/long uint64 Bignum ulong integer/string
sfixed32 总是4个字节 int32 int int int32 Fixnum 或者 Bignum(根据需要) int integer
sfixed64 总是8个字节 int64 long int/long int64 Bignum long integer/string
bool bool boolean bool bool TrueClass/FalseClass bool boolean
string 一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。 string String str/unicode string String (UTF-8) string string
bytes 可能包含任意顺序的字节数据。 string ByteString str []byte String (ASCII-8BIT) ByteString string

5.3 语法

  1. 指定protobuf的版本,proto3是最新的语法版本,默认 proto2
1
syntax = "proto3";
  1. 消息定义包:防止不同消息类型有命名冲突,其它proto文件导入该proto文件的,Person消息类型时,使用 person.info.Person person = 1
1
package person.info;
  1. 导入其它proto文件
1
import "other.proto"
  1. 定义数据结构/字段:每一个字段后有一个唯一数字(标识号),用于识别二进制格式消息的字段,且一旦使用无法更改

    注意:[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。切记:要为将来有可能添加的、频繁出现的字段预留一些标识号。

1
2
3
4
5
6
7
8
9
message Person {
required string name = 1;// Your realname or nickname
optional string email = 2 [default = "null"];
float salary = 3;
bool is_employed = 4;
repeated Address address = 5;
Department department = 6;
other.Result result = 7;
}
  1. 保留标识号(Reserved),留给以后用
1
2
3
message Foo {
reserved 2, 15, 9 to 11; // 保留2,15,9到11这些标识号
}
  1. extensions 表示被保留为扩展用,其他人可在自己的.proto文件中添加新字段到Foo中,但是新字段的标识号在该范围内,即不需要编辑原文件,可直接导入使用,并为该消息类型声明新字段。

    1
    2
    3
    4
    5
    6
    7
    8
    message Foo {
    // ...
    extensions 100 to 199;
    }

    extend Foo {
    optional int32 bar = 126;
    }
  2. 使用//表示注释

  3. 消息嵌套

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// (1) 引用其他消息类型的用法
// 定义Result消息
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3; // 字符串数组类型
}

// 定义SearchResponse消息
message SearchResponse {
// 引用上面定义的Result消息类型,作为results字段的类型
repeated Result results = 1; // repeated关键词标记,说明results字段是一个数组
}

// (2) 引用其他消息类型的用法 —— 显式消息嵌套
message SearchResponse {
// 嵌套消息定义
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}
// 引用嵌套的消息定义
repeated Result results = 1;
}

// (3) 跨文件的消息嵌套
------------------result.proto--------------------
syntax = "proto3";
// Result消息定义
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3; // 字符串数组类型
}
--------------------------------------------------
--------------search_response.proto---------------
syntax = "proto3";
// 导入Result消息定义
import "result.proto";

// 定义SearchResponse消息
message SearchResponse {
// 使用导入的Result消息
repeated Result results = 1;
}
--------------------------------------------------
  1. 方法

    service 表示服务,例如SearchSearchRequest表示请求数据字段,SearchResponse表示返回数据字段,编译好可进行远程调用,可实现跨平台跨语言。

1
2
3
service SearchService {
rpc Search (SearchRequest) returns (SearchResponse);
}
  1. 字段规则
  • required:通常消息中必须含有1个这种字段,即必填项;
  • optional:消息格式中该字段可以有0个或1个值(不超过1个),即可选项。
  • repeated:该字段可重复多次(包括0次),重复的值的顺序会被保留,类似于列表。

6. protobuf类型转换

6.1 Protobuf -> Python Json

1
2
3
from google.protobuf.json_format import MessageToJson

jsonObj = MessageToJson(protobuf_obj)

6.2 Protobuf -> Python Dict

1
2
3
from google.protobuf.json_format import MessageToDict

dict_obj = MessageToDict(org)

7. 重试机制

7.1 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 参考网站(英文):https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
# 参考网站(中文):https://blog.csdn.net/DAGU131/article/details/106122895
# 最多执行四个RPC尝试(一个原始RPC,三个重试),仅返回状态码是UNAVAILABLE才重试RPC
# maxAttempts 指定最大的RPC尝试次数,包括原始请求,必须是大于 1 的整数,对于大于5的值会被视为5
# 第一次重试间隔是 random(0, initialBackoff)
# 第 n 次的重试间隔为 random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff))
# 状态码必须是有效的 gPRC 状态码,可以是整数形式,并且不区分大小写 ([14], [“UNAVAILABLE”]
service_default_config = json.dumps(
{
"methodConfig": [
{
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 1,
"retryableStatusCodes": [
"UNAVAILABLE",
],
},
}
]
}
)
options = [('grpc.service_config', service_default_config)]
self.channel = grpc.insecure_channel(target=endpoint, options=self.options)

7.2 手动实现

1
2
3
4
5
6
7
8
9
10
11
12
# 自定义重试间隔 retry_interval 以及重试次数 retry_times
def call(self, service, func, data):
# ...
retry_count = 0
while retry_count < self.retry_times:
try:
return func_handler(func_input, timeout=self.timeout)
except Exception as exc: # pylint: disable=broad-except
# ...
retry_count += 1
time.sleep(self.retry_interval)
raise GrpcException

8. 状态码

参考网站

Code Number Description
OK 0 Not an error; returned on success.
CANCELLED 1 The operation was cancelled, typically by the caller.
UNKNOWN 2 Unknown error. For example, this error may be returned when a Status value received from another address space belongs to an error space that is not known in this address space. Also errors raised by APIs that do not return enough error information may be converted to this error.
INVALID_ARGUMENT 3 The client specified an invalid argument. Note that this differs from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments that are problematic regardless of the state of the system (e.g., a malformed file name).
DEADLINE_EXCEEDED 4 The deadline expired before the operation could complete. For operations that change the state of the system, this error may be returned even if the operation has completed successfully. For example, a successful response from a server could have been delayed long
NOT_FOUND 5 Some requested entity (e.g., file or directory) was not found. Note to server developers: if a request is denied for an entire class of users, such as gradual feature rollout or undocumented allowlist, NOT_FOUND may be used. If a request is denied for some users within a class of users, such as user-based access control, PERMISSION_DENIED must be used.
ALREADY_EXISTS 6 The entity that a client attempted to create (e.g., file or directory) already exists.
PERMISSION_DENIED 7 The caller does not have permission to execute the specified operation. PERMISSION_DENIED must not be used for rejections caused by exhausting some resource (use RESOURCE_EXHAUSTED instead for those errors). PERMISSION_DENIED must not be used if the caller can not be identified (use UNAUTHENTICATED instead for those errors). This error code does not imply the request is valid or the requested entity exists or satisfies other pre-conditions.
RESOURCE_EXHAUSTED 8 Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file system is out of space.
FAILED_PRECONDITION 9 The operation was rejected because the system is not in a state required for the operation’s execution. For example, the directory to be deleted is non-empty, an rmdir operation is applied to a non-directory, etc. Service implementors can use the following guidelines to decide between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: (a) Use UNAVAILABLE if the client can retry just the failing call. (b) Use ABORTED if the client should retry at a higher level (e.g., when a client-specified test-and-set fails, indicating the client should restart a read-modify-write sequence). (c) Use FAILED_PRECONDITION if the client should not retry until the system state has been explicitly fixed. E.g., if an “rmdir” fails because the directory is non-empty, FAILED_PRECONDITION should be returned since the client should not retry unless the files are deleted from the directory.
ABORTED 10 The operation was aborted, typically due to a concurrency issue such as a sequencer check failure or transaction abort. See the guidelines above for deciding between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE.
OUT_OF_RANGE 11 The operation was attempted past the valid range. E.g., seeking or reading past end-of-file. Unlike INVALID_ARGUMENT, this error indicates a problem that may be fixed if the system state changes. For example, a 32-bit file system will generate INVALID_ARGUMENT if asked to read at an offset that is not in the range [0,2^32-1], but it will generate OUT_OF_RANGE if asked to read from an offset past the current file size. There is a fair bit of overlap between FAILED_PRECONDITION and OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific error) when it applies so that callers who are iterating through a space can easily look for an OUT_OF_RANGE error to detect when they are done.
UNIMPLEMENTED 12 The operation is not implemented or is not supported/enabled in this service.
INTERNAL 13 Internal errors. This means that some invariants expected by the underlying system have been broken. This error code is reserved for serious errors.
UNAVAILABLE 14 The service is currently unavailable. This is most likely a transient condition, which can be corrected by retrying with a backoff. Note that it is not always safe to retry non-idempotent operations.
DATA_LOSS 15 Unrecoverable data loss or corruption.
UNAUTHENTICATED 16 The request does not have valid authentication credentials for the operation.

9. 参考文档

10. 参考项目

1
2
3
4
# 将grpc代码clone到本地
git clone -b v1.23.0 https://github.com/grpc/grpc
# 切换到python的helloworld例子目录。
cd grpc/examples/python/helloworld

首先运行服务端

1
python greeter_server.py

打开另外一个命令窗口,运行客户端

1
python greeter_client.py

运行应用程序:根据proto文件,生成新的python类库,但是我们还没实现新定义的rpc方法,下面介绍服务端和客户端如果升级代码。

更新服务端代码:在同样目录打开greeter_server.py文件,实现类似如下代码。

1
2
3
4
5
6
7
8
class Greeter(helloworld_pb2_grpc.GreeterServicer):
# 实现SayHello方法
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
# 实现SayHelloAgain方法
def SayHelloAgain(self, request, context):
return helloworld_pb2.HelloReply(message='Hello again, %s!' % request.name)
...

更新客户端代码:在同样的目录打开greeter_client.py文件,实现代码如下:

1
2
3
4
5
6
7
8
9
10
def run():
# 配置grpc服务端地址
channel = grpc.insecure_channel('localhost:50051')
stub = helloworld_pb2_grpc.GreeterStub(channel)
# 请求服务端的SayHello方法
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
# 请求服务端的SayHelloAgain方法
response = stub.SayHelloAgain(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)