grpc和consul结合实现分布式rpc调用

36次阅读

共计 7563 个字符,预计需要花费 19 分钟才能阅读完成。

GRPC
主要介绍了 grpc 在使用示例和原理,以及如何与 consul 结合
gRPC 是什么?
gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。
在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得我们能够更容易地创建分布式应用和服务。
参考文档:gRPC Python Quickstart
开始前确保已经安装 grpcio-tools 和 grpcio 这两个包
定义一个 GRPC 有如下三个步骤:

定义一个消息类型
编译该 proto 文件
编写服务端代码
编写客户端代码

我们以实现一个 echo 的 grpc 为例。
定义一个消息类型
首先定义通信双方(即客户端和服务端)交互的消息格式(protobuf 消息的格式),然后定义该 echo 服务如下:
syntax = “proto3″; // 声明使用 proto3 语法

// 定义客户端请求的 protobuf 格式,如下所示,包含一个字符串字段 q
message Req {
string q = 1;
}

// 定义服务端相应的 protobuf 格式,如下所示,包含一个字符串字段 a
message Resp {
string a = 1;
}

// 定义 echo 服务,如下所示,该服务包含一个名称为 ”echo” 的 rpc
service Echoer{
rpc echo (Req) returns (Resp) {}
}
使用以下命令编译:
python -m grpc_tools.protoc -I./ –python_out=. –grpc_python_out=. ./Echoer.proto
生成两个 py 文件

Echoer_pb2.py 此文件包含生成的 request(Req) 和 response(Resp) 类。
Echoer_pb2_grpc.py 此文件包含生成的 客户端 (EchoerStub) 和服务端 (EchoerServicer) 的类

创建服务端代码
创建和运行 Echoer 服务可以分为两个部分:

实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”的函数。
运行一个 gRPC 服务器,监听来自客户端的请求并传输服务的响应。

在当前目录,创建文件 Echoer_server.py,实现一个新的函数:
from concurrent import futures
import time

import grpc

import Echoer_pb2
import Echoer_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24

class Echoer(Echoer_pb2_grpc.EchoerServicer):
# 工作函数
def SayHello(self, request, context):
return Echoer_pb2.Resp(a=”echo”)

def serve():
# gRPC 服务器
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)
server.add_insecure_port(‘[::]:50051’)
server.start() # start() 不会阻塞,如果运行时你的代码没有其它的事情可做,你可能需要循环等待。
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)

if __name__ == ‘__main__’:
serve()
创建客户端代码
在当前目录,打开文件 Echoer_client.py,实现一个新的函数:
from __future__ import print_function

import grpc

import Echoer_pb2
import Echoer_pb2_grpc

def run():
channel = grpc.insecure_channel(‘localhost:50051′) # 创建信道
stub = Echoer_pb2_grpc.EchoerStub(channel) # 通过信道获取凭据,即 Stub
response = stub.echo(Echoer_pb2.Req(q=’echo’)) # 调用 rpc,获取响应
print(“Echoer client received: ” + response.a)

if __name__ == ‘__main__’:
run()
运行代码
首先运行服务端代码
python Echoer_server.py
复制代码然后运行客户端代码
python Echoer_client.py
# output
Echoer client received: echo
进阶
点击查看参考博客
为了通信安全起见,GRPC 提供了 TSlSSL 的支持。
首先利用 openssl 创建一个自签名证书
$ openssl genrsa -out server.key 2048
Generating RSA private key, 2048 bit long modulus (2 primes)
……………………………………………………+++++
………………………………………………………………………………………………………………..+++++
e is 65537 (0x010001)

$ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter ‘.’, the field will be left blank.
—–
Country Name (2 letter code) [AU]:
State or Province Name (full name) [Some-State]:
Locality Name (eg, city) []:
Organization Name (eg, company) [Internet Widgits Pty Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:Echoer
Email Address []:
生成了 server.key 和 server.crt 两个文件,服务端两个文件都需要,客户端只需要 crt 文件
修改服务端代码
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)
# 读取 key and certificate
with open(os.path.join(os.path.split(__file__)[0], ‘server.key’)) as f:
private_key = f.read().encode()
with open(os.path.join(os.path.split(__file__)[0], ‘server.crt’)) as f:
certificate_chain = f.read().encode()
# 创建 server credentials
server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
# 调用 add_secure_port 方法,而不是 add_insesure_port 方法
server.add_secure_port(‘localhost:50051’, server_creds)
修改客户端代码
# 读取证书
with open(‘server.crt’) as f:
trusted_certs = f.read().encode()
# 创建 credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
# 调用 secure_channel 方法,而不是 insecure_channel 方法
channel = grpc.secure_channel(‘localhost:50051’, credentials)
启动服务端后,启动客户端,会出现以下错误:
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = “Connect Failed”
debug_error_string = “{“created”:”@1547552759.642000000″,”description”:”Failed to create subchannel”,”file”:”src/core/ext/filters/client_channel/client_channel.cc”,”file_line”:2721,”referenced_errors”:[{“created”:”@1547552759.642000000″,”description”:”Pick Cancelled”,”file”:”src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc”,”file_line”:241,”referenced_errors”:[{“created”:”@1547552759.642000000″,”description”:”Connect Failed”,”file”:”src/core/ext/filters/client_channel/subchannel.cc”,”file_line”:689,”grpc_status”:14,”referenced_errors”:[{“created”:”@1547552759.642000000″,”description”:”Peer name localhost is not in peer certificate”,”file”:”src/core/lib/security/security_connector/security_connector.cc”,”file_line”:880}]}]}]}”
>
!!! 警告:
这是因为 TSLSSL 模式下,客户端是通过服务名称:port 来获取服务的凭据,而不是 ip:port, 所以对客户端做如下修改:
# 修改前
channel = grpc.secure_channel(‘localhost:50051’, credentials)
# 修改后
channel = grpc.secure_channel(‘Echoer:50051’, credentials)
!!! 警告:
其次,在 TSLSSL 模式下,客户端对服务名称:port 解析时候需要 dns 支持,目前不知道如何解决,只能够采取以下措施解决,通过修改 windows 的 host 文件,利用 host 将服务名称解析为 IP 地址,打开 windows 的 host 文件,地址:C:\Windows\System32\drivers\etc\hosts 备份后修改如下,添加:
# 服务的 IP 地址 服务名称
127.0.0.1 Echoer
保存即可
修改后,再次运行,即可运行成功
注意事项:CA 证书和私钥 key 都是配套的,不配套的 CA 证书和 key 是无法校验成功的
结合 consul
注意事项:确保 consul 已经正确启动,查看 http://ip:port:8500/, 可查看 consul 的状态,确保已经安装 python-consul 这个库,否则无法操作 consul
首先想象我们以上的 grpc 示例程序之所以成功的有限制条件,

