乐趣区

关于java:Spring-cloud-stream-31-rocketmq踩坑记录

spring cloud stream 新绑定形式

新版 spring cloud stream 文档

新版提倡用函数式进行发送和生产信息

定义返回类型为 Supplier, Function or Consumer 的 bean 提供音讯发送和生产的 bean

看看绑定名称命名规定

  • input – <functionName> + -in- + <index>
  • output – <functionName> + -out- + <index>

在配置文件中指定 spring.cloud.function.definition 的名称后会把这个 bean 绑定到对应的消费者和提供者上.

如下定义 会把 bean 绑定在消费者 consumerEvent-in- 0 或者提供者 consumerEvent-out- 0 上

多个 bean 能够用 ; 进行宰割

spring:
  cloud:
    function:
      definition: consumerEvent

指定这个消费者的 topic 和 group

spring:
  cloud:
    stream:
      bindings:
        consumerEvent-in-0:
          destination: DEMO
          group: demo-group

注册消费者的 bean

// 第一种形式 (官网举荐)
@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
    return flux -> flux.map(message -> {System.out.println(message.getPayload());
        return message;
    }).then();}

// 第二种形式
// 留神应用 Flux 要调用 subscribe 不然这个办法不会被生产
@Bean
public Consumer<Flux<Message<String>>> consumerEvent() {
    return flux -> flux.map(message -> {System.out.println(message.getPayload());
        return message;
    }).subscribe();}
// 或
@Bean
public Consumer<Message<String>> consumerEvent() {return message -> System.out.println(message.getPayload());
}

示例

提供者

@Configuration
public class EventSender {
    @Bean
    public Demo demo() {return new Demo();
    }

    static class Demo implements CommandLineRunner {
        @Autowired
        StreamBridge streamBridge;

        @Override
        public void run(String... args) throws Exception {final Message<T> message = MessageBuilder.withPayload("Body")
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();

            // 第一个配置的是目的地
            // 如果在 yaml 中有配置会发送到 yaml 中目的地
            streamBridge.send("DEMO", message);
        }
    }
}

配置 rocketmq 和 stream 的配置

spring:
  application:
    name: demo
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          group: demo
      bindings:
        consumerEvent-in-0:
          destination: DEMO
          content-type: application/json
          group: demo-group
      function:
        definition: consumerEvent

注册一个消费者

@Configuration
public class EventReceptor {
    @Bean
    public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
        return flux -> flux.map(message -> {System.out.println(message.getPayload());
            return message;
        }).then();}
}

依赖

spring cloud 2020 默认不应用 bootstrap 启动 要加这个依赖 spring-cloud-starter-bootstrap

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>2020.0.2</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.5-RocketMQ-RC1</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bootstrap</artifactId>
    <version>3.0.2</version>
</dependency>

Tag 过滤

在新版的时候过滤 tag 始终生效, 前面看源码发现新版的 sql 和 tag 联合到 subscription 的属性中

this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));

public static MessageSelector getMessageSelector(String expression) {return StringUtils.hasText(expression) && expression.startsWith("sql:") ? MessageSelector.bySql(expression.replaceFirst("sql:", "")) : MessageSelector.byTag(expression);
}

如果消费者要过滤某个 tag 须要这么写

// 新版 (当初的写法)
rocketmq:
  bindings:
    createUserAccountEvent-in-0:
      consumer:
        subscription: DEMO-TAG

// 旧版 (以前的写法)
rocketmq:
  bindings:
    createUserAccountEvent-in-0:
      consumer:
        tag: DEMO-TAG
退出移动版