CLOVER🍀

That was when it all began.

Spring FrameworkのTask Executionを試す

これは、なにをしたくて書いたもの?

Spring Frameworkには、タスク実行とスケジューリングの機能があります。

Task Execution and Scheduling

今回は、タスク実行にフォーカスして見ていきたいと思います。

具体的には、TaskExecutorとThreadPoolTaskExecutor、@Asyncアノテーションを扱います。

Task Execution

Spring FrameworkのTask Execution…タスクの実行に関するドキュメントは、こちらです。

Task Execution and Scheduling

Spring Frameworkでは、JavaのExecutorを抽象化したTaskExecutorを提供しています。

The Spring TaskExecutor Abstraction

Java SE環境であってもJava EE環境であっても、こちらを使うことでタスク実行に関する実装の詳細が隠蔽されます。

TaskExecutorはExecutorを継承したインターフェースであり、その実装はSpring Frameworkが提供しています。
Spring Frameworkの利用者自身が、TaskExecutorの実装を作成することはほとんどないでしょう。

TaskExecutor (Spring Framework 5.3.6 API)

TaskExecutor Types

TaskExecutorの実装には、以下の種類があります。

  • SyncTaskExecutor … タスクの実行は別スレッドで行うものの、タスクを非同期では実行しない実装。テストなど、マルチスレッドが不要な状況下で使用する
  • SimpleAsyncTaskExecutor … 呼び出しごとに、新しいスレッドを作成して実行する実装。同時に実行できるタスクの数は制限できる
  • ConcurrentTaskExecutor … java.util.concurrent.Executorのアダプター。通常は、ThreadPoolTaskExecutorを使用すればよい
  • ThreadPoolTaskExecutor … 最も一般的な実装。java.util.concurrent.ThreadPoolExecutorをTaskExecutorとしてラップしている。他のExecutorを使いたい場合は、ConcurrentTaskExecutorの利用を検討する
  • WorkManagerTaskExecutor … CommonJ WorkManager(WebSphere、WebLogic)をバックエンドに使用する実装
  • DefaultManagedTaskExecutor … Java EE環境におけるManagedExecutorServiceをバックエンドに使用する実装

いずれにせよ、TaskExecutorはJavaのExecutorインターフェースの拡張なので、使い方も同じになります。

Executor (Java SE 11 & JDK 11 )

よく使われそうなThreadPoolTaskExecutorはThreadPoolExecutorをラップしているので、こちらも参照すると
よいでしょう。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

ThreadPoolTaskExecutor (Spring Framework 5.3.6 API)

Spring Bootを使った場合は、ThreadPoolTaskExecutorが自動構成されます。

Task Execution and Scheduling

TaskExecutorの使用例はこちら。

Using a TaskExecutor

また、@Asyncはスケジュール実行に関する機能なのですが、今回使ってみます。

The @Async annotation

@EnableAsyncを使って有効化したうえで、処理を行うBeanのメソッドに@Asyncアノテーションを付与すると、
指定された(またはデフォルトの)TaskExecutorを使って非同期実行が行われます。

Enable Scheduling Annotations

今回は、TaskExecutorとThreadPoolTaskExecutor、@Asyncアノテーションを使っていきます。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.10 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.10, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"

Spring Bootは2.4.5を使い、依存するSpring Frameworkは5.3.6となります。

Spring Bootプロジェクトの作成

Spring Initializrを使って、Spring Bootプロジェクトを作成します。

今回のアプリケーションはWebではなく、CommandLineRunnerで確認するくらいで良いかな、と思ったので依存関係は
特に指定していません。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.4.5 \
  -d javaVersion=11 \
  -d name=task-execution \
  -d groupId=org.littlewings \
  -d artifactId=task-execution \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.task \
  -d baseDir=task-execution | tar zxvf -


$ cd task-execution

dependenciesをなにも指定しない場合は、依存関係がこれくらいのpom.xmlになります。

 <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

起動クラスは、こちらで。

src/main/java/org/littlewings/spring/task/App.java

package org.littlewings.spring.task;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
//@EnableAsync
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

コメントアウトしている@EnableAsyncアノテーションは、後述します。

実行は、以下のコマンドで行っているものとします。

$ mvn spring-boot:run

あと、CommandLineRunnerインターフェースの実装クラスは試したい機能ごとに作っていくのですが、そのまま増やすと
全部実行していってしまうので、紹介が終わったら以下のように@Componentアノテーションをコメントアウトしていく
ものとします。

