CLOVER🍀

That was when it all began.

ThreadPoolExecutorの蚭定項目を確認する

これは、なにをしたくお曞いたもの

今たでConcurrency Utilitiesで提䟛されるスレッドプヌルを䜿っおきたこずはあったのですが、その蚭定内容をちゃんず
芋おきおいなかったので今回芋おみようかなず。

具䜓的には、ThreadPoolExecutorを芋おいこうず思いたす。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

Executorsずスレッドプヌル

Javaでスレッドプヌルを䜿うずなるず、登堎するのがExecutorsです。

Executors (Java SE 11 & JDK 11 )

Executorsが提䟛する、以䞋のスレッドプヌルの䞭から䜿うこずが倚いでしょう。

  • Executors#newFixedThreadPool 
 固定サむズのスレッドプヌル
  • Executors#newSingleThreadExecutor 
 スレッドサむズがひず぀のExecutor
  • Executors#newCachedThreadPool 
 プヌルサむズがタスク数に応じお拡匵および瞮小されるスレッドプヌル
  • Executors#newScheduledThreadPool 
 実行をスケゞュヌリングできるスレッドプヌル
  • Executors#newWorkStealingPool 
 work-stealingプヌル

今回は、Executors#newFixedThreadPool、Executors#newSingleThreadExecutor、Executors#newCachedThreadPoolを
察象にしたす。

いずれも、スレッド数が固定のスレッドプヌル、シングルスレッド、タスク数に応じお拡匵・瞮小が行われるスレッドプヌル、
くらいのむメヌゞで䜿っおいるのですが、これらを構成するために䜿われおいるのがThreadPoolExecutorです。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

それぞれ定矩を芋おいっおみたしょう。

newFixedThreadPool。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L91-L95

newSingleThreadExecutor。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L174-L179

newCachedThreadPool。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L217-L221

ThreadPoolExecutor

ThreadPoolExecutorのJavadocを芋るず、いろいろ蚭定できるこずがわかりたす。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

  • コアプヌルサむズcorePoolSize
  • 最倧プヌルサむズmaximumPoolSize
  • タスクを保持するキュヌ
  • Keep-AliveタむムアりトkeepAliveTime
  • コアプヌルスレッドがタむムアりトするかallowCoreThreadTimeOut
  • タスクの拒吊

その他、事前にスレッドを開始したりprestartCoreThread、prestartAllCoreThreads、タスクを実行する前にフックを
かけたりbeforeExecute、afterExecute、スレッドの䜜成方法を指定したりThreadFactoryもできたす。

コアプヌルサむズ、最倧プヌルサむズ、キュヌ、Keep-Aliveタむムアりトにはいろいろ関係がありそうな感じです。

Executors#newFixedThreadPool、Executors#newSingleThreadExecutor、Executors#newCachedThreadPoolで
それぞれどうなっおいるか、衚にしおみたす。

スレッドプヌルの皮類 コアプヌルサむズ 最倧プヌルサむズ キュヌ Keep-Aliveタむムアりト
Executors#newFixedThreadPool 指定サむズ 指定サむズコアプヌルサむズず同じ LinkedBlockingQueueサむズ制限なしのBlockingQueue 0Keep-Aliveしない
Executors#newSingleThreadExecutor 1 1 LinkedBlockingQueueサむズ制限なしのBlockingQueue 0Keep-Aliveしない
Executors#newCachedThreadPool 1 Integer#MAX_VALUE SynchronousQueueキャパシティのないBlockingQueue 60秒

コアプヌルサむズが䞋限、最倧プヌルサむズが䞊限、ずいった印象を受けるのですが、キュヌの性質も圱響したす。

ThreadPoolExecutorのキュヌむングの郚分を芋るず、こんなこずが曞いおありたす。

  • corePoolSizeよりも少ない数のスレッドが実行されおいる堎合、executorはキュヌむングよりも新しいスレッドの远加を垞に優先したす。
  • corePoolSize以䞊の数のスレッドが実行されおいる堎合、executorは新しいスレッドの远加よりも芁求のキュヌむングを垞に優先したす。
  • 芁求をキュヌに入れるこずができない堎合、新しいスレッドを䜜成するこずによりmaximumPoolSizeを超えない堎合は新しいスレッドが䜜成され、超える堎合はタスクが拒吊されたす。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

