乐趣区

Elasticsearch-Java-High-Level-REST-ClientBulk-API

Bulk API

Java High Level REST Client 提供了 Bulk 处理器来帮助处理批量请求。

Bulk 请求

BulkRequest可以使用一个请求执行多个索引、更新和 / 或删除操作。

它需要在批量请求中添加至少一个操作:

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));
  • 创建BulkRequest
  • IndexRequest 添加到 Bulk 请求。

Bulk API 只支持 JSONSMILE编码的文档,提供任何其他格式的文档都会导致错误。

不同的操作类型可以添加到同一个BulkRequest

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));
  • BulkRequest 添加DeleteRequest
  • BulkRequest 添加UpdateRequest
  • 使用 JSON 格式添加IndexRequest

可选参数

可以选择提供以下参数:

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");
  • 作为 TimeValue 等待 bulk 请求执行的超时。
  • 作为 String 等待 bulk 请求执行的超时。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");
  • 作为 WriteRequest.RefreshPolicy 实例的刷新策略。
  • 作为 String 的刷新策略。
request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);
  • 设置在继续执行索引 / 更新 / 删除操作之前必须活动的碎片副本的数量。
  • 作为 ActiveShardCount 提供的碎片副本的数量:可以是ActiveShardCount.ALLActiveShardCount.ONEActiveShardCount.DEFAULT(默认)。
request.pipeline("pipelineId");
  • 全局 pipelineId 用于所有子请求,除非在子请求上重写。
request.routing("routingId");
  • 全局 routingId 用于所有子请求,除非在子请求上重写。
BulkRequest defaulted = new BulkRequest("posts");
  • 在所有子请求上使用全局索引的 bulk 请求,除非在子请求上重写,这个参数是 @Nullable,并只能在创建BulkRequest 时设置。

同步执行

当以以下方式执行 BulkRequest 时,客户端等待 BulkResponse 返回,然后继续执行代码:

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

在高级别 REST 客户端中解析 REST 响应失败、请求超时或类似的情况,其中没有来自服务器的响应的情况下,同步调用可能引发IOException

在服务器返回 4xx5xx错误代码的情况下,高级别客户端尝试解析响应体错误细节,然后抛出一个通用的 ElasticsearchException 并将原始的 ResponseException 作为一个被抑制的异常添加到它。

异步执行

还可以以异步方式执行BulkRequest,以便客户端可以直接返回,用户需要指定如何通过将请求和侦听器传递给异步块方法来处理响应或潜在故障:

client.bulkAsync(request, RequestOptions.DEFAULT, listener);
  • 要执行的 BulkRequest 和在执行完成时使用的ActionListener

异步方法不会阻塞并立即返回,一旦执行完成,ActionListener将使用 onResponse 方法(如果执行成功)被调用,或者使用 onFailure 方法(如果执行失败)被调用,失败情况和预期的异常与同步执行情况相同。

一个典型的 bulk 监听器是这样的:

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) { }

    @Override
    public void onFailure(Exception e) {}};
  • onResponse当执行成功完成时调用。
  • onFailure当整个 BulkRequest 失败时调用。

Bulk 响应

返回的 BulkResponse 包含执行操作的信息,允许对每个结果进行如下迭代:

for (BulkItemResponse bulkItemResponse : bulkResponse) {DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}
  • 遍历所有操作的结果。
  • 检索操作的响应(成功与否),可以是 IndexResponseUpdateResponseDeleteResponse,它们都可以看作 DocWriteResponse 实例。
  • 处理索引操作的响应。
  • 处理更新操作的响应。
  • 处理删除操作的响应。

Bulk响应提供了一种方法来快速检查一个或多个操作是否失败:

if (bulkResponse.hasFailures()) {}
  • 如果至少有一个操作失败,此方法将返回true

在这种情况下,需要对所有的操作结果进行迭代,以检查操作是否失败,如果失败,则检索相应的失败:

for (BulkItemResponse bulkItemResponse : bulkResponse) {if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure();}
}
  • 指示给定操作是否失败。
  • 检索失败操作的失败。

Bulk 处理器

BulkProcessor提供了一个实用程序类,允许索引 / 更新 / 删除操作在添加到处理器时透明地执行,从而简化了Bulk API 的使用。

为了执行请求,BulkProcessor需要以下组件:

RestHighLevelClient

  • 此客户端用于执行 BulkRequest 并检索BulkResponse

BulkProcessor.Listener

  • 在每次执行 BulkRequest 之前和之后,或者当一个 BulkRequest 失败时,都会调用这个侦听器。

然后 BulkProcessor.builder 方法可以用来构建一个新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) { }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) { }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {}};

BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();
  • 创建BulkProcessor.Listener
  • beforeBulk方法在每次执行 BulkRequest 之前调用。
  • afterBulk方法在每次执行 BulkRequest 之后调用。
  • failure 参数的 afterBulk 方法在 BulkRequest 失败时调用。
  • 通过从 BulkProcessor.builder 调用 build() 方法创建 BulkProcessorRestHighLevelClient.bulkAsync() 方法将用于在后台执行BulkRequest

BulkProcessor.Builder提供了一些方法来配置 BulkProcessor 应该如何处理请求执行:

BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
  • 根据当前添加的操作数量设置刷新新 bulk 请求的时间(默认为 1000,使用-1 禁用它)。
  • 根据当前添加的操作大小设置刷新新 bulk 请求的时间(默认为 5Mb,使用-1 禁用)。
  • 设置允许执行的并发请求数量(默认为 1,使用0 只允许执行单个请求)。
  • 设置刷新间隔,如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)。
  • 设置一个常量后退策略,该策略最初等待 1 秒并重试最多 3 次,有关更多选项,请参见 BackoffPolicy.noBackoff()BackoffPolicy.constantBackoff()BackoffPolicy.exponentialBackoff()

一旦创建了BulkProcessor,就可以向它添加请求:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

请求将由 BulkProcessor 执行,它负责为每个 bulk 请求调用BulkProcessor.Listener

监听器提供访问 BulkRequestBulkResponse的方法:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {if (response.hasFailures()) {logger.warn("Bulk [{}] executed with failures", executionId);
        } else {logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {logger.error("Failed to execute bulk", failure); 
    }
};
  • beforeBulk在执行 BulkRequest 的每次执行之前调用,这个方法允许知道将要在 BulkRequest 中执行的操作的数量。
  • afterBulk在每次执行 BulkRequest 之后调用,这个方法允许知道 BulkResponse 是否包含错误。
  • 如果 BulkRequest 失败,则调用带 failure 参数的 afterBulk 方法,该方法允许知道失败。

将所有请求添加到 BulkProcessor 之后,需要使用两种可用的关闭方法之一关闭它的实例。

awaitClose()方法可以用来等待,直到所有的请求都被处理完毕或者指定的等待时间过去:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
  • 如果所有 bulk 请求都已完成,则该方法返回 true,如果在所有bulk 请求完成之前的等待时间已经过去,则返回false

close()方法可用于立即关闭BulkProcessor

这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且禁止向处理器添加任何新请求。


退出移动版