共计 8903 个字符,预计需要花费 23 分钟才能阅读完成。
指标
- soul
zookeeper
形式数据同步原理及源码剖析
上一篇咱们对 Soul
网关的 webscoket 数据同步形式做了简略的剖析,理解了一下 websocket
同步的根本流程。接下来咱们看一 下 Soul
网关的 zookeeper
数据同步形式。
同步原理
Soul
网关 zookeeper
同步原理:
zookeeper
同步次要是依赖 zookeeper
的 watch 机制,soul-web
会监听配置的节点,soul-admin
在启动的时候,会将数据全量写入 zookeeper
,后续数据产生变更时,会增量更新 zookeeper
的节点,与此同时,soul-web
会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。如更新了 Selector
的数据同步流程图:
Soul
网关开启 zookeeper
同步:
-
soul-bootstrap
新增如下依赖:<!--soul data sync start use zookeeper--> <dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId> <version>2.2.1</version> </dependency>
-
application.yml
增加相干配置soul : sync: zookeeper: url: localhost:2181 sessionTimeout: 5000 connectionTimeout: 2000 #url: 配置成你的 zk 地址,集群环境请应用(,)分隔
soul-admin
配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper=''
,而后重启服务
soul:
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
源码剖析
soul-admin 数据同步
soul-admin
在用户产生数据变更之后,会通过 spring 的 ApplicationEventPublisher
收回数据变更告诉,由 DataChangedEventDispatcher
解决该变更告诉,而后依据配置的 zookeeper
同步策略,将配置发送给对应的事件处理器。
soul-admin 的数据变更告诉,Soul 网关的三种数据同步形式 webscoket、zookeeper、http 长轮询
原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样。
- 数据变更告诉源码剖析
如果咱们在 soul-admin
后盾治理做了配置的创立和更新后,都会触发 publishEvent
事件
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
List<ConditionData> conditionDataList =
selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
publishEvent
事件办法,通过 DataChangedEvent
中的 groupKey
来解决不同组件的相干事件。
DataChangedEvent
的具体实现类为DataChangedEventDispatcher
:
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {this.applicationContext = applicationContext;}
/**
* 数据变更事件散发
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {
// 解决不同组件的监听器
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value:" + event.getGroupKey());
}
}
}
@Override
public void afterPropertiesSet() {Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
zookeeper
监听器源码剖析
咱们后面开启了 soul.sync.zookeeper.url='地址'
,那么在我的项目中必定会有读取配置的中央。通过`soul.sync.zookeeper
搜寻发现数据同步的配置类 DataSyncConfiguration
,上面是zookeeper
的配置代码:
/**
* The type Zookeeper listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
/**
* Config event listener data changed listener.
*
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {return new ZookeeperDataChangedListener(zkClient);
}
/**
* Zookeeper data init zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {return new ZookeeperDataInit(zkClient, syncDataService);
}
}
zookeeperDataChangedListener
类为 DataChangedListener
接口的具体实现,通过 zkClient
发送数据变更信息
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (eventType == DataEventTypeEnum.REFRESH) {
// refresh
final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());
deleteZkPathRecursive(selectorParentPath);
}
for (SelectorData data : changed) {final String selectorRealPath = ZkPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());
if (eventType == DataEventTypeEnum.DELETE) {
// delete
deleteZkPath(selectorRealPath);
continue;
}
final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getPluginName());
createZkNode(selectorParentPath);
//create or update
upsertZkNode(selectorRealPath, data);
}
}
/**
* create or update zookeeper node.
* @param path node path
* @param data node data
*/
private void upsertZkNode(final String path, final Object data) {if (!zkClient.exists(path)) {zkClient.createPersistent(path, true);
}
// 更新节点
zkClient.writeData(path, data);
}
private void deleteZkPath(final String path) {if (zkClient.exists(path)) {
// 一般删除
zkClient.delete(path);
}
}
private void deleteZkPathRecursive(final String path) {if (zkClient.exists(path)) {
// 递归删除
zkClient.deleteRecursive(path);
}
}
至此,soul-admin
曾经实现了数据发送。
soul-bootstrap 网关数据同步
开启 zookeeper
同步,须要在 soul-bootstrap
中引入 soul-spring-boot-starter-sync-data-zookeeper
,在我的项目中找到对应的自定义 spring-boot-starter,发现了ZookeeperSyncDataConfiguration
配置类。
@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {
/**
* Sync data service sync data service.
* zk 数据同步
* @param zkClient the zk client
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use zookeeper sync soul data.......");
return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* register zkClient in spring ioc.
* yml 文件中的 zookeeper 配置
* @param zookeeperConfig the zookeeper configuration
* @return ZkClient {@linkplain ZkClient}
*/
@Bean
public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
}
}
以 Selector
为例,看一下 ZookeeperSyncDataService
类监听 Selector
数据变动的逻辑:
// 监听选择器
private void watcherSelector(final String pluginName) {
// 组装父节点地址
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName);
// 获取子节点
List<String> childrenList = zkClientGetChildren(selectorParentPath);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
// 组装实在节点
String realPath = buildRealPath(selectorParentPath, children);
// 读取指定节点的值并更新缓存数据
cacheSelectorData(zkClient.readData(realPath));
// 只监听节点数据的变动
subscribeSelectorDataChanges(realPath);
});
}
// 对父节点增加监听子节点变动(只针对新增子节点、缩小子节点、删除节点事件触发)subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList);
}
// 只监听节点的变动
private void subscribeChildChanges(final ConfigGroupEnum groupKey
, final String groupParentPath, final List<String> childrenList) {switch (groupKey) {
case SELECTOR:
// handleChildChanges(String parentPath, List<String> currentChilds)
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {if (CollectionUtils.isNotEmpty(currentChildren)) {
// 从 currentChildren 中过滤掉之前曾经解决过的 childrenList(数据变动的节点)List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);
addSubscribePath.stream().map(addPath -> {String realPath = buildRealPath(parentPath, addPath);
cacheSelectorData(zkClient.readData(realPath));
return realPath;
}).forEach(this::subscribeSelectorDataChanges);
}
});
break;
...
}
}
// 只监听节点数据的变动
private void subscribeSelectorDataChanges(final String path) {zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
// 节点数据扭转时触发
public void handleDataChange(final String dataPath, final Object data) {cacheSelectorData((SelectorData) data);
}
// 节点删除时触发
@Override
public void handleDataDeleted(final String dataPath) {unCacheSelectorData(dataPath);
}
});
}
// 获取子节点
private List<String> zkClientGetChildren(final String parent) {if (!zkClient.exists(parent)) {zkClient.createPersistent(parent, true);
}
return zkClient.getChildren(parent);
}
// 更新缓存数据
private void cacheSelectorData(final SelectorData selectorData) {Optional.ofNullable(selectorData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
}
下面 cacheSelectorData
为更新缓存数据的办法,具体的实现类为CommonPluginDataSubscriber
,这和上一篇 webscoket 更新缓存数据的调用是一样的。