これだけ芋るず、プヌルサむズの関係は先述のように単なる䞋限、䞊限芋えなくもないのですが、キュヌに䜿甚する性質に
よっお動きが倉わりたす。

キュヌに指定するのは、BlockingQueueむンタヌフェヌスの実装ですね。

BlockingQueue (Java SE 11 & JDK 11 )

  • LinkedBlockingQueueのようなサむズのないキュヌを䜿う堎合
    • コアスレッド数で指定したスレッドを䜿い切っおいる堎合、タスクはキュヌ内に埅機する
    • このようなキュヌを䜿う堎合、最倧プヌルサむズは効果がなくなるため、コアスレッド数ず同じ倀を指定するのがよい
    • Executors#newFixedThreadPoolずExecutors#newSingleThreadExecutorがこのパタヌン
  • SynchronousQueueのような、容量を持たないキュヌを䜿う堎合
    • タスクを実行するために利甚できるスレッドがない堎合、スレッドを新たに远加しようずする
    • このようなキュヌを䜿う堎合、十分に倧きな最倧プヌルサむズが必芁
    • Executors#newCachedThreadPoolがこのパタヌンであり、コアプヌルサむズが1、最倧プヌルサむズがInteger#MAX_VALUE
  • ArrayBlockingQueueのような、容量制限のあるキュヌを䜿う堎合
    • コアプヌルサむズのスレッドを䜿いきっおいお、キュヌの容量も䜿いきっおいる堎合、新しいスレッドが远加される
    • 远加されるスレッドは、最倧プヌルサむズたで拡匵できるが、それを超えるず拒吊される

この内容を芋た埌に、あらためお各スレッドプヌルの蚭定ThreadPoolExecutorのコンストラクタに枡しおいる匕数を
芋るず、なるほど、ずいう感じがしたす。

キュヌにサむズ制限がない堎合はスレッド数を固定し、キュヌに容量がない堎合はスレッド数が拡匵されるようにする、
ずいうように、どちらかを固定した方がわかりやすいずいうこずですね。

ずころで、キュヌを䜿うか最倧プヌルサむズたで拡匵するかですが、コアプヌルサむズのスレッド数を超えた時に
先に䜿うのはキュヌになるんですよね。キュヌがいっぱいになったら、最倧プヌルサむズたでスレッドが远加できたす。

゜ヌスコヌドでは、このあたりですね。

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1347

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1354

あず、Keep-Aliveタむムアりトの効果に぀いおですが。コアプヌルサむズより倚くのスレッドを保持しおいる堎合、アむドル状態の
スレッドはKeep-Aliveタむムアりトに蚭定した時間が経過するず終了しおいきたす。コアプヌルサむズのスレッド数は、
維持されたす。

Keep-Aliveタむムアりトを蚭定したうえで、さらにallowCoreThreadTimeOutをtrueにした堎合、珟圚のスレッド数が
コアプヌルサむズ以䞋であっおもKeep-Aliveタむムアりトで指定した時間以䞊にアむドル状態が続くずスレッド終了の
察象ずなりたす。

このあたりの話は、以䞋に蚘述されおいたす。

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1029-L1062

ちなみにKeep-Aliveタむムアりトを0に指定するず、コアプヌルサむズより倚いスレッドはすぐに終了したす。

説明はこれくらいにしお、動䜜確認をしおいきたしょう。ThreadPoolExecutorを䜿っお、ここたで曞いおきたこずを
詊しおいきたいず思いたす。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"

準備

pom.xmlの䞻芁な郚分。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.19.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

動䜜確認は、テストコヌドで行いたす。テストコヌドの雛圢は、以䞋のように甚意したした。

src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java

