共计 8607 个字符,预计需要花费 22 分钟才能阅读完成。
指标
- soul
http
长轮询 形式数据同步原理及源码剖析
上一篇咱们对 Soul
网关的 zookeeper 数据同步形式做了简略的剖析,理解了一下 zookeeper
同步的根本流程。接下来咱们看一 下 Soul
网关的 http 长轮询
数据同步形式。
同步原理
Soul
网关 http
同步原理:
Soul 借鉴了 Apollo
、Nacos
的设计思维,取其精华,本人实现了 http
长轮询数据同步性能。留神,这里并非传统的 Ajax 长轮询。http
长轮询机制如下图所示:
soul-web
网关申请 soul-admin
的配置服务,读取超时工夫为 90s,意味着网关层申请配置服务最多会期待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。
soul-web
的 HTTP 申请达到 sou-admin
之后,并非立马响应数据,而是利用 Servlet3.0
的异步机制,将长轮询申请工作扔到 BlocingQueue
中,并且开启调度工作,60s 后执行。这样做的目标是 60s 后将该长轮询申请移除队列,即使是这段时间内没有产生配置数据变更,也得让网关晓得。而且网关申请配置服务时,也有 90s 的超时工夫。
Soul
网关开启 http 长轮询
同步:
-
soul-bootstrap
新增如下依赖:<!--soul data sync start use http--> <dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-http</artifactId> <version>2.2.1</version> </dependency>
-
application.yml
增加相干配置soul : sync: http: url: http://localhost:9095 #url: 配置成你的 zk 地址,集群环境请应用(,)分隔
soul-admin
配置,或在 soul-admin 启动参数中设置 --soul.sync.http.enabled=true
,而后重启服务
sync:
http:
enabled: true
源码剖析
soul-admin 数据同步
soul-admin 的数据变更告诉,Soul 网关的三种数据同步形式 webscoket、zookeeper、http 长轮询
原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样,上一篇 zookeeper
数据同步已做了剖析,这里就不在赘述。
htttp 长轮询
监听器源码剖析
咱们后面开启了 soul.sync.http.enabled=true
,那么在我的项目中必定会有读取配置的中央。通过`soul.sync.http
搜寻发现数据同步的配置类 DataSyncConfiguration
,上面是http 长轮询
的配置代码:
/**
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {
@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
HttpLongPollingDataChangedListener
类为 DataChangedListener
接口的具体实现,解决 http 长轮询
的数据推送,外围代码如下:
/**
* If the configuration data changes, the group information for the change is immediately responded.
* Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached.
*
* @param request the request
* @param response the response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// 查看客户端是否须要更新缓存,md5 不统一,立刻响应
// 留神:这里返回的是配置分组信息,网关收到响应信息之后,只晓得是哪个 Group 产生了配置变更,还须要再次申请该 Group 的配置数据
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
if (CollectionUtils.isNotEmpty(changedGroup)) {this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// listen for configuration changed.
// 获取 servlet3.0 的异步解决 HTTP 申请
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
// 阻止客户端线程 60 秒
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
/******************LongPollingClient 外围代码 *****************************/
public void run() {
// 退出定时工作,如果 60s 之内没有配置变更,则 60s 后执行,响应 http 申请
this.asyncTimeoutFuture = scheduler.schedule(() -> {
// 将以后长轮询从队列中移除(clients 是阻塞队列,保留了来自 soul-web 的申请信息)clients.remove(LongPollingClient.this);
// 获取以后申请中的配置分组信息
List<ConfigGroupEnum> changedGroups
= compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
// 发送响应申请
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
clients.add(this);
}
/**
* Send response datagram. 发送响应数据包
*
* @param response the response
* @param changedGroups the changed groups
*/
private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {
try {response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));
} catch (IOException ex) {log.error("Sending response failed.", ex);
至此,soul-admin
曾经实现了数据发送。
soul-bootstrap 网关数据同步
开启 http 长轮询
同步,须要在 soul-bootstrap
中引入 soul-spring-boot-starter-sync-data-http
,在我的项目中找到对应的自定义 spring-boot-starter,发现了HttpSyncDataConfiguration
配置类。
/**
* Http sync data configuration for spring boot.
*
* @author xiaoyu(Myth)
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {
/**
* Http sync data service.
*
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use http long pull sync soul data");
return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Http config http config.
*
* @return the http config
*/
@Bean
@ConfigurationProperties(prefix = "soul.sync.http")
public HttpConfig httpConfig() {return new HttpConfig();
}
}
HttpSyncDataService
为实现 http 长轮询
的外围类,上面是外围代码:
private void start() {// RUNNING = new AtomicBoolean(false),默认为 false
// compareAndSet:如果以后状态值等于预期值,则以原子形式将同步状态设置为给定的更新值
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// 申请 configs/fetch,获取最新的配置信息
this.fetchGroupConfig(ConfigGroupEnum.values());
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
// 开启轮询,若配置多个 soul-admin 服务器,每个服务器都会开启轮询
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {log.info("soul http long polling was started, executor=[{}]", executor);
}
}
/****************** HttpLongPollingTask 外围代码 *****************************/
// 分组申请 soul-admin/configs/listener 监听接口
private void doLongPolling(final String server) {
// 每组缓存最初更新工夫
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {ConfigData<?> cacheConfig = factory.cacheConfigData(group);
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
try {
// 执行申请
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
} catch (RestClientException e) {String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new SoulException(message, e);
}
if (groupJson != null) {
// 异步获取组配置信息
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
// 分组申请 soul-admin/configs/listener 监听接口,获取最新配置信息
this.doFetchGroupConfig(server, changedGroups);
}
}
}
// 分组申请 soul-admin/configs/listener 接口,获取最新配置信息
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
// 组装申请 url 地址
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
String json = null;
try {
// 获取最新配置信息
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {log.warn(message);
throw new SoulException(message, e);
}
// 更新本地缓存
boolean updated = this.updateCacheWithJson(json);
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
/**
* 更新本地缓存
* @param json the response from config server.
* @return true: the local cache was updated. false: not updated.
*/
private boolean updateCacheWithJson(final String json) {JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
return factory.executor(data);
}
/**
* 执行更新本地缓存操作
*
* @param data the data
* @return the boolean
*/
public boolean executor(final JsonObject data) {final boolean[] success = {false};
ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
return success[0];
}
为什么先告诉 Group 产生了配置变更,通过 Group 申请 soul-admin 的 /configs/fetch
接口获取具体配置信息,而不是间接将变更的数据写出?
因为
http 长轮询
机制只能保障准实时,如果在网关层解决不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,平安起见,只告知某个 Group 信息产生了变更。
至此,http 长轮询
数据同步源码剖析实现。