乐趣区

Java11 HttpClient小试牛刀


本文主要研究一下 Java11 的 HttpClient 的基本使用。
变化

从 java9 的 jdk.incubator.httpclient 模块迁移到 java.net.http 模块,包名由 jdk.incubator.http 改为 java.net.http
原来的诸如 HttpResponse.BodyHandler.asString() 方法变更为 HttpResponse.BodyHandlers.ofString(),变化一为 BodyHandler 改为 BodyHandlers,变化二为 asXXX() 之类的方法改为 ofXXX(),由 as 改为 of

实例
设置超时时间
@Test
public void testTimeout() throws IOException, InterruptedException {
//1.set connect timeout
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(5000))
.followRedirects(HttpClient.Redirect.NORMAL)
.build();

//2.set read timeout
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“http://openjdk.java.net/”))
.timeout(Duration.ofMillis(5009))
.build();

HttpResponse<String> response =
client.send(request, HttpResponse.BodyHandlers.ofString());

System.out.println(response.body());

}
HttpConnectTimeoutException 实例
Caused by: java.net.http.HttpConnectTimeoutException: HTTP connect timed out
at java.net.http/jdk.internal.net.http.ResponseTimerEvent.handle(ResponseTimerEvent.java:68)
at java.net.http/jdk.internal.net.http.HttpClientImpl.purgeTimeoutsAndReturnNextDeadline(HttpClientImpl.java:1248)
at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:877)
Caused by: java.net.ConnectException: HTTP connect timed out
at java.net.http/jdk.internal.net.http.ResponseTimerEvent.handle(ResponseTimerEvent.java:69)
… 2 more
HttpTimeoutException 实例
java.net.http.HttpTimeoutException: request timed out

at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:559)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at com.example.HttpClientTest.testTimeout(HttpClientTest.java:40)
设置 authenticator
@Test
public void testBasicAuth() throws IOException, InterruptedException {
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(5000))
.authenticator(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(“admin”,”password”.toCharArray());
}
})
.build();

HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“http://localhost:8080/json/info”))
.timeout(Duration.ofMillis(5009))
.build();

HttpResponse<String> response =
client.send(request, HttpResponse.BodyHandlers.ofString());

System.out.println(response.statusCode());
System.out.println(response.body());
}

authenticator 可以用来设置 HTTP authentication,比如 Basic authentication
虽然 Basic authentication 也可以自己设置 header,不过通过 authenticator 省得自己去构造 header

设置 header
@Test
public void testCookies() throws IOException, InterruptedException {
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(5000))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“http://localhost:8080/json/cookie”))
.header(“Cookie”,”JSESSIONID=4f994730-32d7-4e22-a18b-25667ddeb636; userId=java11″)
.timeout(Duration.ofMillis(5009))
.build();
HttpResponse<String> response =
client.send(request, HttpResponse.BodyHandlers.ofString());

System.out.println(response.statusCode());
System.out.println(response.body());
}
通过 request 可以自己设置 header
GET
同步
@Test
public void testSyncGet() throws IOException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“https://www.baidu.com”))
.build();

HttpResponse<String> response =
client.send(request, HttpResponse.BodyHandlers.ofString());

System.out.println(response.body());
}
异步
@Test
public void testAsyncGet() throws ExecutionException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“https://www.baidu.com”))
.build();

CompletableFuture<String> result = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
System.out.println(result.get());
}
POST 表单
@Test
public void testPostForm() throws IOException, InterruptedException {
HttpClient client = HttpClient.newBuilder().build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“http://www.w3school.com.cn/demo/demo_form.asp”))
.header(“Content-Type”,”application/x-www-form-urlencoded”)
.POST(HttpRequest.BodyPublishers.ofString(“name1=value1&name2=value2”))
.build();

HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println(response.statusCode());
}
header 指定内容是表单类型,然后通过 BodyPublishers.ofString 传递表单数据,需要自己构建表单参数
POST JSON
@Test
public void testPostJsonGetJson() throws ExecutionException, InterruptedException, JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
StockDto dto = new StockDto();
dto.setName(“hj”);
dto.setSymbol(“hj”);
dto.setType(StockDto.StockType.SH);
String requestBody = objectMapper
.writerWithDefaultPrettyPrinter()
.writeValueAsString(dto);

