GRPC主要介绍了grpc在使用示例和原理,以及如何与consul结合gRPC 是什么?gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得我们能够更容易地创建分布式应用和服务。参考文档:gRPC Python Quickstart开始前确保已经安装grpcio-tools和grpcio这两个包定义一个GRPC有如下三个步骤:定义一个消息类型编译该proto文件编写服务端代码编写客户端代码我们以实现一个echo的grpc为例。定义一个消息类型首先定义通信双方(即客户端和服务端)交互的消息格式(protobuf消息的格式),然后定义该echo服务如下:syntax = “proto3”; // 声明使用 proto3 语法// 定义客户端请求的protobuf格式,如下所示,包含一个字符串字段qmessage Req { string q = 1;}// 定义服务端相应的protobuf格式,如下所示,包含一个字符串字段amessage Resp { string a = 1;}// 定义echo服务,如下所示,该服务包含一个名称为"echo"的rpcservice Echoer{ rpc echo (Req) returns (Resp) {}}使用以下命令编译:python -m grpc_tools.protoc -I./ –python_out=. –grpc_python_out=. ./Echoer.proto生成两个py文件Echoer_pb2.py 此文件包含生成的 request(Req) 和 response(Resp) 类。Echoer_pb2_grpc.py 此文件包含生成的 客户端(EchoerStub)和服务端(EchoerServicer)的类创建服务端代码创建和运行 Echoer 服务可以分为两个部分:实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”的函数。运行一个 gRPC 服务器,监听来自客户端的请求并传输服务的响应。在当前目录,创建文件 Echoer_server.py,实现一个新的函数:from concurrent import futuresimport timeimport grpcimport Echoer_pb2import Echoer_pb2_grpc_ONE_DAY_IN_SECONDS = 60 * 60 * 24class Echoer(Echoer_pb2_grpc.EchoerServicer): # 工作函数 def SayHello(self, request, context): return Echoer_pb2.Resp(a=“echo”)def serve(): # gRPC 服务器 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server) server.add_insecure_port(’[::]:50051’) server.start() # start() 不会阻塞,如果运行时你的代码没有其它的事情可做,你可能需要循环等待。 try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0)if name == ‘main’: serve()创建客户端代码在当前目录,打开文件 Echoer_client.py,实现一个新的函数:from future import print_functionimport grpcimport Echoer_pb2import Echoer_pb2_grpcdef run(): channel = grpc.insecure_channel(’localhost:50051’) # 创建信道 stub = Echoer_pb2_grpc.EchoerStub(channel) # 通过信道获取凭据,即Stub response = stub.echo(Echoer_pb2.Req(q=‘echo’)) # 调用rpc,获取响应 print(“Echoer client received: " + response.a)if name == ‘main’: run()运行代码首先运行服务端代码python Echoer_server.py复制代码然后运行客户端代码python Echoer_client.py# outputEchoer client received: echo进阶点击查看参考博客为了通信安全起见,GRPC提供了TSlSSL的支持。首先利用openssl创建一个自签名证书$ openssl genrsa -out server.key 2048Generating RSA private key, 2048 bit long modulus (2 primes)……………………………………………………+++++………………………………………………………………………………………………………………..+++++e is 65537 (0x010001)$ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650You are about to be asked to enter information that will be incorporatedinto your certificate request.What you are about to enter is what is called a Distinguished Name or a DN.There are quite a few fields but you can leave some blankFor some fields there will be a default value,If you enter ‘.’, the field will be left blank.—–Country Name (2 letter code) [AU]:State or Province Name (full name) [Some-State]:Locality Name (eg, city) []:Organization Name (eg, company) [Internet Widgits Pty Ltd]:Organizational Unit Name (eg, section) []:Common Name (e.g. server FQDN or YOUR name) []:EchoerEmail Address []:生成了server.key和server.crt两个文件,服务端两个文件都需要,客户端只需要crt文件修改服务端代码server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)# 读取 key and certificatewith open(os.path.join(os.path.split(file)[0], ‘server.key’)) as f: private_key = f.read().encode()with open(os.path.join(os.path.split(file)[0], ‘server.crt’)) as f: certificate_chain = f.read().encode()# 创建 server credentialsserver_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),))# 调用add_secure_port方法,而不是add_insesure_port方法server.add_secure_port(’localhost:50051’, server_creds)修改客户端代码# 读取证书with open(‘server.crt’) as f: trusted_certs = f.read().encode()# 创建 credentialscredentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)# 调用secure_channel方法,而不是insecure_channel方法channel = grpc.secure_channel(’localhost:50051’, credentials)启动服务端后,启动客户端,会出现以下错误:grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = “Connect Failed” debug_error_string = “{“created”:"@1547552759.642000000”,“description”:“Failed to create subchannel”,“file”:“src/core/ext/filters/client_channel/client_channel.cc”,“file_line”:2721,“referenced_errors”:[{“created”:"@1547552759.642000000”,“description”:“Pick Cancelled”,“file”:“src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc”,“file_line”:241,“referenced_errors”:[{“created”:"@1547552759.642000000",“description”:“Connect Failed”,“file”:“src/core/ext/filters/client_channel/subchannel.cc”,“file_line”:689,“grpc_status”:14,“referenced_errors”:[{“created”:"@1547552759.642000000",“description”:“Peer name localhost is not in peer certificate”,“file”:“src/core/lib/security/security_connector/security_connector.cc”,“file_line”:880}]}]}]}">!!! 警告:这是因为TSLSSL模式下,客户端是通过服务名称:port来获取服务的凭据,而不是ip:port, 所以对客户端做如下修改:# 修改前channel = grpc.secure_channel(’localhost:50051’, credentials)# 修改后channel = grpc.secure_channel(‘Echoer:50051’, credentials)!!! 警告:其次,在TSLSSL模式下,客户端对服务名称:port解析时候需要dns支持,目前不知道如何解决,只能够采取以下措施解决,通过修改windows的host文件,利用host将服务名称解析为IP地址,打开windows的host文件,地址:C:\Windows\System32\drivers\etc\hosts备份后修改如下,添加:# 服务的IP地址 服务名称127.0.0.1 Echoer保存即可修改后,再次运行,即可运行成功注意事项:CA证书和私钥key都是配套的,不配套的CA证书和key是无法校验成功的结合consul注意事项:确保consul已经正确启动,查看http://ip:port:8500/, 可查看consul的状态,确保已经安装python-consul这个库,否则无法操作consul首先想象我们以上的grpc示例程序之所以成功的有限制条件,我们知道服务端已经正常启动我们知道了服务端的ip和端口但在实际过程中,一般是不可能确切知道服务的ip和端口的,所以consul就起了个中间桥梁的作用,具体如下:服务注册服务注册,顾名思义,服务在启动之前,必须现在consul中注册。服务端:当服务端启动之后,consul会利用服务注册时获得的ip和port同服务建立联系,其中最重要的就是health check即心跳检测。consul通过心跳检测来判定该服务是否正常。客户端:客户端通过consul来查询所需服务的ip和port,若对应服务已经注册且心跳检测正常,则会返回给客户端对应的ip和port信息,然后客户端就可以利用这个来连接服务端了服务注册示例代码如下:def register(self, server_name, ip, port, consul_host=CONSUL_HOST): """ server_name: 服务名称 ip: 服务IP地址 port: 服务监听的端口 consul_host: 所连接的consul服务器的IP地址 """ c = consul.Consul(host=consul_host) # 获取与consul的连接 print(f"开始注册服务{server_name}") check = consul.Check.tcp(ip, port, “10s”) # 设置心跳检测的超时时间和对应的ip和port端口 c.agent.service.register(server_name, f"{server_name}-{ip}-{port}", address=ip, port=port, check=check) # 注册既然有服务注册,当然会有服务注销,示例代码如下:def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST): c = consul.Consul(host=consul_host) print(f"开始退出服务{server_name}") c.agent.service.deregister(f"{server_name}-{ip}-{port}")服务查询客户端则需要在consul中查询对应服务的IP和port,但由于在TSL/SSL模式下,所需的只是服务名称和port,故而只需要查询port端口即可。客户端服务查询采用的是DNS的查询方式,必须确保安装dnspython库,用于创建DNS查询服务查询示例代码如下:# 创建一个consul dns查询的 resolverconsul_resolver = resolver.Resolver()consul_resolver.port = 8600consul_resolver.nameservers = [consul_host]def get_host_port(self, server_name): try: dns_answer_srv = consul_resolver.query(f"{server_name}.service.consul", “SRV”) # 查询对应服务的port, except DNSException as e: return None, None return server_name, dns_answer_srv[0].port # 返回服务名和端口grpc流模式grpc总共提供了四种数据交互模式:simpe 简单模式 RPC:即上述的所有的grpcserver-side streaming 服务端流式 RPCclient-side streaming 客户端流式 RPCBidirectional streaming 双向数据流模式的 gRPC由于grpc对于消息有大小限制,diff数据过大会导致无法接收数据,我们在使用过程中,使用了流模式来解决了此类问题,在此模式下,客户端传入的参数由具体的protobuf变为了protobuf的迭代器,客户端接收的响应也变为了迭代器,获取完整的响应则需要迭代获取。服务端响应也变为了一个迭代器。修改服务定义文件:# 修改前service Echoer{ rpc echo (Req) returns (Resp) {}}# 修改后service Echoer{ rpc echo (stream Req) returns (stream Resp) {}}重新编译修改服务端将工作函数修改为如下所示, 即工作函数变成了一个迭代器:def echo(self, request_iterator, context): for i in range(10): yield Echoer_pb2.Resp(a=“echo”)修改客户端将echo的传入参数修改为迭代器:def qq(): for i in range(10): yield Echoer_pb2.Req(q=“echo”)response = stub.echo(qq())for resp in response: print(“Echoer client received: " + response.a)重新运行,接收结果如下:$ python Echoer_client.pyEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echoEchoer client received: echo