指标

  • soul zookeeper 形式数据同步原理及源码剖析

上一篇咱们对Soul网关的 webscoket 数据同步形式做了简略的剖析,理解了一下 websocket 同步的根本流程。接下来咱们看一下Soul网关的zookeeper数据同步形式。

同步原理

Soul网关 zookeeper 同步原理:

zookeeper 同步次要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据产生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。如更新了Selector的数据同步流程图:

Soul网关开启 zookeeper 同步:

  • soul-bootstrap新增如下依赖:

    <!--soul data sync start use zookeeper--><dependency>    <groupId>org.dromara</groupId>    <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>    <version>2.2.1</version></dependency>
  • application.yml增加相干配置

     soul :     sync:         zookeeper:              url: localhost:2181              sessionTimeout: 5000              connectionTimeout: 2000 #url: 配置成你的zk地址,集群环境请应用(,)分隔

soul-admin 配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper='',而后重启服务

soul:  sync:    zookeeper:        url: localhost:2181        sessionTimeout: 5000        connectionTimeout: 2000

源码剖析

soul-admin 数据同步

soul-admin 在用户产生数据变更之后,会通过 spring 的 ApplicationEventPublisher 收回数据变更告诉,由 DataChangedEventDispatcher 解决该变更告诉,而后依据配置的 zookeeper同步策略,将配置发送给对应的事件处理器。

soul-admin 的数据变更告诉,Soul 网关的三种数据同步形式webscoket、zookeeper、http长轮询原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样。

  • 数据变更告诉源码剖析

如果咱们在soul-admin后盾治理做了配置的创立和更新后,都会触发publishEvent事件

private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish change event.        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }

publishEvent事件办法,通过DataChangedEvent中的groupKey来解决不同组件的相干事件。

DataChangedEvent的具体实现类为DataChangedEventDispatcher

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {    private ApplicationContext applicationContext;    private List<DataChangedListener> listeners;    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {        this.applicationContext = applicationContext;    }    /**     * 数据变更事件散发     */    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        for (DataChangedListener listener : listeners) {            // 解决不同组件的监听器            switch (event.getGroupKey()) {                case APP_AUTH:                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    @Override    public void afterPropertiesSet() {        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));    }}
  • zookeeper 监听器源码剖析

咱们后面开启了soul.sync.zookeeper.url='地址',那么在我的项目中必定会有读取配置的中央。通过`soul.sync.zookeeper搜寻发现数据同步的配置类DataSyncConfiguration,上面是zookeeper的配置代码:

   /**     * The type Zookeeper listener.     */    @Configuration    @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")    @Import(ZookeeperConfiguration.class)    static class ZookeeperListener {        /**         * Config event listener data changed listener.         *         * @param zkClient the zk client         * @return the data changed listener         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {            return new ZookeeperDataChangedListener(zkClient);        }        /**         * Zookeeper data init zookeeper data init.         *         * @param zkClient        the zk client         * @param syncDataService the sync data service         * @return the zookeeper data init         */        @Bean        @ConditionalOnMissingBean(ZookeeperDataInit.class)        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {            return new ZookeeperDataInit(zkClient, syncDataService);        }    }

zookeeperDataChangedListener类为DataChangedListener接口的具体实现,通过zkClient发送数据变更信息

  @Override    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {        if (eventType == DataEventTypeEnum.REFRESH) {            // refresh            final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());            deleteZkPathRecursive(selectorParentPath);        }        for (SelectorData data : changed) {            final String selectorRealPath = ZkPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());            if (eventType == DataEventTypeEnum.DELETE) {                // delete                deleteZkPath(selectorRealPath);                continue;            }            final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getPluginName());            createZkNode(selectorParentPath);            //create or update            upsertZkNode(selectorRealPath, data);        }    }   /**     * create or update zookeeper node.     * @param path node path     * @param data node data      */    private void upsertZkNode(final String path, final Object data) {        if (!zkClient.exists(path)) {            zkClient.createPersistent(path, true);        }        // 更新节点        zkClient.writeData(path, data);    }        private void deleteZkPath(final String path) {        if (zkClient.exists(path)) {            //一般删除            zkClient.delete(path);        }    }        private void deleteZkPathRecursive(final String path) {         if (zkClient.exists(path)) {            //递归删除            zkClient.deleteRecursive(path);        }    }

至此,soul-admin曾经实现了数据发送。

soul-bootstrap 网关数据同步

开启zookeeper同步,须要在soul-bootstrap中引入soul-spring-boot-starter-sync-data-zookeeper,在我的项目中找到对应的自定义spring-boot-starter,发现了ZookeeperSyncDataConfiguration配置类。

@Configuration@ConditionalOnClass(ZookeeperSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")@EnableConfigurationProperties(ZookeeperConfig.class)@Slf4jpublic class ZookeeperSyncDataConfiguration {    /**     * Sync data service sync data service.     * zk 数据同步     * @param zkClient          the zk client     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use zookeeper sync soul data.......");        return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }    /**     * register zkClient in spring ioc.     * yml文件中的 zookeeper 配置     * @param zookeeperConfig the zookeeper configuration     * @return ZkClient {@linkplain ZkClient}     */    @Bean    public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {        return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());    }}

Selector为例,看一下ZookeeperSyncDataService类监听Selector数据变动的逻辑:

//监听选择器    private void watcherSelector(final String pluginName) {        //组装父节点地址        String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName);        //获取子节点        List<String> childrenList = zkClientGetChildren(selectorParentPath);        if (CollectionUtils.isNotEmpty(childrenList)) {            childrenList.forEach(children -> {                //组装实在节点                String realPath = buildRealPath(selectorParentPath, children);                //读取指定节点的值并更新缓存数据                cacheSelectorData(zkClient.readData(realPath));                //只监听节点数据的变动                subscribeSelectorDataChanges(realPath);            });        }        //对父节点增加监听子节点变动(只针对新增子节点、缩小子节点、删除节点事件触发)        subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList);    }  //只监听节点的变动  private void subscribeChildChanges(final ConfigGroupEnum groupKey                                     , final String groupParentPath, final List<String> childrenList) {        switch (groupKey) {            case SELECTOR:                // handleChildChanges(String parentPath, List<String> currentChilds)                zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {                    if (CollectionUtils.isNotEmpty(currentChildren)) {                        //从 currentChildren 中过滤掉之前曾经解决过的childrenList(数据变动的节点)                         List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);                        addSubscribePath.stream().map(addPath -> {                            String realPath = buildRealPath(parentPath, addPath);                            cacheSelectorData(zkClient.readData(realPath));                            return realPath;                        }).forEach(this::subscribeSelectorDataChanges);                    }                });                break;                ...               }    }        //只监听节点数据的变动    private void subscribeSelectorDataChanges(final String path) {        zkClient.subscribeDataChanges(path, new IZkDataListener() {            @Override            //节点数据扭转时触发            public void handleDataChange(final String dataPath, final Object data) {                cacheSelectorData((SelectorData) data);            }            //节点删除时触发            @Override            public void handleDataDeleted(final String dataPath) {                unCacheSelectorData(dataPath);            }        });    }   //获取子节点   private List<String> zkClientGetChildren(final String parent) {        if (!zkClient.exists(parent)) {            zkClient.createPersistent(parent, true);        }        return zkClient.getChildren(parent);    }    //更新缓存数据       private void cacheSelectorData(final SelectorData selectorData) {        Optional.ofNullable(selectorData)                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));    }

下面cacheSelectorData为更新缓存数据的办法,具体的实现类为CommonPluginDataSubscriber,这和上一篇webscoket更新缓存数据的调用是一样的。