HttpRequest request = HttpRequest.newBuilder(URI.create(“http://localhost:8080/json/demo”))
.header(“Content-Type”, “application/json”)
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();

CompletableFuture<StockDto> result = HttpClient.newHttpClient()
.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenApply(body -> {
try {
return objectMapper.readValue(body,StockDto.class);
} catch (IOException e) {
return new StockDto();
}
});
System.out.println(result.get());
}
post json 的话,body 自己 json 化为 string,然后 header 指定是 json 格式
文件上传
@Test
public void testUploadFile() throws IOException, InterruptedException, URISyntaxException {
HttpClient client = HttpClient.newHttpClient();
Path path = Path.of(getClass().getClassLoader().getResource(“body.txt”).toURI());
File file = path.toFile();

String multipartFormDataBoundary = “Java11HttpClientFormBoundary”;
org.apache.http.HttpEntity multipartEntity = MultipartEntityBuilder.create()
.addPart(“file”, new FileBody(file, ContentType.DEFAULT_BINARY))
.setBoundary(multipartFormDataBoundary) // 要设置,否则阻塞
.build();

HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“http://localhost:8080/file/upload”))
.header(“Content-Type”, “multipart/form-data; boundary=” + multipartFormDataBoundary)
.POST(HttpRequest.BodyPublishers.ofInputStream(() -> {
try {
return multipartEntity.getContent();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}))
.build();

HttpResponse<String> response =
client.send(request, HttpResponse.BodyHandlers.ofString());

System.out.println(response.body());
}

官方的 HttpClient 并没有提供类似 WebClient 那种现成的 BodyInserters.fromMultipartData 方法,因此这里需要自己转换
这里使用 org.apache.httpcomponents(httpclient 及 httpmime) 的 MultipartEntityBuilder 构建 multipartEntity,最后通过 HttpRequest.BodyPublishers.ofInputStream 来传递内容
这里 header 要指定 Content-Type 值为 multipart/form-data 以及 boundary 的值,否则服务端可能无法解析

文件下载
@Test
public void testAsyncDownload() throws ExecutionException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“http://localhost:8080/file/download”))
.build();

CompletableFuture<Path> result = client.sendAsync(request, HttpResponse.BodyHandlers.ofFile(Paths.get(“/tmp/body.txt”)))
.thenApply(HttpResponse::body);
System.out.println(result.get());
}
使用 HttpResponse.BodyHandlers.ofFile 来接收文件
并发请求
@Test
public void testConcurrentRequests(){
HttpClient client = HttpClient.newHttpClient();
List<String> urls = List.of(“http://www.baidu.com”,”http://www.alibaba.com/”,”http://www.tencent.com”);
List<HttpRequest> requests = urls.stream()
.map(url -> HttpRequest.newBuilder(URI.create(url)))
.map(reqBuilder -> reqBuilder.build())
.collect(Collectors.toList());

List<CompletableFuture<HttpResponse<String>>> futures = requests.stream()
.map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString()))
.collect(Collectors.toList());
futures.stream()
.forEach(e -> e.whenComplete((resp,err) -> {
if(err != null){
err.printStackTrace();
}else{
System.out.println(resp.body());
System.out.println(resp.statusCode());
}
}));
CompletableFuture.allOf(futures
.toArray(CompletableFuture<?>[]::new))
.join();
}

sendAsync 方法返回的是 CompletableFuture,可以方便地进行转换、组合等操作
这里使用 CompletableFuture.allOf 组合在一起,最后调用 join 等待所有 future 完成

错误处理
@Test
public void testHandleException() throws ExecutionException, InterruptedException {
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(5000))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(“https://twitter.com”))
.build();

CompletableFuture<String> result = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
// .whenComplete((resp,err) -> {
// if(err != null){
// err.printStackTrace();
// }else{
// System.out.println(resp.body());
// System.out.println(resp.statusCode());
// }
// })
.thenApply(HttpResponse::body)
.exceptionally(err -> {
err.printStackTrace();
return “fallback”;
});
System.out.println(result.get());
}

