CLOVER🍀

That was when it all began.

ThreadPoolExecutorの設定項目を確認する

これは、なにをしたくて書いたもの?

今までConcurrency Utilitiesで提供されるスレッドプールを使ってきたことはあったのですが、その設定内容をちゃんと
見てきていなかったので今回見てみようかなと。

具体的には、ThreadPoolExecutorを見ていこうと思います。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

Executorsとスレッドプール

Javaでスレッドプールを使うとなると、登場するのがExecutorsです。

Executors (Java SE 11 & JDK 11 )

Executorsが提供する、以下のスレッドプールの中から使うことが多いでしょう。

  • Executors#newFixedThreadPool … 固定サイズのスレッドプール
  • Executors#newSingleThreadExecutor … スレッドサイズがひとつのExecutor
  • Executors#newCachedThreadPool … プールサイズがタスク数に応じて拡張および縮小されるスレッドプール
  • Executors#newScheduledThreadPool … 実行をスケジューリングできるスレッドプール
  • Executors#newWorkStealingPool … work-stealingプール

今回は、Executors#newFixedThreadPool、Executors#newSingleThreadExecutor、Executors#newCachedThreadPoolを
対象にします。

いずれも、スレッド数が固定のスレッドプール、シングルスレッド、タスク数に応じて拡張・縮小が行われるスレッドプール、
くらいのイメージで使っているのですが、これらを構成するために使われているのがThreadPoolExecutorです。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

それぞれ定義を見ていってみましょう。

newFixedThreadPool。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L91-L95

newSingleThreadExecutor。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L174-L179

newCachedThreadPool。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L217-L221

ThreadPoolExecutor

ThreadPoolExecutorのJavadocを見ると、いろいろ設定できることがわかります。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

  • コアプールサイズ(corePoolSize)
  • 最大プールサイズ(maximumPoolSize)
  • タスクを保持するキュー
  • Keep-Aliveタイムアウト(keepAliveTime)
  • コアプールスレッドがタイムアウトするか(allowCoreThreadTimeOut)
  • タスクの拒否

その他、事前にスレッドを開始したり(prestartCoreThread、prestartAllCoreThreads)、タスクを実行する前にフックを
かけたり(beforeExecute、afterExecute)、スレッドの作成方法を指定したり(ThreadFactory)もできます。

コアプールサイズ、最大プールサイズ、キュー、Keep-Aliveタイムアウトにはいろいろ関係がありそうな感じです。

Executors#newFixedThreadPool、Executors#newSingleThreadExecutor、Executors#newCachedThreadPoolで
それぞれどうなっているか、表にしてみます。

スレッドプールの種類 コアプールサイズ 最大プールサイズ キュー Keep-Aliveタイムアウト
Executors#newFixedThreadPool 指定サイズ 指定サイズ(コアプールサイズと同じ) LinkedBlockingQueue(サイズ制限なしのBlockingQueue) 0(Keep-Aliveしない)
Executors#newSingleThreadExecutor 1 1 LinkedBlockingQueue(サイズ制限なしのBlockingQueue) 0(Keep-Aliveしない)
Executors#newCachedThreadPool 1 Integer#MAX_VALUE SynchronousQueue(キャパシティのないBlockingQueue) 60秒

コアプールサイズが下限、最大プールサイズが上限、といった印象を受けるのですが、キューの性質も影響します。

ThreadPoolExecutorのキューイングの部分を見ると、こんなことが書いてあります。

  • corePoolSizeよりも少ない数のスレッドが実行されている場合、executorはキューイングよりも新しいスレッドの追加を常に優先します。
  • corePoolSize以上の数のスレッドが実行されている場合、executorは新しいスレッドの追加よりも要求のキューイングを常に優先します。
  • 要求をキューに入れることができない場合、新しいスレッドを作成することによりmaximumPoolSizeを超えない場合は新しいスレッドが作成され、超える場合はタスクが拒否されます。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