package org.littlewings.concurrent;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ThreadPoolExecutorTest {
    private void sleep(Duration duration) {
        try {
            TimeUnit.SECONDS.sleep(duration.getSeconds());
        } catch (InterruptedException e) {
            // ignore
        }
    }

    private void log(String format, Object... args) {
        System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args));
    }

    private <T> T getFuture(Future<T> future) {
        try {
            return future.get();
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // ここにテストを曞く
}

コメントを蚘述しおいる郚分に、テストを曞いおきたす。

テストコヌド内で䜿う、スリヌプ、ログ出力、Future甚のメ゜ッドも甚意。

基本パタヌン

たずは基本的なパタヌンずいうこずで、Executorsが提䟛しおいるスレッドプヌルず同じものを定矩しおみたしょう。

Executors#newSingleThreadExecutorず同等のもの。

    @Test
    public void singleThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(1);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
        assertThat(executor.getPoolSize()).isEqualTo(1);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプヌルサむズ、最倧プヌルサむズが1、Keep-Aliveタむムアりトはなし、キュヌは容量制限のないLinkedBlockingQueueです。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

厳密に蚀うず、Executors#newSingleThreadExecutorずはFinalizableDelegatedExecutorServiceでラップしおいないずころが
異なりたすが。

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L755-L764

タスク内では2秒ず぀スリヌプしおいお、

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

スレッドは1぀しかないので、順次実行される状態になり、実行時間は6秒ちょっずになりたす。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

蚭定ず、実際に保持しおいるスレッド数ThreadPoolExecutor#getPoolSize、タスク数ThreadPoolExecutor#getActiveCountの確認。

        assertThat(executor.getCorePoolSize()).isEqualTo(1);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
        assertThat(executor.getPoolSize()).isEqualTo(1);
        assertThat(executor.getActiveCount()).isEqualTo(0);

党タスクは完了しおいるので、getActiveCountの戻り倀は0ですね。

実行時のログ。

[2021-04-29T20:07:36.041148] pool-7-thread-1 - wait...
[2021-04-29T20:07:38.041595] pool-7-thread-1 - wake up
[2021-04-29T20:07:38.042653] pool-7-thread-1 - wait...
[2021-04-29T20:07:40.043593] pool-7-thread-1 - wake up
[2021-04-29T20:07:40.044089] pool-7-thread-1 - wait...
[2021-04-29T20:07:42.044462] pool-7-thread-1 - wake up

次は、Executors#newFixedThreadPoolず同じものを䜜っおみたす。

    @Test
    public void fixedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

Executors#newFixedThreadPoolをマネた時ず比べるず、コアプヌルサむズ、最倧プヌルサむズを1より倧きくしお、
か぀同じ倀にしたものです。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

実行しおいるタスクも同じですが、今回は2スレッド䜿っおいるので実行時間が少し短くなっおいたす。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));

スレッドプヌルには、スレッドが2぀残っおいたす。

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

実行時のログ。

[2021-04-29T20:07:01.948101] pool-2-thread-1 - wait...
[2021-04-29T20:07:01.948795] pool-2-thread-2 - wait...
[2021-04-29T20:07:03.948678] pool-2-thread-1 - wake up
[2021-04-29T20:07:03.949356] pool-2-thread-1 - wait...
[2021-04-29T20:07:03.950086] pool-2-thread-2 - wake up
[2021-04-29T20:07:05.950024] pool-2-thread-1 - wake up

最埌は、Executors#newCachedThreadPoolず同等のもので。

    @Test
    public void cachedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                        60L, TimeUnit.SECONDS,
                        new SynchronousQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));

        assertThat(executor.getCorePoolSize()).isEqualTo(0);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプヌルサむズが0、最倧プヌルサむズがInteger#MAX_VALUE、Keep-Aliveタむムアりトが60秒、キュヌが容量のない
SynchronousQueueです。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                        60L, TimeUnit.SECONDS,
                        new SynchronousQueue<>());

アむドル状態のスレッドがない限りは、タスクが远加されるずスレッドが増えるので、今回のパタヌンだず3スレッドになっお
1番実行時間が短くなりたす。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));

スレッド数は、3になりたした。

        assertThat(executor.getCorePoolSize()).isEqualTo(0);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getActiveCount()).isEqualTo(0);

実行時のログ。

[2021-04-29T20:07:34.028435] pool-6-thread-1 - wait...
[2021-04-29T20:07:34.029653] pool-6-thread-2 - wait...
[2021-04-29T20:07:34.030378] pool-6-thread-3 - wait...
[2021-04-29T20:07:36.028956] pool-6-thread-1 - wake up
[2021-04-29T20:07:36.035749] pool-6-thread-2 - wake up
[2021-04-29T20:07:36.036370] pool-6-thread-3 - wake up

たずは、Executorsが提䟛しおいるものずほが同じ蚭定で詊しおみたした。ここから先は、もう少しThreadPoolExecutorの
蚭定を倉えおいっおみたしょう。

キュヌ

最初は、キュヌに関しお芋おいっおみたしょう。ずいっおも、こちらはLinkedBlockingQueueを䜿ったたたで、タスクが
キュヌに入っおいるずころを芋るだけですが。

    @Test
    public void queueing() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 20)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(18);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプヌルサむズ、最倧プヌルサむズを2にしお、

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