HttpClient 异步请求返回的是 CompletableFuture<HttpResponse<T>>,其自带 exceptionally 方法可以用来做 fallback 处理
另外值得注意的是 HttpClient 不像 WebClient 那样,它没有对 4xx 或 5xx 的状态码抛出异常,需要自己根据情况来处理,手动检测状态码抛出异常或者返回其他内容

HTTP2
@Test
public void testHttp2() throws URISyntaxException {
HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2)
.build()
.sendAsync(HttpRequest.newBuilder()
.uri(new URI(“https://http2.akamai.com/demo”))
.GET()
.build(),
HttpResponse.BodyHandlers.ofString())
.whenComplete((resp,t) -> {
if(t != null){
t.printStackTrace();
}else{
System.out.println(resp.version());
System.out.println(resp.statusCode());
}
}).join();
}
执行之后可以看到返回的 response 的 version 为 HTTP_2
WebSocket
@Test
public void testWebSocket() throws InterruptedException {
HttpClient client = HttpClient.newHttpClient();
WebSocket webSocket = client.newWebSocketBuilder()
.buildAsync(URI.create(“ws://localhost:8080/echo”), new WebSocket.Listener() {

@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
// request one more
webSocket.request(1);

// Print the message when it’s available
return CompletableFuture.completedFuture(data)
.thenAccept(System.out::println);
}
}).join();
webSocket.sendText(“hello “, false);
webSocket.sendText(“world “,true);

TimeUnit.SECONDS.sleep(10);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, “ok”).join();
}

HttpClient 支持 HTTP2,也包含了 WebSocket,通过 newWebSocketBuilder 去构造 WebSocket
传入 listener 进行接收消息,要发消息的话,使用 WebSocket 来发送,关闭使用 sendClose 方法

reactive streams
HttpClient 本身就是 reactive 的,支持 reactive streams,这里举 ResponseSubscribers.ByteArraySubscriber 的源码看看:java.net.http/jdk/internal/net/http/ResponseSubscribers.java
public static class ByteArraySubscriber<T> implements BodySubscriber<T> {
private final Function<byte[], T> finisher;
private final CompletableFuture<T> result = new MinimalFuture<>();
private final List<ByteBuffer> received = new ArrayList<>();

private volatile Flow.Subscription subscription;

public ByteArraySubscriber(Function<byte[],T> finisher) {
this.finisher = finisher;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
subscription.cancel();
return;
}
this.subscription = subscription;
// We can handle whatever you’ve got
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(List<ByteBuffer> items) {
// incoming buffers are allocated by http client internally,
// and won’t be used anywhere except this place.
// So it’s free simply to store them for further processing.
assert Utils.hasRemaining(items);
received.addAll(items);
}

@Override
public void onError(Throwable throwable) {
received.clear();
result.completeExceptionally(throwable);
}

static private byte[] join(List<ByteBuffer> bytes) {
int size = Utils.remaining(bytes, Integer.MAX_VALUE);
byte[] res = new byte[size];
int from = 0;
for (ByteBuffer b : bytes) {
int l = b.remaining();
b.get(res, from, l);
from += l;
}
return res;
}

@Override
public void onComplete() {
try {
result.complete(finisher.apply(join(received)));
received.clear();
} catch (IllegalArgumentException e) {
result.completeExceptionally(e);
}
}

@Override
public CompletionStage<T> getBody() {
return result;
}
}

BodySubscriber 接口继承了 Flow.Subscriber<List<ByteBuffer>> 接口
这里的 Subscription 来自 Flow 类,该类是 java9 引入的,里头包含了支持 Reactive Streams 的实现

小结
HttpClient 在 Java11 从 incubator 变为正式版,相对于传统的 HttpUrlConnection 其提升可不是一点半点,不仅支持异步,也支持 reactive streams,同时也支持了 HTTP2 以及 WebSocket,非常值得大家使用。
doc

java.net.http javadoc
Examples and Recipes
Java 11: Standardized HTTP Client API
Exploring the New HTTP Client in Java 9
Introduction to the New HTTP Client in Java 9
Getting Started With Java 9’s New HTTP Client
java9 系列 (六)HTTP/2 Client (Incubator)
Java 9 HttpClient send a multipart/form-data request
Java 9: High level HTTP and WebSocket API
WebSocket Client API in Java 9 with Example

退出移动版