// @Component
public class SimpleRunner implements CommandLineRunner {

まずは使ってみる

では、使っていってみます。

こちらを参考に

Using a TaskExecutor

こんなクラスを作成。

src/main/java/org/littlewings/spring/task/SimpleRunner.java

package org.littlewings.spring.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class SimpleRunner implements CommandLineRunner {
    Logger logger = LoggerFactory.getLogger(SimpleRunner.class);

    TaskExecutor taskExecutor;

    public SimpleRunner(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void run(String... args) throws Exception {
        taskExecutor.execute(() -> {
            logger.info("Hello World!!");
            logger.info("TaskExecutor name = {}", taskExecutor.getClass().getName());
        });

        taskExecutor.execute(() -> logger.info("Oops!!"));

        ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor;
        logger.info("core pool size = {}, max pool size = {}, keep-alive seconds = {}", threadPoolTaskExecutor.getCorePoolSize(), threadPoolTaskExecutor.getMaxPoolSize(), threadPoolTaskExecutor.getKeepAliveSeconds());
    }
}

DIする型は、TaskExecutorインターフェースでも大丈夫です。

    TaskExecutor taskExecutor;

    public SimpleRunner(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

TaskExecutor#executeメソッドに、Runnableを渡すことでタスクを実行できます。今回は、ログ出力をしているだけです。
TaskExecutorの実装クラス名も出力しています。

        taskExecutor.execute(() -> {
            logger.info("Hello World!!");
            logger.info("TaskExecutor name = {}", taskExecutor.getClass().getName());
        });

        taskExecutor.execute(() -> logger.info("Oops!!"));

TaskExecutorインターフェースができるのは、これだけです。

TaskExecutor (Spring Framework 5.3.6 API)

タスクからの戻り値も取得できません。

実体はThreadPoolTaskExecutorなので、キャストして設定を見てみましょう。

        ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor;
        logger.info("core pool size = {}, max pool size = {}, keep-alive seconds = {}", threadPoolTaskExecutor.getCorePoolSize(), threadPoolTaskExecutor.getMaxPoolSize(), threadPoolTaskExecutor.getKeepAliveSeconds());

実行結果。

2021-04-25 22:36:55.048  INFO 43283 --- [         task-1] o.littlewings.spring.task.SimpleRunner   : Hello World!!
2021-04-25 22:36:55.049  INFO 43283 --- [         task-2] o.littlewings.spring.task.SimpleRunner   : Oops!!
2021-04-25 22:36:55.049  INFO 43283 --- [         task-1] o.littlewings.spring.task.SimpleRunner   : TaskExecutor name = org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
2021-04-25 22:36:55.048  INFO 43283 --- [           main] o.littlewings.spring.task.SimpleRunner   : core pool size = 8, max pool size = 2147483647, keep-alive seconds = 60
2021-04-25 22:37:55.052  INFO 43283 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

ログのスレッド名を見ると、task-となっていて、別スレッドで実行されていることがわかります。

使用されているThreadPoolTaskExecutorがAutoConfigureされているのは、こちらですね。

https://github.com/spring-projects/spring-boot/blob/v2.4.5/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/task/TaskExecutionAutoConfiguration.java

プロパティ設定は、こちらを。

Core Properties

デフォルトでは8スレッドをコアサイズとして持ち、最大スレッド数はInteger.MAX_VALUEまで広がるスレッドプールが
構成されます。

https://github.com/spring-projects/spring-boot/blob/v2.4.5/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/task/TaskExecutionProperties.java

ところでこのプログラム、実行するとしばらく終了しません。

停止まで1分かかっています。

2021-04-25 22:36:55.048  INFO 43283 --- [           main] o.littlewings.spring.task.SimpleRunner   : core pool size = 8, max pool size = 2147483647, keep-alive seconds = 60
2021-04-25 22:37:55.052  INFO 43283 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

spring.task.execution.pool.allow-core-thread-timeoutがデフォルトでtrueになっていてタスクを実行したスレッドはタイムアウト
する設定になっているのですが、spring.task.execution.pool.keep-aliveがデフォルトで60秒なので、この間は待ち続けます。

ちょっと長いので、今回は3秒にしておきましょう。

src/main/resources/application.properties

spring.task.execution.pool.keep-alive=3s

今度は、3秒で終了するようになりました。

2021-04-25 22:44:36.363  INFO 43800 --- [         task-1] o.littlewings.spring.task.SimpleRunner   : Hello World!!
2021-04-25 22:44:36.364  INFO 43800 --- [         task-2] o.littlewings.spring.task.SimpleRunner   : Oops!!
2021-04-25 22:44:36.364  INFO 43800 --- [         task-1] o.littlewings.spring.task.SimpleRunner   : TaskExecutor name = org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
2021-04-25 22:44:36.364  INFO 43800 --- [           main] o.littlewings.spring.task.SimpleRunner   : core pool size = 8, max pool size = 2147483647, keep-alive seconds = 3
2021-04-25 22:44:39.369  INFO 43800 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

ThreadPoolTaskExecutorを使う

先ほどはTaskExecutorインターフェースのままタスクを実行しましたが、TaskExecutor#executeだとRunnableしか
実行できないのでちょっと不便です…。

もう、ThreadPoolTaskExecutorでDIしていいのではないのでしょうか?という気分になります。

というわけで、そのように書いてみました。

src/main/java/org/littlewings/spring/task/ThreadPoolTaskExecutorRunner.java

package org.littlewings.spring.task;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class ThreadPoolTaskExecutorRunner implements CommandLineRunner {
    Logger logger = LoggerFactory.getLogger(ThreadPoolTaskExecutorRunner.class);

    ThreadPoolTaskExecutor taskExecutor;

    public ThreadPoolTaskExecutorRunner(ThreadPoolTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void run(String... args) throws Exception {
        List<Future<?>> futures = new ArrayList<>();

        futures.add(taskExecutor.submit(() -> {
            logger.info("calc task start");

            try {
                TimeUnit.SECONDS.sleep(3L);
            } catch (InterruptedException e) {
                // ignore
            }

            return 1 + 2;
        }));

        futures.add(taskExecutor.submit(() -> {
            logger.info("message task start");

            try {
                TimeUnit.SECONDS.sleep(5L);
            } catch (InterruptedException e) {
                // ignore
            }

            return "Hello World!!";
        }));

        for (Future<?> future : futures) {
            logger.info("future get = {}", future.get());
        }
    }
}

やっていることは、Callableを使って結果を返すようにしてFutureを受け取り、

        futures.add(taskExecutor.submit(() -> {
            logger.info("calc task start");

            try {
                TimeUnit.SECONDS.sleep(3L);
            } catch (InterruptedException e) {
                // ignore
            }

            return 1 + 2;
        }));

        futures.add(taskExecutor.submit(() -> {
            logger.info("message task start");

            try {
                TimeUnit.SECONDS.sleep(5L);
            } catch (InterruptedException e) {
                // ignore
            }

            return "Hello World!!";
        }));

最後に待ち合わせているだけです。

        for (Future<?> future : futures) {
            logger.info("future get = {}", future.get());
        }

実行結果。

2021-04-25 22:53:40.708  INFO 44289 --- [         task-1] o.l.s.task.ThreadPoolTaskExecutorRunner  : calc task start
2021-04-25 22:53:40.709  INFO 44289 --- [         task-2] o.l.s.task.ThreadPoolTaskExecutorRunner  : message task start
2021-04-25 22:53:43.710  INFO 44289 --- [           main] o.l.s.task.ThreadPoolTaskExecutorRunner  : future get = 3
2021-04-25 22:53:45.710  INFO 44289 --- [           main] o.l.s.task.ThreadPoolTaskExecutorRunner  : future get = Hello World!!

ログを見ても、別スレッドで実行されていることが確認できますし、結果も取得できています。

@Asyncを使って非同期実行する

次は、@Asyncを使って非同期実行してみましょう。@Asyncアノテーションは、スケジュール実行側の機能です。

The @Async annotation

最初に、うまくいかない例を書いてみます。

src/main/java/org/littlewings/spring/task/MessageService.java

package org.littlewings.spring.task;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
public class MessageService {
    Logger logger = LoggerFactory.getLogger(MessageService.class);

    @Async
    public String getMessage() {
        logger.info("get message start");

        Thread.dumpStack();

        try {
            TimeUnit.SECONDS.sleep(3L);
        } catch (InterruptedException e) {
            // ignore
        }

        return "Hello World";
    }
}

こんな感じで、非同期実行したいメソッドに@Asyncアノテーションを付与します。スタックトレースの様子も見たいので、
Thread#dumpStackも含めておきました。

    @Async
    public String getMessage() {
        logger.info("get message start");

        Thread.dumpStack();

        try {
            TimeUnit.SECONDS.sleep(3L);
        } catch (InterruptedException e) {
            // ignore
        }

        return "Hello World";
    }

こちらを使うクラスを作成。このパターンでは、TaskExecutorやThreadPoolTaskExecutorは、最後までソースコードに現れません。

src/main/java/org/littlewings/spring/task/UseAsyncRunner.java

package org.littlewings.spring.task;

import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class UseAsyncRunner implements CommandLineRunner {
    Logger logger = LoggerFactory.getLogger(UseAsyncRunner.class);

    MessageService messageService;

    public UseAsyncRunner(MessageService messageService) {
        this.messageService = messageService;
    }

    @Override
    public void run(String... args) throws Exception {
        String message = messageService.getMessage();

        logger.info("return message = {}", message);
    }
}

実行してログをよく見ると、単に同じスレッドで動いているだけです。

2021-04-25 23:06:59.317  INFO 45078 --- [           main] o.l.spring.task.MessageService           : get message start
java.lang.Exception: Stack trace
    at java.base/java.lang.Thread.dumpStack(Thread.java:1388)
    at org.littlewings.spring.task.MessageService.getMessage(MessageService.java:22)
    at org.littlewings.spring.task.UseAsyncRunner.run(UseAsyncRunner.java:22)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:819)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:803)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:346)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1340)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329)
    at org.littlewings.spring.task.App.main(App.java:11)
