乐趣区

聊聊jdk http的HeaderFilter


本文主要研究一下 jdk http 的 HeaderFilter。
FilterFactory
java.net.http/jdk/internal/net/http/FilterFactory.java
class FilterFactory {

// Strictly-ordered list of filters.
final LinkedList<Class<? extends HeaderFilter>> filterClasses = new LinkedList<>();

public void addFilter(Class<? extends HeaderFilter> type) {
filterClasses.add(type);
}

LinkedList<HeaderFilter> getFilterChain() {
LinkedList<HeaderFilter> l = new LinkedList<>();
for (Class<? extends HeaderFilter> clazz : filterClasses) {
try {
// Requires a public no arg constructor.
HeaderFilter headerFilter = clazz.getConstructor().newInstance();
l.add(headerFilter);
} catch (ReflectiveOperationException e) {
throw new InternalError(e);
}
}
return l;
}
}
提供了 addFilter 及 getFilterChain 方法,前者添加 filter class,后者使用反射实例化 filter。
HttpClientImpl
java.net.http/jdk/internal/net/http/HttpClientImpl.java
private HttpClientImpl(HttpClientBuilderImpl builder,
SingleFacadeFactory facadeFactory) {
id = CLIENT_IDS.incrementAndGet();
dbgTag = “HttpClientImpl(” + id +”)”;
if (builder.sslContext == null) {
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException ex) {
throw new InternalError(ex);
}
} else {
sslContext = builder.sslContext;
}
Executor ex = builder.executor;
if (ex == null) {
ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
isDefaultExecutor = true;
} else {
isDefaultExecutor = false;
}
delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
cookieHandler = builder.cookieHandler;
connectTimeout = builder.connectTimeout;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
this.userProxySelector = Optional.ofNullable(builder.proxy);
this.proxySelector = userProxySelector
.orElseGet(HttpClientImpl::getDefaultProxySelector);
if (debug.on())
debug.log(“proxySelector is %s (user-supplied=%s)”,
this.proxySelector, userProxySelector.isPresent());
authenticator = builder.authenticator;
if (builder.version == null) {
version = HttpClient.Version.HTTP_2;
} else {
version = builder.version;
}
if (builder.sslParams == null) {
sslParams = getDefaultParams(sslContext);
} else {
sslParams = builder.sslParams;
}
connections = new ConnectionPool(id);
connections.start();
timeouts = new TreeSet<>();
try {
selmgr = new SelectorManager(this);
} catch (IOException e) {
// unlikely
throw new InternalError(e);
}
selmgr.setDaemon(true);
filters = new FilterFactory();
initFilters();
assert facadeRef.get() != null;
}

private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
if (this.cookieHandler != null) {
addFilter(CookieFilter.class);
}
}

private void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}

final LinkedList<HeaderFilter> filterChain() {
return filters.getFilterChain();
}

HttpClientImpl 的构造器创建了 FilterFactory,并调用 addFilter 添加默认的 filter
filterChain 方法则调用了 FilterFactory 的 getFilterChain() 方法,使用反射实例化这些 filter

MultiExchange
java.net.http/jdk/internal/net/http/MultiExchange.java
/**
* MultiExchange with one final response.
*/
MultiExchange(HttpRequest userRequest,
HttpRequestImpl requestImpl,
HttpClientImpl client,
HttpResponse.BodyHandler<T> responseHandler,
PushPromiseHandler<T> pushPromiseHandler,
AccessControlContext acc) {
this.previous = null;
this.userRequest = userRequest;
this.request = requestImpl;
this.currentreq = request;
this.previousreq = null;
this.client = client;
this.filters = client.filterChain();
this.acc = acc;
this.executor = client.theExecutor();
this.responseHandler = responseHandler;

if (pushPromiseHandler != null) {
Executor executor = acc == null
? this.executor.delegate()
: new PrivilegedExecutor(this.executor.delegate(), acc);
this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
} else {
pushGroup = null;
}

this.exchange = new Exchange<>(request, this);
}

