关于Soul:Soul源码阅读07数据同步之zookeeper

34次阅读

共计 8903 个字符,预计需要花费 23 分钟才能阅读完成。

指标

  • soul zookeeper 形式数据同步原理及源码剖析

上一篇咱们对 Soul 网关的 webscoket 数据同步形式做了简略的剖析,理解了一下 websocket 同步的根本流程。接下来咱们看一 下 Soul网关的 zookeeper 数据同步形式。

同步原理

Soul网关 zookeeper 同步原理:

zookeeper 同步次要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据产生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。如更新了 Selector 的数据同步流程图:

Soul网关开启 zookeeper 同步:

  • soul-bootstrap新增如下依赖:

    <!--soul data sync start use zookeeper-->
    <dependency>
        <groupId>org.dromara</groupId>
        <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
        <version>2.2.1</version>
    </dependency>
  • application.yml增加相干配置

     soul :
         sync:
             zookeeper:
                  url: localhost:2181
                  sessionTimeout: 5000
                  connectionTimeout: 2000
     #url: 配置成你的 zk 地址,集群环境请应用(,)分隔

soul-admin 配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper='',而后重启服务

soul:
  sync:
    zookeeper:
        url: localhost:2181
        sessionTimeout: 5000
        connectionTimeout: 2000

源码剖析

soul-admin 数据同步

soul-admin 在用户产生数据变更之后,会通过 spring 的 ApplicationEventPublisher 收回数据变更告诉,由 DataChangedEventDispatcher 解决该变更告诉,而后依据配置的 zookeeper同步策略,将配置发送给对应的事件处理器。

soul-admin 的数据变更告诉,Soul 网关的三种数据同步形式 webscoket、zookeeper、http 长轮询 原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样。

  • 数据变更告诉源码剖析

如果咱们在 soul-admin 后盾治理做了配置的创立和更新后,都会触发 publishEvent 事件

private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
        List<ConditionData> conditionDataList =
                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
    }

publishEvent事件办法,通过 DataChangedEvent 中的 groupKey 来解决不同组件的相干事件。

