关于dubbo:Dubbo的简单使用以及Triple协议的Streaming通信的实现

35次阅读

共计 5091 个字符,预计需要花费 13 分钟才能阅读完成。

商业转载请分割作者取得受权,非商业转载请注明出处。
For commercial use, please contact the author for authorization. For non-commercial use, please indicate the source.
协定 (License):署名 - 非商业性应用 - 雷同形式共享 4.0 国内 (CC BY-NC-SA 4.0)
作者 (Author):Waste Code
链接 (URL):https://waste-code.tech/archi…
起源(Source):Waste Code

文章概览

  • 我的项目模块

    1. common 模块——实现实体类以及申明裸露的 api 接口
    2. provider 模块——裸露的 api 接口的业务实现
    3. consumer 模块——申请接口的实现,将会待用裸露的 api 接口
  • GITHUB: Dubbo 的简略应用以及 Triple 协定的 Streaming 通信的实现
  • 官网文档: Triple 协定
  • Blog 目标: 记录实现过程及呈现的问题

Dubbo 的简略应用

  1. 在 common 模块中定义实体类 User
  2. 在 common 模块中申明暴露出的接口,实现接口 UserService

    public interface UserService {
    
        /**
        * 获取用户信息
        * @param name
        * @return
        */
        User getUserInfo(String name);
    }
  3. 在 provider 和 consumer 模块中引入相干依赖

    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-spring-boot-starter</artifactId>
        <version>3.0.7</version>
    </dependency>
    
    <!--   上面这个包必须援用,服务注册到 zookeeper 中应用,之前没有援用这个包,后果利用起不来         -->
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-registry-zookeeper</artifactId>
        <version>3.0.7</version>
    </dependency>
    
    <dependency>
        <groupId>com.sample</groupId>
        <artifactId>common</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </dependency>
  4. 在 provider 和 consumer 模块中创立 application.yml 文件并编写相干配置

    server:
        port: 8082 # 这里填写端口号,provider 和 consumer 不同,spring:
        application:
            name: consumer
    
    dubbo:
        protocol:
            name: dubbo # 抉择通信协议
            port: -1
        registry:
            id: zk-zookeeper
            address: zookeeper://127.0.0.1:2181
  5. 在 provider 和 consumer 中编写启动类,这里以 consumer 模块为例,这里要加上 EnableDubbo 注解

    @SpringBootApplication
    @EnableDubbo
    public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);
        }
    }
  6. 在 provider 中对 UserService 进行实现

    // 这里留神应用注解 @DubboService,时 dubbo 中的 Service 注解,次要在对外提供服务的实现类上
    @DubboService
    public class UserServiceImpl implements UserService {
        @Override
        public User getUserInfo(String name) {User user = new User();
            user.setName("dubbo");
            user.setAge(12);
            return user;
        }
    
    }
    
  7. 在 consumer 中实现申请接口, 援用 provider 模块暴露出的接口要应用 DubboReference 注解

    @RestController
    @RequestMapping("/user")
    public class UserController {
    
        @DubboReference
        private UserService userService;
    
        @GetMapping("/info")
        public User getUserInfo() {return userService.getUserInfo("xxx");
        }
    
    
    }

编写实现代码后,启动 provider 和 consumer 模块,而后通过 Postman 工具调用接口,发现能够失常应用就实现了

Triple 协定的 Streaming 通信实现

Triple 协定的 Stream 通信次要分为三种:服务端流、客户端流、双向流

利用场景

  • 接口须要发送大量数据,无奈被放到一次申请中,须要分批次发送
  • 流式场景,数据须要依照发送程序解决, 数据自身是没有确定边界的
  • 推送类场景,多个音讯在同一个调用的上下文中被发送和解决

    流的语义保障(长处)

  • 提供音讯边界,能够不便的对音讯进行独自解决
  • 严格有序,发送端的程序和接收端的程序是统一的
  • 全双工,发送不须要期待
  • 反对勾销和超时

Streaming 流通信实现

服务端流 (SERVER_STREAM) 申请流程