タスクを20個詰め蟌みたす。

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 20)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

スレッドは2぀しかないので、実行盎埌はキュヌに18個のタスクが入っおいたす。

        assertThat(executor.getQueue().size()).isEqualTo(18);

20個のタスクを2スレッドで実行しおいくので、実行時間はこうなりたすね。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));

実行時のログ。

[2021-04-29T20:07:09.993991] pool-4-thread-1 - wait...
[2021-04-29T20:07:09.994222] pool-4-thread-2 - wait...
[2021-04-29T20:07:11.994487] pool-4-thread-1 - wake up
[2021-04-29T20:07:11.994760] pool-4-thread-2 - wake up
[2021-04-29T20:07:11.995325] pool-4-thread-1 - wait...
[2021-04-29T20:07:11.995693] pool-4-thread-2 - wait...
[2021-04-29T20:07:13.996178] pool-4-thread-1 - wake up
[2021-04-29T20:07:13.996881] pool-4-thread-1 - wait...
[2021-04-29T20:07:13.996547] pool-4-thread-2 - wake up
[2021-04-29T20:07:13.997756] pool-4-thread-2 - wait...
[2021-04-29T20:07:15.997477] pool-4-thread-1 - wake up
[2021-04-29T20:07:15.998256] pool-4-thread-1 - wait...
[2021-04-29T20:07:15.998240] pool-4-thread-2 - wake up
[2021-04-29T20:07:15.998992] pool-4-thread-2 - wait...
[2021-04-29T20:07:17.998718] pool-4-thread-1 - wake up
[2021-04-29T20:07:17.999219] pool-4-thread-1 - wait...
[2021-04-29T20:07:17.999574] pool-4-thread-2 - wake up
[2021-04-29T20:07:18.000390] pool-4-thread-2 - wait...
[2021-04-29T20:07:20.000014] pool-4-thread-1 - wake up
[2021-04-29T20:07:20.000971] pool-4-thread-2 - wake up
[2021-04-29T20:07:20.001085] pool-4-thread-1 - wait...
[2021-04-29T20:07:20.001895] pool-4-thread-2 - wait...
[2021-04-29T20:07:22.002559] pool-4-thread-1 - wake up
[2021-04-29T20:07:22.003006] pool-4-thread-2 - wake up
[2021-04-29T20:07:22.003315] pool-4-thread-1 - wait...
[2021-04-29T20:07:22.003510] pool-4-thread-2 - wait...
[2021-04-29T20:07:24.003844] pool-4-thread-1 - wake up
[2021-04-29T20:07:24.004008] pool-4-thread-2 - wake up
[2021-04-29T20:07:24.004724] pool-4-thread-1 - wait...
[2021-04-29T20:07:24.004957] pool-4-thread-2 - wait...
[2021-04-29T20:07:26.005308] pool-4-thread-1 - wake up
[2021-04-29T20:07:26.005540] pool-4-thread-2 - wake up
[2021-04-29T20:07:26.006187] pool-4-thread-2 - wait...
[2021-04-29T20:07:26.006003] pool-4-thread-1 - wait...
[2021-04-29T20:07:28.006544] pool-4-thread-2 - wake up
[2021-04-29T20:07:28.007051] pool-4-thread-1 - wake up
[2021-04-29T20:07:28.007251] pool-4-thread-2 - wait...
[2021-04-29T20:07:28.007647] pool-4-thread-1 - wait...
[2021-04-29T20:07:30.007880] pool-4-thread-2 - wake up
[2021-04-29T20:07:30.008026] pool-4-thread-1 - wake up

スレッド数を最倧プヌルサむズたで広げる

次は、コアプヌルサむズず最倧プヌルサむズを異なる倀にしお、スレッド数が広がるずころを芋おみたす。

これには、キュヌも関係したす。

芁求をキュヌに入れるこずができない堎合、新しいスレッドを䜜成するこずによりmaximumPoolSizeを超えない堎合は新しいスレッドが䜜成され、超える堎合はタスクが拒吊されたす。

容量のないキュヌ、たたは容量制限のあるキュヌを遞択する必芁があるようです。

ThreadPoolExecutor (Java SE 11 & JDK 11 )

