Spring 非同期処理とリトライ処理

お仕事でSpringの非同期処理とリトライ処理に触れることがあったので、まとめておきます。

Spring MVCの非同期処理の細かい話については、こちらを参照してください。

今回の環境

  • Java 11
  • Spring Boot 2.5.2

@Asyncを使った非同期処理

@org.springframework.scheduling.annotation.Asyncメソッドに付与することで非同期処理を行うことができます。 Spring Boot 2.1より前のバージョンだと、taskExecutorというBean名でThreadPoolTaskExecutort.javaをBean登録することが普通でしたが、Spring Boot 2.1からはデフォルトでBean登録されるようになりました。参考

実装

1.@Asyncによる非同期処理を有効にする

設定用のクラスを準備して@EnableAsyncを付与します。

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration(proxyBeanMethods = false)
@EnableAsync
public class WebMvcConfig implements WebMvcConfigurer {}

2.非同期処理したいさせたいメソッドに@Asyncを付与する。

非同期処理させたいメソッドに@Asyncを付与してください。AOPを使っているので、DIしてこのメソッドを呼ぶ使い方をしないと非同期処理が行われません。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.b1a9idps.springasyncdemo.dto.request.AsyncRequest;
import com.b1a9idps.springasyncdemo.exception.FailedFileUploadException;
import com.b1a9idps.springasyncdemo.service.AsyncService;

@Service
public class AsyncServiceImpl implements AsyncService {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncServiceImpl.class);

    @Override
    @Async
    public void save(AsyncRequest request) {
        LOGGER.info("Start Async processing.(number = " + request.getNumber() + ")");

        try {
            Thread.sleep(5000);
            LOGGER.info("Hi!.(number = " + request.getNumber() + ")");
        } catch (InterruptedException e) {
            LOGGER.error("thrown InterruptedException.");
        }

        LOGGER.info("End Async processing.(number = " + request.getNumber() + ")");
}

挙動を見てみる

今回は、こんな感じの設定で試しています。

最小スレッド数2(spring.task.execution.pool.core-size=2)
最大スレッド数3(spring.task.execution.pool.max-size=3)
キュー数4(spring.task.execution.pool.queue-copacity=4)

スクリプトで連続的にリクエストしてみます。

#!/bin/bash

number=1

while true;
do
  curl -X POST -H "Content-Type: application/json" -d '{"number" : "'$number'"}'  http://localhost:8080/async
  echo ''
  number=$(expr $number + 1)
  sleep 1s;
done

スクリプトの実行ログ

{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"too busy..."}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"too busy..."}

キューがいっぱいでタスクの実行を受け入れられないときにRejectedExecutionException.javaが投げられるので、 AsyncControllerExceptionHandler#handleRejectedExecutionException でハンドリングしています。

アプリケーションログ

