指标
- soul websocket形式数据同步原理及源码剖析
数据同步起因
为什么要进行数据同步那?
网关是流量申请的入口,在微服务架构中承当了十分重要的角色。在应用网关的过程中,为了满足业务诉求,常常须要变更配置,比方流控规定、路由规定等等。因而,网关动静配置是保障网关高可用的重要因素。Soul 的插件全都是热插拔的,并且所有插件的选择器、规定都是动静配置,立刻失效,不须要重启服务。为了提供更高的响应速度,soul 所有的配置都缓存在 JVM 的 Hashmap 中,每次申请都走的本地缓存,速度十分快。为了批改的配置能过及时的更新到 JVM 的本地缓存中,因而须要进行数据同步。
Soul 反对三种数据同步形式:
- zookeeper同步
- websocket同步
- http长轮询
原理剖析
Soul
官网文档 对于数据同步的介绍:
Soul
网关在启动时,会从配置服务同步配置数据,反对推拉模式获取配置变更信息,并且更新本地缓存。而管理员在治理后盾,变更用户、规定、插件、流量配置,通过推拉模式将变更信息同步给 Soul
网关,具体是 push
模式,还是 pull
模式取决于配置。
同步流程图:
本篇文章抉择websocket
同步形式进行剖析
websocket同步
什么是websocket
?
WebSocket是HTML5新增的协定,它的目标是在浏览器和服务器之间建设一个不受限的双向通信的通道,比如说,服务器能够在任意时刻发送音讯给浏览器。
Soul
网关 websocket 同步原理:
Soul
网关与 admin
建设好 websocket
连贯时,admin
会推送一次全量数据,后续如果配置数据产生变更,则将增量数据通过 websocket
被动推送给 soul-web
。
Soul
网关开启 websocket 同步:
soul-bootstrap
新增如下依赖:<!--soul data sync start use websocket--><dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId> <version>2.2.1</version></dependency>
application.yml
增加相干配置soul : sync: websocket : urls: ws://localhost:9095/websocket
soul-admin
配置,或在 soul-admin 启动参数中设置 --soul.sync.websocket=''
,而后重启服务
soul: sync: websocket: enabled: true
源码剖析
soul-admin数据同步
soul-admin
在用户产生数据变更之后,会通过 spring 的 ApplicationEventPublisher
收回数据变更告诉,由 DataChangedEventDispatcher
解决该变更告诉,而后依据配置的 weboscket
同步策略,将配置发送给对应的事件处理器。
- 数据变更告诉源码剖析
如果咱们在soul-admin
后盾治理做了配置的创立和更新后,都会触发publishEvent
事件
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) { PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList)))); }
publishEvent
事件办法,通过DataChangedEvent
中的groupKey
来解决不同组件的相干事件。
DataChangedEvent
的具体实现类为DataChangedEventDispatcher
:
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { private ApplicationContext applicationContext; private List<DataChangedListener> listeners; public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; } /** * 数据变更事件散发 */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { // 解决不同组件的监听器 switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } @Override public void afterPropertiesSet() { Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); }}
- websocket 监听器源码剖析
咱们后面开启了soul.sync.websocket.enabled=true
,那么在我的项目中必定会有读取配置的中央。通过`soul.sync.websocket.enabled
搜寻发现数据同步的配置类DataSyncConfiguration
,上面是websocket
的配置代码:
@Configuration @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(WebsocketSyncProperties.class) static class WebsocketListener { /** * Config event listener data changed listener. * * @return the data changed listener */ @Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener();} /** * Websocket collector websocket collector. * * @return the websocket collector */ @Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() {return new WebsocketCollector();} /** * Server endpoint exporter server endpoint exporter. * * @return the server endpoint exporter */ @Bean @ConditionalOnMissingBean(ServerEndpointExporter.class) public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
WebsocketDataChangedListener
类为DataChangedListener
接口的具体实现,通过WebsocketCollector
发送数据变更信息
public class WebsocketDataChangedListener implements DataChangedListener { //插件变更监听 @Override public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) { WebsocketData<PluginData> websocketData = new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); } //选择器变更监听 @Override public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) { WebsocketData<SelectorData> websocketData = new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); } //规定变更监听 @Override public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) { WebsocketData<RuleData> configData = new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); } //App认证监听 @Override public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) { WebsocketData<AppAuthData> configData = new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); } //元数据变更监听 @Override public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) { WebsocketData<MetaData> configData = new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); }}
至此,soul-admin
曾经实现了数据发送。
soul-bootstrap 网关数据同步
开启websocker
同步,须要在soul-bootstrap
中引入soul-spring-boot-starter-sync-data-websocket
,在我的项目中找到对应的自定义spring-boot-starter,发现了WebsocketSyncDataConfiguration
配置类。
@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration { /** * Websocket sync data service. * Websocket 数据同步实现类 * @param websocketConfig the websocket config * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } /** * Config websocket config. * yml文件中的websocket配置 * * @return the websocket config */ @Bean @ConfigurationProperties(prefix = "soul.sync.websocket") public WebsocketConfig websocketConfig() { return new WebsocketConfig(); }}
应用 websocket 同步的时候,特地要留神断线重连,也叫放弃心跳。soul
应用java-websocket
这个第三方库来进行websocket
连贯。WebsocketSyncDataService
类的外围代码:
public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { //从websocketConfig中获取url信息( yml中配置的soul.sync.websocket.urls: ws://localhost:9095/websocket) String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); // 创立SoulWebsocketClient executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { //进行连贯 boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); //应用调度线程池进行断线重连,30秒进行一次 executor.scheduleAtFixedRate(() -> { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); } }, 10, 30, TimeUnit.SECONDS); } } catch (InterruptedException e) { log.info("websocket connection...exception....", e); } }
SoulWebsocketClient
类在构造方法中实例化了WebsocketDataHandler
,通过ConfigGroupEnum
类型,抉择具体的 DataHandler
/** * Instantiates a new Soul websocket client. * * @param serverUri the server uri * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers , final List<AuthDataSubscriber> authDataSubscribers) { super(serverUri); this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber , metaDataSubscribers, authDataSubscribers); }
public class WebsocketDataHandler { private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class); /** * Instantiates a new Websocket data handler. * * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers)); ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers)); } /** * Executor. * * @param type the type * @param json the json * @param eventType the event type */ public void executor(final ConfigGroupEnum type, final String json, final String eventType) { ENUM_MAP.get(type).handle(json, eventType); }}
Datahandler
应用了模板办法设计模式,AbstractDataHandler
定义了不同事件的解决办法
public void handle(final String json, final String eventType) { List<T> dataList = convert(json); if (CollectionUtils.isNotEmpty(dataList)) { DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType); switch (eventTypeEnum) { case REFRESH: case MYSELF: doRefresh(dataList); break; case UPDATE: case CREATE: doUpdate(dataList); break; case DELETE: doDelete(dataList); break; default: break; } } }
轻易进入一个事件的解决办法,比方进入SelectorDataHandler
的doUpdate
办法,发现缓存的更新接口PluginDataSubscriber
@Overrideprotected void doUpdate(final List<SelectorData> dataList) { dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
持续往下,发现了SelectorDataHandler
接口的实现类CommonPluginDataSubscriber
,上面是对缓存中数据进行变更的具体方法:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { //插件 if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; //更新 if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cachePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { //删除 BaseDataCache.getInstance().removePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } else if (data instanceof SelectorData) { //选择器 SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cacheSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { //规定 RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cacheRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); } } });}
至此,websocket
数据同步源码剖析实现。