今回は、容量制限のあるArrayBlockingQueueを遞択しおみたした。プヌルが拡匵される様子を芋るのであれば
SynchronousQueueでもいいのですが、キュヌにタスクが詰め蟌たれおいるずころも芋たかったので。

    @Test
    public void expand() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

コアプヌルサむズを2、最倧プヌルサむズを4、キュヌのサむズを2にしおいたす。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

タスクを6぀登録するず

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

キュヌに2぀入り、スレッドも4たで拡匵されたす。

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

キュヌに入りきれなかったので、スレッドが远加された状態ですね。

4スレッドで6぀のタスクを扱うので、実行時間はこうなりたす。

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

ログ。

[2021-04-29T20:07:05.979826] pool-3-thread-1 - wait...
[2021-04-29T20:07:05.979981] pool-3-thread-2 - wait...
[2021-04-29T20:07:05.986304] pool-3-thread-3 - wait...
[2021-04-29T20:07:05.988164] pool-3-thread-4 - wait...
[2021-04-29T20:07:07.982240] pool-3-thread-1 - wake up
[2021-04-29T20:07:07.983124] pool-3-thread-1 - wait...
[2021-04-29T20:07:07.984288] pool-3-thread-2 - wake up
[2021-04-29T20:07:07.985241] pool-3-thread-2 - wait...
[2021-04-29T20:07:07.989875] pool-3-thread-3 - wake up
[2021-04-29T20:07:07.990235] pool-3-thread-4 - wake up
[2021-04-29T20:07:09.984001] pool-3-thread-1 - wake up
[2021-04-29T20:07:09.986035] pool-3-thread-2 - wake up

ちなみに、コアプヌルサむズ以䞊に拡匵された分のスレッドは、タスクが完了するずいなくなりたす。

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1044-L1049

https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1052-L1057

これを維持するには、Keep-Aliveタむムアりトの指定が必芁になりたす。

タスクの拒吊

キュヌの容量を䜿い切り、最倧プヌルサむズたで拡匵された状態でタスクを远加しようずするず、そのタスクは拒吊されたす。

前述の最倧プヌルサむズたで拡匵されるコヌドがすでにキュヌの容量、プヌルサむズの䞡方でギリギリだったので、ここに
さらにタスクを远加しおみたす。

    @Test
    public void reject() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        for (int i = 7; i <= 8; i++) {
            int counter = i;
            assertThatThrownBy(() ->
                    executor.submit(() -> {
                        log("wait...");
                        sleep(Duration.ofSeconds(2L));
                        log("wake up");
                        return counter;
                    })
            ).isInstanceOf(RejectedExecutionException.class);
        }

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

するず、こんな感じで拒吊されたす。

        for (int i = 7; i <= 8; i++) {
            int counter = i;
            assertThatThrownBy(() ->
                    executor.submit(() -> {
                        log("wait...");
                        sleep(Duration.ofSeconds(2L));
                        log("wake up");
                        return counter;
                    })
            ).isInstanceOf(RejectedExecutionException.class);
        }

この時の動䜜は、RejectedExecutionHandlerむンタヌフェヌスの実装により指定できたす。デフォルトでは、
ThreadPoolExecutor.AbortPolicyが指定されたこずになっおいたす

RejectedExecutionHandler (Java SE 11 & JDK 11 )

ログは先ほどず同じような内容になりたすが远加分のタスクが拒吊されただけだから、䞀応茉せおおきたす。

[2021-04-29T20:07:30.015131] pool-5-thread-1 - wait...
[2021-04-29T20:07:30.018122] pool-5-thread-2 - wait...
[2021-04-29T20:07:30.031335] pool-5-thread-3 - wait...
[2021-04-29T20:07:30.037615] pool-5-thread-4 - wait...
[2021-04-29T20:07:32.016207] pool-5-thread-1 - wake up
[2021-04-29T20:07:32.016742] pool-5-thread-1 - wait...
[2021-04-29T20:07:32.018562] pool-5-thread-2 - wake up
[2021-04-29T20:07:32.018948] pool-5-thread-2 - wait...
[2021-04-29T20:07:32.032788] pool-5-thread-3 - wake up
[2021-04-29T20:07:32.039120] pool-5-thread-4 - wake up
[2021-04-29T20:07:34.017244] pool-5-thread-1 - wake up
[2021-04-29T20:07:34.019358] pool-5-thread-2 - wake up

Keep-Aliveタむムアりト