2021-07-09 15:38:19.033  INFO 79541 --- [nio-8080-exec-1] c.b.s.controller.AsyncController         : Start.(number = 1)
2021-07-09 15:38:19.037  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 1)
2021-07-09 15:38:19.174  INFO 79541 --- [nio-8080-exec-2] c.b.s.controller.AsyncController         : Start.(number = 2)
2021-07-09 15:38:19.174  INFO 79541 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 2)
2021-07-09 15:38:19.300  INFO 79541 --- [nio-8080-exec-4] c.b.s.controller.AsyncController         : Start.(number = 3)
2021-07-09 15:38:19.421  INFO 79541 --- [nio-8080-exec-6] c.b.s.controller.AsyncController         : Start.(number = 4)
2021-07-09 15:38:19.542  INFO 79541 --- [nio-8080-exec-8] c.b.s.controller.AsyncController         : Start.(number = 5)
2021-07-09 15:38:19.667  INFO 79541 --- [io-8080-exec-10] c.b.s.controller.AsyncController         : Start.(number = 6)
2021-07-09 15:38:19.793  INFO 79541 --- [nio-8080-exec-2] c.b.s.controller.AsyncController         : Start.(number = 7)
2021-07-09 15:38:19.793  INFO 79541 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 7)
2021-07-09 15:38:19.913  INFO 79541 --- [nio-8080-exec-4] c.b.s.controller.AsyncController         : Start.(number = 8)
2021-07-09 15:38:20.040  INFO 79541 --- [nio-8080-exec-5] c.b.s.controller.AsyncController         : Start.(number = 9)
2021-07-09 15:38:20.163  INFO 79541 --- [nio-8080-exec-6] c.b.s.controller.AsyncController         : Start.(number = 10)
2021-07-09 15:38:20.285  INFO 79541 --- [nio-8080-exec-7] c.b.s.controller.AsyncController         : Start.(number = 11)
2021-07-09 15:38:24.038  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 1)
2021-07-09 15:38:24.038  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 1)
2021-07-09 15:38:24.038  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 3)
2021-07-09 15:38:24.177  INFO 79541 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 2)
2021-07-09 15:38:24.177  INFO 79541 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 2)
2021-07-09 15:38:24.178  INFO 79541 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 4)
2021-07-09 15:38:24.794  INFO 79541 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 7)
2021-07-09 15:38:24.794  INFO 79541 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 7)
2021-07-09 15:38:24.794  INFO 79541 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 5)
2021-07-09 15:38:29.044  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 3)
2021-07-09 15:38:29.044  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 3)
2021-07-09 15:38:29.044  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 6)
2021-07-09 15:38:29.181  INFO 79541 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 4)
2021-07-09 15:38:29.181  INFO 79541 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 4)
2021-07-09 15:38:29.799  INFO 79541 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 5)
2021-07-09 15:38:29.800  INFO 79541 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 5)
2021-07-09 15:38:34.046  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Hi!.(number = 6)
2021-07-09 15:38:34.046  INFO 79541 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : End Async processing.(number = 6)

1.task-番号がSpring MVCが用意したスレッドで、2.nio-8080-exec-番号がTomcatのスレッドを表しています。最大スレッド3で非同期処理が行われていることがわかります。

Spring Retryを使ったリトライ処理

Springが作っている、 Spring Retry を利用すると簡単にリトライ処理を行うことができます。

実装

1.Spring Retryによるリトライ処理を有効にする

設定用のクラスに@EnableRetryを付与します。

import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;

@Configuration(proxyBeanMethods = false)
@EnableRetry
public class RetryConfig {}

2.リトライ処理したいさせたいメソッドに@Retryableを付与する。

リトライ処理させたいメソッドに@Retryableを付与してください。今回の例だと、「FailedFileUploadException.javaが投げられたらリトライ処理を行い、リトライ全部失敗したらsaveRecoverメソッドを呼ぶ」という風になっています。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.b1a9idps.springasyncdemo.dto.request.AsyncRequest;
import com.b1a9idps.springasyncdemo.exception.FailedFileUploadException;
import com.b1a9idps.springasyncdemo.infrastructure.FileService;
import com.b1a9idps.springasyncdemo.service.AsyncService;

@Service
public class AsyncServiceImpl implements AsyncService {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncServiceImpl.class);

    private final FileService fileService;

    public AsyncServiceImpl(FileService fileService) {
        this.fileService = fileService;
    }

    @Override
    @Retryable(value = FailedFileUploadException.class, recover = "saveRecover")
    @Async
    public void save(AsyncRequest request) {
        LOGGER.info("Start Async processing.(number = " + request.getNumber() + ")");

        try {
            Thread.sleep(500);
            fileService.upload();
        } catch (InterruptedException e) {
            LOGGER.error("thrown InterruptedException.");
        }

        LOGGER.info("End Async processing.(number = " + request.getNumber() + ")");
    }

    @Recover
    private void saveRecover(FailedFileUploadException e, AsyncRequest request) {
        LOGGER.error("failed to upload file(number = " + request.getNumber() + ")", e);
    }
}

挙動を見てみる

非同期の例で使ったスクリプトを使って、リクエストしてみます。

スクリプトで連続的にリクエストしてみます。

#!/bin/bash

number=1