これだけ見ると、プールサイズの関係は先述のように(単なる下限、上限)見えなくもないのですが、キューに使用する性質に
よって動きが変わります。

キューに指定するのは、BlockingQueueインターフェースの実装ですね。

BlockingQueue (Java SE 11 & JDK 11 )

  • LinkedBlockingQueueのようなサイズのないキューを使う場合
    • コアスレッド数で指定したスレッドを使い切っている場合、タスクはキュー内に待機する
    • このようなキューを使う場合、最大プールサイズは効果がなくなるため、コアスレッド数と同じ値を指定するのがよい
    • Executors#newFixedThreadPoolとExecutors#newSingleThreadExecutorがこのパターン
  • SynchronousQueueのような、容量を持たないキューを使う場合
    • タスクを実行するために利用できるスレッドがない場合、スレッドを新たに追加しようとする
    • このようなキューを使う場合、十分に大きな最大プールサイズが必要
    • Executors#newCachedThreadPoolがこのパターンであり、コアプールサイズが1、最大プールサイズがInteger#MAX_VALUE
  • ArrayBlockingQueueのような、容量制限のあるキューを使う場合
    • コアプールサイズのスレッドを使いきっていて、キューの容量も使いきっている場合、新しいスレッドが追加される
    • 追加されるスレッドは、最大プールサイズまで拡張できるが、それを超えると拒否される

この内容を見た後に、あらためて各スレッドプールの設定(ThreadPoolExecutorのコンストラクタに渡している引数)を
見ると、なるほど、という感じがします。

キューにサイズ制限がない場合はスレッド数を固定し、キューに容量がない場合はスレッド数が拡張されるようにする、
というように、どちらかを固定した方がわかりやすいということですね。

ところで、キューを使うか最大プールサイズまで拡張するかですが、コアプールサイズのスレッド数を超えた時に
先に使うのはキューになるんですよね。キューがいっぱいになったら、最大プールサイズまでスレッドが追加できます。

ソースコードでは、このあたりですね。

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1347

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1354

あと、Keep-Aliveタイムアウトの効果についてですが。コアプールサイズより多くのスレッドを保持している場合、アイドル状態の
スレッドはKeep-Aliveタイムアウトに設定した時間が経過すると終了していきます。コアプールサイズのスレッド数は、
維持されます。

Keep-Aliveタイムアウトを設定したうえで、さらにallowCoreThreadTimeOutをtrueにした場合、現在のスレッド数が
コアプールサイズ以下であってもKeep-Aliveタイムアウトで指定した時間以上にアイドル状態が続くとスレッド終了の
対象となります。

このあたりの話は、以下に記述されています。

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1029-L1062

ちなみにKeep-Aliveタイムアウトを0に指定すると、コアプールサイズより多いスレッドはすぐに終了します。

説明はこれくらいにして、動作確認をしていきましょう。ThreadPoolExecutorを使って、ここまで書いてきたことを
試していきたいと思います。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"

準備

pom.xmlの主要な部分。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.19.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

動作確認は、テストコードで行います。テストコードの雛形は、以下のように用意しました。

src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java

