在Spring Batch中配置重试逻辑

2023/05/11

1. 概述

默认情况下,Spring Batch作业会因执行期间出现的任何错误而失败。但是,有时我们可能希望提高应用程序的弹性以处理间歇性故障。

在本快速教程中,我们将探讨如何在Spring Batch框架中配置重试逻辑

2. 示例用例

假设我们有一个读取输入CSV文件的批处理作业:

username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321

然后,它通过访问REST端点以获取用户的年龄和邮政编码属性来处理每条记录:

public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {

    @Override
    public Transaction process(Transaction transaction) throws IOException {
        log.info("RetryItemProcessor, attempting to process: {}", transaction);
        HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
        // parse user's age and postCode from response and update transaction
        // ...
        return transaction;
    }
    // ...
}

最后,它生成一个合并的输出XML:

<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31 00:00:00</transactionDate>
        <userId>1234</userId>
        <username>sammy</username>
        <age>10</age>
        <postCode>430222</postCode>
    </transactionRecord>
    <!--...-->
</transactionRecord>

3. 向ItemProcessor添加重试

现在,如果由于某些网络缓慢而导致与REST端点的连接超时怎么办?如果是这样,我们的批处理作业就会失败。

在这种情况下,我们希望将失败的项目处理重试几次。因此,让我们将批处理作业配置为在失败的情况下最多执行三次重试

@Bean
public Step retryStep(ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) throws ParseException {
    return stepBuilderFactory
        .get("retryStep")
        .<Transaction, Transaction>chunk(10)
        .reader(itemReader(inputCsv))
        .processor(processor)
        .writer(writer)
        .faultTolerant()
        .retryLimit(3)
        .retry(ConnectTimeoutException.class)
        .retry(DeadlockLoserDataAccessException.class)
        .build();
}

在这里,我们调用了faultTolerant()以启用重试功能。此外,我们使用retry和retryLimit分别定义符合重试条件的异常和项目的最大重试次数

4. 测试重试

假设我们有一个测试场景,其中REST端点返回age和postCode只是暂时关闭。在此测试场景中,我们将仅针对前两次API调用获得ConnectTimeoutException,第三次调用将成功:

@Test
public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
    FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
    FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);

    when(httpResponse.getEntity())
        .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
 
    //fails for first two calls and passes third time onwards
    when(httpClient.execute(any()))
        .thenThrow(new ConnectTimeoutException("Timeout count 1"))
        .thenThrow(new ConnectTimeoutException("Timeout count 2"))
        .thenReturn(httpResponse);

    JobExecution jobExecution = jobLauncherTestUtils
        .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
    AssertFile.assertFileEquals(expectedResult, actualResult);
}

在这里,我们的作业成功完成了。此外,从日志中可以明显看出,id=1234的第一条记录失败了两次,最后在第三次重试时成功了

19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO  c.t.t.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  c.t.t.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  c.t.t.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  c.t.t.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

同样,让我们再来一个测试用例,看看当所有重试都用完时会发生什么

@Test
public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
    when(httpClient.execute(any()))
        .thenThrow(new ConnectTimeoutException("Endpoint is down"));

    JobExecution jobExecution = jobLauncherTestUtils
        .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
    assertThat(actualJobExitStatus.getExitDescription(),
        containsString("org.apache.http.conn.ConnectTimeoutException"));
}

在这种情况下,在作业最终因ConnectTimeoutException而失败之前,对第一条记录进行了三次重试

5. 使用XML配置重试

最后,让我们看一下上述配置的XML等价物:

<batch:job id="retryBatchJob">
    <batch:step id="retryStep">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
                         processor="retryItemProcessor" commit-interval="10"
                         retry-limit="3">
                <batch:retryable-exception-classes>
                    <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
                    <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                </batch:retryable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

6. 总结

在本文中,我们学习了如何在Spring Batch中配置重试逻辑。我们查看了Java和XML配置。

我们还使用单元测试来查看重试在实践中的效果。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章