これは、なにをしたくて書いたもの?
今までConcurrency Utilitiesで提供されるスレッドプールを使ってきたことはあったのですが、その設定内容をちゃんと
見てきていなかったので今回見てみようかなと。
具体的には、ThreadPoolExecutor
を見ていこうと思います。
ThreadPoolExecutor (Java SE 11 & JDK 11 )
Executorsとスレッドプール
Javaでスレッドプールを使うとなると、登場するのがExecutors
です。
Executors (Java SE 11 & JDK 11 )
Executors
が提供する、以下のスレッドプールの中から使うことが多いでしょう。
Executors#newFixedThreadPool
… 固定サイズのスレッドプール
Executors#newSingleThreadExecutor
… スレッドサイズがひとつのExecutor
Executors#newCachedThreadPool
… プールサイズがタスク数に応じて拡張および縮小されるスレッドプール
Executors#newScheduledThreadPool
… 実行をスケジューリングできるスレッドプール
Executors#newWorkStealingPool
… work-stealingプール
今回は、Executors#newFixedThreadPool
、Executors#newSingleThreadExecutor
、Executors#newCachedThreadPool
を
対象にします。
いずれも、スレッド数が固定のスレッドプール、シングルスレッド、タスク数に応じて拡張・縮小が行われるスレッドプール、
くらいのイメージで使っているのですが、これらを構成するために使われているのがThreadPoolExecutor
です。
ThreadPoolExecutor (Java SE 11 & JDK 11 )
それぞれ定義を見ていってみましょう。
newFixedThreadPool
。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L91-L95
newSingleThreadExecutor
。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L174-L179
newCachedThreadPool
。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L217-L221
ThreadPoolExecutor
ThreadPoolExecutor
のJavadocを見ると、いろいろ設定できることがわかります。
ThreadPoolExecutor (Java SE 11 & JDK 11 )
- コアプールサイズ(
corePoolSize
)
- 最大プールサイズ(
maximumPoolSize
)
- タスクを保持するキュー
- Keep-Aliveタイムアウト(
keepAliveTime
)
- コアプールスレッドがタイムアウトするか(
allowCoreThreadTimeOut
)
- タスクの拒否
その他、事前にスレッドを開始したり(prestartCoreThread
、prestartAllCoreThreads
)、タスクを実行する前にフックを
かけたり(beforeExecute
、afterExecute
)、スレッドの作成方法を指定したり(ThreadFactory
)もできます。
コアプールサイズ、最大プールサイズ、キュー、Keep-Aliveタイムアウトにはいろいろ関係がありそうな感じです。
Executors#newFixedThreadPool
、Executors#newSingleThreadExecutor
、Executors#newCachedThreadPool
で
それぞれどうなっているか、表にしてみます。
スレッドプールの種類 |
コアプールサイズ |
最大プールサイズ |
キュー |
Keep-Aliveタイムアウト |
Executors#newFixedThreadPool |
指定サイズ |
指定サイズ(コアプールサイズと同じ) |
LinkedBlockingQueue (サイズ制限なしのBlockingQueue ) |
0 (Keep-Aliveしない) |
Executors#newSingleThreadExecutor |
1 |
1 |
LinkedBlockingQueue (サイズ制限なしのBlockingQueue ) |
0 (Keep-Aliveしない) |
Executors#newCachedThreadPool |
1 |
Integer#MAX_VALUE |
SynchronousQueue (キャパシティのないBlockingQueue ) |
60秒 |
コアプールサイズが下限、最大プールサイズが上限、といった印象を受けるのですが、キューの性質も影響します。
ThreadPoolExecutor
のキューイングの部分を見ると、こんなことが書いてあります。
- corePoolSizeよりも少ない数のスレッドが実行されている場合、executorはキューイングよりも新しいスレッドの追加を常に優先します。
- corePoolSize以上の数のスレッドが実行されている場合、executorは新しいスレッドの追加よりも要求のキューイングを常に優先します。
- 要求をキューに入れることができない場合、新しいスレッドを作成することによりmaximumPoolSizeを超えない場合は新しいスレッドが作成され、超える場合はタスクが拒否されます。
ThreadPoolExecutor (Java SE 11 & JDK 11 )
これだけ見ると、プールサイズの関係は先述のように(単なる下限、上限)見えなくもないのですが、キューに使用する性質に
よって動きが変わります。
キューに指定するのは、BlockingQueue
インターフェースの実装ですね。
BlockingQueue (Java SE 11 & JDK 11 )
LinkedBlockingQueue
のようなサイズのないキューを使う場合
- コアスレッド数で指定したスレッドを使い切っている場合、タスクはキュー内に待機する
- このようなキューを使う場合、最大プールサイズは効果がなくなるため、コアスレッド数と同じ値を指定するのがよい
Executors#newFixedThreadPool
とExecutors#newSingleThreadExecutor
がこのパターン
SynchronousQueue
のような、容量を持たないキューを使う場合
- タスクを実行するために利用できるスレッドがない場合、スレッドを新たに追加しようとする
- このようなキューを使う場合、十分に大きな最大プールサイズが必要
Executors#newCachedThreadPool
がこのパターンであり、コアプールサイズが1
、最大プールサイズがInteger#MAX_VALUE
ArrayBlockingQueue
のような、容量制限のあるキューを使う場合
- コアプールサイズのスレッドを使いきっていて、キューの容量も使いきっている場合、新しいスレッドが追加される
- 追加されるスレッドは、最大プールサイズまで拡張できるが、それを超えると拒否される
この内容を見た後に、あらためて各スレッドプールの設定(ThreadPoolExecutor
のコンストラクタに渡している引数)を
見ると、なるほど、という感じがします。
キューにサイズ制限がない場合はスレッド数を固定し、キューに容量がない場合はスレッド数が拡張されるようにする、
というように、どちらかを固定した方がわかりやすいということですね。
ところで、キューを使うか最大プールサイズまで拡張するかですが、コアプールサイズのスレッド数を超えた時に
先に使うのはキューになるんですよね。キューがいっぱいになったら、最大プールサイズまでスレッドが追加できます。
ソースコードでは、このあたりですね。
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1347
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1354
あと、Keep-Aliveタイムアウトの効果についてですが。コアプールサイズより多くのスレッドを保持している場合、アイドル状態の
スレッドはKeep-Aliveタイムアウトに設定した時間が経過すると終了していきます。コアプールサイズのスレッド数は、
維持されます。
Keep-Aliveタイムアウトを設定したうえで、さらにallowCoreThreadTimeOut
をtrue
にした場合、現在のスレッド数が
コアプールサイズ以下であってもKeep-Aliveタイムアウトで指定した時間以上にアイドル状態が続くとスレッド終了の
対象となります。
このあたりの話は、以下に記述されています。
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1029-L1062
ちなみにKeep-Aliveタイムアウトを0
に指定すると、コアプールサイズより多いスレッドはすぐに終了します。
説明はこれくらいにして、動作確認をしていきましょう。ThreadPoolExecutor
を使って、ここまで書いてきたことを
試していきたいと思います。
環境
今回の環境は、こちらです。
$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)
$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"
準備
pom.xml
の主要な部分。
<properties>
<projectbuildsourceEncoding>UTF-8</projectbuildsourceEncoding>
<projectreportingoutputEncoding>UTF-8</projectreportingoutputEncoding>
<mavencompilersource>11</mavencompilersource>
<mavencompilertarget>11</mavencompilertarget>
</properties>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.19.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
</plugin>
</plugins>
</build>
動作確認は、テストコードで行います。テストコードの雛形は、以下のように用意しました。
src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java
package org.littlewings.concurrent;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class ThreadPoolExecutorTest {
private void sleep(Duration duration) {
try {
TimeUnit.SECONDS.sleep(duration.getSeconds());
} catch (InterruptedException e) {
}
}
private void log(String format, Object... args) {
System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args));
}
private <T> T getFuture(Future<T> future) {
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
コメントを記述している部分に、テストを書いてきます。
テストコード内で使う、スリープ、ログ出力、Future
用のメソッドも用意。
基本パターン
まずは基本的なパターンということで、Executors
が提供しているスレッドプールと同じものを定義してみましょう。
Executors#newSingleThreadExecutor
と同等のもの。
@Test
public void singleThreadPool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
long startTime = System.currentTimeMillis();
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
assertThat(executor.getPoolSize()).isEqualTo(1);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
コアプールサイズ、最大プールサイズが1、Keep-Aliveタイムアウトはなし、キューは容量制限のないLinkedBlockingQueue
です。
ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
厳密に言うと、Executors#newSingleThreadExecutor
とはFinalizableDelegatedExecutorService
でラップしていないところが
異なりますが。
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/Executors.java#L755-L764
タスク内では2秒ずつスリープしていて、
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
スレッドは1つしかないので、順次実行される状態になり、実行時間は6秒ちょっとになります。
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));
設定と、実際に保持しているスレッド数(ThreadPoolExecutor#getPoolSize
)、タスク数(ThreadPoolExecutor#getActiveCount
)の確認。
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
assertThat(executor.getPoolSize()).isEqualTo(1);
assertThat(executor.getActiveCount()).isEqualTo(0);
全タスクは完了しているので、getActiveCount
の戻り値は0ですね。
実行時のログ。
[2021-04-29T20:07:36.041148] pool-7-thread-1 - wait...
[2021-04-29T20:07:38.041595] pool-7-thread-1 - wake up
[2021-04-29T20:07:38.042653] pool-7-thread-1 - wait...
[2021-04-29T20:07:40.043593] pool-7-thread-1 - wake up
[2021-04-29T20:07:40.044089] pool-7-thread-1 - wait...
[2021-04-29T20:07:42.044462] pool-7-thread-1 - wake up
次は、Executors#newFixedThreadPool
と同じものを作ってみます。
@Test
public void fixedThreadPool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
long startTime = System.currentTimeMillis();
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
Executors#newFixedThreadPool
をマネた時と比べると、コアプールサイズ、最大プールサイズを1より大きくして、
かつ同じ値にしたものです。
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
実行しているタスクも同じですが、今回は2スレッド使っているので実行時間が少し短くなっています。
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));
スレッドプールには、スレッドが2つ残っています。
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
実行時のログ。
[2021-04-29T20:07:01.948101] pool-2-thread-1 - wait...
[2021-04-29T20:07:01.948795] pool-2-thread-2 - wait...
[2021-04-29T20:07:03.948678] pool-2-thread-1 - wake up
[2021-04-29T20:07:03.949356] pool-2-thread-1 - wait...
[2021-04-29T20:07:03.950086] pool-2-thread-2 - wake up
[2021-04-29T20:07:05.950024] pool-2-thread-1 - wake up
最後は、Executors#newCachedThreadPool
と同等のもので。
@Test
public void cachedThreadPool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>());
long startTime = System.currentTimeMillis();
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));
assertThat(executor.getCorePoolSize()).isEqualTo(0);
assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
assertThat(executor.getPoolSize()).isEqualTo(3);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
コアプールサイズが0、最大プールサイズがInteger#MAX_VALUE
、Keep-Aliveタイムアウトが60秒、キューが容量のない
SynchronousQueue
です。
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>());
アイドル状態のスレッドがない限りは、タスクが追加されるとスレッドが増えるので、今回のパターンだと3スレッドになって
1番実行時間が短くなります。
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));
スレッド数は、3になりました。
assertThat(executor.getCorePoolSize()).isEqualTo(0);
assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
assertThat(executor.getPoolSize()).isEqualTo(3);
assertThat(executor.getActiveCount()).isEqualTo(0);
実行時のログ。
[2021-04-29T20:07:34.028435] pool-6-thread-1 - wait...
[2021-04-29T20:07:34.029653] pool-6-thread-2 - wait...
[2021-04-29T20:07:34.030378] pool-6-thread-3 - wait...
[2021-04-29T20:07:36.028956] pool-6-thread-1 - wake up
[2021-04-29T20:07:36.035749] pool-6-thread-2 - wake up
[2021-04-29T20:07:36.036370] pool-6-thread-3 - wake up
まずは、Executors
が提供しているものとほぼ同じ設定で試してみました。ここから先は、もう少しThreadPoolExecutor
の
設定を変えていってみましょう。
キュー
最初は、キューに関して見ていってみましょう。といっても、こちらはLinkedBlockingQueue
を使ったままで、タスクが
キューに入っているところを見るだけですが。
@Test
public void queueing() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 20)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(18);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
コアプールサイズ、最大プールサイズを2にして、
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
タスクを20個詰め込みます。
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 20)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
スレッドは2つしかないので、実行直後はキューに18個のタスクが入っています。
assertThat(executor.getQueue().size()).isEqualTo(18);
20個のタスクを2スレッドで実行していくので、実行時間はこうなりますね。
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));
実行時のログ。
[2021-04-29T20:07:09.993991] pool-4-thread-1 - wait...
[2021-04-29T20:07:09.994222] pool-4-thread-2 - wait...
[2021-04-29T20:07:11.994487] pool-4-thread-1 - wake up
[2021-04-29T20:07:11.994760] pool-4-thread-2 - wake up
[2021-04-29T20:07:11.995325] pool-4-thread-1 - wait...
[2021-04-29T20:07:11.995693] pool-4-thread-2 - wait...
[2021-04-29T20:07:13.996178] pool-4-thread-1 - wake up
[2021-04-29T20:07:13.996881] pool-4-thread-1 - wait...
[2021-04-29T20:07:13.996547] pool-4-thread-2 - wake up
[2021-04-29T20:07:13.997756] pool-4-thread-2 - wait...
[2021-04-29T20:07:15.997477] pool-4-thread-1 - wake up
[2021-04-29T20:07:15.998256] pool-4-thread-1 - wait...
[2021-04-29T20:07:15.998240] pool-4-thread-2 - wake up
[2021-04-29T20:07:15.998992] pool-4-thread-2 - wait...
[2021-04-29T20:07:17.998718] pool-4-thread-1 - wake up
[2021-04-29T20:07:17.999219] pool-4-thread-1 - wait...
[2021-04-29T20:07:17.999574] pool-4-thread-2 - wake up
[2021-04-29T20:07:18.000390] pool-4-thread-2 - wait...
[2021-04-29T20:07:20.000014] pool-4-thread-1 - wake up
[2021-04-29T20:07:20.000971] pool-4-thread-2 - wake up
[2021-04-29T20:07:20.001085] pool-4-thread-1 - wait...
[2021-04-29T20:07:20.001895] pool-4-thread-2 - wait...
[2021-04-29T20:07:22.002559] pool-4-thread-1 - wake up
[2021-04-29T20:07:22.003006] pool-4-thread-2 - wake up
[2021-04-29T20:07:22.003315] pool-4-thread-1 - wait...
[2021-04-29T20:07:22.003510] pool-4-thread-2 - wait...
[2021-04-29T20:07:24.003844] pool-4-thread-1 - wake up
[2021-04-29T20:07:24.004008] pool-4-thread-2 - wake up
[2021-04-29T20:07:24.004724] pool-4-thread-1 - wait...
[2021-04-29T20:07:24.004957] pool-4-thread-2 - wait...
[2021-04-29T20:07:26.005308] pool-4-thread-1 - wake up
[2021-04-29T20:07:26.005540] pool-4-thread-2 - wake up
[2021-04-29T20:07:26.006187] pool-4-thread-2 - wait...
[2021-04-29T20:07:26.006003] pool-4-thread-1 - wait...
[2021-04-29T20:07:28.006544] pool-4-thread-2 - wake up
[2021-04-29T20:07:28.007051] pool-4-thread-1 - wake up
[2021-04-29T20:07:28.007251] pool-4-thread-2 - wait...
[2021-04-29T20:07:28.007647] pool-4-thread-1 - wait...
[2021-04-29T20:07:30.007880] pool-4-thread-2 - wake up
[2021-04-29T20:07:30.008026] pool-4-thread-1 - wake up
スレッド数を最大プールサイズまで広げる
次は、コアプールサイズと最大プールサイズを異なる値にして、スレッド数が広がるところを見てみます。
これには、キューも関係します。
要求をキューに入れることができない場合、新しいスレッドを作成することによりmaximumPoolSizeを超えない場合は新しいスレッドが作成され、超える場合はタスクが拒否されます。
容量のないキュー、または容量制限のあるキューを選択する必要があるようです。
ThreadPoolExecutor (Java SE 11 & JDK 11 )
今回は、容量制限のあるArrayBlockingQueue
を選択してみました。プールが拡張される様子を見るのであれば
SynchronousQueue
でもいいのですが、キューにタスクが詰め込まれているところも見たかったので。
@Test
public void expand() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
コアプールサイズを2、最大プールサイズを4、キューのサイズを2にしています。
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
タスクを6つ登録すると
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
キューに2つ入り、スレッドも4まで拡張されます。
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
キューに入りきれなかったので、スレッドが追加された状態ですね。
4スレッドで6つのタスクを扱うので、実行時間はこうなります。
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
ログ。
[2021-04-29T20:07:05.979826] pool-3-thread-1 - wait...
[2021-04-29T20:07:05.979981] pool-3-thread-2 - wait...
[2021-04-29T20:07:05.986304] pool-3-thread-3 - wait...
[2021-04-29T20:07:05.988164] pool-3-thread-4 - wait...
[2021-04-29T20:07:07.982240] pool-3-thread-1 - wake up
[2021-04-29T20:07:07.983124] pool-3-thread-1 - wait...
[2021-04-29T20:07:07.984288] pool-3-thread-2 - wake up
[2021-04-29T20:07:07.985241] pool-3-thread-2 - wait...
[2021-04-29T20:07:07.989875] pool-3-thread-3 - wake up
[2021-04-29T20:07:07.990235] pool-3-thread-4 - wake up
[2021-04-29T20:07:09.984001] pool-3-thread-1 - wake up
[2021-04-29T20:07:09.986035] pool-3-thread-2 - wake up
ちなみに、コアプールサイズ以上に拡張された分のスレッドは、タスクが完了するといなくなります。
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1044-L1049
https://github.com/openjdk/jdk11u/blob/jdk-11.0.11%2B9/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1052-L1057
これを維持するには、Keep-Aliveタイムアウトの指定が必要になります。
タスクの拒否
キューの容量を使い切り、最大プールサイズまで拡張された状態でタスクを追加しようとすると、そのタスクは拒否されます。
前述の最大プールサイズまで拡張されるコードがすでにキューの容量、プールサイズの両方でギリギリだったので、ここに
さらにタスクを追加してみます。
@Test
public void reject() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
for (int i = 7; i <= 8; i++) {
int counter = i;
assertThatThrownBy(() ->
executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return counter;
})
).isInstanceOf(RejectedExecutionException.class);
}
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
すると、こんな感じで拒否されます。
for (int i = 7; i <= 8; i++) {
int counter = i;
assertThatThrownBy(() ->
executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return counter;
})
).isInstanceOf(RejectedExecutionException.class);
}
この時の動作は、RejectedExecutionHandler
インターフェースの実装により指定できます。デフォルトでは、
ThreadPoolExecutor.AbortPolicy
が指定されたことになっています
RejectedExecutionHandler (Java SE 11 & JDK 11 )
ログは先ほどと同じような内容になりますが(追加分のタスクが拒否されただけだから)、一応載せておきます。
[2021-04-29T20:07:30.015131] pool-5-thread-1 - wait...
[2021-04-29T20:07:30.018122] pool-5-thread-2 - wait...
[2021-04-29T20:07:30.031335] pool-5-thread-3 - wait...
[2021-04-29T20:07:30.037615] pool-5-thread-4 - wait...
[2021-04-29T20:07:32.016207] pool-5-thread-1 - wake up
[2021-04-29T20:07:32.016742] pool-5-thread-1 - wait...
[2021-04-29T20:07:32.018562] pool-5-thread-2 - wake up
[2021-04-29T20:07:32.018948] pool-5-thread-2 - wait...
[2021-04-29T20:07:32.032788] pool-5-thread-3 - wake up
[2021-04-29T20:07:32.039120] pool-5-thread-4 - wake up
[2021-04-29T20:07:34.017244] pool-5-thread-1 - wake up
[2021-04-29T20:07:34.019358] pool-5-thread-2 - wake up
最後は、Keep-Aliveタイムアウトです。Keep-Aliveタイムアウトを指定することで、スレッド数がコアプールサイズよりも
多くなっている場合は、Keep-Aliveタイムアウトで指定した時間以上にアイドル状態が続くとスレッドが終了します。
@Test
public void keepAlive() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(executor.getActiveCount()).isEqualTo(0);
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(2);
executor.shutdown();
}
今回は、Keep-Aliveタイムアウトを5秒にしてみました。コアプールサイズは2、最大プールサイズは4です。
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
タスクを追加することで、キューにタスクが入り、スレッドプールも拡張されます。
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
タスクの実行後、しばらく待っているとスレッド数がコアプールサイズまで減少することが確認できます。
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(2);
実行時のログ。
[2021-04-29T21:20:12.075690] pool-1-thread-1 - wait...
[2021-04-29T21:20:12.079657] pool-1-thread-2 - wait...
[2021-04-29T21:20:12.079740] pool-1-thread-4 - wait...
[2021-04-29T21:20:12.079747] pool-1-thread-3 - wait...
[2021-04-29T21:20:14.087106] pool-1-thread-1 - wake up
[2021-04-29T21:20:14.087656] pool-1-thread-2 - wake up
[2021-04-29T21:20:14.088526] pool-1-thread-1 - wait...
[2021-04-29T21:20:14.089013] pool-1-thread-3 - wake up
[2021-04-29T21:20:14.088705] pool-1-thread-2 - wait...
[2021-04-29T21:20:14.088548] pool-1-thread-4 - wake up
[2021-04-29T21:20:16.089248] pool-1-thread-1 - wake up
[2021-04-29T21:20:16.091406] pool-1-thread-2 - wake up
[2021-04-29T21:20:16.097731] main - sleeping...
最後です。Keep-Aliveタイムアウトを指定すると、アイドル状態のスレッドはコアプールサイズまで減少していきます。
ですが、allowCoreThreadTimeOut
をtrue
にすると、コアプールサイズ分のスレッドも終了します。
こちらは、ArrayBlockingQueue
とLinkedBlockingQueue
の2つを使って試してみました。
Keep-Aliveタイムアウトを5秒、キューをArrayBlockingQueue
にして、コアプールサイズを2、最大プールサイズを4にしてみます。
@Test
public void keepAliveAllowCoreThreadTimeOut1() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
executor.allowCoreThreadTimeOut(true);
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(executor.getActiveCount()).isEqualTo(0);
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(0);
executor.shutdown();
}
allowCoreThreadTimeOut
をtrue
にしているのがポイントであり、Keep-Aliveタイムアウトが0より大きくしたうえで
設定する必要があります。
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
executor.allowCoreThreadTimeOut(true);
タスクの終了後はスレッド数は最大プールサイズまで拡張されていますが
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(executor.getActiveCount()).isEqualTo(0);
アイドル状態が続くと、コアプールサイズ分のスレッドも含めて終了します。
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(0);
ログ。
[2021-04-29T21:21:02.180550] pool-8-thread-1 - wait...
[2021-04-29T21:21:02.180915] pool-8-thread-3 - wait...
[2021-04-29T21:21:02.180639] pool-8-thread-2 - wait...
[2021-04-29T21:21:02.181316] pool-8-thread-4 - wait...
[2021-04-29T21:21:04.181189] pool-8-thread-1 - wake up
[2021-04-29T21:21:04.181289] pool-8-thread-3 - wake up
[2021-04-29T21:21:04.181584] pool-8-thread-1 - wait...
[2021-04-29T21:21:04.181571] pool-8-thread-2 - wake up
[2021-04-29T21:21:04.181791] pool-8-thread-4 - wake up
[2021-04-29T21:21:04.181729] pool-8-thread-3 - wait...
[2021-04-29T21:21:06.181957] pool-8-thread-1 - wake up
[2021-04-29T21:21:06.182476] pool-8-thread-3 - wake up
[2021-04-29T21:21:06.183003] main - sleeping...
次は、キューをLinkedBlockingQueue
にして、プールサイズを2つとも2にしてみました。Executors#newFixedThreadPool
で
スレッドが終了するようなバージョンですね。
@Test
public void keepAliveAllowCoreThreadTimeOut2() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
5000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
executor.allowCoreThreadTimeOut(true);
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(0);
executor.shutdown();
}
こちらもタスク実行後はKeep-Aliveタイムアウトで指定した時間が経過した後、スレッドがすべて終了します。
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(0);
ログ。
[2021-04-29T21:21:12.187598] pool-9-thread-1 - wait...
[2021-04-29T21:21:12.187732] pool-9-thread-2 - wait...
[2021-04-29T21:21:14.188006] pool-9-thread-1 - wake up
[2021-04-29T21:21:14.188197] pool-9-thread-2 - wake up
[2021-04-29T21:21:14.188389] pool-9-thread-1 - wait...
[2021-04-29T21:21:14.188424] pool-9-thread-2 - wait...
[2021-04-29T21:21:16.188723] pool-9-thread-1 - wake up
[2021-04-29T21:21:16.188837] pool-9-thread-2 - wake up
[2021-04-29T21:21:16.189002] pool-9-thread-1 - wait...
[2021-04-29T21:21:16.189090] pool-9-thread-2 - wait...
[2021-04-29T21:21:18.189300] pool-9-thread-1 - wake up
[2021-04-29T21:21:18.189366] pool-9-thread-2 - wake up
[2021-04-29T21:21:18.190522] main - sleeping...
とりあえず、気になるところはざっと確認できた感じです。
まとめ
ThreadPoolExecutor
の設定項目を、いろいろ確認してみました。ふだんはExecutors
の裏に隠れているのであまり気にすることは
ないかもしれませんが、意味を把握しておくとThreadPoolExecutor
を拡張したようなスレッドプールの設定をしたりする時などに
役に立つかもしれません。
最後に、今回作成したソースコードの全体を載せておきます。
src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java
package org.littlewings.concurrent;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class ThreadPoolExecutorTest {
private void sleep(Duration duration) {
try {
TimeUnit.SECONDS.sleep(duration.getSeconds());
} catch (InterruptedException e) {
}
}
private void log(String format, Object... args) {
System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args));
}
private <T> T getFuture(Future<T> future) {
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Test
public void singleThreadPool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
long startTime = System.currentTimeMillis();
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
assertThat(executor.getPoolSize()).isEqualTo(1);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
@Test
public void fixedThreadPool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
long startTime = System.currentTimeMillis();
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
@Test
public void cachedThreadPool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>());
long startTime = System.currentTimeMillis();
Future<String> f1 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "hello";
});
Future<String> f2 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "world";
});
Future<String> f3 = executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return "test";
});
assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello");
assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world");
assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test");
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));
assertThat(executor.getCorePoolSize()).isEqualTo(0);
assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE);
assertThat(executor.getPoolSize()).isEqualTo(3);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
@Test
public void queueing() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 20)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(18);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
@Test
public void expand() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
@Test
public void keepAlive() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(executor.getActiveCount()).isEqualTo(0);
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(2);
executor.shutdown();
}
@Test
public void keepAliveAllowCoreThreadTimeOut1() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
executor.allowCoreThreadTimeOut(true);
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(executor.getActiveCount()).isEqualTo(0);
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(0);
executor.shutdown();
}
@Test
public void keepAliveAllowCoreThreadTimeOut2() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 2,
5000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
executor.allowCoreThreadTimeOut(true);
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
log("sleeping...");
sleep(Duration.ofSeconds(6L));
assertThat(executor.getPoolSize()).isEqualTo(0);
executor.shutdown();
}
@Test
public void reject() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
long startTime = System.currentTimeMillis();
List<Future<Integer>> futures =
IntStream
.rangeClosed(1, 6)
.mapToObj(i -> executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return i;
}))
.collect(Collectors.toList());
for (int i = 7; i <= 8; i++) {
int counter = i;
assertThatThrownBy(() ->
executor.submit(() -> {
log("wait...");
sleep(Duration.ofSeconds(2L));
log("wake up");
return counter;
})
).isInstanceOf(RejectedExecutionException.class);
}
assertThat(executor.getQueue().size()).isEqualTo(2);
assertThat(executor.getPoolSize()).isEqualTo(4);
assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21);
long elapsedTime = System.currentTimeMillis() - startTime;
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L));
assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
assertThat(executor.getCorePoolSize()).isEqualTo(2);
assertThat(executor.getMaximumPoolSize()).isEqualTo(4);
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getActiveCount()).isEqualTo(0);
executor.shutdown();
}
}