package org.littlewings.concurrent;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ThreadPoolExecutorTest {
    private void sleep(Duration duration) {
        try {
            TimeUnit.SECONDS.sleep(duration.getSeconds());
        } catch (InterruptedException e) {
            // ignore
        }
    }

    private void log(String format, Object... args) {
        System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args));
    }

    private <T> T getFuture(Future<T> future) {
        try {
            return future.get();
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // ここにテストを書く!!
}

コメントを記述している部分に、テストを書いてきます。

テストコード内で使う、スリープ、ログ出力、Future用のメソッドも用意。

基本パターン

まずは基本的なパターンということで、Executorsが提供しているスレッドプールと同じものを定義してみましょう。

Executors#newSingleThreadExecutorと同等のもの。

    @Test
    public void singleThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(1);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
        assertThat(executor.getPoolSize()).isEqualTo(1);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプールサイズ、最大プールサイズが1、Keep-Aliveタイムアウトはなし、キューは容量制限のないLinkedBlockingQueueです。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

厳密に言うと、Executors#newSingleThreadExecutorとはFinalizableDelegatedExecutorServiceでラップしていないところが
異なりますが。

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L755-L764

タスク内では2秒ずつスリープしていて、

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

スレッドは1つしかないので、順次実行される状態になり、実行時間は6秒ちょっとになります。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

設定と、実際に保持しているスレッド数(ThreadPoolExecutor#getPoolSize)、タスク数(ThreadPoolExecutor#getActiveCount)の確認。

        assertThat(executor.getCorePoolSize()).isEqualTo(1);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
        assertThat(executor.getPoolSize()).isEqualTo(1);
        assertThat(executor.getActiveCount()).isEqualTo(0);

全タスクは完了しているので、getActiveCountの戻り値は0ですね。

実行時のログ。

[2021-04-29T20:07:36.041148] pool-7-thread-1 - wait...
[2021-04-29T20:07:38.041595] pool-7-thread-1 - wake up
[2021-04-29T20:07:38.042653] pool-7-thread-1 - wait...
[2021-04-29T20:07:40.043593] pool-7-thread-1 - wake up
[2021-04-29T20:07:40.044089] pool-7-thread-1 - wait...
[2021-04-29T20:07:42.044462] pool-7-thread-1 - wake up

次は、Executors#newFixedThreadPoolと同じものを作ってみます。

    @Test
    public void fixedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

Executors#newFixedThreadPoolをマネた時と比べると、コアプールサイズ、最大プールサイズを1より大きくして、
かつ同じ値にしたものです。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

実行しているタスクも同じですが、今回は2スレッド使っているので実行時間が少し短くなっています。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));

スレッドプールには、スレッドが2つ残っています。

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

実行時のログ。

[2021-04-29T20:07:01.948101] pool-2-thread-1 - wait...
[2021-04-29T20:07:01.948795] pool-2-thread-2 - wait...
[2021-04-29T20:07:03.948678] pool-2-thread-1 - wake up
[2021-04-29T20:07:03.949356] pool-2-thread-1 - wait...
[2021-04-29T20:07:03.950086] pool-2-thread-2 - wake up
[2021-04-29T20:07:05.950024] pool-2-thread-1 - wake up

最後は、Executors#newCachedThreadPoolと同等のもので。

    @Test
    public void cachedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                        60L, TimeUnit.SECONDS,
                        new SynchronousQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));

        assertThat(executor.getCorePoolSize()).isEqualTo(0);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプールサイズが0、最大プールサイズがInteger#MAX_VALUE、Keep-Aliveタイムアウトが60秒、キューが容量のない
SynchronousQueueです。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                        60L, TimeUnit.SECONDS,
                        new SynchronousQueue<>());

アイドル状態のスレッドがない限りは、タスクが追加されるとスレッドが増えるので、今回のパターンだと3スレッドになって
1番実行時間が短くなります。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));

スレッド数は、3になりました。

        assertThat(executor.getCorePoolSize()).isEqualTo(0);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getActiveCount()).isEqualTo(0);

実行時のログ。

[2021-04-29T20:07:34.028435] pool-6-thread-1 - wait...
[2021-04-29T20:07:34.029653] pool-6-thread-2 - wait...
[2021-04-29T20:07:34.030378] pool-6-thread-3 - wait...
[2021-04-29T20:07:36.028956] pool-6-thread-1 - wake up
[2021-04-29T20:07:36.035749] pool-6-thread-2 - wake up
[2021-04-29T20:07:36.036370] pool-6-thread-3 - wake up

まずは、Executorsが提供しているものとほぼ同じ設定で試してみました。ここから先は、もう少しThreadPoolExecutorの
設定を変えていってみましょう。

