共计 6651 个字符,预计需要花费 17 分钟才能阅读完成。
Bulk API
Java High Level REST Client 提供了 Bulk 处理器来帮助处理批量请求。
Bulk 请求
BulkRequest
可以使用一个请求执行多个索引、更新和 / 或删除操作。
它需要在批量请求中添加至少一个操作:
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("posts").id("1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")
.source(XContentType.JSON,"field", "baz"));
- 创建
BulkRequest
。 - 将
IndexRequest
添加到Bulk
请求。
Bulk API 只支持 JSON 或SMILE编码的文档,提供任何其他格式的文档都会导致错误。
不同的操作类型可以添加到同一个BulkRequest
:
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")
.source(XContentType.JSON,"field", "baz"));
- 向
BulkRequest
添加DeleteRequest
。 - 向
BulkRequest
添加UpdateRequest
。 - 使用 JSON 格式添加
IndexRequest
。
可选参数
可以选择提供以下参数:
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
- 作为
TimeValue
等待 bulk 请求执行的超时。 - 作为
String
等待 bulk 请求执行的超时。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
- 作为
WriteRequest.RefreshPolicy
实例的刷新策略。 - 作为
String
的刷新策略。
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
- 设置在继续执行索引 / 更新 / 删除操作之前必须活动的碎片副本的数量。
- 作为
ActiveShardCount
提供的碎片副本的数量:可以是ActiveShardCount.ALL
、ActiveShardCount.ONE
、ActiveShardCount.DEFAULT
(默认)。
request.pipeline("pipelineId");
- 全局
pipelineId
用于所有子请求,除非在子请求上重写。
request.routing("routingId");
- 全局
routingId
用于所有子请求,除非在子请求上重写。
BulkRequest defaulted = new BulkRequest("posts");
- 在所有子请求上使用全局索引的 bulk 请求,除非在子请求上重写,这个参数是
@Nullable
,并只能在创建BulkRequest
时设置。
同步执行
当以以下方式执行 BulkRequest
时,客户端等待 BulkResponse
返回,然后继续执行代码:
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
在高级别 REST 客户端中解析 REST 响应失败、请求超时或类似的情况,其中没有来自服务器的响应的情况下,同步调用可能引发IOException
。
在服务器返回 4xx
或5xx
错误代码的情况下,高级别客户端尝试解析响应体错误细节,然后抛出一个通用的 ElasticsearchException
并将原始的 ResponseException
作为一个被抑制的异常添加到它。
异步执行
还可以以异步方式执行BulkRequest
,以便客户端可以直接返回,用户需要指定如何通过将请求和侦听器传递给异步块方法来处理响应或潜在故障:
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
- 要执行的
BulkRequest
和在执行完成时使用的ActionListener
。
异步方法不会阻塞并立即返回,一旦执行完成,ActionListener
将使用 onResponse
方法(如果执行成功)被调用,或者使用 onFailure
方法(如果执行失败)被调用,失败情况和预期的异常与同步执行情况相同。
一个典型的 bulk 监听器是这样的:
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) { }
@Override
public void onFailure(Exception e) {}};
-
onResponse
当执行成功完成时调用。 -
onFailure
当整个BulkRequest
失败时调用。
Bulk 响应
返回的 BulkResponse
包含执行操作的信息,允许对每个结果进行如下迭代:
for (BulkItemResponse bulkItemResponse : bulkResponse) {DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
- 遍历所有操作的结果。
- 检索操作的响应(成功与否),可以是
IndexResponse
、UpdateResponse
或DeleteResponse
,它们都可以看作DocWriteResponse
实例。 - 处理索引操作的响应。
- 处理更新操作的响应。
- 处理删除操作的响应。
Bulk响应提供了一种方法来快速检查一个或多个操作是否失败:
if (bulkResponse.hasFailures()) {}
- 如果至少有一个操作失败,此方法将返回
true
。
在这种情况下,需要对所有的操作结果进行迭代,以检查操作是否失败,如果失败,则检索相应的失败:
for (BulkItemResponse bulkItemResponse : bulkResponse) {if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();}
}
- 指示给定操作是否失败。
- 检索失败操作的失败。
Bulk 处理器
BulkProcessor
提供了一个实用程序类,允许索引 / 更新 / 删除操作在添加到处理器时透明地执行,从而简化了Bulk API 的使用。
为了执行请求,BulkProcessor
需要以下组件:
RestHighLevelClient
- 此客户端用于执行
BulkRequest
并检索BulkResponse
。
BulkProcessor.Listener
- 在每次执行
BulkRequest
之前和之后,或者当一个BulkRequest
失败时,都会调用这个侦听器。
然后 BulkProcessor.builder
方法可以用来构建一个新的BulkProcessor
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) { }
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) { }
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {}};
BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
- 创建
BulkProcessor.Listener
。 -
beforeBulk
方法在每次执行BulkRequest
之前调用。 -
afterBulk
方法在每次执行BulkRequest
之后调用。 - 带
failure
参数的afterBulk
方法在BulkRequest
失败时调用。 - 通过从
BulkProcessor.builder
调用build()
方法创建BulkProcessor
,RestHighLevelClient.bulkAsync()
方法将用于在后台执行BulkRequest
。
BulkProcessor.Builder
提供了一些方法来配置 BulkProcessor
应该如何处理请求执行:
BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
- 根据当前添加的操作数量设置刷新新 bulk 请求的时间(默认为
1000
,使用-1
禁用它)。 - 根据当前添加的操作大小设置刷新新 bulk 请求的时间(默认为
5Mb
,使用-1
禁用)。 - 设置允许执行的并发请求数量(默认为
1
,使用0
只允许执行单个请求)。 - 设置刷新间隔,如果间隔通过,则刷新任何挂起的
BulkRequest
(默认为未设置)。 - 设置一个常量后退策略,该策略最初等待 1 秒并重试最多 3 次,有关更多选项,请参见
BackoffPolicy.noBackoff()
、BackoffPolicy.constantBackoff()
和BackoffPolicy.exponentialBackoff()
。
一旦创建了BulkProcessor
,就可以向它添加请求:
IndexRequest one = new IndexRequest("posts").id("1")
.source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
请求将由 BulkProcessor
执行,它负责为每个 bulk 请求调用BulkProcessor.Listener
。
监听器提供访问 BulkRequest
和BulkResponse
的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {if (response.hasFailures()) {logger.warn("Bulk [{}] executed with failures", executionId);
} else {logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {logger.error("Failed to execute bulk", failure);
}
};
-
beforeBulk
在执行BulkRequest
的每次执行之前调用,这个方法允许知道将要在BulkRequest
中执行的操作的数量。 -
afterBulk
在每次执行BulkRequest
之后调用,这个方法允许知道BulkResponse
是否包含错误。 - 如果
BulkRequest
失败,则调用带failure
参数的afterBulk
方法,该方法允许知道失败。
将所有请求添加到 BulkProcessor
之后,需要使用两种可用的关闭方法之一关闭它的实例。
awaitClose()
方法可以用来等待,直到所有的请求都被处理完毕或者指定的等待时间过去:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
- 如果所有 bulk 请求都已完成,则该方法返回
true
,如果在所有bulk 请求完成之前的等待时间已经过去,则返回false
。
close()
方法可用于立即关闭BulkProcessor
:
这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且禁止向处理器添加任何新请求。