private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
if (attempts.incrementAndGet() > max_attempts) {
cf = failedFuture(new IOException(“Too many retries”, retryCause));
} else {
if (currentreq.timeout().isPresent()) {
responseTimerEvent = ResponseTimerEvent.of(this);
client.registerTimer(responseTimerEvent);
}
try {
// 1. apply request filters
// if currentreq == previousreq the filters have already
// been applied once. Applying them a second time might
// cause some headers values to be added twice: for
// instance, the same cookie might be added again.
if (currentreq != previousreq) {
requestFilters(currentreq);
}
} catch (IOException e) {
return failedFuture(e);
}
Exchange<T> exch = getExchange();
// 2. get response
cf = exch.responseAsync()
.thenCompose((Response response) -> {
HttpRequestImpl newrequest;
try {
// 3. apply response filters
newrequest = responseFilters(response);
} catch (IOException e) {
return failedFuture(e);
}
// 4. check filter result and repeat or continue
if (newrequest == null) {
if (attempts.get() > 1) {
Log.logError(“Succeeded on attempt: ” + attempts);
}
return completedFuture(response);
} else {
this.response =
new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
Exchange<T> oldExch = exch;
return exch.ignoreBody().handle((r,t) -> {
previousreq = currentreq;
currentreq = newrequest;
expiredOnce = false;
setExchange(new Exchange<>(currentreq, this, acc));
return responseAsyncImpl();
}).thenCompose(Function.identity());
} })
.handle((response, ex) -> {
// 5. handle errors and cancel any timer set
cancelTimer();
if (ex == null) {
assert response != null;
return completedFuture(response);
}
// all exceptions thrown are handled here
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
if (errorCF == null) {
return responseAsyncImpl();
} else {
return errorCF;
} })
.thenCompose(Function.identity());
}
return cf;
}

private void requestFilters(HttpRequestImpl r) throws IOException {
Log.logTrace(“Applying request filters”);
for (HeaderFilter filter : filters) {
Log.logTrace(“Applying {0}”, filter);
filter.request(r, this);
}
Log.logTrace(“All filters applied”);
}

private HttpRequestImpl responseFilters(Response response) throws IOException
{
Log.logTrace(“Applying response filters”);
Iterator<HeaderFilter> reverseItr = filters.descendingIterator();
while (reverseItr.hasNext()) {
HeaderFilter filter = reverseItr.next();
Log.logTrace(“Applying {0}”, filter);
HttpRequestImpl newreq = filter.response(response);
if (newreq != null) {
Log.logTrace(“New request: stopping filters”);
return newreq;
}
}
Log.logTrace(“All filters applied”);
return null;
}

MultiExchange 在构造器里头调用了 client.filterChain(),完成 filters 的初始化
在 responseAsyncImpl 方法里头,执行请求之前调用 requestFilters,得到 response 之后调用 responseFilters
requestFilters 是按顺序执行,而 responseFilters 则取的是 descendingIterator,逆序执行

HeaderFilter
java.net.http/jdk/internal/net/http/HeaderFilter.java
/**
* A header filter that can examine or modify, typically system headers for
* requests before they are sent, and responses before they are returned to the
* user. Some ability to resend requests is provided.
*/
interface HeaderFilter {

void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException;

/**
* Returns null if response ok to be given to user. Non null is a request
* that must be resent and its response given to user. If impl throws an
* exception that is returned to user instead.
*/
HttpRequestImpl response(Response r) throws IOException;
}

可以看到 HeaderFilter 接口定义了 request 以及 response 方法
对于 response 方法,如果对 header 处理没问题就返回 null,有异常抛异常,需要重新发送的则会返回 HttpRequestImpl
HeaderFilter 有三个实现类,分别是 AuthenticationFilter、RedirectFilter、CookieFilter

AuthenticationFilter
java.net.http/jdk/internal/net/http/AuthenticationFilter.java
@Override
public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
// use preemptive authentication if an entry exists.
Cache cache = getCache(e);
this.exchange = e;

// Proxy
if (exchange.proxyauth == null) {
URI proxyURI = getProxyURI(r);
if (proxyURI != null) {
CacheEntry ca = cache.get(proxyURI, true);
if (ca != null) {
exchange.proxyauth = new AuthInfo(true, ca.scheme, null, ca);
addBasicCredentials(r, true, ca.value);
}
}
}

// Server
if (exchange.serverauth == null) {
CacheEntry ca = cache.get(r.uri(), false);
if (ca != null) {
exchange.serverauth = new AuthInfo(true, ca.scheme, null, ca);
addBasicCredentials(r, false, ca.value);
}
}
}

// TODO: refactor into per auth scheme class
private static void addBasicCredentials(HttpRequestImpl r,
boolean proxy,
PasswordAuthentication pw) {
String hdrname = proxy ? “Proxy-Authorization” : “Authorization”;
StringBuilder sb = new StringBuilder(128);
sb.append(pw.getUserName()).append(‘:’).append(pw.getPassword());
String s = encoder.encodeToString(sb.toString().getBytes(ISO_8859_1));
String value = “Basic ” + s;
if (proxy) {
if (r.isConnect()) {
if (!Utils.PROXY_TUNNEL_FILTER.test(hdrname, value)) {
Log.logError(“{0} disabled”, hdrname);
return;
}
} else if (r.proxy() != null) {
if (!Utils.PROXY_FILTER.test(hdrname, value)) {
Log.logError(“{0} disabled”, hdrname);
return;
}
}
}
r.setSystemHeader(hdrname, value);
}