キュー

最初は、キューに関して見ていってみましょう。といっても、こちらはLinkedBlockingQueueを使ったままで、タスクが
キューに入っているところを見るだけですが。

    @Test
    public void queueing() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 20)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(18);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプールサイズ、最大プールサイズを2にして、

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

タスクを20個詰め込みます。

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 20)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

スレッドは2つしかないので、実行直後はキューに18個のタスクが入っています。

        assertThat(executor.getQueue().size()).isEqualTo(18);

20個のタスクを2スレッドで実行していくので、実行時間はこうなりますね。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));

実行時のログ。

[2021-04-29T20:07:09.993991] pool-4-thread-1 - wait...
[2021-04-29T20:07:09.994222] pool-4-thread-2 - wait...
[2021-04-29T20:07:11.994487] pool-4-thread-1 - wake up
[2021-04-29T20:07:11.994760] pool-4-thread-2 - wake up
[2021-04-29T20:07:11.995325] pool-4-thread-1 - wait...
[2021-04-29T20:07:11.995693] pool-4-thread-2 - wait...
[2021-04-29T20:07:13.996178] pool-4-thread-1 - wake up
[2021-04-29T20:07:13.996881] pool-4-thread-1 - wait...
[2021-04-29T20:07:13.996547] pool-4-thread-2 - wake up
[2021-04-29T20:07:13.997756] pool-4-thread-2 - wait...
[2021-04-29T20:07:15.997477] pool-4-thread-1 - wake up
[2021-04-29T20:07:15.998256] pool-4-thread-1 - wait...
[2021-04-29T20:07:15.998240] pool-4-thread-2 - wake up
[2021-04-29T20:07:15.998992] pool-4-thread-2 - wait...
[2021-04-29T20:07:17.998718] pool-4-thread-1 - wake up
[2021-04-29T20:07:17.999219] pool-4-thread-1 - wait...
[2021-04-29T20:07:17.999574] pool-4-thread-2 - wake up
[2021-04-29T20:07:18.000390] pool-4-thread-2 - wait...
[2021-04-29T20:07:20.000014] pool-4-thread-1 - wake up
[2021-04-29T20:07:20.000971] pool-4-thread-2 - wake up
[2021-04-29T20:07:20.001085] pool-4-thread-1 - wait...
[2021-04-29T20:07:20.001895] pool-4-thread-2 - wait...
[2021-04-29T20:07:22.002559] pool-4-thread-1 - wake up
[2021-04-29T20:07:22.003006] pool-4-thread-2 - wake up
[2021-04-29T20:07:22.003315] pool-4-thread-1 - wait...
[2021-04-29T20:07:22.003510] pool-4-thread-2 - wait...
[2021-04-29T20:07:24.003844] pool-4-thread-1 - wake up
[2021-04-29T20:07:24.004008] pool-4-thread-2 - wake up
[2021-04-29T20:07:24.004724] pool-4-thread-1 - wait...
[2021-04-29T20:07:24.004957] pool-4-thread-2 - wait...
[2021-04-29T20:07:26.005308] pool-4-thread-1 - wake up
[2021-04-29T20:07:26.005540] pool-4-thread-2 - wake up
[2021-04-29T20:07:26.006187] pool-4-thread-2 - wait...
[2021-04-29T20:07:26.006003] pool-4-thread-1 - wait...
[2021-04-29T20:07:28.006544] pool-4-thread-2 - wake up
[2021-04-29T20:07:28.007051] pool-4-thread-1 - wake up
[2021-04-29T20:07:28.007251] pool-4-thread-2 - wait...
[2021-04-29T20:07:28.007647] pool-4-thread-1 - wait...
[2021-04-29T20:07:30.007880] pool-4-thread-2 - wake up
[2021-04-29T20:07:30.008026] pool-4-thread-1 - wake up

