指标

  • soul websocket形式数据同步原理及源码剖析

数据同步起因

为什么要进行数据同步那?

网关是流量申请的入口,在微服务架构中承当了十分重要的角色。在应用网关的过程中,为了满足业务诉求,常常须要变更配置,比方流控规定、路由规定等等。因而,网关动静配置是保障网关高可用的重要因素。

Soul 的插件全都是热插拔的,并且所有插件的选择器、规定都是动静配置,立刻失效,不须要重启服务。为了提供更高的响应速度,soul 所有的配置都缓存在 JVM 的 Hashmap 中,每次申请都走的本地缓存,速度十分快。为了批改的配置能过及时的更新到 JVM 的本地缓存中,因而须要进行数据同步。

Soul 反对三种数据同步形式:

  1. zookeeper同步
  2. websocket同步
  3. http长轮询

原理剖析

Soul官网文档 对于数据同步的介绍:

Soul 网关在启动时,会从配置服务同步配置数据,反对推拉模式获取配置变更信息,并且更新本地缓存。而管理员在治理后盾,变更用户、规定、插件、流量配置,通过推拉模式将变更信息同步给 Soul 网关,具体是 push 模式,还是 pull 模式取决于配置。

同步流程图:

本篇文章抉择websocket同步形式进行剖析

websocket同步

什么是websocket?

WebSocket是HTML5新增的协定,它的目标是在浏览器和服务器之间建设一个不受限的双向通信的通道,比如说,服务器能够在任意时刻发送音讯给浏览器。

Soul网关 websocket 同步原理:

Soul网关与 admin 建设好 websocket 连贯时,admin 会推送一次全量数据,后续如果配置数据产生变更,则将增量数据通过 websocket 被动推送给 soul-web

Soul网关开启 websocket 同步:

  • soul-bootstrap新增如下依赖:

    <!--soul data sync start use websocket--><dependency>    <groupId>org.dromara</groupId>    <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>    <version>2.2.1</version></dependency>
  • application.yml增加相干配置

    soul :    sync:        websocket :             urls: ws://localhost:9095/websocket

soul-admin 配置,或在 soul-admin 启动参数中设置 --soul.sync.websocket='',而后重启服务

soul:  sync:    websocket:      enabled: true

源码剖析

soul-admin数据同步

soul-admin 在用户产生数据变更之后,会通过 spring 的 ApplicationEventPublisher 收回数据变更告诉,由 DataChangedEventDispatcher 解决该变更告诉,而后依据配置的 weboscket同步策略,将配置发送给对应的事件处理器。

  • 数据变更告诉源码剖析

如果咱们在soul-admin后盾治理做了配置的创立和更新后,都会触发publishEvent事件

private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish change event.        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

publishEvent事件办法,通过DataChangedEvent中的groupKey来解决不同组件的相干事件。

DataChangedEvent的具体实现类为DataChangedEventDispatcher

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {    private ApplicationContext applicationContext;    private List<DataChangedListener> listeners;    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {        this.applicationContext = applicationContext;    }    /**     * 数据变更事件散发     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        for (DataChangedListener listener : listeners) {            // 解决不同组件的监听器            switch (event.getGroupKey()) {                case APP_AUTH:                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    @Override    public void afterPropertiesSet() {        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));    }}
  • websocket 监听器源码剖析

咱们后面开启了soul.sync.websocket.enabled=true,那么在我的项目中必定会有读取配置的中央。通过`soul.sync.websocket.enabled搜寻发现数据同步的配置类DataSyncConfiguration,上面是websocket的配置代码:

    @Configuration    @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {        /**         * Config event listener data changed listener.         *         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener();}        /**         * Websocket collector websocket collector.         *         * @return the websocket collector         */        @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {return new WebsocketCollector();}        /**         * Server endpoint exporter server endpoint exporter.         *         * @return the server endpoint exporter         */        @Bean        @ConditionalOnMissingBean(ServerEndpointExporter.class)        public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }    }

WebsocketDataChangedListener类为DataChangedListener接口的具体实现,通过WebsocketCollector发送数据变更信息

