spring cloud stream 新绑定形式
新版spring cloud stream文档
新版提倡用函数式进行发送和生产信息
定义返回类型为Supplier, Function or Consumer的bean提供音讯发送和生产的bean
看看绑定名称命名规定
- input - <functionName> + -in- + <index>
- output - <functionName> + -out- + <index>
在配置文件中指定spring.cloud.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上.
如下定义 会把bean绑定在消费者consumerEvent-in-0或者提供者consumerEvent-out-0上
多个bean能够用 ; 进行宰割
spring: cloud: function: definition: consumerEvent
指定这个消费者的topic和group
spring: cloud: stream: bindings: consumerEvent-in-0: destination: DEMO group: demo-group
注册消费者的bean
// 第一种形式(官网举荐)@Beanpublic Function<Flux<Message<String>>, Mono<Void>> consumerEvent() { return flux -> flux.map(message -> { System.out.println(message.getPayload()); return message; }).then();}// 第二种形式// 留神应用Flux 要调用 subscribe 不然这个办法不会被生产@Beanpublic Consumer<Flux<Message<String>>> consumerEvent() { return flux -> flux.map(message -> { System.out.println(message.getPayload()); return message; }).subscribe();}// 或@Beanpublic Consumer<Message<String>> consumerEvent() { return message -> System.out.println(message.getPayload());}
示例
提供者
@Configurationpublic class EventSender { @Bean public Demo demo() { return new Demo(); } static class Demo implements CommandLineRunner { @Autowired StreamBridge streamBridge; @Override public void run(String... args) throws Exception { final Message<T> message = MessageBuilder.withPayload("Body") .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); // 第一个配置的是目的地 // 如果在yaml中有配置会发送到yaml中目的地 streamBridge.send("DEMO", message); } }}
配置rocketmq和stream的配置
spring: application: name: demo cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 group: demo bindings: consumerEvent-in-0: destination: DEMO content-type: application/json group: demo-group function: definition: consumerEvent
注册一个消费者
@Configurationpublic class EventReceptor { @Bean public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() { return flux -> flux.map(message -> { System.out.println(message.getPayload()); return message; }).then(); }}
依赖
spring cloud 2020 默认不应用bootstrap启动 要加这个依赖spring-cloud-starter-bootstrap
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope></dependency><dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5-RocketMQ-RC1</version> <type>pom</type> <scope>import</scope></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> <version>3.0.2</version></dependency>
Tag过滤
在新版的时候过滤tag始终生效, 前面看源码发现新版的sql和tag联合到subscription的属性中
this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));public static MessageSelector getMessageSelector(String expression) { return StringUtils.hasText(expression) && expression.startsWith("sql:") ? MessageSelector.bySql(expression.replaceFirst("sql:", "")) : MessageSelector.byTag(expression);}
如果消费者要过滤某个tag须要这么写
// 新版 (当初的写法)rocketmq: bindings: createUserAccountEvent-in-0: consumer: subscription: DEMO-TAG// 旧版 (以前的写法)rocketmq: bindings: createUserAccountEvent-in-0: consumer: tag: DEMO-TAG