商业转载请分割作者取得受权,非商业转载请注明出处。
For commercial use, please contact the author for authorization. For non-commercial use, please indicate the source.
协定(License):署名-非商业性应用-雷同形式共享 4.0 国内 (CC BY-NC-SA 4.0)
作者(Author):Waste Code
链接(URL):https://waste-code.tech/archi...
起源(Source):Waste Code

文章概览

  • 我的项目模块

    1. common模块——实现实体类以及申明裸露的api接口
    2. provider模块——裸露的api接口的业务实现
    3. consumer模块——申请接口的实现,将会待用裸露的api接口
  • GITHUB: Dubbo的简略应用以及Triple协定的Streaming通信的实现
  • 官网文档: Triple协定
  • Blog目标: 记录实现过程及呈现的问题

Dubbo的简略应用

  1. 在common模块中定义实体类User
  2. 在common模块中申明暴露出的接口,实现接口UserService

    public interface UserService {    /**    * 获取用户信息    * @param name    * @return    */    User getUserInfo(String name);}
  3. 在provider和consumer模块中引入相干依赖

    <dependency>    <groupId>org.apache.dubbo</groupId>    <artifactId>dubbo-spring-boot-starter</artifactId>    <version>3.0.7</version></dependency><!--   上面这个包必须援用,服务注册到zookeeper中应用,之前没有援用这个包,后果利用起不来         --><dependency>    <groupId>org.apache.dubbo</groupId>    <artifactId>dubbo-registry-zookeeper</artifactId>    <version>3.0.7</version></dependency><dependency>    <groupId>com.sample</groupId>    <artifactId>common</artifactId>    <version>0.0.1-SNAPSHOT</version></dependency>
  4. 在provider和consumer模块中创立application.yml文件并编写相干配置

    server:    port: 8082 # 这里填写端口号,provider和consumer不同,spring:    application:        name: consumerdubbo:    protocol:        name: dubbo # 抉择通信协议        port: -1    registry:        id: zk-zookeeper        address: zookeeper://127.0.0.1:2181
  5. 在provider和consumer中编写启动类,这里以consumer模块为例,这里要加上EnableDubbo注解

    @SpringBootApplication@EnableDubbopublic class ConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(ConsumerApplication.class, args);    }}
  6. 在provider中对UserService进行实现

    // 这里留神应用注解@DubboService,时dubbo中的Service注解,次要在对外提供服务的实现类上@DubboServicepublic class UserServiceImpl implements UserService {    @Override    public User getUserInfo(String name) {        User user = new User();        user.setName("dubbo");        user.setAge(12);        return user;    }}
  7. 在consumer中实现申请接口, 援用provider模块暴露出的接口要应用DubboReference注解

    @RestController@RequestMapping("/user")public class UserController {    @DubboReference    private UserService userService;    @GetMapping("/info")    public User getUserInfo() {        return userService.getUserInfo("xxx");    }}

编写实现代码后,启动provider和consumer模块,而后通过Postman工具调用接口,发现能够失常应用就实现了

Triple协定的Streaming通信实现

Triple协定的Stream通信次要分为三种:服务端流、客户端流、双向流

利用场景

  • 接口须要发送大量数据,无奈被放到一次申请中,须要分批次发送
  • 流式场景,数据须要依照发送程序解决, 数据自身是没有确定边界的
  • 推送类场景,多个音讯在同一个调用的上下文中被发送和解决

    流的语义保障(长处)

  • 提供音讯边界,能够不便的对音讯进行独自解决
  • 严格有序,发送端的程序和接收端的程序是统一的
  • 全双工,发送不须要期待
  • 反对勾销和超时

Streaming流通信实现

服务端流(SERVER_STREAM)申请流程

服务端流(SERVER_STREAM)的Java实现

  1. 在provider和consumer模块中增加相干依赖

    <dependency>    <groupId>com.google.protobuf</groupId>    <artifactId>protobuf-java</artifactId></dependency>
  2. 批改provider和consumer模块中的相干配置

    dubbo: #此处仅截取须要变更的配置,其余配置默认为有原有的就行    protocol:        name: tri # 批改dubbo的通信协议,当然triple协定同样反对之前的dubbo的简略应用
  3. 在common模块的UserService中申明相干api接口

    /** * 服务端流 * @param name * @param response */void sayHelloServerStream(String name, StreamObserver<String> response)    throws InterruptedException;
  4. 在provider模块中实现相干性能

    //StreamObserver是接管音讯的观察者,//在onNext办法调用后,consumer模块中的消费者会获取相干的数据,//当onCompleted办法调用后,consumer模块进行最初的解决后,整个服务端流才会完结@Overridepublic void sayHelloServerStream(String name, StreamObserver<String> response)    throws InterruptedException {    response.onNext("Hallo, " + name);        // 这里提早10s,次要测试,provider模块接收数据会不会有10s的延时    Thread.sleep(10 * 1000);    response.onNext("Hallo, " + name + ", 第二次");    response.onCompleted();}
  5. 在consumer模块编写申请办法

    /** * 测试服务端流 * @param name * @return * @throws InterruptedException */@GetMapping("/sayHallo/{name}")public List<String> sayHallo(@PathVariable("name") String name) throws InterruptedException {    List<String> list = new ArrayList<>();    userService.sayHelloServerStream(name, new StreamObserver<String>() {                // 每次provider模块调用一次onNext时,该办法会执行一次        @Override        public void onNext(String data) {            System.out.println("onNext:" + data);            list.add(data);        }        @Override        public void onError(Throwable throwable) {            System.out.println("报错了");        }        // 当provider模块的onCompleted办法调用后,执行该办法        @Override        public void onCompleted() {            System.out.println("完结");        }    });    return list;}

    客户端(CLIENT_STREAM)流申请流程

双向流(BIDIRECTIONAL_STREAM)申请流程

客户端流(CLIENT_STREAM)/双向流(BIDIRECTIONAL_STREAM)的Java实现

  1. 客户端流和双向流在Java中的实现形式是同一种
  2. 援用pom和批改配置与服务端流雷同
  3. 在common模块中申明相干接口

    /** * 客户端流/双向流, 这里返回的StreamObserver类里的解决切实provider模块中实现, * 而参数StreamObserver则是在consumer模块中实现,尽管是consumer调用该办法 * @param response * @return */StreamObserver<String> sayHelloStream(StreamObserver<String> response);
  4. 在provider模块中实现相干办法

    @Overridepublic StreamObserver<String> sayHelloStream(StreamObserver<String> response) {    return new StreamObserver<String>() {        @Override        public void onNext(String data) {            System.out.println("服务端申请参数:" + data);            response.onNext("Hello, " + data);        }        @Override        public void onError(Throwable throwable) {        }        @Override        public void onCompleted() {            System.out.println("provider敞开");            response.onCompleted();        }    };}
  5. 在consumer模块中实现办法的调用

    @PostMapping("/sayHallo")public List<String> sayHallo(@RequestBody List<String> names) {    List<String> list = new ArrayList<>();    StreamObserver<String> request = userService.sayHelloStream(new StreamObserver<String>() {        @Override        public void onNext(String data) {            System.out.println("说了啥?" + data);            list.add(data);        }        @Override        public void onError(Throwable throwable) {        }        @Override        public void onCompleted() {            System.out.println("完结了");        }    });    // 下面定义了StreamObserver并调用了办法后,在下边通过onNext办法调用发送申请    names.forEach(item -> {        request.onNext(item);        try {            Thread.sleep(10 * 1000);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }    });    request.onCompleted();    return list;}