乐趣区

关于云原生:Spring-Cloud-Bus-消息总线介绍

作者 | 洛夜
起源 | 阿里巴巴云原生公众号

在 Spring 生态中玩转 RocketMQ 系列文章:

  • 《如何在 Spring 生态中玩转 RocketMQ?》
  • 《罗美琪和春波特的故事 …》
  • 《RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?》
  • 《应用 rocketmq-spring-boot-starter 来配置、发送和生产 RocketMQ 音讯》
  • 《Spring Cloud Stream 体系及原理介绍》

本文配套可交互教程已登录 阿里云知口头手实验室,PC 端登录 start.aliyun.com 在浏览器中立刻体验

Spring Cloud Bus 对本人的定位是 Spring Cloud 体系内的音讯总线,应用 message broker 来连贯分布式系统的所有节点。Bus 官网的 Reference 文档 比较简单,简略到连一张图都没有。

这是最新版的 Spring Cloud Bus 代码构造(代码量比拟少):

Bus 实例演示

在剖析 Bus 的实现之前,咱们先来看两个应用 Spring Cloud Bus 的简略例子。

1. 所有节点的配置新增

Bus 的例子比较简单,因为 Bus 的 AutoConfiguration 层都有了默认的配置,只须要引入消息中间件对应的 Spring Cloud Stream 以及 Spring Cloud Bus 依赖即可,之后所有启动的利用都会应用同一个 Topic 进行音讯的接管和发送。

Bus 对应的 Demo 曾经放到了 github 上,该 Demo 会模仿启动 5 个节点,只须要对其中任意的一个实例新增配置项,所有节点都会新增该配置项。

Demo 地址:https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo

拜访任意节点提供的 Controller 提供的获取配置的地址(key 为 hangzhou):

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'

所有节点返回的后果都是 unknown,因为所有节点的配置中没有 hangzhou 这个 key。

Bus 外部提供了 EnvironmentBusEndpoint 这个 Endpoint 通过 message broker 用来新增 / 更新配置。

拜访任意节点该 Endpoint 对应的 url: /actuator/bus-env?name=hangzhou&value=alibaba 进行配置项的新增(比方拜访 node1 的 url):

curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'

而后再次拜访所有节点 /bus/env 获取配置:

$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
alibaba%

能够看到,所有节点都新增了一个 key 为 hangzhou 的配置,且对应的 value 是 alibaba。这个配置项是通过 Bus 提供的 EnvironmentBusEndpoint 实现的。

这里援用 程序猿 DD 画的一张图片,Spring Cloud Config 配合 Bus 实现所有节点配置的刷新来形容之前的实例(本文实例不是刷新,而是新增配置,然而流程是一样的):

2. 局部节点的配置批改

比方在 node1 上指定 destination 为 rocketmq-bus-node2 (node2 配置了 spring.cloud.bus.id 为 rocketmq-bus-node2:10002,能够匹配上) 进行配置的批改:

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'

拜访 /bus/env 获取配置(因为在 node1 上发送音讯,Bus 也会对发送方的节点 node1 进行配置批改):

~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
xihu%

能够看到,只有 node1 和 node2 批改了配置,其余的 3 个节点配置未扭转。

Bus 的实现

1. Bus 概念介绍

1)事件

Bus 中定义了近程事件 RemoteApplicationEvent,该事件继承了 Spring 的事件 ApplicationEvent,而且它目前有 4 个具体的实现:

  • EnvironmentChangeRemoteApplicationEvent:近程环境变更事件。次要用于接管一个 Map<String,String> 类型的数据并更新到 Spring 上下文中 Environment 中的事件。文中的实例就是应用这个事件并配合 EnvironmentBusEndpoint 和 EnvironmentChangeListener 实现的。
  • AckRemoteApplicationEvent:近程确认事件。Bus 外部胜利接管到近程事件后会发送回 AckRemoteApplicationEvent 确认事件进行确认。
  • RefreshRemoteApplicationEvent: 近程配置刷新事件。配合 @RefreshScope 以及所有的 @ConfigurationProperties 注解润饰的配置类的动静刷新。
  • UnknownRemoteApplicationEvent:近程未知事件。Bus 内部消息体进行转换近程事件的时候如果产生异样会对立包装成该事件。

Bus 外部还存在一个非 RemoteApplicationEvent 事件 -SentApplicationEvent 音讯发送事件,配合 Trace 进行近程音讯发送的记录。