DataChangedEvent的具体实现类为DataChangedEventDispatcher

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    private ApplicationContext applicationContext;

    private List<DataChangedListener> listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {this.applicationContext = applicationContext;}

    /**
     * 数据变更事件散发
     */
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {
            // 解决不同组件的监听器
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value:" + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}
  • zookeeper 监听器源码剖析

咱们后面开启了 soul.sync.zookeeper.url='地址',那么在我的项目中必定会有读取配置的中央。通过`soul.sync.zookeeper 搜寻发现数据同步的配置类 DataSyncConfiguration,上面是zookeeper 的配置代码:

   /**
     * The type Zookeeper listener.
     */
    @Configuration
    @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
    @Import(ZookeeperConfiguration.class)
    static class ZookeeperListener {

        /**
         * Config event listener data changed listener.
         *
         * @param zkClient the zk client
         * @return the data changed listener
         */
        @Bean
        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {return new ZookeeperDataChangedListener(zkClient);
        }

        /**
         * Zookeeper data init zookeeper data init.
         *
         * @param zkClient        the zk client
         * @param syncDataService the sync data service
         * @return the zookeeper data init
         */
        @Bean
        @ConditionalOnMissingBean(ZookeeperDataInit.class)
        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {return new ZookeeperDataInit(zkClient, syncDataService);
        }
    }

zookeeperDataChangedListener类为 DataChangedListener 接口的具体实现,通过 zkClient 发送数据变更信息

  @Override
    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (eventType == DataEventTypeEnum.REFRESH) {
            // refresh
            final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());
            deleteZkPathRecursive(selectorParentPath);
        }
        for (SelectorData data : changed) {final String selectorRealPath = ZkPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());
            if (eventType == DataEventTypeEnum.DELETE) {
                // delete
                deleteZkPath(selectorRealPath);
                continue;
            }
            final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getPluginName());
            createZkNode(selectorParentPath);
            //create or update
            upsertZkNode(selectorRealPath, data);
        }
    }

   /**
     * create or update zookeeper node.
     * @param path node path
     * @param data node data 
     */
    private void upsertZkNode(final String path, final Object data) {if (!zkClient.exists(path)) {zkClient.createPersistent(path, true);
        }
        // 更新节点
        zkClient.writeData(path, data);
    }
    
    private void deleteZkPath(final String path) {if (zkClient.exists(path)) {
            // 一般删除
            zkClient.delete(path);
        }
    }
    
    private void deleteZkPathRecursive(final String path) {if (zkClient.exists(path)) {
            // 递归删除
            zkClient.deleteRecursive(path);
        }
    }

至此,soul-admin曾经实现了数据发送。

soul-bootstrap 网关数据同步

开启 zookeeper 同步,须要在 soul-bootstrap 中引入 soul-spring-boot-starter-sync-data-zookeeper,在我的项目中找到对应的自定义 spring-boot-starter,发现了ZookeeperSyncDataConfiguration 配置类。

@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {

    /**
     * Sync data service sync data service.
     * zk 数据同步
     * @param zkClient          the zk client
     * @param pluginSubscriber the plugin subscriber
     * @param metaSubscribers   the meta subscribers
     * @param authSubscribers   the auth subscribers
     * @return the sync data service
     */
    @Bean
    public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use zookeeper sync soul data.......");
        return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

    /**
     * register zkClient in spring ioc.
     * yml 文件中的 zookeeper 配置
     * @param zookeeperConfig the zookeeper configuration
     * @return ZkClient {@linkplain ZkClient}
     */
    @Bean
    public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
    }

}

Selector 为例,看一下 ZookeeperSyncDataService 类监听 Selector 数据变动的逻辑:

// 监听选择器    
private void watcherSelector(final String pluginName) {
        // 组装父节点地址
        String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName);
        // 获取子节点
        List<String> childrenList = zkClientGetChildren(selectorParentPath);
        if (CollectionUtils.isNotEmpty(childrenList)) {
            childrenList.forEach(children -> {
                // 组装实在节点
                String realPath = buildRealPath(selectorParentPath, children);
                // 读取指定节点的值并更新缓存数据
                cacheSelectorData(zkClient.readData(realPath));
                // 只监听节点数据的变动
                subscribeSelectorDataChanges(realPath);
            });
        }
        // 对父节点增加监听子节点变动(只针对新增子节点、缩小子节点、删除节点事件触发)subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList);
    }

  // 只监听节点的变动
  private void subscribeChildChanges(final ConfigGroupEnum groupKey
                                     , final String groupParentPath, final List<String> childrenList) {switch (groupKey) {
            case SELECTOR:
                // handleChildChanges(String parentPath, List<String> currentChilds)
                zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {if (CollectionUtils.isNotEmpty(currentChildren)) {
                        // 从 currentChildren 中过滤掉之前曾经解决过的 childrenList(数据变动的节点)List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);
                        addSubscribePath.stream().map(addPath -> {String realPath = buildRealPath(parentPath, addPath);
                            cacheSelectorData(zkClient.readData(realPath));
                            return realPath;
                        }).forEach(this::subscribeSelectorDataChanges);

                    }
                });
                break;
                ...  
             }
    }    

    // 只监听节点数据的变动
    private void subscribeSelectorDataChanges(final String path) {zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            // 节点数据扭转时触发
            public void handleDataChange(final String dataPath, final Object data) {cacheSelectorData((SelectorData) data);
            }
            // 节点删除时触发
            @Override
            public void handleDataDeleted(final String dataPath) {unCacheSelectorData(dataPath);
            }
        });
    }

   // 获取子节点
   private List<String> zkClientGetChildren(final String parent) {if (!zkClient.exists(parent)) {zkClient.createPersistent(parent, true);
        }
        return zkClient.getChildren(parent);
    }
    // 更新缓存数据   
    private void cacheSelectorData(final SelectorData selectorData) {Optional.ofNullable(selectorData)
                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
    }

下面 cacheSelectorData 为更新缓存数据的办法,具体的实现类为CommonPluginDataSubscriber,这和上一篇 webscoket 更新缓存数据的调用是一样的。

正文完
 0