2021-04-25 23:07:02.318  INFO 45078 --- [           main] o.l.spring.task.UseAsyncRunner           : return message = Hello World

この機能を使うには、@EnableAsyncアノテーションを使う必要があるからです。

Enable Scheduling Annotations

これで、@Asyncアノテーションが機能するようになります。

@SpringBootApplication
@EnableAsync
public class App {

再度実行。

2021-04-25 23:08:43.523  INFO 45212 --- [           main] o.l.spring.task.UseAsyncRunner           : return message = null
2021-04-25 23:08:43.534  INFO 45212 --- [         task-1] o.l.spring.task.MessageService           : get message start
java.lang.Exception: Stack trace
    at java.base/java.lang.Thread.dumpStack(Thread.java:1388)
    at org.littlewings.spring.task.MessageService.getMessage(MessageService.java:22)
    at org.littlewings.spring.task.MessageService$$FastClassBySpringCGLIB$$ace3701c.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

今度は、別スレッドで動くようになりました。スタックトレースを見ると、インターセプターが入っていますね。

ですが、メソッドの戻り値が取得できていません。

2021-04-25 23:08:43.523  INFO 45212 --- [           main] o.l.spring.task.UseAsyncRunner           : return message = null

ドキュメントをちゃんと読むと、値を返す場合はFutureを使う必要があるようです。

Even methods that return a value can be invoked asynchronously. However, such methods are required to have a Future-typed return value. This still provides the benefit of asynchronous execution so that the caller can perform other tasks prior to calling get() on that Future.

The @Async annotation

ドキュメントを見ていると戻り値がvoidの例ばかりですが、voidの場合だと結果を待たない(待てない)非同期実行になります。
メソッドの戻り値が不要であれば、voidにして実行してもよいでしょう。

今回は戻り値を使いたいので、メソッドの戻り値がFutureとなるように変更してみます。

src/main/java/org/littlewings/spring/task/MessageService.java

package org.littlewings.spring.task;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
public class MessageService {
    Logger logger = LoggerFactory.getLogger(MessageService.class);