スレッド数を最大プールサイズまで広げる

次は、コアプールサイズと最大プールサイズを異なる値にして、スレッド数が広がるところを見てみます。

これには、キューも関係します。

要求をキューに入れることができない場合、新しいスレッドを作成することによりmaximumPoolSizeを超えない場合は新しいスレッドが作成され、超える場合はタスクが拒否されます。

容量のないキュー、または容量制限のあるキューを選択する必要があるようです。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

今回は、容量制限のあるArrayBlockingQueueを選択してみました。プールが拡張される様子を見るのであれば
SynchronousQueueでもいいのですが、キューにタスクが詰め込まれているところも見たかったので。

    @Test
    public void expand() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプールサイズを2、最大プールサイズを4、キューのサイズを2にしています。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

タスクを6つ登録すると

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

キューに2つ入り、スレッドも4まで拡張されます。

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

キューに入りきれなかったので、スレッドが追加された状態ですね。

4スレッドで6つのタスクを扱うので、実行時間はこうなります。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

ログ。

[2021-04-29T20:07:05.979826] pool-3-thread-1 - wait...
[2021-04-29T20:07:05.979981] pool-3-thread-2 - wait...
[2021-04-29T20:07:05.986304] pool-3-thread-3 - wait...
[2021-04-29T20:07:05.988164] pool-3-thread-4 - wait...
[2021-04-29T20:07:07.982240] pool-3-thread-1 - wake up
[2021-04-29T20:07:07.983124] pool-3-thread-1 - wait...
[2021-04-29T20:07:07.984288] pool-3-thread-2 - wake up
[2021-04-29T20:07:07.985241] pool-3-thread-2 - wait...
[2021-04-29T20:07:07.989875] pool-3-thread-3 - wake up
[2021-04-29T20:07:07.990235] pool-3-thread-4 - wake up
[2021-04-29T20:07:09.984001] pool-3-thread-1 - wake up
[2021-04-29T20:07:09.986035] pool-3-thread-2 - wake up

ちなみに、コアプールサイズ以上に拡張された分のスレッドは、タスクが完了するといなくなります。

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1044-L1049

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1052-L1057

これを維持するには、Keep-Aliveタイムアウトの指定が必要になります。

タスクの拒否

キューの容量を使い切り、最大プールサイズまで拡張された状態でタスクを追加しようとすると、そのタスクは拒否されます。

前述の最大プールサイズまで拡張されるコードがすでにキューの容量、プールサイズの両方でギリギリだったので、ここに
さらにタスクを追加してみます。

    @Test
    public void reject() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        for (int i = 7; i <= 8; i++) {
            int counter = i;
            assertThatThrownBy(() ->
                    executor.submit(() -> {
                        log("wait...");
                        sleep(Duration.ofSeconds(2L));
                        log("wake up");
                        return counter;
                    })
            ).isInstanceOf(RejectedExecutionException.class);
        }

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

すると、こんな感じで拒否されます。

        for (int i = 7; i <= 8; i++) {
            int counter = i;
            assertThatThrownBy(() ->
                    executor.submit(() -> {
                        log("wait...");
                        sleep(Duration.ofSeconds(2L));
                        log("wake up");
                        return counter;
                    })
            ).isInstanceOf(RejectedExecutionException.class);
        }

この時の動作は、RejectedExecutionHandlerインターフェースの実装により指定できます。デフォルトでは、
ThreadPoolExecutor.AbortPolicyが指定されたことになっています

RejectedExecutionHandler (Java SE 11 & JDK 11 )

ログは先ほどと同じような内容になりますが(追加分のタスクが拒否されただけだから)、一応載せておきます。

