1. 引言
默认状况下,Spring批处理作业在执行过程中呈现任何谬误都会失败。然而有些时候,为了进步应用程序的弹性,咱们就须要解决这类间歇性的故障。
在这篇短文中,咱们就来一起探讨 如何在Spring批处理框架中配置重试逻辑。
2. 简略举例
假如有一个批处理作业,它读取一个CSV文件作为输出:
username, userid, transaction_date, transaction_amountsammy, 1234, 31/10/2015, 10000john, 9999, 3/12/2015, 12321
而后,它通过拜访REST端点来解决每条记录,获取用户的 age 和 postCode 属性:
public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> { @Override public Transaction process(Transaction transaction) throws IOException { log.info("RetryItemProcessor, attempting to process: {}", transaction); HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); //parse user's age and postCode from response and update transaction ... return transaction; } ...}
最初,它生成并输入一个合并的XML:
<transactionRecord> <transactionRecord> <amount>10000.0</amount> <transactionDate>2015-10-31 00:00:00</transactionDate> <userId>1234</userId> <username>sammy</username> <age>10</age> <postCode>430222</postCode> </transactionRecord> ...</transactionRecord>
3. ItemProcessor 中增加重试
当初假如,如果到REST端点的连贯因为某些网络速度慢而超时,该怎么办?如果产生这种状况,则咱们的批处理工作将失败。
在这种状况下,咱们心愿失败的 item 解决重试几次。因而,接下来我将批处理作业配置为:在呈现故障时执行最多三次重试:
@Beanpublic Step retryStep( ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) throws ParseException { return stepBuilderFactory .get("retryStep") .<Transaction, Transaction>chunk(10) .reader(itemReader(inputCsv)) .processor(processor) .writer(writer) .faultTolerant() .retryLimit(3) .retry(ConnectTimeoutException.class) .retry(DeadlockLoserDataAccessException.class) .build();}
这里调用了 faultTolerant() 来启用重试性能。另外,咱们应用 retry 和 retryLimit 别离定义合乎重试条件的异样和 item 的最大重试次数。
4. 测试重试次数
假如咱们有一个测试场景,其中返回 age 和 postCode 的REST端点敞开了一段时间。在这个测试场景中,咱们只对前两个 API 调用获取一个 ConnectTimeoutException ,而第三个调用将胜利:
@Testpublic void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT); FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT); when(httpResponse.getEntity()) .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }")); //fails for first two calls and passes third time onwards when(httpClient.execute(any())) .thenThrow(new ConnectTimeoutException("Timeout count 1")) .thenThrow(new ConnectTimeoutException("Timeout count 2")) .thenReturn(httpResponse); JobExecution jobExecution = jobLauncherTestUtils .launchJob(defaultJobParameters()); JobInstance actualJobInstance = jobExecution.getJobInstance(); ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED")); AssertFile.assertFileEquals(expectedResult, actualResult);}
在这里,咱们的工作胜利地实现了。另外,从日志中能够显著看出 第一条记录 id=1234 失败了两次,最初在第三次重试时胜利了:
19:06:57.742 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=123419:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=123419:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=123419:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=999919:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms
同样,看下另一个测试用例,当所有重试次数都用完时会产生什么:
@Testpublic void whenEndpointAlwaysFail_thenJobFails() throws Exception { when(httpClient.execute(any())) .thenThrow(new ConnectTimeoutException("Endpoint is down")); JobExecution jobExecution = jobLauncherTestUtils .launchJob(defaultJobParameters()); JobInstance actualJobInstance = jobExecution.getJobInstance(); ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); assertThat(actualJobExitStatus.getExitCode(), is("FAILED")); assertThat(actualJobExitStatus.getExitDescription(), containsString("org.apache.http.conn.ConnectTimeoutException"));}
在这个测试用例中,在作业因 ConnectTimeoutException
而失败之前,会尝试对第一条记录重试三次。
5. 应用XML配置重试
最初,让咱们看一下与上述配置等价的XML:
<batch:job id="retryBatchJob"> <batch:step id="retryStep"> <batch:tasklet> <batch:chunk reader="itemReader" writer="itemWriter" processor="retryItemProcessor" commit-interval="10" retry-limit="3"> <batch:retryable-exception-classes> <batch:include class="org.apache.http.conn.ConnectTimeoutException"/> <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/> </batch:retryable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step></batch:job>
6. 简略总结
在本文中,咱们学习了如何在Spring批处理中配置重试逻辑,其中包含应用Java和XML配置。以及应用单元测试来察看重试在实践中是如何工作的。
如果你感觉文章还不错,记得关注公众号: 锅外的大佬
锅外的大佬博客