这些事件会配合 ApplicationListener 进行操作,比方 EnvironmentChangeRemoteApplicationEvent 配了 EnvironmentChangeListener 进行配置的新增 / 批改:

public class EnvironmentChangeListener
        implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
    @Autowired
    private EnvironmentManager env;
    @Override
    public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {Map<String, String> values = event.getValues();
        log.info("Received remote environment change request. Keys/values to update"
                + values);
        for (Map.Entry<String, String> entry : values.entrySet()) {env.setProperty(entry.getKey(), entry.getValue());
        }
    }
}

收到其它节点发送来 EnvironmentChangeRemoteApplicationEven 事件之后调用 EnvironmentManager#setProperty 进行配置的设置,该办法外部针对每一个配置项都会发送一个 EnvironmentChangeEvent 事件,而后被 ConfigurationPropertiesRebinder 所监听,进行 rebind 操作新增 / 更新配置。

2)Actuator Endpoint

Bus 外部裸露了 2 个 Endpoint,别离是 EnvironmentBusEndpoint 和 RefreshBusEndpoint,进行配置的新增 / 批改以及全局配置刷新。它们对应的 Endpoint id 即 url 是 bus-env 和 bus-refresh。

3)配置

Bus 对于音讯的发送必然波及到 Topic、Group 之类的信息,这些内容都被封装到了 BusProperties 中,其默认的配置前缀为 spring.cloud.bus,比方:

  • spring.cloud.bus.refresh.enabled 用于开启 / 敞开全局刷新的 Listener。
  • spring.cloud.bus.env.enabled 用于开启 / 敞开配置新增 / 批改的 Endpoint。
  • spring.cloud.bus.ack.enabled 用于开启开启 / 敞开 AckRemoteApplicationEvent 事件的发送。
  • spring.cloud.bus.trace.enabled 用于开启 / 敞开息记录 Trace 的 Listener。

音讯发送波及到的 Topic 默认用的是 springCloudBus,能够配置进行批改,Group 能够设置成播送模式或应用 UUID 配合 offset 为 lastest 的模式。

每个 Bus 利用都有一个对应的 Bus id,官网取值形式较简单:

${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}

倡议手动配置 Bus id,因为 Bus 近程事件中的 destination 会依据 Bus id 进行匹配:

spring.cloud.bus.id=${spring.application.name}-${server.port}

2. Bus 底层剖析

Bus 的底层剖析无非牵扯到这几个方面:

  • 音讯是如何发送的
  • 音讯是如何接管的
  • destination 是如何匹配的
  • 近程事件收到后如何触发下一个 action

BusAutoConfiguration 自动化配置类被 @EnableBinding(SpringCloudBusClient.class)所润饰。

@EnableBinding 的用法在文章《Spring Cloud Stream 体系及原理介绍》中曾经阐明,且它的 value 为 SpringCloudBusClient.class,会在 SpringCloudBusClient 中基于代理创立出 input 和 output 的 DirectChannel:

public interface SpringCloudBusClient {
    String INPUT = "springCloudBusInput";
    String OUTPUT = "springCloudBusOutput";
    @Output(SpringCloudBusClient.OUTPUT)
    MessageChannel springCloudBusOutput();
    @Input(SpringCloudBusClient.INPUT)
    SubscribableChannel springCloudBusInput();}

springCloudBusInput 和 springCloudBusOutput 这两个 Binding 的属性能够通过配置文件进行批改(比方批改 topic):

spring.cloud.stream.bindings:
  springCloudBusInput:
    destination: my-bus-topic
  springCloudBusOutput:
    destination: my-bus-topic

音讯的接管和发送:

