乐趣区

关于spring:配置-Spring-Batch-批处理失败重试机制

1. 引言

默认状况下,Spring 批处理作业在执行过程中呈现任何谬误都会失败。然而有些时候,为了进步应用程序的弹性,咱们就须要解决这类间歇性的故障。
在这篇短文中,咱们就来一起探讨 如何在 Spring 批处理框架中配置重试逻辑

2. 简略举例

假如有一个批处理作业,它读取一个 CSV 文件作为输出:

username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321

而后,它通过拜访 REST 端点来解决每条记录,获取用户的 agepostCode 属性:

public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
    
    @Override
    public Transaction process(Transaction transaction) throws IOException {log.info("RetryItemProcessor, attempting to process: {}", transaction);
        HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
        //parse user's age and postCode from response and update transaction
        ...
        return transaction;
    }
    ...
}

最初,它生成并输入一个合并的XML

<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31 00:00:00</transactionDate>
        <userId>1234</userId>
        <username>sammy</username>
        <age>10</age>
        <postCode>430222</postCode>
    </transactionRecord>
    ...
</transactionRecord>

3. ItemProcessor 中增加重试

当初假如,如果到 REST 端点的连贯因为某些网络速度慢而超时,该怎么办?如果产生这种状况,则咱们的批处理工作将失败。

在这种状况下,咱们心愿失败的 item 解决重试几次。因而,接下来我将批处理作业配置为:在呈现故障时执行最多三次重试

@Bean
public Step retryStep(
  ItemProcessor<Transaction, Transaction> processor,
  ItemWriter<Transaction> writer) throws ParseException {
    return stepBuilderFactory
      .get("retryStep")
      .<Transaction, Transaction>chunk(10)
      .reader(itemReader(inputCsv))
      .processor(processor)
      .writer(writer)
      .faultTolerant()
      .retryLimit(3)
      .retry(ConnectTimeoutException.class)
      .retry(DeadlockLoserDataAccessException.class)
      .build();}

这里调用了 faultTolerant() 来启用重试性能。另外,咱们应用 retry 和 retryLimit 别离定义合乎重试条件的异样和 item 的最大重试次数

4. 测试重试次数

假如咱们有一个测试场景,其中返回 agepostCode 的 REST 端点敞开了一段时间。在这个测试场景中,咱们只对前两个 API 调用获取一个 ConnectTimeoutException,而第三个调用将胜利:

@Test
public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
    FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);

    when(httpResponse.getEntity())
      .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\"}"));
 
    //fails for first two calls and passes third time onwards
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Timeout count 1"))
      .thenThrow(new ConnectTimeoutException("Timeout count 2"))
      .thenReturn(httpResponse);

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
    AssertFile.assertFileEquals(expectedResult, actualResult);
}

在这里,咱们的工作胜利地实现了。另外,从日志中能够显著看出 第一条记录 id=1234 失败了两次,最初在第三次重试时胜利了

19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

同样,看下另一个测试用例,当所有重试次数都用完时会产生什么:

@Test
public void whenEndpointAlwaysFail_thenJobFails() throws Exception {when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Endpoint is down"));

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
    assertThat(actualJobExitStatus.getExitDescription(),
      containsString("org.apache.http.conn.ConnectTimeoutException"));
}

在这个测试用例中,在作业因 ConnectTimeoutException 而失败之前,会尝试对第一条记录重试三次。

5. 应用 XML 配置重试

最初,让咱们看一下与上述配置等价的 XML:

<batch:job id="retryBatchJob">
    <batch:step id="retryStep">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="retryItemProcessor" commit-interval="10"
              retry-limit="3">
                <batch:retryable-exception-classes>
                    <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
                    <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                </batch:retryable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

6. 简略总结

在本文中,咱们学习了如何在 Spring 批处理中配置重试逻辑,其中包含应用 Java 和 XML 配置。以及应用单元测试来察看重试在实践中是如何工作的。
如果你感觉文章还不错,记得关注公众号:锅外的大佬
锅外的大佬博客

退出移动版