共计 12559 个字符,预计需要花费 32 分钟才能阅读完成。
博客:cbb777.fun
全平台账号: 安妮的心动录
github: https://github.com/anneheartrecord
下文中我说的可能对,也可能不对,鉴于笔者程度无限,请君自辨。有问题欢送大家找我探讨
gRPC
gRPC 是一种现代化开源的 RPC 框架,可能运行于任何环境之中,最后由谷歌进行开发,之前说过 RPC 是一种软性的标准,而不是硬性的协定。它的底层协定应用 HTTP/ 2 作为传输协定。这是因为 HTTP 2 协定通过优化之后速度曾经足够快,并且 HTTP2 同样应用二进制的数据进行传输,和 RPC 不约而同。
常见的负载平衡算法
1. 轮询算法:依照程序顺次轮流将申请调配给后端服务器,直到轮询完所有的服务器之后从新开始
2. 随机算法:随机抉择一个后端服务器来解决申请
3. 加权轮询:依据后端服务器的解决能力,为每个服务器调配一个权重值,而后依照权重顺次轮询调配申请
4. 加权随机:依照后端服务器的解决能力,为每个服务器调配一个权重值,而后依照权重值随机抉择一个服务器来解决申请
5. 最小连接数:讲申请调配给以后连接数起码的服务器,以保障负载平衡
6.IP 哈希算法:依据客户端的 IP 地址,通过哈希算法计算出一个值,而后将这个值对服务器列表长度取值,失去要拜访的服务器编号,相似于随机
7. 故障转移:如果以后服务器呈现故障或者无奈解决申请,则主动切换到下一个可用的服务器
grpc 实现了哪些负载平衡算法
1. 轮询
2. 最小连接数
3. 故障转移
4. 随机
这些负载平衡算法都能够在 grpc 的客户端配置中进行设置。默认状况下,应用的是轮询算法。
如果须要应用其余负载平衡算法,能够应用 gprc 提供的负载均衡器,比方 grpclb。此外,grpc 还提供了扩大借口来让用户自定义负载平衡算法
在 gRPC 中,客户端能够像调用本地办法一样间接调用其余机器上的服务端应用程序的办法,帮忙你更容易创立分布式应用程序和服务。gRPC 是基于定义一个服务,制订一个能够近程调用的带有参数和返回类型的办法。在服务端程序中实现这个接口并且运行 gRPC 服务解决客户端调用,在客户端,有一个 stub 提供和服务端雷同的办法
为什么要用 gRPC
gRPC 能够帮忙咱们一次性的在一个 .proto
文件中定义服务并应用任何反对它的语言去实现客户端和服务端,也就是说 gRPC 解决了不同语言以及环境间通信的复杂性。应用 protocol buffer
还能取得其余益处,包含高效的序列化,简略的 IDL 以及容易进行接口更新。总之,应用 gRPC 可能帮忙咱们更容易编写跨语言的分布式代码
IDL(Interface description Language)是指接口描述语言,是用来形容软件组件接口的一种计算机语言,是跨平台开发的根底。IDL 通过一种中立的形式来形容接口,使得在不同平台上运行的对象和用不同语言编写的程序能够互相通信交换;比方,一个组件用 C ++ 写成,另一个组件用 Go 写成
应用 gRPC 进行开发的步骤
编写.proto 文件定义服务
默认状况下 gRPC 应用 protocol buffers
作为接口定义语言(IDL)来形容服务接口和无效负载音讯的构造
在 gRPC 中能够定义四种类型的服务办法
一般 rpc,客户端向服务器发送一个申请,而后失去一个响应,就像一般的函数调用一样rpc SayHello(HelloRequest) returns (HelloResponse);
服务器流式 rpc,其中客户端向服务器发送申请,并取得一个流来读取一系列音讯。客户端从返回的流中读取,直到没有更多的音讯,gRPC 保障在单个 RPC 调用的音讯是有序的rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
客户端流式 rpc,其中客户端写入一系列音讯并将其发送到服务器,同样应用提供的流。一旦客户端实现了音讯的写入,它就期待服务器读取音讯并返回响应,同样,gRPC 保障单个 RPC 调用中的音讯是有序的rpc LostsOfGreetings(stream HelloRequest) returns(HelloResponse);
双向流式 rpc,其中单方应用读写流发送一系列音讯,这两个流独立运行,因而客户端和服务器能够依照本人喜爱的程序读写;例如,服务器能够期待承受所有客户端音讯后再写响应,或者能够交替读取音讯而后写入音讯,或者其余读写组合。每个流中的音讯是有序的rpc LostsOfGreetings(stream HelloRequest) returns(stream HelloResponse);
生成指定语言的代码(客户端一份、服务端一份)
在 .proto
文件中定义好服务之后,gRPC 提供了生成客户端和服务端代码的 protocol buffers 编译器插件。
咱们应用这些插件能够依据须要生成 Java Go C++ Python
等语言的代码,咱们通常会在客户端调用这些 API,并且在服务器端实现对应的 API
- 在服务器端,服务器实现服务申明的办法,并运行一个 gRPC 服务器来解决客户端发来的调用申请。gRPC 底层会对传入的申请进行编码,执行被调用的服务办法,并对服务响应进行编码
- 在客户端,客户端有一个称为存根 (stub) 的本地对象,它实现了与服务雷同的办法。而后,客户端能够在本地对象上调用这些办法,将调用的参数包装在适当的
protocol buffers
音讯类型中 –gRPC 在向服务器发送申请并返回服务器的protocol buffers
响应之后进行解决
编写业务逻辑代码
proto 文件生成pb.go
以及 grpc.pb.go
的命令
不指定 proto 门路 protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative yourpath
指定 proto 门路protoc --proto_path=pb --go_out=pb --go_opt=paths=source_relative --go-grpc_out=pb --go-grpc_opt=paths=source_relative xxx.proto
应用 grpc 实现一个简略的 hello 服务
Server
type server struct {pb.UnimplementedGreeterServer // 字段作用是 当没有齐全实现 proto 中的所有办法时仍旧能够运行起来} | |
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {reply := "hello" + in.GetName() | |
return &pb.HelloResponse{Reply: reply}, nil | |
} | |
func main() { | |
// 启动服务 | |
l, err := net.Listen("tcp", ":8972") | |
if err != nil {fmt.Println("failed to listen,err:", err) | |
return | |
} | |
// 注册服务 | |
s := grpc.NewServer() | |
pb.RegisterGreeterServer(s, &server{}) | |
// 启动服务 | |
err = s.Serve(l) | |
if err != nil {fmt.Println("failed to server,err:", err) | |
} | |
} |
syntax = "proto3"; // 版本申明 | |
option go_package="hello_server/pb"; // 我的项目中 import 导入生成 go 代码的模块 | |
package pb; //proto 文件模块 | |
// 定义服务 | |
service Greeter { | |
// 定义方法 | |
rpc SayHello (HelloRequest) returns (HelloResponse) {}} | |
// 定义音讯 | |
message HelloRequest {string name = 1; // 字段的序号} | |
message HelloResponse {string reply = 1;} |
Client
func main() { | |
// 连贯 server 带加密连贯 | |
conn, err := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(insecure.NewCredentials())) | |
if err != nil {log.Fatalf("grpc.Dial failed,err:%v", err) | |
return | |
} | |
defer conn.Close() | |
// 创立客户端 | |
c := proto.NewGreeterClient(conn) | |
// 应用 context 进行管制,传入 background 和超时工夫一秒钟 | |
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) | |
defer cancel() | |
name := "xiaocheng" | |
resp, err := c.SayHello(ctx, &proto.HelloRequest{Name: name}) | |
if err != nil {log.Printf("c.SayHello failed, err:%v", err) | |
return | |
} | |
// 拿到 RPC 响应 | |
log.Printf("resp:%v", resp.GetReply()) | |
} |
// 应该是同一份 proto 文件 | |
syntax = "proto3"; // 版本申明 | |
option go_package="hello_client/proto"; // 我的项目中 import 导入生成 go 代码的模块 | |
package pb; //proto 文件模块 必须与 server 端统一 | |
// 定义服务 | |
service Greeter { | |
// 定义方法 | |
rpc SayHello (HelloRequest) returns (HelloResponse) {}} | |
// 定义音讯 | |
message HelloRequest {string name = 1; // 字段的序号} | |
message HelloResponse {string reply = 1;} |
应用 grpc 实现一个简略的 add 服务
type server struct {pb.UnimplementedAddMethodServer} | |
func (s *server) Add(ctx context.Context, in *pb.AddRequest) (*pb.AddResponse, error) {reply := in.GetArgs1() + in.GetArgs2() | |
return &pb.AddResponse{Number: reply}, nil | |
} | |
func main() { | |
// 启动服务 | |
l, err := net.Listen("tcp", ":9999") | |
if err != nil {fmt.Println("net listen failed,err:", err) | |
return | |
} | |
s := grpc.NewServer() | |
pb.RegisterAddMethodServer(s, &server{}) | |
err = s.Serve(l) | |
if err != nil {fmt.Println("failed to server,err:", err) | |
} | |
} |
proto 文件应该在客户端和服务端都有一份
syntax="proto3"; | |
option go_package="server/pb"; | |
package pb; | |
service AddMethod {rpc Add(AddRequest) returns (AddResponse) {}} | |
message AddRequest { | |
int32 args1 =1; | |
int32 args2 =2; | |
} | |
message AddResponse {int32 number =1;} |
func main() {conn, err := grpc.Dial("127.0.0.1:9999", grpc.WithTransportCredentials(insecure.NewCredentials())) | |
if err != nil {fmt.Println("grpc dail failed,err:", err) | |
return | |
} | |
defer conn.Close() | |
c := pb.NewAddMethodClient(conn) | |
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) | |
defer cancel() | |
var args1, args2 int32 | |
args1 = 1 | |
args2 = 2 | |
resp, err := c.Add(ctx, &pb.AddRequest{Args1: args1, Args2: args2}) | |
if err != nil {fmt.Println("c.Add failed,err:", err) | |
return | |
} | |
log.Println("Add Response:", resp) | |
} |
protobuf 语法
protobuf 为什么体积小、解析快
protobuf 是 google 提出的数据交换格局,同一条音讯数据,应用 Protobuf 序列化之后占用空间是 JSON 的 1 /10,然而性能却是几十倍
起因如下
- 编解码大多采纳位运算,比 JSON/XML 的字符匹配效率更高
- pb 定义了
varint
类型,应用变长编码压缩数值类型。值越小的数字,应用的字节数就越少 - 采纳 Tag-value 类型,没有冗余字符,而 JSON 有很多冗余的局部,这是为了不便人类浏览才加上的
定义一个音讯类型
syntax="proto3"; | |
message SearchRequest { | |
string query =1; | |
int32 page_number=2; | |
} | |
// 文件的第一行指定应用 proto3 语法,如果不这么写 | |
//pb 的编译器默认应用 proto2 | |
//SearchRequest 定义了一个音讯,应用了两个字段 | |
// 每个字段须要定义类型 名字 和编号 |
字段编号
音讯定义中的每个字段都要有一个惟一的编号,这些编号用来在音讯二进制格局中标识字段,在音讯类型应用后就不能更改。
在范畴 1 到 15 中的字段须要一个字节进行编码,而 16-2047 的字段采纳两个字节。所以应该为常常应用的音讯元素保留数字 1 到 15 的编号,也要为未来可能增加的常常应用的元素留出一些编号
指定字段规定
音讯字段能够是下列字段之一
- singular:格局正确的音讯能够有这个字段的 0 个或者一个,默认应用 singular 字段
- repeated:该字段能够在格局正确的音讯中反复任意次数(包含 0 次),反复值的程序将被保留
- optional:该字段在传递的时候可选也可不选
保留字段
如果你通过齐全删除字段或者将其正文来 更新音讯类型,那么将来的用户在对该类型进行本人的更新的时候就能够重用字段号,如果其他人当前加载旧版本的雷同 .proto
文件,这可能就会导致重大的问题,比方数据损坏、隐衷破绽等等。
解决办法是通过 reserved
来指定曾经删除的字段的字段编号,如果未来有用户尝试应用这些字段标识符,protocol buffer 编译器将收回提醒
message Foo {reserved 2,15,9 to 11;}
值类型
.proto Type | Notes | C++ Type | Java/Kotlin Type[1] | Python Type[3] | Go Type | PHP Type |
---|---|---|---|---|---|---|
double | double | double | float | float64 | float | |
float | float | float | float | float32 | float | |
int32 | 应用可变长度编码。编码正数效率低下——如果你的字段可能有负值,则应用 sint32 代替。 | int32 | int | int | int32 | integer |
int64 | 应用可变长度编码。编码正数效率低下——如果你的字段可能有负值,则应用 sint64 代替。 | int64 | long | int/long[4] | int64 | integer/string[6] |
uint32 | 应用变长编码。 | uint32 | int[2] | int/long[4] | uint32 | integer |
uint64 | 应用变长编码。 | uint64 | long[2] | int/long[4] | uint64 | integer/string[6] |
sint32 | 应用可变长度编码。带符号的 int 值。这些编码比一般的 int32 更无效地编码正数。 | int32 | int | int | int32 | integer |
sint64 | 应用可变长度编码。带符号的 int 值。这些编码比一般的 int64 更无效地编码正数。 | int64 | long | int/long[4] | int64 | integer/string[6] |
fixed32 | 总是四个字节。如果值常常大于 228,则比 uint32 更有效率。 | uint32 | int[2] | int/long[4] | uint32 | integer |
fixed64 | 总是 8 字节。如果值常常大于 256,则比 uint64 更有效率。 | uint64 | integer/string[6] | |||
sfixed32 | 总是四个字节。 | int32 | int | int | int32 | integer |
sfixed64 | 总是八个字节。 | int64 | integer/string[6] | |||
bool | bool | boolean | bool | bool | boolean | |
string | 字符串必须始终蕴含 UTF- 8 编码的或 7 位 ASCII 文本,且不能长于 232。 | string | String | str/unicode[5] | string | string |
bytes | 能够蕴含任何不超过 232 字节的任意字节序列。 | string | ByteString | str (Python 2) bytes (Python 3) | []byte | string |
枚举
在定义音讯类型的时候,可能心愿其中的一个字段只能是预约义的值列表中的一个值。上面是一个栗子,通过 enum
来保障 Conrpus 字段的值只能是其中的一个
message SearchRequest { | |
string query = 1; | |
int32 page_number = 2; | |
int32 result_per_page = 3; | |
enum Corpus { | |
UNIVERSAL = 0; | |
WEB = 1; | |
IMAGES = 2; | |
LOCAL = 3; | |
NEWS = 4; | |
PRODUCTS = 5; | |
VIDEO = 6; | |
} | |
Corpus corpus = 4; | |
} |
嵌套音讯类型
message SearchResponse {repeated Result results = 1;} | |
message Result { | |
string url = 1; | |
string title = 2; | |
repeated string snippets = 3; | |
} |
Any
Any
类型容许你将音讯作为嵌入类型应用,应用 Any 类型须要导入google/protobuf/any.proto
import "google/protobuf/any.proto"; | |
message ErrorStatus { | |
string message = 1; | |
repeated google.protobuf.Any details = 2; | |
} |
oneof
如果你有一条蕴含多个字段的音讯,并且同时最多设置其中的一个字段,那么能够通过 oneof 来实现并节俭内存,能够通过 case()
或者 WihchOneOf()
来查看 one of 中的哪个值被设置(如果有)
message SampleMessage { | |
oneof test_oneof { | |
string name = 4; | |
SubMessage sub_message = 9; | |
} | |
} | |
SampleMessage message; | |
message.set_name("name"); | |
CHECK(message.has_name()); | |
message.mutable_sub_message(); // Will clear name field. | |
CHECK(!message.has_name()); |
Maps
如果想创立一个关联映射作为数据定义的一部分,能够应用这个 map
map<key_type, value_type> map_field = N;
protobuf 实战
oneof 字段
oneof 中的值只能抉择其中的一个
message NoticeReaderRequest { | |
string msg=1; | |
oneof notice_way{ | |
string email=2; | |
string phone=3; | |
} | |
} |
对应的服务端代码
func oneofDemo() { | |
req := &book.NoticeReaderRequest{ | |
Msg: "here is chengxisheng", | |
NoticeWay: &book.NoticeReaderRequest_Email{Email: "xxx",}, | |
} | |
req2 := &book.NoticeReaderRequest{ | |
Msg: "here is xishengcheng", | |
NoticeWay: &book.NoticeReaderRequest_Phone{Phone: "1008611",}, | |
} | |
switch v := req.NoticeWay.(type) { | |
case *book.NoticeReaderRequest_Email: | |
noticeWithEmail(v) | |
case *book.NoticeReaderRequest_Phone: | |
noticeWithPhone(v) | |
} | |
switch v := req2.NoticeWay.(type) { | |
case *book.NoticeReaderRequest_Email: | |
noticeWithEmail(v) | |
case *book.NoticeReaderRequest_Phone: | |
noticeWithPhone(v) | |
} | |
} | |
func noticeWithEmail(in *book.NoticeReaderRequest_Email) {fmt.Printf("notice reader by email:%v\n", in.Email) | |
} | |
func noticeWithPhone(in *book.NoticeReaderRequest_Phone) {fmt.Printf("notice reader by phone:%v\n", in.Phone) | |
} | |
// 这里必须应用类型断言 +switch case | |
// 来进行 one of 字段的确认 |
wrapvalue 类型
首先让咱们想一想 Go 中辨别一个 MySQL 的 int 类型是默认值还是 0 值该怎么做?
其实就只有以下两种办法
price sql.NullInt64 | |
price *int64 | |
// 第一种形式是一个定义好的构造体 | |
// 外面有一个字段是 该构造体是否被赋值 | |
// 第二种形式是间接用指针来做 | |
// 对指针解援用,如果为 0 则赋值,如果为 Nil 则是默认值 |
在 RPC 中也是如此,咱们能够通过 wrapvalue 来确定这个字段是否被赋值
func wrapValueDemo() { | |
// client | |
book:=book.Book{ | |
Title: "learning go language", | |
Price: &wrapperspb.Int64Value{Value: 600}, | |
Memo: &wrapperspb.StringValue{Value: "学"}, | |
} | |
// server | |
if book.GetPrice()==nil {fmt.Println("is not assigned") | |
} else {fmt.Println(book.GetPrice().GetValue()) | |
} | |
if book.GetMemo()==nil {fmt.Println("is not assigned") | |
} else {fmt.Println(book.GetMemo().GetValue()) | |
} | |
} |
FieldMask 类型
当咱们更新的时候,定义了很多字段,不可能全副进行全量更新 Book 的每个字段,因为通常操作只会更新 1 到 2 个字段。
当咱们想晓得更新操作波及到的具体字段,就须要应用到 filedmask
类型
message UpdateBookRequest { | |
// 操作人 | |
string op=1; | |
// 要更新的书籍信息 | |
Book book=2; | |
// 要更新的字段 | |
google.protobuf.FieldMask update_mask=3; | |
} |
func fieldMaskDemo() { | |
//client | |
paths := []string{"price"} | |
req := &book.UpdateBookRequest{ | |
Op: "chengxisheng", | |
Book: &book.Book{Price: &wrapperspb.Int64Value{Value: 8800}, | |
}, | |
UpdateMask: &fieldmaskpb.FieldMask{Paths: paths}, | |
} | |
mask, _ := fieldmask_utils.MaskFromProtoFieldMask(req.UpdateMask, generator.CamelCase) | |
var bookDst = make(map[string]interface{}) | |
fieldmask_utils.StructToMap(mask, book.UpdateBookRequest{}.Book, bookDst) | |
fmt.Printf("bookDst:%#v\n", bookDst) | |
} |
服务端流式 RPC
对应的 proto(client 和 server)中增加一个流式办法rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
Server 增加一个新的办法
func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {words := []string{ | |
"你好", | |
"hello", | |
"こんにちは", | |
"안녕하세요", | |
} | |
for _, word := range words { | |
data := &pb.HelloResponse{Reply: word + in.GetName(), | |
} | |
// 应用 Send 办法发送多个数据 每当有一个 data 就 send 一次数据 | |
if err := stream.Send(data); err != nil {return err} | |
} | |
return nil | |
} |
Client 端增加一个新的函数
func callLotsOfReplies(c proto.GreeterClient) {ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) | |
defer cancel() | |
stream, err := c.LotsOfReplies(ctx, &proto.HelloRequest{Name: *name}) | |
if err != nil {log.Fatalf("c.LotsOfReplies failed,err:%v", err) | |
} | |
for { | |
// 顺次从流式响应中读取返回的响应数据 | |
res, err := stream.Recv() | |
if err == io.EOF {break} | |
if err != nil {log.Fatalf("c.LotsOfReplies failed,err:%v", err) | |
} | |
log.Printf("got reply: %q\n", res.GetReply()) | |
} | |
} |
客户端流式 RPC
在 hello.proto
中增加这么一个新的办法
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
在 server 端增加
func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error { | |
reply := "你好:" | |
for { | |
// 承受客户端发来的流式数据 | |
res, err := stream.Recv() | |
if err == io.EOF { | |
return stream.SendAndClose(&pb.HelloResponse{Reply: reply,}) | |
} | |
if err != nil {return err} | |
reply += res.GetName()} | |
} |
在 Client 端中增加
func runLotsOfGreeting(c proto.GreeterClient) {ctx, cancel := context.WithTimeout(context.Background(), time.Second) | |
defer cancel() | |
// 客户端要流式的发送申请音讯 | |
stream, err := c.LotsOfGreetings(ctx) | |
if err != nil {log.Printf("c.LotsOfGreetings failed,err:%v\n", err) | |
return | |
} | |
names := []string{"张三", "李四", "王五"} | |
for _, name := range names {stream.Send(&proto.HelloRequest{Name: name}) | |
time.Sleep(200 * time.Millisecond) | |
} | |
// 敞开流 | |
res, err := stream.CloseAndRecv() | |
log.Printf("res:%v\n", res) | |
} |
双向流式 RPC
在 proto 中增加
rpc BidiHello(stream HelloRequest) returns(stream HelloResponse);
在 client 中增加
func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error { | |
for { | |
// 承受流式申请 | |
in, err := stream.Recv() | |
if err == io.EOF {return nil} | |
if err != nil {return err} | |
reply := magic(in.GetName()) | |
// 返回流式响应 | |
if err := stream.SendAndClose(&pb.HelloResponse{Reply: reply}); err != nil {return err} | |
} | |
} |
在 Server 端中增加
func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error { | |
for { | |
// 承受流式申请 | |
in, err := stream.Recv() | |
if err == io.EOF {return nil} | |
if err != nil {return err} | |
reply := magic(in.GetName()) | |
// 返回流式响应 | |
if err := stream.SendAndClose(&pb.HelloResponse{Reply: reply}); err != nil {return err} | |
} | |
} | |
// magic 一段无价之宝的“人工智能”代码 | |
func magic(s string) string {s = strings.ReplaceAll(s, "吗", "") | |
s = strings.ReplaceAll(s, "吧", "") | |
s = strings.ReplaceAll(s, "你", "我") | |
s = strings.ReplaceAll(s, "?", "!") | |
s = strings.ReplaceAll(s, "?", "!") | |
return s | |
} |