服务端流 (SERVER_STREAM) 的 Java 实现

  1. 在 provider 和 consumer 模块中增加相干依赖

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
    </dependency>
  2. 批改 provider 和 consumer 模块中的相干配置

    dubbo: #此处仅截取须要变更的配置,其余配置默认为有原有的就行
        protocol:
            name: tri # 批改 dubbo 的通信协议,当然 triple 协定同样反对之前的 dubbo 的简略应用
  3. 在 common 模块的 UserService 中申明相干 api 接口

    /**
     * 服务端流
     * @param name
     * @param response
     */
    void sayHelloServerStream(String name, StreamObserver<String> response)
        throws InterruptedException;
  4. 在 provider 模块中实现相干性能

    //StreamObserver 是接管音讯的观察者,// 在 onNext 办法调用后,consumer 模块中的消费者会获取相干的数据,// 当 onCompleted 办法调用后,consumer 模块进行最初的解决后,整个服务端流才会完结
    @Override
    public void sayHelloServerStream(String name, StreamObserver<String> response)
        throws InterruptedException {response.onNext("Hallo," + name);
        
        // 这里提早 10s,次要测试,provider 模块接收数据会不会有 10s 的延时
        Thread.sleep(10 * 1000);
    
        response.onNext("Hallo," + name + ", 第二次");
    
        response.onCompleted();}
  5. 在 consumer 模块编写申请办法

    
    /**
     * 测试服务端流
     * @param name
     * @return
     * @throws InterruptedException
     */
    @GetMapping("/sayHallo/{name}")
    public List<String> sayHallo(@PathVariable("name") String name) throws InterruptedException {List<String> list = new ArrayList<>();
        userService.sayHelloServerStream(name, new StreamObserver<String>() {
            
            // 每次 provider 模块调用一次 onNext 时,该办法会执行一次
            @Override
            public void onNext(String data) {System.out.println("onNext:" + data);
                list.add(data);
            }
    
            @Override
            public void onError(Throwable throwable) {System.out.println("报错了");
            }
            // 当 provider 模块的 onCompleted 办法调用后,执行该办法
            @Override
            public void onCompleted() {System.out.println("完结");
            }
        });
        return list;
    }

    客户端 (CLIENT_STREAM) 流申请流程

双向流 (BIDIRECTIONAL_STREAM) 申请流程

客户端流 (CLIENT_STREAM)/ 双向流(BIDIRECTIONAL_STREAM) 的 Java 实现

  1. 客户端流和双向流在 Java 中的实现形式是同一种
  2. 援用 pom 和批改配置与服务端流雷同
  3. 在 common 模块中申明相干接口

    /**
     * 客户端流 / 双向流, 这里返回的 StreamObserver 类里的解决切实 provider 模块中实现,* 而参数 StreamObserver 则是在 consumer 模块中实现,尽管是 consumer 调用该办法
     * @param response
     * @return
     */
    StreamObserver<String> sayHelloStream(StreamObserver<String> response);
  4. 在 provider 模块中实现相干办法

    @Override
    public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {return new StreamObserver<String>() {
            @Override
            public void onNext(String data) {System.out.println("服务端申请参数:" + data);
                response.onNext("Hello," + data);
            }
    
            @Override
            public void onError(Throwable throwable) { }
    
            @Override
            public void onCompleted() {System.out.println("provider 敞开");
                response.onCompleted();}
        };
    }
  5. 在 consumer 模块中实现办法的调用

    
    @PostMapping("/sayHallo")
    public List<String> sayHallo(@RequestBody List<String> names) {List<String> list = new ArrayList<>();
        StreamObserver<String> request = userService.sayHelloStream(new StreamObserver<String>() {
            @Override
            public void onNext(String data) {System.out.println("说了啥?" + data);
                list.add(data);
            }
    
            @Override
            public void onError(Throwable throwable) { }
    
            @Override
            public void onCompleted() {System.out.println("完结了");
            }
        });
    
        // 下面定义了 StreamObserver 并调用了办法后,在下边通过 onNext 办法调用发送申请
        names.forEach(item -> {request.onNext(item);
            try {Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {throw new RuntimeException(e);
            }
        });
        request.onCompleted();
    
        return list;
    }

正文完
 0