@Override
public HttpRequestImpl response(Response r) throws IOException {
Cache cache = getCache(exchange);
int status = r.statusCode();
HttpHeaders hdrs = r.headers();
HttpRequestImpl req = r.request();

if (status != UNAUTHORIZED && status != PROXY_UNAUTHORIZED) {
// check if any authentication succeeded for first time
if (exchange.serverauth != null && !exchange.serverauth.fromcache) {
AuthInfo au = exchange.serverauth;
cache.store(au.scheme, req.uri(), false, au.credentials);
}
if (exchange.proxyauth != null && !exchange.proxyauth.fromcache) {
AuthInfo au = exchange.proxyauth;
URI proxyURI = getProxyURI(req);
if (proxyURI != null) {
cache.store(au.scheme, proxyURI, true, au.credentials);
}
}
return null;
}
//……
}
可以用于添加 basic authentication 的 header
RedirectFilter
java.net.http/jdk/internal/net/http/RedirectFilter.java
@Override
public synchronized void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
this.request = r;
this.client = e.client();
this.policy = client.followRedirects();

this.method = r.method();
this.uri = r.uri();
this.exchange = e;
}

@Override
public synchronized HttpRequestImpl response(Response r) throws IOException {
return handleResponse(r);
}

/**
* Checks to see if a new request is needed and returns it.
* Null means response is ok to return to user.
*/
private HttpRequestImpl handleResponse(Response r) {
int rcode = r.statusCode();
if (rcode == 200 || policy == HttpClient.Redirect.NEVER) {
return null;
}

if (rcode == HTTP_NOT_MODIFIED)
return null;

if (rcode >= 300 && rcode <= 399) {
URI redir = getRedirectedURI(r.headers());
String newMethod = redirectedMethod(rcode, method);
Log.logTrace(“response code: {0}, redirected URI: {1}”, rcode, redir);
if (canRedirect(redir) && ++exchange.numberOfRedirects < max_redirects) {
Log.logTrace(“redirect to: {0} with method: {1}”, redir, newMethod);
return HttpRequestImpl.newInstanceForRedirection(redir, newMethod, request);
} else {
Log.logTrace(“not redirecting”);
return null;
}
}
return null;
}
主要用于处理 3xx 跳转,这个时候满足条件的话会返回新的 HttpRequestImpl 实例
CookieFilter
java.net.http/jdk/internal/net/http/CookieFilter.java
@Override
public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
HttpClientImpl client = e.client();
Optional<CookieHandler> cookieHandlerOpt = client.cookieHandler();
if (cookieHandlerOpt.isPresent()) {
CookieHandler cookieHandler = cookieHandlerOpt.get();
Map<String,List<String>> userheaders = r.getUserHeaders().map();
Map<String,List<String>> cookies = cookieHandler.get(r.uri(), userheaders);

// add the returned cookies
HttpHeadersBuilder systemHeadersBuilder = r.getSystemHeadersBuilder();
if (cookies.isEmpty()) {
Log.logTrace(“Request: no cookie to add for {0}”, r.uri());
} else {
Log.logTrace(“Request: adding cookies for {0}”, r.uri());
}
for (Map.Entry<String,List<String>> entry : cookies.entrySet()) {
final String hdrname = entry.getKey();
if (!hdrname.equalsIgnoreCase(“Cookie”)
&& !hdrname.equalsIgnoreCase(“Cookie2”))
continue;
List<String> values = entry.getValue();
if (values == null || values.isEmpty()) continue;
for (String val : values) {
if (Utils.isValidValue(val)) {
systemHeadersBuilder.addHeader(hdrname, val);
}
}
}
} else {
Log.logTrace(“Request: No cookie manager found for {0}”, r.uri());
}
}

@Override
public HttpRequestImpl response(Response r) throws IOException {
HttpHeaders hdrs = r.headers();
HttpRequestImpl request = r.request();
Exchange<?> e = r.exchange;
Log.logTrace(“Response: processing cookies for {0}”, request.uri());
Optional<CookieHandler> cookieHandlerOpt = e.client().cookieHandler();
if (cookieHandlerOpt.isPresent()) {
CookieHandler cookieHandler = cookieHandlerOpt.get();
Log.logTrace(“Response: parsing cookies from {0}”, hdrs.map());
cookieHandler.put(request.uri(), hdrs.map());
} else {
Log.logTrace(“Response: No cookie manager found for {0}”,
request.uri());
}
return null;
}
用于请求以及响应的 cookie 相关的处理
小结

FilterFactory 使用了简单的责任链模式,getFilterChain 方法使用反射实例化各种 filter
HeaderFilter 定义了 request 及 response 两个方法,分别作用于请求前及获得响应之后
HeaderFilter 有三个实现类,分别是 AuthenticationFilter、RedirectFilter、CookieFilter
MultiExchange 在 responseAsyncImpl 方法里头,执行请求之前调用 requestFilters,得到 response 之后调用 responseFilters。其中 requestFilters 是按顺序执行,而 responseFilters 则取的是 descendingIterator,逆序执行

doc
java.net.http javadoc

退出移动版