ããã¯ããªã«ãããããŠæžãããã®ïŒ
ä»ãŸã§Concurrency Utilitiesã§æäŸãããã¹ã¬ããããŒã«ã䜿ã£ãŠããããšã¯ãã£ãã®ã§ããããã®èšå®å
容ãã¡ãããš
èŠãŠããŠããªãã£ãã®ã§ä»åèŠãŠã¿ããããªãšã
å
·äœçã«ã¯ãThreadPoolExecutor
ãèŠãŠããããšæããŸãã
ThreadPoolExecutor (Java SE 11 & JDK 11 )
Executorsãšã¹ã¬ããããŒã«
Javaã§ã¹ã¬ããããŒã«ã䜿ããšãªããšãç»å Žããã®ãExecutors
ã§ãã
Executors (Java SE 11 & JDK 11 )
Executors
ãæäŸããã以äžã®ã¹ã¬ããããŒã«ã®äžãã䜿ãããšãå€ãã§ãããã
Executors#newFixedThreadPool
⊠åºå®ãµã€ãºã®ã¹ã¬ããããŒã«Executors#newSingleThreadExecutor
⊠ã¹ã¬ãããµã€ãºãã²ãšã€ã®Executor
Executors#newCachedThreadPool
⊠ããŒã«ãµã€ãºãã¿ã¹ã¯æ°ã«å¿ããŠæ¡åŒµããã³çž®å°ãããã¹ã¬ããããŒã«Executors#newScheduledThreadPool
⊠å®è¡ãã¹ã±ãžã¥ãŒãªã³ã°ã§ããã¹ã¬ããããŒã«Executors#newWorkStealingPool
⊠work-stealingããŒã«
ä»åã¯ãExecutors#newFixedThreadPool
ãExecutors#newSingleThreadExecutor
ãExecutors#newCachedThreadPool
ã
察象ã«ããŸãã
ãããããã¹ã¬ããæ°ãåºå®ã®ã¹ã¬ããããŒã«ãã·ã³ã°ã«ã¹ã¬ãããã¿ã¹ã¯æ°ã«å¿ããŠæ¡åŒµã»çž®å°ãè¡ãããã¹ã¬ããããŒã«ã
ãããã®ã€ã¡ãŒãžã§äœ¿ã£ãŠããã®ã§ãããããããæ§æããããã«äœ¿ãããŠããã®ãThreadPoolExecutor
ã§ãã
ThreadPoolExecutor (Java SE 11 & JDK 11 )
ããããå®çŸ©ãèŠãŠãã£ãŠã¿ãŸãããã
newFixedThreadPool
ã
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newSingleThreadExecutor
ã
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newCachedThreadPool
ã
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
ThreadPoolExecutor
ThreadPoolExecutor
ã®JavadocãèŠããšãããããèšå®ã§ããããšãããããŸãã
ThreadPoolExecutor (Java SE 11 & JDK 11 )
- ã³ã¢ããŒã«ãµã€ãºïŒ
corePoolSize
ïŒ - æ倧ããŒã«ãµã€ãºïŒ
maximumPoolSize
ïŒ - ã¿ã¹ã¯ãä¿æãããã¥ãŒ
- Keep-Aliveã¿ã€ã ã¢ãŠãïŒ
keepAliveTime
ïŒ - ã³ã¢ããŒã«ã¹ã¬ãããã¿ã€ã ã¢ãŠããããïŒ
allowCoreThreadTimeOut
ïŒ - ã¿ã¹ã¯ã®æåŠ
ãã®ä»ãäºåã«ã¹ã¬ãããéå§ãããïŒprestartCoreThread
ãprestartAllCoreThreads
ïŒãã¿ã¹ã¯ãå®è¡ããåã«ããã¯ã
ããããïŒbeforeExecute
ãafterExecute
ïŒãã¹ã¬ããã®äœææ¹æ³ãæå®ãããïŒThreadFactory
ïŒãã§ããŸãã
ã³ã¢ããŒã«ãµã€ãºãæ倧ããŒã«ãµã€ãºããã¥ãŒãKeep-Aliveã¿ã€ã ã¢ãŠãã«ã¯ããããé¢ä¿ããããããªæãã§ãã
Executors#newFixedThreadPool
ãExecutors#newSingleThreadExecutor
ãExecutors#newCachedThreadPool
ã§
ããããã©ããªã£ãŠããããè¡šã«ããŠã¿ãŸãã
ã¹ã¬ããããŒã«ã®çš®é¡ | ã³ã¢ããŒã«ãµã€ãº | æ倧ããŒã«ãµã€ãº | ãã¥ãŒ | Keep-Aliveã¿ã€ã ã¢ãŠã |
---|---|---|---|---|
Executors#newFixedThreadPool |
æå®ãµã€ãº | æå®ãµã€ãºïŒã³ã¢ããŒã«ãµã€ãºãšåãïŒ | LinkedBlockingQueue ïŒãµã€ãºå¶éãªãã®BlockingQueue ïŒ |
0 ïŒKeep-AliveããªãïŒ |
Executors#newSingleThreadExecutor |
1 |
1 |
LinkedBlockingQueue ïŒãµã€ãºå¶éãªãã®BlockingQueue ïŒ |
0 ïŒKeep-AliveããªãïŒ |
Executors#newCachedThreadPool |
1 |
Integer#MAX_VALUE |
SynchronousQueue ïŒãã£ãã·ãã£ã®ãªãBlockingQueue ïŒ |
60ç§ |
ã³ã¢ããŒã«ãµã€ãºãäžéãæ倧ããŒã«ãµã€ãºãäžéããšãã£ãå°è±¡ãåããã®ã§ããããã¥ãŒã®æ§è³ªã圱é¿ããŸãã
ThreadPoolExecutor
ã®ãã¥ãŒã€ã³ã°ã®éšåãèŠããšããããªããšãæžããŠãããŸãã
- corePoolSizeãããå°ãªãæ°ã®ã¹ã¬ãããå®è¡ãããŠããå Žåãexecutorã¯ãã¥ãŒã€ã³ã°ãããæ°ããã¹ã¬ããã®è¿œå ãåžžã«åªå ããŸãã
- corePoolSize以äžã®æ°ã®ã¹ã¬ãããå®è¡ãããŠããå Žåãexecutorã¯æ°ããã¹ã¬ããã®è¿œå ãããèŠæ±ã®ãã¥ãŒã€ã³ã°ãåžžã«åªå ããŸãã
- èŠæ±ããã¥ãŒã«å ¥ããããšãã§ããªãå Žåãæ°ããã¹ã¬ãããäœæããããšã«ããmaximumPoolSizeãè¶ ããªãå Žåã¯æ°ããã¹ã¬ãããäœæãããè¶ ããå Žåã¯ã¿ã¹ã¯ãæåŠãããŸãã
ThreadPoolExecutor (Java SE 11 & JDK 11 )
ããã ãèŠããšãããŒã«ãµã€ãºã®é¢ä¿ã¯å
è¿°ã®ããã«ïŒåãªãäžéãäžéïŒèŠããªãããªãã®ã§ããããã¥ãŒã«äœ¿çšããæ§è³ªã«
ãã£ãŠåããå€ãããŸãã
ãã¥ãŒã«æå®ããã®ã¯ãBlockingQueue
ã€ã³ã¿ãŒãã§ãŒã¹ã®å®è£
ã§ããã
BlockingQueue (Java SE 11 & JDK 11 )
LinkedBlockingQueue
ã®ãããªãµã€ãºã®ãªããã¥ãŒã䜿ãå Žå- ã³ã¢ã¹ã¬ããæ°ã§æå®ããã¹ã¬ããã䜿ãåã£ãŠããå Žåãã¿ã¹ã¯ã¯ãã¥ãŒå ã«åŸ æ©ãã
- ãã®ãããªãã¥ãŒã䜿ãå Žåãæ倧ããŒã«ãµã€ãºã¯å¹æããªããªããããã³ã¢ã¹ã¬ããæ°ãšåãå€ãæå®ããã®ããã
Executors#newFixedThreadPool
ãšExecutors#newSingleThreadExecutor
ããã®ãã¿ãŒã³
SynchronousQueue
ã®ãããªã容éãæããªããã¥ãŒã䜿ãå Žå- ã¿ã¹ã¯ãå®è¡ããããã«å©çšã§ããã¹ã¬ããããªãå Žåãã¹ã¬ãããæ°ãã«è¿œå ããããšãã
- ãã®ãããªãã¥ãŒã䜿ãå Žåãååã«å€§ããªæ倧ããŒã«ãµã€ãºãå¿ èŠ
Executors#newCachedThreadPool
ããã®ãã¿ãŒã³ã§ãããã³ã¢ããŒã«ãµã€ãºã1
ãæ倧ããŒã«ãµã€ãºãInteger#MAX_VALUE
ArrayBlockingQueue
ã®ãããªã容éå¶éã®ãããã¥ãŒã䜿ãå Žå- ã³ã¢ããŒã«ãµã€ãºã®ã¹ã¬ããã䜿ããã£ãŠããŠããã¥ãŒã®å®¹éã䜿ããã£ãŠããå Žåãæ°ããã¹ã¬ãããè¿œå ããã
- è¿œå ãããã¹ã¬ããã¯ãæ倧ããŒã«ãµã€ãºãŸã§æ¡åŒµã§ãããããããè¶ ãããšæåŠããã
ãã®å
容ãèŠãåŸã«ããããããŠåã¹ã¬ããããŒã«ã®èšå®ïŒThreadPoolExecutor
ã®ã³ã³ã¹ãã©ã¯ã¿ã«æž¡ããŠããåŒæ°ïŒã
èŠããšããªãã»ã©ããšããæããããŸãã
ãã¥ãŒã«ãµã€ãºå¶éããªãå Žåã¯ã¹ã¬ããæ°ãåºå®ãããã¥ãŒã«å®¹éããªãå Žåã¯ã¹ã¬ããæ°ãæ¡åŒµãããããã«ããã
ãšããããã«ãã©ã¡ãããåºå®ããæ¹ããããããããšããããšã§ããã
ãšããã§ããã¥ãŒã䜿ããæ倧ããŒã«ãµã€ãºãŸã§æ¡åŒµãããã§ãããã³ã¢ããŒã«ãµã€ãºã®ã¹ã¬ããæ°ãè¶
ããæã«
å
ã«äœ¿ãã®ã¯ãã¥ãŒã«ãªããã§ãããããã¥ãŒããã£ã±ãã«ãªã£ãããæ倧ããŒã«ãµã€ãºãŸã§ã¹ã¬ãããè¿œå ã§ããŸãã
ãœãŒã¹ã³ãŒãã§ã¯ããã®ãããã§ããã
ããšãKeep-Aliveã¿ã€ã ã¢ãŠãã®å¹æã«ã€ããŠã§ãããã³ã¢ããŒã«ãµã€ãºããå€ãã®ã¹ã¬ãããä¿æããŠããå Žåãã¢ã€ãã«ç¶æ
ã®
ã¹ã¬ããã¯Keep-Aliveã¿ã€ã ã¢ãŠãã«èšå®ããæéãçµéãããšçµäºããŠãããŸããã³ã¢ããŒã«ãµã€ãºã®ã¹ã¬ããæ°ã¯ã
ç¶æãããŸãã
Keep-Aliveã¿ã€ã ã¢ãŠããèšå®ããããã§ãããã«allowCoreThreadTimeOut
ãtrue
ã«ããå ŽåãçŸåšã®ã¹ã¬ããæ°ã
ã³ã¢ããŒã«ãµã€ãºä»¥äžã§ãã£ãŠãKeep-Aliveã¿ã€ã ã¢ãŠãã§æå®ããæé以äžã«ã¢ã€ãã«ç¶æ
ãç¶ããšã¹ã¬ããçµäºã®
察象ãšãªããŸãã
ãã®ãããã®è©±ã¯ã以äžã«èšè¿°ãããŠããŸãã
ã¡ãªã¿ã«Keep-Aliveã¿ã€ã ã¢ãŠãã0
ã«æå®ãããšãã³ã¢ããŒã«ãµã€ãºããå€ãã¹ã¬ããã¯ããã«çµäºããŸãã
説æã¯ãããããã«ããŠãåäœç¢ºèªãããŠãããŸããããThreadPoolExecutor
ã䜿ã£ãŠããããŸã§æžããŠããããšã
è©ŠããŠãããããšæããŸãã
ç°å¢
ä»åã®ç°å¢ã¯ããã¡ãã§ãã
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"
æºå
pom.xml
ã®äž»èŠãªéšåã
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.19.0</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
åäœç¢ºèªã¯ããã¹ãã³ãŒãã§è¡ããŸãããã¹ãã³ãŒãã®é圢ã¯ã以äžã®ããã«çšæããŸããã
src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java
package org.littlewings.concurrent; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class ThreadPoolExecutorTest { private void sleep(Duration duration) { try { TimeUnit.SECONDS.sleep(duration.getSeconds()); } catch (InterruptedException e) { // ignore } } private void log(String format, Object... args) { System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args)); } private <T> T getFuture(Future<T> future) { try { return future.get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } // ããã«ãã¹ããæžãïŒïŒ }
ã³ã¡ã³ããèšè¿°ããŠããéšåã«ããã¹ããæžããŠããŸãã
ãã¹ãã³ãŒãå
ã§äœ¿ããã¹ãªãŒãããã°åºåãFuture
çšã®ã¡ãœãããçšæã
åºæ¬ãã¿ãŒã³
ãŸãã¯åºæ¬çãªãã¿ãŒã³ãšããããšã§ãExecutors
ãæäŸããŠããã¹ã¬ããããŒã«ãšåããã®ãå®çŸ©ããŠã¿ãŸãããã
Executors#newSingleThreadExecutor
ãšåçã®ãã®ã
@Test public void singleThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); long startTime = System.currentTimeMillis(); Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; }); assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello"); assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world"); assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test"); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L)); assertThat(executor.getCorePoolSize()).isEqualTo(1); assertThat(executor.getMaximumPoolSize()).isEqualTo(1); assertThat(executor.getPoolSize()).isEqualTo(1); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); }
ã³ã¢ããŒã«ãµã€ãºãæ倧ããŒã«ãµã€ãºã1ãKeep-Aliveã¿ã€ã ã¢ãŠãã¯ãªãããã¥ãŒã¯å®¹éå¶éã®ãªãLinkedBlockingQueue
ã§ãã
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
å³å¯ã«èšããšãExecutors#newSingleThreadExecutor
ãšã¯FinalizableDelegatedExecutorService
ã§ã©ããããŠããªããšããã
ç°ãªããŸããã
ã¿ã¹ã¯å ã§ã¯2ç§ãã€ã¹ãªãŒãããŠããŠã
Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; });
ã¹ã¬ããã¯1ã€ãããªãã®ã§ãé 次å®è¡ãããç¶æ ã«ãªããå®è¡æéã¯6ç§ã¡ãã£ãšã«ãªããŸãã
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L));
èšå®ãšãå®éã«ä¿æããŠããã¹ã¬ããæ°ïŒThreadPoolExecutor#getPoolSize
ïŒãã¿ã¹ã¯æ°ïŒThreadPoolExecutor#getActiveCount
ïŒã®ç¢ºèªã
assertThat(executor.getCorePoolSize()).isEqualTo(1); assertThat(executor.getMaximumPoolSize()).isEqualTo(1); assertThat(executor.getPoolSize()).isEqualTo(1); assertThat(executor.getActiveCount()).isEqualTo(0);
å
šã¿ã¹ã¯ã¯å®äºããŠããã®ã§ãgetActiveCount
ã®æ»ãå€ã¯0ã§ããã
å®è¡æã®ãã°ã
[2021-04-29T20:07:36.041148] pool-7-thread-1 - wait... [2021-04-29T20:07:38.041595] pool-7-thread-1 - wake up [2021-04-29T20:07:38.042653] pool-7-thread-1 - wait... [2021-04-29T20:07:40.043593] pool-7-thread-1 - wake up [2021-04-29T20:07:40.044089] pool-7-thread-1 - wait... [2021-04-29T20:07:42.044462] pool-7-thread-1 - wake up
次ã¯ãExecutors#newFixedThreadPool
ãšåããã®ãäœã£ãŠã¿ãŸãã
@Test public void fixedThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); long startTime = System.currentTimeMillis(); Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; }); assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello"); assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world"); assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test"); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); }
Executors#newFixedThreadPool
ããããæãšæ¯ã¹ããšãã³ã¢ããŒã«ãµã€ãºãæ倧ããŒã«ãµã€ãºã1ãã倧ããããŠã
ãã€åãå€ã«ãããã®ã§ãã
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
å®è¡ããŠããã¿ã¹ã¯ãåãã§ãããä»åã¯2ã¹ã¬ãã䜿ã£ãŠããã®ã§å®è¡æéãå°ãçããªã£ãŠããŸãã
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L));
ã¹ã¬ããããŒã«ã«ã¯ãã¹ã¬ããã2ã€æ®ã£ãŠããŸãã
assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0);
å®è¡æã®ãã°ã
[2021-04-29T20:07:01.948101] pool-2-thread-1 - wait... [2021-04-29T20:07:01.948795] pool-2-thread-2 - wait... [2021-04-29T20:07:03.948678] pool-2-thread-1 - wake up [2021-04-29T20:07:03.949356] pool-2-thread-1 - wait... [2021-04-29T20:07:03.950086] pool-2-thread-2 - wake up [2021-04-29T20:07:05.950024] pool-2-thread-1 - wake up
æåŸã¯ãExecutors#newCachedThreadPool
ãšåçã®ãã®ã§ã
@Test public void cachedThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); long startTime = System.currentTimeMillis(); Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; }); assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello"); assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world"); assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test"); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L)); assertThat(executor.getCorePoolSize()).isEqualTo(0); assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE); assertThat(executor.getPoolSize()).isEqualTo(3); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); }
ã³ã¢ããŒã«ãµã€ãºã0ãæ倧ããŒã«ãµã€ãºãInteger#MAX_VALUE
ãKeep-Aliveã¿ã€ã ã¢ãŠãã60ç§ããã¥ãŒã容éã®ãªã
SynchronousQueue
ã§ãã
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
ã¢ã€ãã«ç¶æ
ã®ã¹ã¬ããããªãéãã¯ãã¿ã¹ã¯ãè¿œå ããããšã¹ã¬ãããå¢ããã®ã§ãä»åã®ãã¿ãŒã³ã ãš3ã¹ã¬ããã«ãªã£ãŠ
1çªå®è¡æéãçããªããŸãã
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L));
ã¹ã¬ããæ°ã¯ã3ã«ãªããŸããã
assertThat(executor.getCorePoolSize()).isEqualTo(0); assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE); assertThat(executor.getPoolSize()).isEqualTo(3); assertThat(executor.getActiveCount()).isEqualTo(0);
å®è¡æã®ãã°ã
[2021-04-29T20:07:34.028435] pool-6-thread-1 - wait... [2021-04-29T20:07:34.029653] pool-6-thread-2 - wait... [2021-04-29T20:07:34.030378] pool-6-thread-3 - wait... [2021-04-29T20:07:36.028956] pool-6-thread-1 - wake up [2021-04-29T20:07:36.035749] pool-6-thread-2 - wake up [2021-04-29T20:07:36.036370] pool-6-thread-3 - wake up
ãŸãã¯ãExecutors
ãæäŸããŠãããã®ãšã»ãŒåãèšå®ã§è©ŠããŠã¿ãŸãããããããå
ã¯ãããå°ãThreadPoolExecutor
ã®
èšå®ãå€ããŠãã£ãŠã¿ãŸãããã
ãã¥ãŒ
æåã¯ããã¥ãŒã«é¢ããŠèŠãŠãã£ãŠã¿ãŸãããããšãã£ãŠãããã¡ãã¯LinkedBlockingQueue
ã䜿ã£ããŸãŸã§ãã¿ã¹ã¯ã
ãã¥ãŒã«å
¥ã£ãŠãããšãããèŠãã ãã§ããã
@Test public void queueing() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 20) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(18); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); }
ã³ã¢ããŒã«ãµã€ãºãæ倧ããŒã«ãµã€ãºã2ã«ããŠã
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ã¿ã¹ã¯ã20åè©°ã蟌ã¿ãŸãã
List<Future<Integer>> futures = IntStream .rangeClosed(1, 20) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList());
ã¹ã¬ããã¯2ã€ãããªãã®ã§ãå®è¡çŽåŸã¯ãã¥ãŒã«18åã®ã¿ã¹ã¯ãå ¥ã£ãŠããŸãã
assertThat(executor.getQueue().size()).isEqualTo(18);
20åã®ã¿ã¹ã¯ã2ã¹ã¬ããã§å®è¡ããŠããã®ã§ãå®è¡æéã¯ãããªããŸããã
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L));
å®è¡æã®ãã°ã
[2021-04-29T20:07:09.993991] pool-4-thread-1 - wait... [2021-04-29T20:07:09.994222] pool-4-thread-2 - wait... [2021-04-29T20:07:11.994487] pool-4-thread-1 - wake up [2021-04-29T20:07:11.994760] pool-4-thread-2 - wake up [2021-04-29T20:07:11.995325] pool-4-thread-1 - wait... [2021-04-29T20:07:11.995693] pool-4-thread-2 - wait... [2021-04-29T20:07:13.996178] pool-4-thread-1 - wake up [2021-04-29T20:07:13.996881] pool-4-thread-1 - wait... [2021-04-29T20:07:13.996547] pool-4-thread-2 - wake up [2021-04-29T20:07:13.997756] pool-4-thread-2 - wait... [2021-04-29T20:07:15.997477] pool-4-thread-1 - wake up [2021-04-29T20:07:15.998256] pool-4-thread-1 - wait... [2021-04-29T20:07:15.998240] pool-4-thread-2 - wake up [2021-04-29T20:07:15.998992] pool-4-thread-2 - wait... [2021-04-29T20:07:17.998718] pool-4-thread-1 - wake up [2021-04-29T20:07:17.999219] pool-4-thread-1 - wait... [2021-04-29T20:07:17.999574] pool-4-thread-2 - wake up [2021-04-29T20:07:18.000390] pool-4-thread-2 - wait... [2021-04-29T20:07:20.000014] pool-4-thread-1 - wake up [2021-04-29T20:07:20.000971] pool-4-thread-2 - wake up [2021-04-29T20:07:20.001085] pool-4-thread-1 - wait... [2021-04-29T20:07:20.001895] pool-4-thread-2 - wait... [2021-04-29T20:07:22.002559] pool-4-thread-1 - wake up [2021-04-29T20:07:22.003006] pool-4-thread-2 - wake up [2021-04-29T20:07:22.003315] pool-4-thread-1 - wait... [2021-04-29T20:07:22.003510] pool-4-thread-2 - wait... [2021-04-29T20:07:24.003844] pool-4-thread-1 - wake up [2021-04-29T20:07:24.004008] pool-4-thread-2 - wake up [2021-04-29T20:07:24.004724] pool-4-thread-1 - wait... [2021-04-29T20:07:24.004957] pool-4-thread-2 - wait... [2021-04-29T20:07:26.005308] pool-4-thread-1 - wake up [2021-04-29T20:07:26.005540] pool-4-thread-2 - wake up [2021-04-29T20:07:26.006187] pool-4-thread-2 - wait... [2021-04-29T20:07:26.006003] pool-4-thread-1 - wait... [2021-04-29T20:07:28.006544] pool-4-thread-2 - wake up [2021-04-29T20:07:28.007051] pool-4-thread-1 - wake up [2021-04-29T20:07:28.007251] pool-4-thread-2 - wait... [2021-04-29T20:07:28.007647] pool-4-thread-1 - wait... [2021-04-29T20:07:30.007880] pool-4-thread-2 - wake up [2021-04-29T20:07:30.008026] pool-4-thread-1 - wake up
ã¹ã¬ããæ°ãæ倧ããŒã«ãµã€ãºãŸã§åºãã
次ã¯ãã³ã¢ããŒã«ãµã€ãºãšæ倧ããŒã«ãµã€ãºãç°ãªãå€ã«ããŠãã¹ã¬ããæ°ãåºãããšãããèŠãŠã¿ãŸãã
ããã«ã¯ããã¥ãŒãé¢ä¿ããŸãã
èŠæ±ããã¥ãŒã«å ¥ããããšãã§ããªãå Žåãæ°ããã¹ã¬ãããäœæããããšã«ããmaximumPoolSizeãè¶ ããªãå Žåã¯æ°ããã¹ã¬ãããäœæãããè¶ ããå Žåã¯ã¿ã¹ã¯ãæåŠãããŸãã
容éã®ãªããã¥ãŒããŸãã¯å®¹éå¶éã®ãããã¥ãŒãéžæããå¿ èŠãããããã§ãã
ThreadPoolExecutor (Java SE 11 & JDK 11 )
ä»åã¯ã容éå¶éã®ããArrayBlockingQueue
ãéžæããŠã¿ãŸãããããŒã«ãæ¡åŒµãããæ§åãèŠãã®ã§ããã°
SynchronousQueue
ã§ãããã®ã§ããããã¥ãŒã«ã¿ã¹ã¯ãè©°ã蟌ãŸããŠãããšãããèŠããã£ãã®ã§ã
@Test public void expand() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); }
ã³ã¢ããŒã«ãµã€ãºã2ãæ倧ããŒã«ãµã€ãºã4ããã¥ãŒã®ãµã€ãºã2ã«ããŠããŸãã
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2));
ã¿ã¹ã¯ã6ã€ç»é²ãããš
List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList());
ãã¥ãŒã«2ã€å ¥ããã¹ã¬ããã4ãŸã§æ¡åŒµãããŸãã
assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4);
ãã¥ãŒã«å ¥ããããªãã£ãã®ã§ãã¹ã¬ãããè¿œå ãããç¶æ ã§ããã
4ã¹ã¬ããã§6ã€ã®ã¿ã¹ã¯ãæ±ãã®ã§ãå®è¡æéã¯ãããªããŸãã
assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L));
ãã°ã
[2021-04-29T20:07:05.979826] pool-3-thread-1 - wait... [2021-04-29T20:07:05.979981] pool-3-thread-2 - wait... [2021-04-29T20:07:05.986304] pool-3-thread-3 - wait... [2021-04-29T20:07:05.988164] pool-3-thread-4 - wait... [2021-04-29T20:07:07.982240] pool-3-thread-1 - wake up [2021-04-29T20:07:07.983124] pool-3-thread-1 - wait... [2021-04-29T20:07:07.984288] pool-3-thread-2 - wake up [2021-04-29T20:07:07.985241] pool-3-thread-2 - wait... [2021-04-29T20:07:07.989875] pool-3-thread-3 - wake up [2021-04-29T20:07:07.990235] pool-3-thread-4 - wake up [2021-04-29T20:07:09.984001] pool-3-thread-1 - wake up [2021-04-29T20:07:09.986035] pool-3-thread-2 - wake up
ã¡ãªã¿ã«ãã³ã¢ããŒã«ãµã€ãºä»¥äžã«æ¡åŒµãããåã®ã¹ã¬ããã¯ãã¿ã¹ã¯ãå®äºãããšããªããªããŸãã
assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0);
ãããç¶æããã«ã¯ãKeep-Aliveã¿ã€ã ã¢ãŠãã®æå®ãå¿ èŠã«ãªããŸãã
ã¿ã¹ã¯ã®æåŠ
ãã¥ãŒã®å®¹éã䜿ãåããæ倧ããŒã«ãµã€ãºãŸã§æ¡åŒµãããç¶æ ã§ã¿ã¹ã¯ãè¿œå ããããšãããšããã®ã¿ã¹ã¯ã¯æåŠãããŸãã
åè¿°ã®æ倧ããŒã«ãµã€ãºãŸã§æ¡åŒµãããã³ãŒãããã§ã«ãã¥ãŒã®å®¹éãããŒã«ãµã€ãºã®äž¡æ¹ã§ã®ãªã®ãªã ã£ãã®ã§ãããã«
ããã«ã¿ã¹ã¯ãè¿œå ããŠã¿ãŸãã
@Test public void reject() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); for (int i = 7; i <= 8; i++) { int counter = i; assertThatThrownBy(() -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return counter; }) ).isInstanceOf(RejectedExecutionException.class); } assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); }
ãããšããããªæãã§æåŠãããŸãã
for (int i = 7; i <= 8; i++) { int counter = i; assertThatThrownBy(() -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return counter; }) ).isInstanceOf(RejectedExecutionException.class); }
ãã®æã®åäœã¯ãRejectedExecutionHandler
ã€ã³ã¿ãŒãã§ãŒã¹ã®å®è£
ã«ããæå®ã§ããŸããããã©ã«ãã§ã¯ã
ThreadPoolExecutor.AbortPolicy
ãæå®ãããããšã«ãªã£ãŠããŸã
RejectedExecutionHandler (Java SE 11 & JDK 11 )
ãã°ã¯å ã»ã©ãšåããããªå 容ã«ãªããŸããïŒè¿œå åã®ã¿ã¹ã¯ãæåŠãããã ãã ããïŒãäžå¿èŒããŠãããŸãã
[2021-04-29T20:07:30.015131] pool-5-thread-1 - wait... [2021-04-29T20:07:30.018122] pool-5-thread-2 - wait... [2021-04-29T20:07:30.031335] pool-5-thread-3 - wait... [2021-04-29T20:07:30.037615] pool-5-thread-4 - wait... [2021-04-29T20:07:32.016207] pool-5-thread-1 - wake up [2021-04-29T20:07:32.016742] pool-5-thread-1 - wait... [2021-04-29T20:07:32.018562] pool-5-thread-2 - wake up [2021-04-29T20:07:32.018948] pool-5-thread-2 - wait... [2021-04-29T20:07:32.032788] pool-5-thread-3 - wake up [2021-04-29T20:07:32.039120] pool-5-thread-4 - wake up [2021-04-29T20:07:34.017244] pool-5-thread-1 - wake up [2021-04-29T20:07:34.019358] pool-5-thread-2 - wake up
Keep-Aliveã¿ã€ã ã¢ãŠã
æåŸã¯ãKeep-Aliveã¿ã€ã ã¢ãŠãã§ããKeep-Aliveã¿ã€ã ã¢ãŠããæå®ããããšã§ãã¹ã¬ããæ°ãã³ã¢ããŒã«ãµã€ãºããã
å€ããªã£ãŠããå Žåã¯ãKeep-Aliveã¿ã€ã ã¢ãŠãã§æå®ããæé以äžã«ã¢ã€ãã«ç¶æ
ãç¶ããšã¹ã¬ãããçµäºããŸãã
@Test public void keepAlive() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(executor.getActiveCount()).isEqualTo(0); log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(2); executor.shutdown(); }
ä»åã¯ãKeep-Aliveã¿ã€ã ã¢ãŠãã5ç§ã«ããŠã¿ãŸãããã³ã¢ããŒã«ãµã€ãºã¯2ãæ倧ããŒã«ãµã€ãºã¯4ã§ãã
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2));
ã¿ã¹ã¯ãè¿œå ããããšã§ããã¥ãŒã«ã¿ã¹ã¯ãå ¥ããã¹ã¬ããããŒã«ãæ¡åŒµãããŸãã
List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4);
ã¿ã¹ã¯ã®å®è¡åŸããã°ããåŸ ã£ãŠãããšã¹ã¬ããæ°ãã³ã¢ããŒã«ãµã€ãºãŸã§æžå°ããããšã確èªã§ããŸãã
log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(2);
å®è¡æã®ãã°ã
[2021-04-29T21:20:12.075690] pool-1-thread-1 - wait... [2021-04-29T21:20:12.079657] pool-1-thread-2 - wait... [2021-04-29T21:20:12.079740] pool-1-thread-4 - wait... [2021-04-29T21:20:12.079747] pool-1-thread-3 - wait... [2021-04-29T21:20:14.087106] pool-1-thread-1 - wake up [2021-04-29T21:20:14.087656] pool-1-thread-2 - wake up [2021-04-29T21:20:14.088526] pool-1-thread-1 - wait... [2021-04-29T21:20:14.089013] pool-1-thread-3 - wake up [2021-04-29T21:20:14.088705] pool-1-thread-2 - wait... [2021-04-29T21:20:14.088548] pool-1-thread-4 - wake up [2021-04-29T21:20:16.089248] pool-1-thread-1 - wake up [2021-04-29T21:20:16.091406] pool-1-thread-2 - wake up [2021-04-29T21:20:16.097731] main - sleeping...
ã³ã¢ããŒã«ãã¿ã€ã ã¢ãŠãããã
æåŸã§ããKeep-Aliveã¿ã€ã ã¢ãŠããæå®ãããšãã¢ã€ãã«ç¶æ ã®ã¹ã¬ããã¯ã³ã¢ããŒã«ãµã€ãºãŸã§æžå°ããŠãããŸãã
ã§ãããallowCoreThreadTimeOut
ãtrue
ã«ãããšãã³ã¢ããŒã«ãµã€ãºåã®ã¹ã¬ãããçµäºããŸãã
ãã¡ãã¯ãArrayBlockingQueue
ãšLinkedBlockingQueue
ã®2ã€ã䜿ã£ãŠè©ŠããŠã¿ãŸããã
Keep-Aliveã¿ã€ã ã¢ãŠãã5ç§ããã¥ãŒãArrayBlockingQueue
ã«ããŠãã³ã¢ããŒã«ãµã€ãºã2ãæ倧ããŒã«ãµã€ãºã4ã«ããŠã¿ãŸãã
@Test public void keepAliveAllowCoreThreadTimeOut1() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); executor.allowCoreThreadTimeOut(true); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(executor.getActiveCount()).isEqualTo(0); log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(0); executor.shutdown(); }
allowCoreThreadTimeOut
ãtrue
ã«ããŠããã®ããã€ã³ãã§ãããKeep-Aliveã¿ã€ã ã¢ãŠãã0ãã倧ããããããã§
èšå®ããå¿
èŠããããŸãã
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); executor.allowCoreThreadTimeOut(true);
ã¿ã¹ã¯ã®çµäºåŸã¯ã¹ã¬ããæ°ã¯æ倧ããŒã«ãµã€ãºãŸã§æ¡åŒµãããŠããŸãã
assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(executor.getActiveCount()).isEqualTo(0);
ã¢ã€ãã«ç¶æ ãç¶ããšãã³ã¢ããŒã«ãµã€ãºåã®ã¹ã¬ãããå«ããŠçµäºããŸãã
log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(0);
ãã°ã
[2021-04-29T21:21:02.180550] pool-8-thread-1 - wait... [2021-04-29T21:21:02.180915] pool-8-thread-3 - wait... [2021-04-29T21:21:02.180639] pool-8-thread-2 - wait... [2021-04-29T21:21:02.181316] pool-8-thread-4 - wait... [2021-04-29T21:21:04.181189] pool-8-thread-1 - wake up [2021-04-29T21:21:04.181289] pool-8-thread-3 - wake up [2021-04-29T21:21:04.181584] pool-8-thread-1 - wait... [2021-04-29T21:21:04.181571] pool-8-thread-2 - wake up [2021-04-29T21:21:04.181791] pool-8-thread-4 - wake up [2021-04-29T21:21:04.181729] pool-8-thread-3 - wait... [2021-04-29T21:21:06.181957] pool-8-thread-1 - wake up [2021-04-29T21:21:06.182476] pool-8-thread-3 - wake up [2021-04-29T21:21:06.183003] main - sleeping...
次ã¯ããã¥ãŒãLinkedBlockingQueue
ã«ããŠãããŒã«ãµã€ãºã2ã€ãšã2ã«ããŠã¿ãŸãããExecutors#newFixedThreadPool
ã§
ã¹ã¬ãããçµäºãããããªããŒãžã§ã³ã§ããã
@Test public void keepAliveAllowCoreThreadTimeOut2() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); executor.allowCoreThreadTimeOut(true); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(0); executor.shutdown(); }
ãã¡ããã¿ã¹ã¯å®è¡åŸã¯Keep-Aliveã¿ã€ã ã¢ãŠãã§æå®ããæéãçµéããåŸãã¹ã¬ããããã¹ãŠçµäºããŸãã
log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(0);
ãã°ã
[2021-04-29T21:21:12.187598] pool-9-thread-1 - wait... [2021-04-29T21:21:12.187732] pool-9-thread-2 - wait... [2021-04-29T21:21:14.188006] pool-9-thread-1 - wake up [2021-04-29T21:21:14.188197] pool-9-thread-2 - wake up [2021-04-29T21:21:14.188389] pool-9-thread-1 - wait... [2021-04-29T21:21:14.188424] pool-9-thread-2 - wait... [2021-04-29T21:21:16.188723] pool-9-thread-1 - wake up [2021-04-29T21:21:16.188837] pool-9-thread-2 - wake up [2021-04-29T21:21:16.189002] pool-9-thread-1 - wait... [2021-04-29T21:21:16.189090] pool-9-thread-2 - wait... [2021-04-29T21:21:18.189300] pool-9-thread-1 - wake up [2021-04-29T21:21:18.189366] pool-9-thread-2 - wake up [2021-04-29T21:21:18.190522] main - sleeping...
ãšãããããæ°ã«ãªããšããã¯ãã£ãšç¢ºèªã§ããæãã§ãã
ãŸãšã
ThreadPoolExecutor
ã®èšå®é
ç®ãããããã確èªããŠã¿ãŸããããµã ãã¯Executors
ã®è£ã«é ããŠããã®ã§ããŸãæ°ã«ããããšã¯
ãªããããããŸããããæå³ãææ¡ããŠãããšThreadPoolExecutor
ãæ¡åŒµãããããªã¹ã¬ããããŒã«ã®èšå®ããããããæãªã©ã«
圹ã«ç«ã€ãããããŸããã
æåŸã«ãä»åäœæãããœãŒã¹ã³ãŒãã®å šäœãèŒããŠãããŸãã
src/test/java/org/littlewings/concurrent/ThreadPoolExecutorTest.java
package org.littlewings.concurrent; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class ThreadPoolExecutorTest { private void sleep(Duration duration) { try { TimeUnit.SECONDS.sleep(duration.getSeconds()); } catch (InterruptedException e) { // ignore } } private void log(String format, Object... args) { System.out.printf("[%s] %s - %s%n", LocalDateTime.now(), Thread.currentThread().getName(), String.format(format, args)); } private <T> T getFuture(Future<T> future) { try { return future.get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } @Test public void singleThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); long startTime = System.currentTimeMillis(); Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; }); assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello"); assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world"); assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test"); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L)); assertThat(executor.getCorePoolSize()).isEqualTo(1); assertThat(executor.getMaximumPoolSize()).isEqualTo(1); assertThat(executor.getPoolSize()).isEqualTo(1); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); } @Test public void fixedThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); long startTime = System.currentTimeMillis(); Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; }); assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello"); assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world"); assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test"); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(5L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); } @Test public void cachedThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); long startTime = System.currentTimeMillis(); Future<String> f1 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "hello"; }); Future<String> f2 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "world"; }); Future<String> f3 = executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return "test"; }); assertThat(f1).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("hello"); assertThat(f2).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("world"); assertThat(f3).succeedsWithin(Duration.ofSeconds(5L)).isEqualTo("test"); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(2L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(3L)); assertThat(executor.getCorePoolSize()).isEqualTo(0); assertThat(executor.getMaximumPoolSize()).isEqualTo(Integer.MAX_VALUE); assertThat(executor.getPoolSize()).isEqualTo(3); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); } @Test public void queueing() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 20) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(18); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(210); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(20L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(21L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); } @Test public void expand() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); } @Test public void keepAlive() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(executor.getActiveCount()).isEqualTo(0); log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(2); executor.shutdown(); } @Test public void keepAliveAllowCoreThreadTimeOut1() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); executor.allowCoreThreadTimeOut(true); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(executor.getActiveCount()).isEqualTo(0); log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(0); executor.shutdown(); } @Test public void keepAliveAllowCoreThreadTimeOut2() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); executor.allowCoreThreadTimeOut(true); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(6L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(7L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); log("sleeping..."); sleep(Duration.ofSeconds(6L)); assertThat(executor.getPoolSize()).isEqualTo(0); executor.shutdown(); } @Test public void reject() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); long startTime = System.currentTimeMillis(); List<Future<Integer>> futures = IntStream .rangeClosed(1, 6) .mapToObj(i -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return i; })) .collect(Collectors.toList()); for (int i = 7; i <= 8; i++) { int counter = i; assertThatThrownBy(() -> executor.submit(() -> { log("wait..."); sleep(Duration.ofSeconds(2L)); log("wake up"); return counter; }) ).isInstanceOf(RejectedExecutionException.class); } assertThat(executor.getQueue().size()).isEqualTo(2); assertThat(executor.getPoolSize()).isEqualTo(4); assertThat(futures.stream().mapToInt(this::getFuture).sum()).isEqualTo(21); long elapsedTime = System.currentTimeMillis() - startTime; assertThat(Duration.ofMillis(elapsedTime)).isGreaterThan(Duration.ofSeconds(4L)); assertThat(Duration.ofMillis(elapsedTime)).isLessThan(Duration.ofSeconds(6L)); assertThat(executor.getCorePoolSize()).isEqualTo(2); assertThat(executor.getMaximumPoolSize()).isEqualTo(4); assertThat(executor.getPoolSize()).isEqualTo(2); assertThat(executor.getActiveCount()).isEqualTo(0); executor.shutdown(); } }