    @Async
    public Future<String> getMessage() {
        logger.info("get message start");

        Thread.dumpStack();

        try {
            TimeUnit.SECONDS.sleep(3L);
        } catch (InterruptedException e) {
            // ignore
        }

        // return "Hello World";
        return new AsyncResult<>("Hello World");
    }
}

ドキュメントがvoidの例ばかりだったので、Futureを返すにはどうしたら…?という気分になりましたが、AsyncResultを
使うのが良さそうです。

AsyncResult (Spring Framework 5.3.6 API)

呼び出し元も変更。

src/main/java/org/littlewings/spring/task/UseAsyncRunner.java

package org.littlewings.spring.task;

import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class UseAsyncRunner implements CommandLineRunner {
    Logger logger = LoggerFactory.getLogger(UseAsyncRunner.class);

    MessageService messageService;

    public UseAsyncRunner(MessageService messageService) {
        this.messageService = messageService;
    }

    @Override
    public void run(String... args) throws Exception {
        Future<String> message = messageService.getMessage();

        logger.info("return message = {}", message.get());
    }
}

では、実行。

2021-04-25 23:43:14.929  INFO 47028 --- [         task-1] o.l.spring.task.MessageService           : get message start
java.lang.Exception: Stack trace
    at java.base/java.lang.Thread.dumpStack(Thread.java:1388)
    at org.littlewings.spring.task.MessageService.getMessage(MessageService.java:37)
    at org.littlewings.spring.task.MessageService$$FastClassBySpringCGLIB$$ace3701c.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-25 23:43:17.931  INFO 47028 --- [           main] o.l.spring.task.UseAsyncRunner           : return message = Hello World

OKですね。

ちなみに、Future以外にもSpringが提供するListenableFuture、そしてJava 8以降のCompletableFutureも戻り値として
使えます。

ドキュメントに記載がありますし、

@Async methods may not only declare a regular java.util.concurrent.Future return type but also Spring’s org.springframework.util.concurrent.ListenableFuture or, as of Spring 4.2, JDK 8’s java.util.concurrent.CompletableFuture, for richer interaction with the asynchronous task and for immediate composition with further processing steps.

The @Async annotation

このあたりを見ればわかります。

https://github.com/spring-projects/spring-framework/blob/v5.3.6/spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java#L271-L293

呼び出し元は、こちら。メソッドをCallableでラップして実行します。

https://github.com/spring-projects/spring-framework/blob/v5.3.6/spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java#L100-L130

この時使われるTaskExecutorは、これまで使ってきたSpring Bootでデフォルト構成されたThreadPoolTaskExecutorです。

また、今回は扱いませんが、例外処理についてはこちらのようです。

Exception Management with @Async

自分でThreadPoolTaskExecutorのBeanを作成する

最後に、自分でThreadPoolTaskExecutorのBeanを作成してみましょう。

こんな感じで、2つのThreadPoolTaskExecutorのBeanを登録。

src/main/java/org/littlewings/spring/task/MyTaskExecutorConfiguration.java

package org.littlewings.spring.task;

import java.time.Duration;

import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class MyTaskExecutorConfiguration {
    @Bean("largePoolTaskExecutor")
    public ThreadPoolTaskExecutor largePoolTaskExecutor() {
        TaskExecutorBuilder builder = new TaskExecutorBuilder();

        return builder
                .corePoolSize(16)
                .maxPoolSize(16)
                .keepAlive(Duration.ofSeconds(2L))
                .queueCapacity(Integer.MAX_VALUE)
                .threadNamePrefix("large-pool-task-")
                .allowCoreThreadTimeOut(true)
                .build();
    }

    @Bean("smallPoolTaskExecutor")
    public ThreadPoolTaskExecutor smallPoolTaskExecutor() {
        TaskExecutorBuilder builder = new TaskExecutorBuilder();

        return builder
                .corePoolSize(2)
                .maxPoolSize(2)
                .keepAlive(Duration.ofSeconds(2L))
                .queueCapacity(Integer.MAX_VALUE)
                .threadNamePrefix("small-pool-task-")
                .allowCoreThreadTimeOut(true)
                .build();
    }
}

ThreadPoolTaskExecutorの作成には、TaskExecutorBuilderを使うのが良いでしょう。

The auto-configured TaskExecutorBuilder allows you to easily create instances that reproduce what the auto-configuration does by default.

Both a TaskExecutorBuilder bean and a TaskSchedulerBuilder bean are made available in the context if a custom executor or scheduler needs to be created.

Task Execution and Scheduling

TaskExecutorBuilder (Spring Boot 2.4.5 API)

使い方は、Spring Boot自身が参考になります。

https://github.com/spring-projects/spring-boot/blob/v2.4.5/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/task/TaskExecutionAutoConfiguration.java

今回は、サイズ固定のスレッドプールを2つ用意しました。