[2021-04-29T20:07:30.015131] pool-5-thread-1 - wait...
[2021-04-29T20:07:30.018122] pool-5-thread-2 - wait...
[2021-04-29T20:07:30.031335] pool-5-thread-3 - wait...
[2021-04-29T20:07:30.037615] pool-5-thread-4 - wait...
[2021-04-29T20:07:32.016207] pool-5-thread-1 - wake up
[2021-04-29T20:07:32.016742] pool-5-thread-1 - wait...
[2021-04-29T20:07:32.018562] pool-5-thread-2 - wake up
[2021-04-29T20:07:32.018948] pool-5-thread-2 - wait...
[2021-04-29T20:07:32.032788] pool-5-thread-3 - wake up
[2021-04-29T20:07:32.039120] pool-5-thread-4 - wake up
[2021-04-29T20:07:34.017244] pool-5-thread-1 - wake up
[2021-04-29T20:07:34.019358] pool-5-thread-2 - wake up

Keep-Aliveタイムアウト

最後は、Keep-Aliveタイムアウトです。Keep-Aliveタイムアウトを指定することで、スレッド数がコアプールサイズよりも
多くなっている場合は、Keep-Aliveタイムアウトで指定した時間以上にアイドル状態が続くとスレッドが終了します。

    @Test
    public void keepAlive() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(2);

        executor.shutdown();
    }

今回は、Keep-Aliveタイムアウトを5秒にしてみました。コアプールサイズは2、最大プールサイズは4です。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

タスクを追加することで、キューにタスクが入り、スレッドプールも拡張されます。

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

タスクの実行後、しばらく待っているとスレッド数がコアプールサイズまで減少することが確認できます。

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(2);

実行時のログ。

[2021-04-29T21:20:12.075690] pool-1-thread-1 - wait...
[2021-04-29T21:20:12.079657] pool-1-thread-2 - wait...
[2021-04-29T21:20:12.079740] pool-1-thread-4 - wait...
[2021-04-29T21:20:12.079747] pool-1-thread-3 - wait...
[2021-04-29T21:20:14.087106] pool-1-thread-1 - wake up
[2021-04-29T21:20:14.087656] pool-1-thread-2 - wake up
[2021-04-29T21:20:14.088526] pool-1-thread-1 - wait...
[2021-04-29T21:20:14.089013] pool-1-thread-3 - wake up
[2021-04-29T21:20:14.088705] pool-1-thread-2 - wait...
[2021-04-29T21:20:14.088548] pool-1-thread-4 - wake up
[2021-04-29T21:20:16.089248] pool-1-thread-1 - wake up
[2021-04-29T21:20:16.091406] pool-1-thread-2 - wake up
[2021-04-29T21:20:16.097731] main - sleeping...

コアプールもタイムアウトさせる

最後です。Keep-Aliveタイムアウトを指定すると、アイドル状態のスレッドはコアプールサイズまで減少していきます。

ですが、allowCoreThreadTimeOutをtrueにすると、コアプールサイズ分のスレッドも終了します。

こちらは、ArrayBlockingQueueとLinkedBlockingQueueの2つを使って試してみました。

Keep-Aliveタイムアウトを5秒、キューをArrayBlockingQueueにして、コアプールサイズを2、最大プールサイズを4にしてみます。

    @Test
    public void keepAliveAllowCoreThreadTimeOut1() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

allowCoreThreadTimeOutをtrueにしているのがポイントであり、Keep-Aliveタイムアウトが0より大きくしたうえで
設定する必要があります。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));
        executor.allowCoreThreadTimeOut(true);

タスクの終了後はスレッド数は最大プールサイズまで拡張されていますが

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

アイドル状態が続くと、コアプールサイズ分のスレッドも含めて終了します。

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

ログ。