最埌は、Keep-Aliveタむムアりトです。Keep-Aliveタむムアりトを指定するこずで、スレッド数がコアプヌルサむズよりも
倚くなっおいる堎合は、Keep-Aliveタむムアりトで指定した時間以䞊にアむドル状態が続くずスレッドが終了したす。

    @Test
    public void keepAlive() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(2);

        executor.shutdown();
    }

今回は、Keep-Aliveタむムアりトを5秒にしおみたした。コアプヌルサむズは2、最倧プヌルサむズは4です。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

タスクを远加するこずで、キュヌにタスクが入り、スレッドプヌルも拡匵されたす。

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

タスクの実行埌、しばらく埅っおいるずスレッド数がコアプヌルサむズたで枛少するこずが確認できたす。

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(2);

実行時のログ。

[2021-04-29T21:20:12.075690] pool-1-thread-1 - wait...
[2021-04-29T21:20:12.079657] pool-1-thread-2 - wait...
[2021-04-29T21:20:12.079740] pool-1-thread-4 - wait...
[2021-04-29T21:20:12.079747] pool-1-thread-3 - wait...
[2021-04-29T21:20:14.087106] pool-1-thread-1 - wake up
[2021-04-29T21:20:14.087656] pool-1-thread-2 - wake up
[2021-04-29T21:20:14.088526] pool-1-thread-1 - wait...
[2021-04-29T21:20:14.089013] pool-1-thread-3 - wake up
[2021-04-29T21:20:14.088705] pool-1-thread-2 - wait...
[2021-04-29T21:20:14.088548] pool-1-thread-4 - wake up
[2021-04-29T21:20:16.089248] pool-1-thread-1 - wake up
[2021-04-29T21:20:16.091406] pool-1-thread-2 - wake up
[2021-04-29T21:20:16.097731] main - sleeping...

コアプヌルもタむムアりトさせる

最埌です。Keep-Aliveタむムアりトを指定するず、アむドル状態のスレッドはコアプヌルサむズたで枛少しおいきたす。

ですが、allowCoreThreadTimeOutをtrueにするず、コアプヌルサむズ分のスレッドも終了したす。

こちらは、ArrayBlockingQueueずLinkedBlockingQueueの2぀を䜿っお詊しおみたした。

Keep-Aliveタむムアりトを5秒、キュヌをArrayBlockingQueueにしお、コアプヌルサむズを2、最倧プヌルサむズを4にしおみたす。

    @Test
    public void keepAliveAllowCoreThreadTimeOut1() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

allowCoreThreadTimeOutをtrueにしおいるのがポむントであり、Keep-Aliveタむムアりトが0より倧きくしたうえで
蚭定する必芁がありたす。

        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));
        executor.allowCoreThreadTimeOut(true);

タスクの終了埌はスレッド数は最倧プヌルサむズたで拡匵されおいたすが

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

アむドル状態が続くず、コアプヌルサむズ分のスレッドも含めお終了したす。

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

ログ。

[2021-04-29T21:21:02.180550] pool-8-thread-1 - wait...
[2021-04-29T21:21:02.180915] pool-8-thread-3 - wait...
[2021-04-29T21:21:02.180639] pool-8-thread-2 - wait...
[2021-04-29T21:21:02.181316] pool-8-thread-4 - wait...
[2021-04-29T21:21:04.181189] pool-8-thread-1 - wake up
[2021-04-29T21:21:04.181289] pool-8-thread-3 - wake up
[2021-04-29T21:21:04.181584] pool-8-thread-1 - wait...
[2021-04-29T21:21:04.181571] pool-8-thread-2 - wake up
[2021-04-29T21:21:04.181791] pool-8-thread-4 - wake up
[2021-04-29T21:21:04.181729] pool-8-thread-3 - wait...
[2021-04-29T21:21:06.181957] pool-8-thread-1 - wake up
[2021-04-29T21:21:06.182476] pool-8-thread-3 - wake up
[2021-04-29T21:21:06.183003] main - sleeping...

次は、キュヌをLinkedBlockingQueueにしお、プヌルサむズを2぀ずも2にしおみたした。Executors#newFixedThreadPoolで
スレッドが終了するようなバヌゞョンですね。

    @Test
    public void keepAliveAllowCoreThreadTimeOut2() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        5000L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

こちらもタスク実行埌はKeep-Aliveタむムアりトで指定した時間が経過した埌、スレッドがすべお終了したす。

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