// BusAutoConfiguration
@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) { // 2
        this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
    }
}
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) {if (event instanceof AckRemoteApplicationEvent) {if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.applicationEventPublisher != null) { // 5
            this.applicationEventPublisher.publishEvent(event);
        }
        // If it's an ACK we are finished processing at this point
        return;
    }
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) { // 6
        if (!this.serviceMatcher.isFromSelf(event)) { // 7
            this.applicationEventPublisher.publishEvent(event);
        }
        if (this.bus.getAck().isEnabled()) { // 8
            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                    this.serviceMatcher.getServiceId(),
                    this.bus.getAck().getDestinationService(),
                    event.getDestinationService(), event.getId(), event.getClass());
            this.cloudBusOutboundChannel
                    .send(MessageBuilder.withPayload(ack).build());
            this.applicationEventPublisher.publishEvent(ack);
        }
    }
    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9
        // We are set to register sent events so publish it for local consumption,
        // irrespective of the origin
        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                event.getOriginService(), event.getDestinationService(),
                event.getId(), event.getClass()));
    }
}
  1. 利用 Spring 事件的监听机制监听本地所有的 RemoteApplicationEvent 近程事件(比方 bus-env 会在本地发送 EnvironmentChangeRemoteApplicationEvent 事件,bus-refresh 会在本地发送 RefreshRemoteApplicationEvent 事件,这些事件在这里都会被监听到)。
  2. 判断本地接管到的事件不是 AckRemoteApplicationEvent 近程确认事件 (不然会死循环,始终接管音讯,发送音讯 …) 以及该事件是利用本身发送进来的(事件发送方是利用本身),如果都满足执行步骤 3。
  3. 结构 Message 并将该近程事件作为 payload,而后应用 Spring Cloud Stream 结构的 Binding name 为 springCloudBusOutput 的 MessageChannel 将音讯发送到 broker。

4.@StreamListener 注解生产 Spring Cloud Stream 结构的 Binding name 为 springCloudBusInput 的 MessageChannel,接管的音讯为近程音讯。

  1. 如果该近程事件是 AckRemoteApplicationEvent 近程确认事件并且利用开启了音讯追踪 trace 开关,同时该近程事件不是利用本身发送的(事件发送方不是利用本身,示意事件是其它利用发送过去的),那么本地发送 AckRemoteApplicationEvent 近程确认事件示意利用确认收到了其它利用发送过去的近程事件,流程完结。
  2. 如果该近程事件是其它利用发送给利用本身的(事件的接管方是利用本身),那么进行步骤 7 和 8,否则执行步骤 9。
  3. 该近程事件不是利用本身发送 (事件发送方不是利用本身) 的话,将该事件以本地的形式发送进来。利用本身一开始曾经在本地被对应的音讯接管方解决了,无需再次发送。
  4. 如果开启了 AckRemoteApplicationEvent 近程确认事件的开关,结构 AckRemoteApplicationEvent 事件并在近程和本地都发送该事件(本地发送是因为步骤 5 没有进行本地 AckRemoteApplicationEvent 事件的发送,也就是本身利用对本身利用确认; 近程发送是为了通知其它利用,本身利用收到了音讯)。
  5. 如果开启了音讯记录 Trace 的开关,本地结构并发送 SentApplicationEvent 事件。

bus-env 触发后所有节点的 EnvironmentChangeListener 监听到了配置的变动,控制台都会打印出以下信息:

o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}

如果在本地监听近程确认事件 AckRemoteApplicationEvent,都会收到所有节点的信息,比方 node5 节点的控制台监听到的 AckRemoteApplicationEvent 事件如下:

ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

那么回到本章节结尾提到的 4 个问题,咱们别离做一下解答:

  • 音讯是如何发送的: 在 BusAutoConfiguration#acceptLocal 办法中通过 Spring Cloud Stream 发送事件到 springCloudBustopic 中。
  • 音讯是如何接管的: 在 BusAutoConfiguration#acceptRemote 办法中通过 Spring Cloud Stream 接管 springCloudBustopic 的音讯。
  • destination 是如何匹配的: 在 BusAutoConfiguration#acceptRemote 办法中接管近程事件办法里对 destination 进行匹配。
  • 近程事件收到后如何触发下一个 action: Bus 外部通过 Spring 的事件机制接管本地的 RemoteApplicationEvent 具体的实现事件再做下一步的动作(比方 EnvironmentChangeListener 接管了 EnvironmentChangeRemoteApplicationEvent 事件,RefreshListener 接管了 RefreshRemoteApplicationEvent 事件)。

总结

Spring Cloud Bus 本身内容还是比拟少的,不过还是须要提前理解 Spring Cloud Stream 体系以及 Spring 本身的事件机制,在此基础上,能力更好地了解 Spring Cloud Bus 对本地事件和近程事件的解决逻辑。

目前 Bus 内置的近程事件较少,大多数为配置相干的事件,咱们能够继承 RemoteApplicationEvent 并配合 @RemoteApplicationEventScan 注解构建本身的微服务音讯体系。

作者简介

方剑(花名:洛夜),GitHub ID @fangjian0423,开源爱好者,阿里巴巴高级开发工程师,阿里云产品 EDAS 开发,Spring Cloud Alibaba 开源我的项目负责人之一。

退出移动版