while true;
do
  curl -X POST -H "Content-Type: application/json" -d '{"number" : "'$number'"}'  http://localhost:8080/async
  echo ''
  number=$(expr $number + 1)
  sleep 1s;
done

スクリプトの実行ログ

{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"success"}
{"message":"too busy..."}
{"message":"too busy..."}
{"message":"too busy..."}

アプリケーションログ

2021-07-09 18:55:03.083  INFO 82164 --- [nio-8080-exec-1] c.b.s.controller.AsyncController         : Start.(number = 1)
2021-07-09 18:55:03.098  INFO 82164 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 1)
2021-07-09 18:55:03.217  INFO 82164 --- [nio-8080-exec-2] c.b.s.controller.AsyncController         : Start.(number = 2)
2021-07-09 18:55:03.218  INFO 82164 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 2)
2021-07-09 18:55:03.338  INFO 82164 --- [nio-8080-exec-4] c.b.s.controller.AsyncController         : Start.(number = 3)
2021-07-09 18:55:03.458  INFO 82164 --- [nio-8080-exec-6] c.b.s.controller.AsyncController         : Start.(number = 4)
2021-07-09 18:55:03.582  INFO 82164 --- [nio-8080-exec-8] c.b.s.controller.AsyncController         : Start.(number = 5)
2021-07-09 18:55:03.603  INFO 82164 --- [         task-1] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:03.701  INFO 82164 --- [io-8080-exec-10] c.b.s.controller.AsyncController         : Start.(number = 6)
2021-07-09 18:55:03.722  INFO 82164 --- [         task-2] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:03.817  INFO 82164 --- [nio-8080-exec-2] c.b.s.controller.AsyncController         : Start.(number = 7)
2021-07-09 18:55:03.818  INFO 82164 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 7)
2021-07-09 18:55:03.936  INFO 82164 --- [nio-8080-exec-4] c.b.s.controller.AsyncController         : Start.(number = 8)
2021-07-09 18:55:04.060  INFO 82164 --- [nio-8080-exec-5] c.b.s.controller.AsyncController         : Start.(number = 9)
2021-07-09 18:55:04.182  INFO 82164 --- [nio-8080-exec-6] c.b.s.controller.AsyncController         : Start.(number = 10)
2021-07-09 18:55:04.319  INFO 82164 --- [         task-3] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:04.608  INFO 82164 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 1)
2021-07-09 18:55:04.727  INFO 82164 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 2)
2021-07-09 18:55:05.109  INFO 82164 --- [         task-1] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:05.228  INFO 82164 --- [         task-2] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:05.322  INFO 82164 --- [         task-3] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 7)
2021-07-09 18:55:05.822  INFO 82164 --- [         task-3] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:06.109  INFO 82164 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 1)
2021-07-09 18:55:06.231  INFO 82164 --- [         task-2] c.b.s.service.impl.AsyncServiceImpl      : Start Async processing.(number = 2)
2021-07-09 18:55:06.610  INFO 82164 --- [         task-1] c.b.s.i.impl.FileServiceImpl             : try upload.
2021-07-09 18:55:06.615 ERROR 82164 --- [         task-1] c.b.s.service.impl.AsyncServiceImpl      : failed to upload file(number = 1)

com.b1a9idps.springasyncdemo.exception.FailedFileUploadException: file upload failed.
    at com.b1a9idps.springasyncdemo.infrastructure.impl.FileServiceImpl.upload(FileServiceImpl.java:19) ~[main/:na]
    at com.b1a9idps.springasyncdemo.service.impl.AsyncServiceImpl.save(AsyncServiceImpl.java:34) ~[main/:na]
    at com.b1a9idps.springasyncdemo.service.impl.AsyncServiceImpl$$FastClassBySpringCGLIB$$7a0ef216.invoke(<generated>) ~[main/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93) ~[spring-retry-1.3.1.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.1.jar:na]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116) ~[spring-retry-1.3.1.jar:na]
    at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:163) ~[spring-retry-1.3.1.jar:na]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.8.jar:5.3.8]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.3.8.jar:5.3.8]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

ログを見ると、リトライ処理が行われていることがわかると思います。