指标
- soul
zookeeper
形式数据同步原理及源码剖析
上一篇咱们对Soul
网关的 webscoket 数据同步形式做了简略的剖析,理解了一下 websocket
同步的根本流程。接下来咱们看一下Soul
网关的zookeeper
数据同步形式。
同步原理
Soul
网关 zookeeper
同步原理:
zookeeper
同步次要是依赖 zookeeper
的 watch 机制,soul-web
会监听配置的节点,soul-admin
在启动的时候,会将数据全量写入 zookeeper
,后续数据产生变更时,会增量更新 zookeeper
的节点,与此同时,soul-web
会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。如更新了Selector
的数据同步流程图:
Soul
网关开启 zookeeper
同步:
soul-bootstrap
新增如下依赖:<!--soul data sync start use zookeeper--><dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId> <version>2.2.1</version></dependency>
application.yml
增加相干配置soul : sync: zookeeper: url: localhost:2181 sessionTimeout: 5000 connectionTimeout: 2000 #url: 配置成你的zk地址,集群环境请应用(,)分隔
soul-admin
配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper=''
,而后重启服务
soul: sync: zookeeper: url: localhost:2181 sessionTimeout: 5000 connectionTimeout: 2000
源码剖析
soul-admin 数据同步
soul-admin
在用户产生数据变更之后,会通过 spring 的 ApplicationEventPublisher
收回数据变更告诉,由 DataChangedEventDispatcher
解决该变更告诉,而后依据配置的 zookeeper
同步策略,将配置发送给对应的事件处理器。
soul-admin 的数据变更告诉,Soul 网关的三种数据同步形式webscoket、zookeeper、http长轮询
原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样。
- 数据变更告诉源码剖析
如果咱们在soul-admin
后盾治理做了配置的创立和更新后,都会触发publishEvent
事件
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) { PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList)))); }
publishEvent
事件办法,通过DataChangedEvent
中的groupKey
来解决不同组件的相干事件。
DataChangedEvent
的具体实现类为DataChangedEventDispatcher
:
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { private ApplicationContext applicationContext; private List<DataChangedListener> listeners; public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; } /** * 数据变更事件散发 */ @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { // 解决不同组件的监听器 switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } @Override public void afterPropertiesSet() { Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); }}
zookeeper
监听器源码剖析
咱们后面开启了soul.sync.zookeeper.url='地址'
,那么在我的项目中必定会有读取配置的中央。通过`soul.sync.zookeeper
搜寻发现数据同步的配置类DataSyncConfiguration
,上面是zookeeper
的配置代码:
/** * The type Zookeeper listener. */ @Configuration @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url") @Import(ZookeeperConfiguration.class) static class ZookeeperListener { /** * Config event listener data changed listener. * * @param zkClient the zk client * @return the data changed listener */ @Bean @ConditionalOnMissingBean(ZookeeperDataChangedListener.class) public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) { return new ZookeeperDataChangedListener(zkClient); } /** * Zookeeper data init zookeeper data init. * * @param zkClient the zk client * @param syncDataService the sync data service * @return the zookeeper data init */ @Bean @ConditionalOnMissingBean(ZookeeperDataInit.class) public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) { return new ZookeeperDataInit(zkClient, syncDataService); } }
zookeeperDataChangedListener
类为DataChangedListener
接口的具体实现,通过zkClient
发送数据变更信息
@Override public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) { if (eventType == DataEventTypeEnum.REFRESH) { // refresh final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(changed.get(0).getPluginName()); deleteZkPathRecursive(selectorParentPath); } for (SelectorData data : changed) { final String selectorRealPath = ZkPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId()); if (eventType == DataEventTypeEnum.DELETE) { // delete deleteZkPath(selectorRealPath); continue; } final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getPluginName()); createZkNode(selectorParentPath); //create or update upsertZkNode(selectorRealPath, data); } } /** * create or update zookeeper node. * @param path node path * @param data node data */ private void upsertZkNode(final String path, final Object data) { if (!zkClient.exists(path)) { zkClient.createPersistent(path, true); } // 更新节点 zkClient.writeData(path, data); } private void deleteZkPath(final String path) { if (zkClient.exists(path)) { //一般删除 zkClient.delete(path); } } private void deleteZkPathRecursive(final String path) { if (zkClient.exists(path)) { //递归删除 zkClient.deleteRecursive(path); } }
至此,soul-admin
曾经实现了数据发送。
soul-bootstrap 网关数据同步
开启zookeeper
同步,须要在soul-bootstrap
中引入soul-spring-boot-starter-sync-data-zookeeper
,在我的项目中找到对应的自定义spring-boot-starter,发现了ZookeeperSyncDataConfiguration
配置类。
@Configuration@ConditionalOnClass(ZookeeperSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")@EnableConfigurationProperties(ZookeeperConfig.class)@Slf4jpublic class ZookeeperSyncDataConfiguration { /** * Sync data service sync data service. * zk 数据同步 * @param zkClient the zk client * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use zookeeper sync soul data......."); return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } /** * register zkClient in spring ioc. * yml文件中的 zookeeper 配置 * @param zookeeperConfig the zookeeper configuration * @return ZkClient {@linkplain ZkClient} */ @Bean public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) { return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout()); }}
以Selector
为例,看一下ZookeeperSyncDataService
类监听Selector
数据变动的逻辑:
//监听选择器 private void watcherSelector(final String pluginName) { //组装父节点地址 String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName); //获取子节点 List<String> childrenList = zkClientGetChildren(selectorParentPath); if (CollectionUtils.isNotEmpty(childrenList)) { childrenList.forEach(children -> { //组装实在节点 String realPath = buildRealPath(selectorParentPath, children); //读取指定节点的值并更新缓存数据 cacheSelectorData(zkClient.readData(realPath)); //只监听节点数据的变动 subscribeSelectorDataChanges(realPath); }); } //对父节点增加监听子节点变动(只针对新增子节点、缩小子节点、删除节点事件触发) subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList); } //只监听节点的变动 private void subscribeChildChanges(final ConfigGroupEnum groupKey , final String groupParentPath, final List<String> childrenList) { switch (groupKey) { case SELECTOR: // handleChildChanges(String parentPath, List<String> currentChilds) zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> { if (CollectionUtils.isNotEmpty(currentChildren)) { //从 currentChildren 中过滤掉之前曾经解决过的childrenList(数据变动的节点) List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); addSubscribePath.stream().map(addPath -> { String realPath = buildRealPath(parentPath, addPath); cacheSelectorData(zkClient.readData(realPath)); return realPath; }).forEach(this::subscribeSelectorDataChanges); } }); break; ... } } //只监听节点数据的变动 private void subscribeSelectorDataChanges(final String path) { zkClient.subscribeDataChanges(path, new IZkDataListener() { @Override //节点数据扭转时触发 public void handleDataChange(final String dataPath, final Object data) { cacheSelectorData((SelectorData) data); } //节点删除时触发 @Override public void handleDataDeleted(final String dataPath) { unCacheSelectorData(dataPath); } }); } //获取子节点 private List<String> zkClientGetChildren(final String parent) { if (!zkClient.exists(parent)) { zkClient.createPersistent(parent, true); } return zkClient.getChildren(parent); } //更新缓存数据 private void cacheSelectorData(final SelectorData selectorData) { Optional.ofNullable(selectorData) .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data))); }
下面cacheSelectorData
为更新缓存数据的办法,具体的实现类为CommonPluginDataSubscriber
,这和上一篇webscoket更新缓存数据的调用是一样的。