舒适提醒:本文须要联合上一篇 gRPC 文章一起食用,否则可能看不懂。
后面一篇文章松哥和大家聊了 gRPC 的根本用法,明天咱们再来略微深刻一点点,来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式别离是:
- 一元 RPC
- 服务端流 RPC
- 客户端流 RPC
- 双向流 RPC
接下来松哥就通过四个残缺的案例,来别离和向搭档们演示这四种不同的通信模式。
1. 筹备工作
对于 gRPC 的基础知识咱们就不啰嗦了,咱们间接来看我明天的 proto 文件,如下:
这次我新建了一个名为 book.proto 的文件,这里次要定义了一些图书相干的办法,如下:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";
package book;
service BookService {rpc addBook(Book) returns (google.protobuf.StringValue);
rpc getBook(google.protobuf.StringValue) returns (Book);
rpc searchBooks(google.protobuf.StringValue) returns (stream Book);
rpc updateBooks(stream Book) returns (google.protobuf.StringValue);
rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}
message Book {
string id = 1;
repeated string tags = 2;
string name = 3;
float price = 4;
string author = 5;
}
message BookSet {
string id = 1;
repeated Book bookList = 3;
}
这个文件中,有一些内容咱们在上篇文章中都讲过了,讲过的我就不再反复了,我说一些上篇文章没有波及到的货色:
- 因为咱们在这个文件中,援用了 Google 提供的 StringValue(
google.protobuf.StringValue
),所以这个文件下面咱们首先用 import 导入相干的文件,导入之后,才能够应用。 - 在办法参数和返回值中呈现的 stream,就示意这个办法的参数或者返回值是流的模式(其实就是数据能够屡次传输)。
- message 中呈现了一个上篇文章没有的关键字 repeated,这个示意这个字段能够反复,能够简略了解为这就是咱们 Java 中的数组。
好了,和上篇文章相比,本文次要就是这几个中央不一样。
proto 文件写好之后,依照上篇文章介绍的办法进行编译,生成对应的代码,这里就不再反复了。
2. 一元 RPC
一元 RPC 是一种比较简单的 RPC 模式,其实说白了咱们上篇文章和大家介绍的就是一种一元 RPC,也就是客户端发动一个申请,服务端给出一个响应,而后申请完结。
下面咱们定义的五个办法中,addBook 和 getBook 都算是一种一元 RPC。
2.1 addBook
先来看 addBook 办法,这个办法的逻辑很简略,咱们提前在服务端筹备一个 Map 用来保留 Book,addBook 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void addBook(Book request, StreamObserver<StringValue> responseObserver) {bookMap.put(request.getId(), request);
responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());
responseObserver.onCompleted();}
}
看过上篇文章的小伙伴,我感觉这段代码应该很好了解。
客户端调用形式如下:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
addBook(stub);
}
private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue stringValue) {System.out.println("stringValue.getValue() =" + stringValue.getValue());
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {countDownLatch.countDown();
System.out.println("增加结束");
}
});
countDownLatch.await();}
}
这里我应用了 CountDownLatch 来实现线程期待,等服务端给出响应之后,客户端再完结。这里在回调的 onNext 办法中,咱们就能够拿到服务端的返回值。
2.2 getBook
getBook 跟下面的 addBook 相似,先来看服务端代码,如下:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void getBook(StringValue request, StreamObserver<Book> responseObserver) {String id = request.getValue();
Book book = bookMap.get(id);
if (book != null) {responseObserver.onNext(book);
responseObserver.onCompleted();} else {responseObserver.onCompleted();
}
}
}
这个 getBook 就是依据客户端传来的 id,从 Map 中查问到一个 Book 并返回。
客户端调用代码如下:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
getBook(stub);
}
private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver<Book>() {
@Override
public void onNext(Book book) {System.out.println("book =" + book);
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {countDownLatch.countDown();
System.out.println("查问结束");
}
});
countDownLatch.await();}
}
小伙伴们大略也能看进去,addBook 和 getBook 基本上操作套路是截然不同的。
3. 服务端流 RPC
后面的一元 RPC,客户端发动一个申请,服务端给出一个响应,申请就完结了。服务端流则是客户端发动一个申请,服务端给一个响应序列,这个响应序列组成一个流。
下面咱们给出的 searchBook 就是这样一个例子,searchBook 是传递图书的 tags 参数,而后在服务端查问哪些书的 tags 满足条件,将满足条件的书全副都返回去。
咱们来看下服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void searchBooks(StringValue request, StreamObserver<Book> responseObserver) {Set<String> keySet = bookMap.keySet();
String tags = request.getValue();
for (String key : keySet) {Book book = bookMap.get(key);
int tagsCount = book.getTagsCount();
for (int i = 0; i < tagsCount; i++) {String t = book.getTags(i);
if (t.equals(tags)) {responseObserver.onNext(book);
break;
}
}
}
responseObserver.onCompleted();}
}
小伙伴们看下,这段 Java 代码应该很好了解:
- 首先从 request 中提取客户端传来的 tags 参数。
- 遍历 bookMap,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,阐明增加匹配,则通过
responseObserver.onNext(book);
将这本书写回到客户端。 - 等所有操作都实现后,执行
responseObserver.onCompleted();
,示意服务端的响应序列完结了,这样客户端也就晓得申请完结了。
咱们来看看客户端的代码,如下:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
searchBook(stub);
}
private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver<Book>() {
@Override
public void onNext(Book book) {System.out.println(book);
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {countDownLatch.countDown();
System.out.println("查问结束!");
}
});
countDownLatch.await();}
}
客户端的代码好了解,搜寻的关键字是 明清小说
,每当服务端返回一次数据的时候,客户端回调的 onNext 办法就会被触发一次,当服务端之行了 responseObserver.onCompleted();
之后,客户端的 onCompleted 办法也会被触发。
这个就是服务端流,客户端发动一个申请,服务端通过 onNext 能够屡次写回数据。
4. 客户端流 RPC
客户端流则是客户端发动多个申请,服务端只给出一个响应。
下面的 updateBooks 就是一个客户端流的案例,客户端想要批改图书,能够发动多个申请批改多本书,服务端则收集屡次批改的后果,将之汇总而后一次性返回给客户端。
咱们先来看看服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public StreamObserver<Book> updateBooks(StreamObserver<StringValue> responseObserver) {StringBuilder sb = new StringBuilder("更新的图书 ID 为:");
return new StreamObserver<Book>() {
@Override
public void onNext(Book book) {bookMap.put(book.getId(), book);
sb.append(book.getId())
.append(",");
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());
responseObserver.onCompleted();}
};
}
}
客户端每发送一本书来,就会触发服务端的 onNext 办法,而后咱们在这办法中进行图书的更新操作,并记录更新后果。最初,咱们在 onCompleted 办法中,将更新后果汇总返回给客户端,基本上就是这样一个流程。
咱们再来看看客户端的代码:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
updateBook(stub);
}
private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver<Book> request = stub.updateBooks(new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue stringValue) {System.out.println("stringValue.getValue() =" + stringValue.getValue());
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {System.out.println("更新结束");
countDownLatch.countDown();}
});
request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());
request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());
request.onCompleted();
countDownLatch.await();}
}
在客户端这块,updateBooks 办法会返回一个 StreamObserver<Book> 对象,调用该对象的 onNext 办法就是给服务端传递数据了,能够传递多个数据,调用该对象的 onCompleted 办法就是通知服务端数据传递完结了,此时也会触发服务端的 onCompleted 办法,服务端的 onCompleted 办法执行之后,进而触发了客户端的 onCompleted 办法。
5. 双向流 RPC
双向流其实就是 3、4 大节的合体。即客户端屡次发送数据,服务端也屡次响应数据。
咱们先来看下服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();
private List<Book> books = new ArrayList<>();
public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public StreamObserver<StringValue> processBooks(StreamObserver<BookSet> responseObserver) {return new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue stringValue) {Book b = Book.newBuilder().setId(stringValue.getValue()).build();
books.add(b);
if (books.size() == 3) {BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
responseObserver.onNext(bookSet);
books.clear();}
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
responseObserver.onNext(bookSet);
books.clear();
responseObserver.onCompleted();}
};
}
}
这段代码没有实际意义,单纯为了给小伙伴们演示双向流,我的操作逻辑是客户端传递多个 ID 到服务端,而后服务端依据这些 ID 构建对应的 Book 对象,而后三个三个一组,再返回给客户端。客户端每次发送一个申请,都会触发服务端的 onNext 办法,咱们在这个办法中对申请分组返回。最初如果还有残余的申请,咱们在 onCompleted() 办法中返回。
再来看看客户端的代码:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
processBook(stub);
}
private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver<StringValue> request = stub.processBooks(new StreamObserver<BookSet>() {
@Override
public void onNext(BookSet bookSet) {System.out.println("bookSet =" + bookSet);
System.out.println("=============");
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() {System.out.println("处理完毕!");
countDownLatch.countDown();}
});
request.onNext(StringValue.newBuilder().setValue("a").build());
request.onNext(StringValue.newBuilder().setValue("b").build());
request.onNext(StringValue.newBuilder().setValue("c").build());
request.onNext(StringValue.newBuilder().setValue("d").build());
request.onCompleted();
countDownLatch.await();}
}
这个客户端的代码跟第四大节截然不同,不再赘述了。
好啦,这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式,文章中只给出了一些要害代码,如果小伙伴们没看明确,倡议联合上篇文章一起浏览就懂啦~