public class WebsocketDataChangedListener implements DataChangedListener {    //插件变更监听    @Override    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {        WebsocketData<PluginData> websocketData =                new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }   //选择器变更监听    @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }    //规定变更监听    @Override    public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {        WebsocketData<RuleData> configData =                new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);    }    //App认证监听    @Override    public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {        WebsocketData<AppAuthData> configData =                new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);    }    //元数据变更监听    @Override    public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {        WebsocketData<MetaData> configData =                new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);    }}

至此,soul-admin曾经实现了数据发送。

soul-bootstrap 网关数据同步

开启websocker同步,须要在soul-bootstrap中引入soul-spring-boot-starter-sync-data-websocket,在我的项目中找到对应的自定义spring-boot-starter,发现了WebsocketSyncDataConfiguration配置类。

@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {    /**     * Websocket sync data service.     * Websocket 数据同步实现类     * @param websocketConfig   the websocket config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }    /**     * Config websocket config.     * yml文件中的websocket配置     *     * @return the websocket config     */    @Bean    @ConfigurationProperties(prefix = "soul.sync.websocket")    public WebsocketConfig websocketConfig() {        return new WebsocketConfig();    }}

应用 websocket 同步的时候,特地要留神断线重连,也叫放弃心跳。soul应用java-websocket 这个第三方库来进行websocket连贯。WebsocketSyncDataService类的外围代码:

public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {       //从websocketConfig中获取url信息( yml中配置的soul.sync.websocket.urls: ws://localhost:9095/websocket)        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");       // 创立SoulWebsocketClient        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                 //进行连贯                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                //应用调度线程池进行断线重连,30秒进行一次                executor.scheduleAtFixedRate(() -> {                    if (client.isClosed()) {                        boolean reconnectSuccess = client.reconnectBlocking();                    }                }, 10, 30, TimeUnit.SECONDS);            }        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }    }

SoulWebsocketClient类在构造方法中实例化了WebsocketDataHandler,通过ConfigGroupEnum类型,抉择具体的 DataHandler

    /**     * Instantiates a new Soul websocket client.     *     * @param serverUri             the server uri     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers   the meta data subscribers     * @param authDataSubscribers   the auth data subscribers     */    public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,                               final List<MetaDataSubscriber> metaDataSubscribers                               , final List<AuthDataSubscriber> authDataSubscribers) {        super(serverUri);               this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber                                                             , metaDataSubscribers, authDataSubscribers);    }
public class WebsocketDataHandler {    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);    /**     * Instantiates a new Websocket data handler.     *     * @param pluginDataSubscriber the plugin data subscriber     * @param metaDataSubscribers  the meta data subscribers     * @param authDataSubscribers  the auth data subscribers     */    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,                                final List<MetaDataSubscriber> metaDataSubscribers,                                final List<AuthDataSubscriber> authDataSubscribers) {        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));    }    /**     * Executor.     *     * @param type      the type     * @param json      the json     * @param eventType the event type     */    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {        ENUM_MAP.get(type).handle(json, eventType);    }}

Datahandler应用了模板办法设计模式,AbstractDataHandler定义了不同事件的解决办法

public void handle(final String json, final String eventType) {        List<T> dataList = convert(json);        if (CollectionUtils.isNotEmpty(dataList)) {            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);            switch (eventTypeEnum) {                case REFRESH:                case MYSELF:                    doRefresh(dataList);                    break;                case UPDATE:                case CREATE:                    doUpdate(dataList);                    break;                case DELETE:                    doDelete(dataList);                    break;                default:                    break;            }        }    }

轻易进入一个事件的解决办法,比方进入SelectorDataHandlerdoUpdate办法,发现缓存的更新接口PluginDataSubscriber

@Overrideprotected void doUpdate(final List<SelectorData> dataList) {    dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}

持续往下,发现了SelectorDataHandler接口的实现类CommonPluginDataSubscriber,上面是对缓存中数据进行变更的具体方法:

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {    Optional.ofNullable(classData).ifPresent(data -> {        //插件        if (data instanceof PluginData) {            PluginData pluginData = (PluginData) data;            //更新            if (dataType == DataEventTypeEnum.UPDATE) {                BaseDataCache.getInstance().cachePluginData(pluginData);                Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));            } else if (dataType == DataEventTypeEnum.DELETE) {                //删除                BaseDataCache.getInstance().removePluginData(pluginData);                Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));            }        } else if (data instanceof SelectorData) {            //选择器            SelectorData selectorData = (SelectorData) data;            if (dataType == DataEventTypeEnum.UPDATE) {                BaseDataCache.getInstance().cacheSelectData(selectorData);                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));            } else if (dataType == DataEventTypeEnum.DELETE) {                BaseDataCache.getInstance().removeSelectData(selectorData);                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));            }        } else if (data instanceof RuleData) {            //规定            RuleData ruleData = (RuleData) data;            if (dataType == DataEventTypeEnum.UPDATE) {                BaseDataCache.getInstance().cacheRuleData(ruleData);                Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));            } else if (dataType == DataEventTypeEnum.DELETE) {                BaseDataCache.getInstance().removeRuleData(ruleData);                Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));            }        }    });}

至此,websocket数据同步源码剖析实现。