我们知道服务端已经正常启动
我们知道了服务端的 ip 和端口

但在实际过程中,一般是不可能确切知道服务的 ip 和端口的,所以 consul 就起了个中间桥梁的作用,具体如下:
服务注册
服务注册,顾名思义,服务在启动之前,必须现在 consul 中注册。
服务端:当服务端启动之后,consul 会利用服务注册时获得的 ip 和 port 同服务建立联系,其中最重要的就是 health check 即心跳检测。consul 通过心跳检测来判定该服务是否正常。
客户端:客户端通过 consul 来查询所需服务的 ip 和 port,若对应服务已经注册且心跳检测正常,则会返回给客户端对应的 ip 和 port 信息,然后客户端就可以利用这个来连接服务端了
服务注册示例代码如下:
def register(self, server_name, ip, port, consul_host=CONSUL_HOST):
“””
server_name: 服务名称
ip: 服务 IP 地址
port: 服务监听的端口
consul_host: 所连接的 consul 服务器的 IP 地址
“””
c = consul.Consul(host=consul_host) # 获取与 consul 的连接
print(f” 开始注册服务{server_name}”)
check = consul.Check.tcp(ip, port, “10s”) # 设置心跳检测的超时时间和对应的 ip 和 port 端口
c.agent.service.register(server_name, f”{server_name}-{ip}-{port}”, address=ip, port=port, check=check) # 注册
既然有服务注册,当然会有服务注销,示例代码如下:
def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST):
c = consul.Consul(host=consul_host)
print(f” 开始退出服务{server_name}”)
c.agent.service.deregister(f”{server_name}-{ip}-{port}”)
服务查询
客户端则需要在 consul 中查询对应服务的 IP 和 port,但由于在 TSL/SSL 模式下,所需的只是服务名称和 port,故而只需要查询 port 端口即可。
客户端服务查询采用的是 DNS 的查询方式,必须确保安装 dnspython 库,用于创建 DNS 查询
服务查询示例代码如下:
# 创建一个 consul dns 查询的 resolver
consul_resolver = resolver.Resolver()
consul_resolver.port = 8600
consul_resolver.nameservers = [consul_host]

def get_host_port(self, server_name):
try:
dns_answer_srv = consul_resolver.query(f”{server_name}.service.consul”, “SRV”) # 查询对应服务的 port,
except DNSException as e:
return None, None
return server_name, dns_answer_srv[0].port # 返回服务名和端口
grpc 流模式
grpc 总共提供了四种数据交互模式:

simpe 简单模式 RPC:即上述的所有的 grpc
server-side streaming 服务端流式 RPC
client-side streaming 客户端流式 RPC
Bidirectional streaming 双向数据流模式的 gRPC

由于 grpc 对于消息有大小限制,diff 数据过大会导致无法接收数据,我们在使用过程中,使用了流模式来解决了此类问题, 在此模式下,客户端传入的参数由具体的 protobuf 变为了 protobuf 的迭代器,客户端接收的响应也变为了迭代器,获取完整的响应则需要迭代获取。服务端响应也变为了一个迭代器。
修改服务定义文件:
# 修改前
service Echoer{
rpc echo (Req) returns (Resp) {}
}
# 修改后
service Echoer{
rpc echo (stream Req) returns (stream Resp) {}
}
重新编译
修改服务端
将工作函数修改为如下所示,即工作函数变成了一个迭代器:
def echo(self, request_iterator, context):
for i in range(10):
yield Echoer_pb2.Resp(a=”echo”)
修改客户端
将 echo 的传入参数修改为迭代器:
def qq():
for i in range(10):
yield Echoer_pb2.Req(q=”echo”)
response = stub.echo(qq())
for resp in response:
print(“Echoer client received: ” + response.a)
重新运行,接收结果如下:
$ python Echoer_client.py
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo

正文完
 0