ログ。

[2021-04-29T21:21:12.187598] pool-9-thread-1 - wait...
[2021-04-29T21:21:12.187732] pool-9-thread-2 - wait...
[2021-04-29T21:21:14.188006] pool-9-thread-1 - wake up
[2021-04-29T21:21:14.188197] pool-9-thread-2 - wake up
[2021-04-29T21:21:14.188389] pool-9-thread-1 - wait...
[2021-04-29T21:21:14.188424] pool-9-thread-2 - wait...
[2021-04-29T21:21:16.188723] pool-9-thread-1 - wake up
[2021-04-29T21:21:16.188837] pool-9-thread-2 - wake up
[2021-04-29T21:21:16.189002] pool-9-thread-1 - wait...
[2021-04-29T21:21:16.189090] pool-9-thread-2 - wait...
[2021-04-29T21:21:18.189300] pool-9-thread-1 - wake up
[2021-04-29T21:21:18.189366] pool-9-thread-2 - wake up
[2021-04-29T21:21:18.190522] main - sleeping...

ずりあえず、気になるずころはざっず確認できた感じです。

たずめ

ThreadPoolExecutorの蚭定項目を、いろいろ確認しおみたした。ふだんはExecutorsの裏に隠れおいるのであたり気にするこずは
ないかもしれたせんが、意味を把握しおおくずThreadPoolExecutorを拡匵したようなスレッドプヌルの蚭定をしたりする時などに
圹に立぀かもしれたせん。

最埌に、今回䜜成した゜ヌスコヌドの党䜓を茉せおおきたす。

src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java

package org.littlewings.concurrent;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ThreadPoolExecutorTest {
    private void sleep(Duration duration) {
        try {
            TimeUnit.SECONDS.sleep(duration.getSeconds());
        } catch (InterruptedException e) {
            // ignore
        }
    }

    private void log(String format, Object... args) {
        System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args));
    }

    private <T> T getFuture(Future<T> future) {
        try {
            return future.get();
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void singleThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(1);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
        assertThat(executor.getPoolSize()).isEqualTo(1);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void fixedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void cachedThreadPool() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                        60L, TimeUnit.SECONDS,
                        new SynchronousQueue<>());

        long startTime = System.currentTimeMillis();

        Future<String> f1 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "hello";
        });

        Future<String> f2 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "world";
        });

        Future<String> f3 = executor.submit(() -> {
            log("wait...");
            sleep(Duration.ofSeconds(2L));
            log("wake up");
            return "test";
        });

        assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
        assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
        assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));

        assertThat(executor.getCorePoolSize()).isEqualTo(0);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void queueing() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 20)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(18);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void expand() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }


    @Test
    public void keepAlive() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(2);

        executor.shutdown();
    }


    @Test
    public void keepAliveAllowCoreThreadTimeOut1() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        5000L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(4);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void keepAliveAllowCoreThreadTimeOut2() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2,
                        5000L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>());
        executor.allowCoreThreadTimeOut(true);

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        log("sleeping...");
        sleep(Duration.ofSeconds(6L));

        assertThat(executor.getPoolSize()).isEqualTo(0);

        executor.shutdown();
    }

    @Test
    public void reject() {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,
                        0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(2));

        long startTime = System.currentTimeMillis();

        List<Future<Integer>> futures =
                IntStream
                        .rangeClosed(1, 6)
                        .mapToObj(i -> executor.submit(() -> {
                            log("wait...");
                            sleep(Duration.ofSeconds(2L));
                            log("wake up");
                            return i;
                        }))
                        .collect(Collectors.toList());

        for (int i = 7; i <= 8; i++) {
            int counter = i;
            assertThatThrownBy(() ->
                    executor.submit(() -> {
                        log("wait...");
                        sleep(Duration.ofSeconds(2L));
                        log("wake up");
                        return counter;
                    })
            ).isInstanceOf(RejectedExecutionException.class);
        }

        assertThat(executor.getQueue().size()).isEqualTo(2);

        assertThat(executor.getPoolSize()).isEqualTo(4);

        assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);

        long elapsedTime = System.currentTimeMillis() - startTime;

        assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
        assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));

        assertThat(executor.getCorePoolSize()).isEqualTo(2);
        assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getActiveCount()).isEqualTo(0);

        executor.shutdown();
    }
}