spring cloud stream 新绑定形式

新版spring cloud stream文档

新版提倡用函数式进行发送和生产信息

定义返回类型为Supplier, Function or Consumer的bean提供音讯发送和生产的bean

看看绑定名称命名规定

  • input - <functionName> + -in- + <index>
  • output - <functionName> + -out- + <index>

在配置文件中指定spring.cloud.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上.

如下定义 会把bean绑定在消费者consumerEvent-in-0或者提供者consumerEvent-out-0上

多个bean能够用 ; 进行宰割

spring:  cloud:    function:      definition: consumerEvent

指定这个消费者的topic和group

spring:  cloud:    stream:      bindings:        consumerEvent-in-0:          destination: DEMO          group: demo-group

注册消费者的bean

// 第一种形式(官网举荐)@Beanpublic Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {    return flux -> flux.map(message -> {        System.out.println(message.getPayload());        return message;    }).then();}// 第二种形式// 留神应用Flux 要调用 subscribe 不然这个办法不会被生产@Beanpublic Consumer<Flux<Message<String>>> consumerEvent() {    return flux -> flux.map(message -> {        System.out.println(message.getPayload());        return message;    }).subscribe();}// 或@Beanpublic Consumer<Message<String>> consumerEvent() {    return message -> System.out.println(message.getPayload());}

示例

提供者

@Configurationpublic class EventSender {    @Bean    public Demo demo() {        return new Demo();    }    static class Demo implements CommandLineRunner {        @Autowired        StreamBridge streamBridge;        @Override        public void run(String... args) throws Exception {            final Message<T> message = MessageBuilder.withPayload("Body")                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)                .build();            // 第一个配置的是目的地            // 如果在yaml中有配置会发送到yaml中目的地            streamBridge.send("DEMO", message);        }    }}

配置rocketmq和stream的配置

spring:  application:    name: demo  cloud:    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876          group: demo      bindings:        consumerEvent-in-0:          destination: DEMO          content-type: application/json          group: demo-group      function:        definition: consumerEvent

注册一个消费者

@Configurationpublic class EventReceptor {    @Bean    public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {        return flux -> flux.map(message -> {            System.out.println(message.getPayload());            return message;        }).then();    }}

依赖

spring cloud 2020 默认不应用bootstrap启动 要加这个依赖spring-cloud-starter-bootstrap

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-dependencies</artifactId>    <version>2020.0.2</version>    <type>pom</type>    <scope>import</scope></dependency><dependency>    <groupId>com.alibaba.cloud</groupId>    <artifactId>spring-cloud-alibaba-dependencies</artifactId>    <version>2.2.5-RocketMQ-RC1</version>    <type>pom</type>    <scope>import</scope></dependency><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-bootstrap</artifactId>    <version>3.0.2</version></dependency>

Tag过滤

在新版的时候过滤tag始终生效, 前面看源码发现新版的sql和tag联合到subscription的属性中

this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));public static MessageSelector getMessageSelector(String expression) {    return StringUtils.hasText(expression) && expression.startsWith("sql:") ? MessageSelector.bySql(expression.replaceFirst("sql:", "")) : MessageSelector.byTag(expression);}

如果消费者要过滤某个tag须要这么写

// 新版 (当初的写法)rocketmq:  bindings:    createUserAccountEvent-in-0:      consumer:        subscription: DEMO-TAG// 旧版 (以前的写法)rocketmq:  bindings:    createUserAccountEvent-in-0:      consumer:        tag: DEMO-TAG