    @Bean("largePoolTaskExecutor")
    public ThreadPoolTaskExecutor largePoolTaskExecutor() {
        TaskExecutorBuilder builder = new TaskExecutorBuilder();

        return builder
                .corePoolSize(16)
                .maxPoolSize(16)
                .keepAlive(Duration.ofSeconds(2L))
                .queueCapacity(Integer.MAX_VALUE)
                .threadNamePrefix("large-pool-task-")
                .allowCoreThreadTimeOut(true)
                .build();
    }

    @Bean("smallPoolTaskExecutor")
    public ThreadPoolTaskExecutor smallPoolTaskExecutor() {
        TaskExecutorBuilder builder = new TaskExecutorBuilder();

        return builder
                .corePoolSize(2)
                .maxPoolSize(2)
                .keepAlive(Duration.ofSeconds(2L))
                .queueCapacity(Integer.MAX_VALUE)
                .threadNamePrefix("small-pool-task-")
                .allowCoreThreadTimeOut(true)
                .build();
    }

定義したThreadPoolTaskExecutorを使う側のクラス。

src/main/java/org/littlewings/spring/task/CustomTaskExecutorRunner.java

package org.littlewings.spring.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class CustomTaskExecutorRunner implements CommandLineRunner {
    Logger logger = LoggerFactory.getLogger(CustomTaskExecutorRunner.class);

    ThreadPoolTaskExecutor largePoolTaskExecutor;
    ThreadPoolTaskExecutor smallPoolTaskExecutor;

    public CustomTaskExecutorRunner(@Qualifier("largePoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor1,
                                    @Qualifier("smallPoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor2) {
        this.largePoolTaskExecutor = taskExecutor1;
        this.smallPoolTaskExecutor = taskExecutor2;
    }

    @Override
    public void run(String... args) throws Exception {
        largePoolTaskExecutor.execute(() -> logger.info("hello"));
        smallPoolTaskExecutor.execute(() -> logger.info("world"));

        logger.info("pool prefix = {}, core pool size = {}, max pool size = {}, keep-alive seconds = {}", largePoolTaskExecutor.getThreadNamePrefix(), largePoolTaskExecutor.getCorePoolSize(), largePoolTaskExecutor.getMaxPoolSize(), largePoolTaskExecutor.getKeepAliveSeconds());
        logger.info("pool prefix = {}, core pool size = {}, max pool size = {}, keep-alive seconds = {}", smallPoolTaskExecutor.getThreadNamePrefix(), smallPoolTaskExecutor.getCorePoolSize(), smallPoolTaskExecutor.getMaxPoolSize(), smallPoolTaskExecutor.getKeepAliveSeconds());
    }
}

ThreadPoolTaskExecutorの区別は、@Qualifierアノテーションで行うことにしました。

    public CustomTaskExecutorRunner(@Qualifier("largePoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor1,
                                    @Qualifier("smallPoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor2) {
        this.largePoolTaskExecutor = taskExecutor1;
        this.smallPoolTaskExecutor = taskExecutor2;
    }

今回のThreadPoolTaskExecutorの使い道は、単にメッセージのログ出力と、設定値の確認のためのログ出力くらいですけどね。

実行結果。

2021-04-26 00:07:16.757  INFO 48670 --- [rge-pool-task-1] o.l.s.task.CustomTaskExecutorRunner      : hello
2021-04-26 00:07:16.758  INFO 48670 --- [all-pool-task-1] o.l.s.task.CustomTaskExecutorRunner      : world
2021-04-26 00:07:16.758  INFO 48670 --- [           main] o.l.s.task.CustomTaskExecutorRunner      : pool prefix = large-pool-task-, core pool size = 16, max pool size = 16, keep-alive seconds = 2
2021-04-26 00:07:16.758  INFO 48670 --- [           main] o.l.s.task.CustomTaskExecutorRunner      : pool prefix = small-pool-task-, core pool size = 2, max pool size = 2, keep-alive seconds = 2

設定した内容が反映されているようです。

OKですね。

このように自分で定義したTaskExecutorを@Asyncアノテーションで指定するには、@Asyncアノテーションのvalueに
指定すればよいみたいです。

Executor Qualification with @Async

今回は、扱いませんが。

まとめ

Spring FrameworkのTask Executionを、TaskExecutorとThreadPoolTaskExecutor、@Asyncアノテーションに絞って
使ってみました。

JavaのExecutorServiceとそう変わらないのだろうと思っていたのですが、それが正解でもあり、固有のこともあった感じですね。

勉強になりました、と。