[2021-04-29T21:21:02.180550] pool-8-thread-1 - wait...
[2021-04-29T21:21:02.180915] pool-8-thread-3 - wait...
[2021-04-29T21:21:02.180639] pool-8-thread-2 - wait...
[2021-04-29T21:21:02.181316] pool-8-thread-4 - wait...
[2021-04-29T21:21:04.181189] pool-8-thread-1 - wake up
[2021-04-29T21:21:04.181289] pool-8-thread-3 - wake up
[2021-04-29T21:21:04.181584] pool-8-thread-1 - wait...
[2021-04-29T21:21:04.181571] pool-8-thread-2 - wake up
[2021-04-29T21:21:04.181791] pool-8-thread-4 - wake up
[2021-04-29T21:21:04.181729] pool-8-thread-3 - wait...
[2021-04-29T21:21:06.181957] pool-8-thread-1 - wake up
[2021-04-29T21:21:06.182476] pool-8-thread-3 - wake up
[2021-04-29T21:21:06.183003] main - sleeping...

次は、キューをLinkedBlockingQueueにして、プールサイズを2つとも2にしてみました。Executors#newFixedThreadPoolで
スレッドが終了するようなバージョンですね。

    @Test
    public void keepAliveAllowCoreThreadTimeOut2() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        5000L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

こちらもタスク実行後はKeep-Aliveタイムアウトで指定した時間が経過した後、スレッドがすべて終了します。

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

ログ。

[2021-04-29T21:21:12.187598] pool-9-thread-1 - wait...
[2021-04-29T21:21:12.187732] pool-9-thread-2 - wait...
[2021-04-29T21:21:14.188006] pool-9-thread-1 - wake up
[2021-04-29T21:21:14.188197] pool-9-thread-2 - wake up
[2021-04-29T21:21:14.188389] pool-9-thread-1 - wait...
[2021-04-29T21:21:14.188424] pool-9-thread-2 - wait...
[2021-04-29T21:21:16.188723] pool-9-thread-1 - wake up
[2021-04-29T21:21:16.188837] pool-9-thread-2 - wake up
[2021-04-29T21:21:16.189002] pool-9-thread-1 - wait...
[2021-04-29T21:21:16.189090] pool-9-thread-2 - wait...
[2021-04-29T21:21:18.189300] pool-9-thread-1 - wake up
[2021-04-29T21:21:18.189366] pool-9-thread-2 - wake up
[2021-04-29T21:21:18.190522] main - sleeping...

とりあえず、気になるところはざっと確認できた感じです。

まとめ

ThreadPoolExecutorの設定項目を、いろいろ確認してみました。ふだんはExecutorsの裏に隠れているのであまり気にすることは
ないかもしれませんが、意味を把握しておくとThreadPoolExecutorを拡張したようなスレッドプールの設定をしたりする時などに
役に立つかもしれません。

最後に、今回作成したソースコードの全体を載せておきます。

src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java

package org.littlewings.concurrent;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ThreadPoolExecutorTest {
    private void sleep(Duration duration) {
        try {
            TimeUnit.SECONDS.sleep(duration.getSeconds());
        } catch (InterruptedException e) {
            // ignore
        }
    }

    private void log(String format, Object... args) {
        System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args));
    }

    private <T> T getFuture(Future<T> future) {
        try {
            return future.get();
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void singleThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(1);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
        assertThat(executor.getPoolSize()).isEqualTo(1);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void fixedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void cachedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                        60L, TimeUnit.SECONDS,
                        new SynchronousQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));

        assertThat(executor.getCorePoolSize()).isEqualTo(0);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void queueing() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 20)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(18);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void expand() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }


    @Test
    public void keepAlive() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(2);

        executor.shutdown();
    }


    @Test
    public void keepAliveAllowCoreThreadTimeOut1() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void keepAliveAllowCoreThreadTimeOut2() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        5000L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void reject() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        for (int i = 7; i <= 8; i++) {
            int counter = i;
            assertThatThrownBy(() ->
                    executor.submit(() -> {
                        log("wait...");
                        sleep(Duration.ofSeconds(2L));
                        log("wake up");
                        return counter;
                    })
            ).isInstanceOf(RejectedExecutionException.class);
        }

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }
}