ããã¯ããªã«ãããããŠæžãããã®ïŒ
Java 21ã§æ£åŒçã«ãªã£ããJEP 444ïŒVirtual ThreadsïŒãè©ŠããŠãããããªãšããããšã§ã
ã¹ã¬ããã«é¢ããAPIãå€ãã£ãŠãããããªã®ã§ããã¡ããåãããŠã
ãªããã¹ã¬ãããã³ãã®ååŸãHTTPãµãŒããŒïŒã¯ã©ã€ã¢ã³ãã䜿ã£ãäŸããããŠpinningïŒãã³çãïŒãçºçããæã«ã¹ã¿ãã¯ãã¬ãŒã¹ã
åºåããæ¹æ³ã¯å¥ãšã³ããªãŒã«æžããŠããŸãã
Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀
JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀
JEP 444ïŒ Virtual Threads
Virtual Threadsã¯JEP 444ã§å°å ¥ãããŸããã
Java 21ã®ããã¥ã¡ã³ãã§ã¯ããã¡ãã«èšèŒããããŸãã
ã³ã¢ã»ã©ã€ãã©ãª / 䞊è¡åŠç / ä»®æ³ã¹ã¬ãã
äž»ãªå 容
JEP 444ã®å
容ãèŠãŠãã£ãŠã¿ãŸããã¡ãã£ãšãããã«ããã§ãããVirtual Threadsã®ç¹å®ã®ã¹ã¬ããã€ã³ã¹ã¿ã³ã¹ãæãå Žåã¯
ãä»®æ³ã¹ã¬ããããšè¡šèšããããã«ããã€ããã§ãã
- ç®æš
- ãªã¯ãšã¹ãããšã«åã¹ã¬ããã§æ±ã圢åŒã§èšè¿°ããããµãŒããŒã¢ããªã±ãŒã·ã§ã³ããããŒããŠã§ã¢äœ¿çšçãããæé©åã§ããããã«æ¡åŒµãã
java.lang.Thread
ã䜿ãæ¢åã®ã³ãŒãã«å¯ŸããŠæå°éã®å€æŽã§Virtual Threadsãå°å ¥ãã- Virtual Threadsã«å¯Ÿãããã©ãã«ã·ã¥ãŒãããããã°ããããã¡ã€ãªã³ã°ãªã©ãæ¢åã®JDKããŒã«ã䜿çšããŠç°¡åã«ã§ããããã«ãã
- ç®æšãšããŠããªããã®
- åŸæ¥ã®ã¹ã¬ããå®è£ ãåé€ããããæ¢åã®ã¢ããªã±ãŒã·ã§ã³ãVirtual Threadsã䜿ãããã«ãµã€ã¬ã³ãã«ç§»è¡ç§»è¡ãããããªã
- Javaã®åºæ¬çãªäžŠè¡ã¢ãã«ãå€æŽããªã
- JavaèšèªãJavaã©ã€ãã©ãªãŒã§æ°ããããŒã¿äžŠåæ§é ãæäŸããªãïŒStream APIãããïŒ
Virtual Threadsãå°å ¥ãããã¢ãããŒã·ã§ã³ãšããŠã¯ã
- æ¢åã®ã¹ã¬ããã¯OSã¹ã¬ããã®ã©ãããŒãšããŠå®è£
ãããŠããããïŒãã©ãããã©ãŒã ã¹ã¬ããïŒäœ¿çšå¯èœãªã¹ã¬ãããå¶éãããŠããã
ãŸãã³ã¹ããé«ããããã¹ã¬ããæ°ããªã¯ãšã¹ããã¹ã¬ããã«å²ãåœãŠãŠåŠçãè¡ããµãŒããŒã¢ããªã±ãŒã·ã§ã³ã®å ŽåãCPUãªã©ã®ãªãœãŒã¹ã䜿ãåãåã«ã¹ã¬ããæ°ãå¶éèŠå ã«ãªã£ãŠããŸã - éåæã¹ã¿ã€ã«ã¯ãªã¯ãšã¹ããç°ãªãã¹ã¬ãããã€ã³ã¿ãŒãªãŒãæ¹åŒã§æ±ããããã¹ã¿ãã¯ãã¬ãŒã¹ãæ±ãã¥ãããããã°ã©ãã³ã°ã¹ã¿ã€ã«ãJavaãã©ãããã©ãŒã ãšçžå®¹ããªããšããåé¡ããã
- Virtual Threadsãå°å ¥ããããšã§ãããããã³ã°IOæäœãåŒã³åºããšä»®æ³ã¹ã¬ãããåéãããŸã§äžæåæ¢ããããšãã§ããããã«ãªããé«ãã¹ã«ãŒããããå®çŸã§ããããã«ãªã
ãšãã£ããšããã®ããã§ããIOã§ãããã¯ããŠããéã«ãå¥ã®ä»®æ³ã¹ã¬ãããåãããããšãããã®ã¿ããã§ããã
ãŸããä»®æ³ã¹ã¬ããã¯ããŒã«ããå¿ èŠã¯ãªãããã§ãã
ãããã¬ãŒãJDK Flight Recorderã¯ä»®æ³ã¹ã¬ãããæ±ãããšãã§ããã¹ã¬ãããã³ããæ°ãã圢åŒã«ãªã£ãããã§ãã
åäœã«é¢ããèšè¿°ãå°ãèŠãŠãããŸãã
- ããã»ããµãŒãžã®å²åœ
- OSã¹ã¬ãããšããŠå®è£ ããããã©ãããã©ãŒã ã¹ã¬ããã®å Žåã¯OSã®ã¹ã±ãžã¥ãŒã©ãŒã«äŸåããããVirtual Threadsã®å Žåã¯JDKç¬èªã®ã¹ã±ãžã¥ãŒã©ãŒã«äŸåããä»®æ³ã¹ã¬ããããã©ãããã©ãŒã ã¹ã¬ããã«å²ãåœãŠã
- ä»®æ³ã¹ã¬ããããã©ãããã©ãŒã ã¹ã¬ããã«å²ãåœãŠãããšãããŠã³ããšåŒã³ãå®è¡çµäºãŸãã¯åæ¢äžã«ã¯ã¢ã³ããŠã³ãããã
- ãã®ç¶æ ã§ãã©ãããã©ãŒã ã¹ã¬ããã¯åŸæ¥ã©ããOSã«ãã£ãŠã¹ã±ãžã¥ãŒãªã³ã°ããã
- Virtual Threadsã®ã¹ã±ãžã¥ãŒã©ãŒ
- ForkJoinPoolãã¹ã±ãžã¥ãŒã©ãŒãšããŠäœ¿ãããŠãã
- ã¹ã±ãžã¥ãŒã©ãŒã®äžŠåæ°ã¯ããã©ã«ãã§äœ¿çšå¯èœãªããã»ããµãŒæ°ã ãã
jdk.virtualThreadScheduler.parallelism
ã·ã¹ãã ããããã£ã§èª¿æŽå¯èœ - ã¹ã±ãžã¥ãŒã©ãŒã䜿çšãããã©ãããã©ãŒã ã¹ã¬ããã®æ°ã¯
jdk.virtualThreadScheduler.maxPoolSize
ã·ã¹ãã ããããã£ã§èª¿æŽå¯èœ - Parallel Streamãªã©ã§äœ¿ããã
ForkJoinPool#commonPool
ãšã¯ç®¡çãå¥
Thread#currentThread
- ä»®æ³ã¹ã¬ããã§å®è¡ãããŠãããã®ã§ããã°ãä»®æ³ã¹ã¬ãããè¿ã
- ããŠã³ãïŒã¢ã³ããŠã³ã
- ä»®æ³ã¹ã¬ããã¯ãIOãŸãã¯ãããã¯æäœãªã©ã«ãããããã¯ããããšã¢ã³ããŠã³ãããããã®åŸå床ããŠã³ãããåéããã
synchronized
ã¡ãœãããŸãã¯ãããã¯å ã®åŠçã¯ã¢ã³ããŠã³ãã§ããªã- 代ããã«
ReentrantLock
ã䜿ãããš - å°æ¥çã«æ¹åãããå¯èœæ§ããã
- 代ããã«
native
ã¡ãœãããŸãã¯Foreign Functionãå®è¡äžã¯ã¢ã³ããŠã³ãã§ããªã
- ãã³çã
- ããä»®æ³ã¹ã¬ããããã©ãããã©ãŒã ã¹ã¬ããã«åºå®ãããããš
- çºçãããšãJDK Flight Recorderã€ãã³ããçæããã
jdk.tracePinnedThreads
ã·ã¹ãã ããããã£ã§ã¹ã¿ãã¯ãã¬ãŒã¹ãããªã¬ãŒå¯èœ
APIã®å€æŽ
JEP 444ãèŠãéãã¯ãVirtual Threadsã®å°å
¥ã«ãããæ°ããã¯ã©ã¹ãè¿œå ãããããã§ã¯ãªãããã§ããã
Thread.Builder
ã埮åŠãªãšããã§ãã
ããã€ãæããŠãããŸãããã
- ã¹ã¬ããã®äœæ
Thread.Builder
ãThread#ofVirtual
ãThread#ofPlatform
ã䜿ãThread#startVirtualThread
ã§ä»®æ³ã¹ã¬ãããäœæãéå§ã§ãã
- ä»®æ³ã¹ã¬ãããã©ããã®å€å®
Thread#isVirtual
Thread#getAllStackTraces
ã§è¿ãã®ã¯ããã¹ãŠã®ãã©ãããã©ãŒã ã¹ã¬ããã®Map
- APIäžã®ä»®æ³ã¹ã¬ãããšãã©ãããã©ãŒã ã¹ã¬ããã®éã
Thread
ã®ãããªãã¯ãªã³ã³ã¹ãã©ã¯ã¿ãŒã§ã¯ãä»®æ³ã¹ã¬ãããäœæã§ããªã- ä»®æ³ã¹ã¬ããã¯åžžã«ããŒã¢ã³ã¹ã¬ããã§ã
Thread#setDaemon
ã§éããŒã¢ã³ã¹ã¬ããã«å€æŽã§ããªã - ä»®æ³ã¹ã¬ããã®åªå
床ã¯åžžã«åºå®ã§
Thread.NORM_PRIORITY
- å°æ¥çã«ãã®å¶éãåæ€èšããå¯èœæ§ã¯ãã
- ä»®æ³ã¹ã¬ããã¯ã¹ã¬ããã°ã«ãŒãã®ã¢ã¯ãã£ããªã¡ã³ããŒã§ã¯ãªããä»®æ³ã¹ã¬ããã«å¯ŸããŠ
Thread#getThreadGroup
ãåŒã³åºããšVirtualThreads
ãšããååã®ã¹ã¬ããã°ã«ãŒããè¿ããã - ä»®æ³ã¹ã¬ããã«å¯ŸããSecurityManagerã®æš©éã¯ãªã
- ä»®æ³ã¹ã¬ããã¯
ThreadLocal
ãå©çšå¯èœ- ãã ã
ThreadLocal
ã¯å€æ°ã®ã¹ã¬ãããäœæãããä»®æ³ã¹ã¬ããããã®ã¢ã¯ã»ã¹ãæ³å®ãããšéãã®ã§ãScoped Valuesãå°å ¥äºå®
- ãã ã
java.util.concurrent.LockSupport
ã¯Virtual ThreadsããµããŒãããLockSupport
ã䜿çšããAPIïŒLock
ãSemaphore
ãBlockingQueue
ãªã©ïŒãä»®æ³ã¹ã¬ããããåŒã³åºãããæã«æ©èœããããã«ãªã£ã
- ã¿ã¹ã¯ããšã«ã¹ã¬ãããäœæãã
Executors#newThreadPerTaskExecutor
ãšExecutors#newVirtualThreadPerTaskExecutor
ã«ããã¹ã¬ããããŒã«ã䜿ãæ¢åã®ã³ãŒãããã®ç§»è¡ãšçžäºéçšãå¯èœã« - ãããã¯ãŒã¯
java.net
ããã³java.nio.channels
ããã±ãŒãžã®ãããã¯ãŒã¯APIã®å®è£ ã¯Virtual ThreadsããµããŒãããããšãã°ä»®æ³ã¹ã¬ããäžã§èªã¿åãããããã¯ããæäœãè¡ããšãã©ãããã©ãŒã ã¹ã¬ãããã¢ã³ããŠã³ããéæŸããjava.net.Socket
ãServerSocket
ãDatagramSocket
ã¯ä»®æ³ã¹ã¬ããã§åŒã³åºãããæã«ã€ã³ã¿ã©ããå¯èœã«ãªããã€ã³ã¿ã©ããããã³ãã£ã³ã»ã«ãã§ããããã«ãªã£ã- ãœã±ããäžã§ãããã¯ãããŠããä»®æ³ã¹ã¬ãããäžæãããšãã¹ã¬ããã解é€ãããœã±ãããã¯ããŒãºããã
java.io
java.io
ããã±ãŒãžã¯synchronized
ãå€çšãããŠãããVirtual Threadsã®å°å ¥ã®ããã«å€æŽãå¿ èŠã ã£ãBufferedInputStream
ãBufferedOutputStream
ãBufferedReader
ãBufferedWriter
ãPrintStream
ãPrintWriter
ã¯synchronized
ã§ã¯ãªãæ瀺çãªããã¯ã䜿ãããã«ãªã£ã- ãããã®ã¯ã©ã¹ããµãã¯ã©ã¹åããŠãåé¡ãªã
InputStreamReader
ããã³OutputStreamWriter
ããå©çšããããã³ãŒããŒããšã³ã³ãŒããŒã¯InputStreamReader
ãOutputStreamWriter
ãšåãããã¯ã䜿ãããã«ãªã£ã
- JMX
ThreadMXBean
ããµããŒãããã®ã¯ãã©ãããã©ãŒã ã¹ã¬ããã®ã¿ThreadMXBean#findDeadlockedThreads
ãæ€åºããã®ã¯ãããããã¯ç¶æ ã®ãã©ãããã©ãŒã ã¹ã¬ããã§ãããä»®æ³ã¹ã¬ããã¯æ€åºãããªã
JNIããããã°ãJDK Flight Recorderã«ã€ããŠã¯ä»åã¯çç¥ããŸãã
ããšã¯ãJava 21ã®ããã¥ã¡ã³ãã®ãã®ããããèŠãŠããã®ããããšæããŸããæžãããŠããå
容ã®ã€ã¡ãŒãžãä»ããããããã«ã
å«ãŸããŠããã»ã¯ã·ã§ã³ã®ã¿ã€ãã«ãèšèŒããŠãããŸãã
- ã³ã¢ã»ã©ã€ãã©ãª / 䞊è¡åŠç / ä»®æ³ã¹ã¬ãã
- ãã©ãããã©ãŒã ã»ã¹ã¬ãã
- ä»®æ³ã¹ã¬ãããšã¯
- ä»®æ³ã¹ã¬ããã䜿çšããçç±
- ã³ã¢ã»ã©ã€ãã©ãª / 䞊è¡åŠç / ä»®æ³ã¹ã¬ãã / ä»®æ³ã¹ã¬ããã®äœæãšå®è¡
Thread
ã¯ã©ã¹ããã³Thread.Builder
ã€ã³ã¿ãã§ãŒã¹ã䜿çšããä»®æ³ã¹ã¬ããã®äœæExecutors.newVirtualThreadPerTaskExecutor()
ã¡ãœããã䜿çšããä»®æ³ã¹ã¬ããã®äœæãšå®è¡- ãã«ãã¹ã¬ããã»ã¯ã©ã€ã¢ã³ãã»ãµãŒããŒã®äŸ
- ã³ã¢ã»ã©ã€ãã©ãª / 䞊è¡åŠç / ä»®æ³ã¹ã¬ãã / ä»®æ³ã¹ã¬ããã®ã¹ã±ãžã¥ãŒã«ããã³åºå®ãããä»®æ³ã¹ã¬ãã
- ã³ã¢ã»ã©ã€ãã©ãª / 䞊è¡åŠç / ä»®æ³ã¹ã¬ãã / ä»®æ³ã¹ã¬ããã®ãããã°
- ä»®æ³ã¹ã¬ããã®Java Flight Recorderã€ãã³ã
- jcmdã¹ã¬ããã»ãã³ãã§ã®ä»®æ³ã¹ã¬ããã®è¡šç€º
- ã³ã¢ã»ã©ã€ãã©ãª / 䞊è¡åŠç / ä»®æ³ã¹ã¬ãã / ä»®æ³ã¹ã¬ãã: æ¡çšã¬ã€ã
- ãªã¯ãšã¹ãããšã®ã¹ã¬ããã»ã¹ã¿ã€ã«ã®ãããã¯I/O APIã䜿çšããåçŽãªåæã³ãŒãã®èšè¿°
- ãã¹ãŠã®åæã¿ã¹ã¯ãä»®æ³ã¹ã¬ãããšããŠè¡šçŸããä»®æ³ã¹ã¬ãããããŒã«ããªã
- ã»ããã©ã䜿çšãã䞊è¡åŠçã®å¶é
- ã¹ã¬ããã»ããŒã«ã«å€æ°ã§é«ã³ã¹ãã®åå©çšå¯èœãªãªããžã§ã¯ãããã£ãã·ã¥ããªã
- é·æéãã€é »ç¹ãªåºå®ã®åé¿
ãšãããããä»åã¯ã¹ã¬ãããŸããã®APIãç°¡åã«è©ŠããŠã¿ãããšæããŸãã
ã¹ã¬ãããã³ãã®ååŸããHTTPãµãŒããŒïŒã¯ã©ã€ã¢ã³ãã䜿ã£ãä»®æ³ã¹ã¬ããã®ããŠã³ãïŒã¢ã³ããŠã³ããããã®åãæ¿ãã
pinningïŒãã³çãïŒãçºçããæã«ã¹ã¿ãã¯ãã¬ãŒã¹ãåºåããæ¹æ³ã¯å¥ã®ãšã³ããªãŒã§ç¢ºèªããŠããŸãã
Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀
JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀
ç°å¢
ä»åã®ç°å¢ã¯ãã¡ãã
$ java --version openjdk 21.0.1 2023-10-17 OpenJDK Runtime Environment (build 21.0.1+12-Ubuntu-222.04) OpenJDK 64-Bit Server VM (build 21.0.1+12-Ubuntu-222.04, mixed mode, sharing) $ mvn --version Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 21.0.1, vendor: Private Build, runtime: /usr/lib/jvm/java-21-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-91-generic", arch: "amd64", family: "unix"
æºå
MavenäŸåé¢ä¿ãªã©ã¯ãã¡ãã
<properties> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.10.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.24.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>3.1.2</version> </plugin> </plugins> </build>
確èªã¯ããã¹ãã³ãŒãã§è¡ãããšã«ããŸãã
æ°ããã¹ã¬ããã«é¢ããAPIã䜿ã
ããã§ã¯ãä»åã¯æ°ããã¹ã¬ããã«é¢ããAPIã䜿ã£ãŠãã£ãŠã¿ãŸãããã
ãã©ãããã©ãŒã ã¹ã¬ãã
ãŸãã¯æ¢åã®ãã©ãããã©ãŒã ã¹ã¬ããããããã®ããããçŽæ¥äœ¿ãããšã¯ãªããšæããŸããã
src/test/java/org/littlewings/virtualthreads/ThreadApiForPlatformThreadTest.java
package org.littlewings.virtualthreads; import org.junit.jupiter.api.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; class ThreadApiForPlatformThreadTest { @Test void createSimply() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = Thread.ofPlatform().start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromConstructor() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = new Thread(latch::countDown); thread.start(); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilder() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofPlatform(); builder.name("my-thread"); Thread thread = builder.start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilderUnstarted() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofPlatform(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void registerThreadGroup() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = Thread.ofPlatform().group(new ThreadGroup("my-group")).start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } }
以äžã§ãã¹ã¬ãããäœæããŠéå§ã§ããŸãã
Thread thread = Thread.ofPlatform().start(latch::countDown);
Thread
ã®ã³ã³ã¹ãã©ã¯ã¿ã䜿ã£ãŠã¹ã¬ãããäœæãããšããã©ãããã©ãŒã ã¹ã¬ããã«ãªããŸãã
Thread thread = new Thread(latch::countDown); thread.start(); ... assertThat(thread.isVirtual()).isFalse();
Thread.Builder
ãšãããã®ãå°å
¥ããããšããããšã§ããããããã¯Thread#ofPlatform
ïŒVirtual Threadsã®å Žåã¯Thread#ofVirtual
ïŒã§
è¿ã£ãŠãããã®ãThread.Builder
ã®ã€ã³ã¹ã¿ã³ã¹ã¿ããã§ããã
Thread.Builder builder = Thread.ofPlatform();
builder.name("my-thread");
Thread thread = builder.start(latch::countDown);
Thread.Builder#unstarted
ã§ããŸã éå§ããŠããªãã¹ã¬ãããäœæããããšãã§ããŸãã
Thread.Builder builder = Thread.ofPlatform(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);
Virtual Threads
ç¶ããŠãVirtual Threadsããã¡ããããããã®APIãçŽæ¥äœ¿ãããšã¯ãªããšæããŸãã
src/test/java/org/littlewings/virtualthreads/ThreadApiForVirtualThreadTest.java
package org.littlewings.virtualthreads; import org.junit.jupiter.api.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class ThreadApiForVirtualThreadTest { @Test void createSimply() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = Thread.ofVirtual().start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilder() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilderUnstarted() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } /* @Test void registerThreadGroup() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // Thread.Builder.OfVirtualã«groupããªã Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } */ @Test void currentThreadIsVirtualThread() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.ofVirtual().name("my-virtual-thread").start(() -> { Thread currentThread = Thread.currentThread(); assertThat(currentThread.getName()).isEqualTo("my-virtual-thread"); assertThat(currentThread.isVirtual()).isTrue(); latch.countDown(); }); latch.await(); } @Test void cannotSetNonDaemon() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.isDaemon()).isTrue(); assertThatThrownBy(() -> thread.setDaemon(false)) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage("'false' not legal for virtual threads"); } @Test void cannotChangePriority() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); thread.setPriority(Thread.MAX_PRIORITY); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); } @Test void useThreadLocal() throws InterruptedException { CountDownLatch setLatch = new CountDownLatch(3); CountDownLatch finishLatch = new CountDownLatch(3); ThreadLocal<String> names = new ThreadLocal<>(); Thread thread1 = Thread.ofVirtual().unstarted(() -> { names.set("thread1"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread1"); names.remove(); finishLatch.countDown(); }); Thread thread2 = Thread.ofVirtual().unstarted(() -> { names.set("thread2"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread2"); names.remove(); finishLatch.countDown(); }); Thread thread3 = Thread.ofVirtual().unstarted(() -> { names.set("thread3"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread3"); names.remove(); finishLatch.countDown(); }); thread1.start(); thread2.start(); thread3.start(); finishLatch.await(); } }
Thread#ofVirtual
ã§äœæããã¹ã¬ããã®ã€ã³ã¹ã¿ã³ã¹ã¯ãä»®æ³ã¹ã¬ããã«ãªããŸãã
Thread thread = Thread.ofVirtual().start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue();
ãã®æã®ã¹ã¬ããã°ã«ãŒãã®ååã¯ã確ãã«VirtualThreads
ã§ããã
Thread.Builder
ã®äœ¿ãæ¹ã¯ããã©ãããã©ãŒã ã¹ã¬ãããšå€§å·®ãªãã§ããã
@Test void createFromBuilder() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilderUnstarted() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); }
äžæ¹ã§ãä»®æ³ã¹ã¬ããã«å¯ŸããŠã¯ã¹ã¬ããã°ã«ãŒãã¯èšå®ã§ããŸãããThread#ofVirtual
ã§è¿ãããThread.Builder
ã§ã¯ã¹ã¬ããã°ã«ãŒãã
æå®ã§ããŸããã
/* @Test void registerThreadGroup() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // Thread.Builder.OfVirtualã«groupããªã Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } */
Thread.Builder
ãšèšã£ãŠããŸãããå³å¯ã«ã¯ãã©ãããã©ãŒã ã¹ã¬ããã®æã¯Thread.Builder.OfPlatform
ãä»®æ³ã¹ã¬ããã®æã¯
Thread.Builder.OfVirtual
ãè¿ã£ãŠããŠããŸãã
Thread#currentThread
ã¯ãçŸåšã®ä»®æ³ã¹ã¬ããã®ã€ã³ã¹ã¿ã³ã¹ãè¿ããŸãã
@Test void currentThreadIsVirtualThread() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.ofVirtual().name("my-virtual-thread").start(() -> { Thread currentThread = Thread.currentThread(); assertThat(currentThread.getName()).isEqualTo("my-virtual-thread"); assertThat(currentThread.isVirtual()).isTrue(); latch.countDown(); }); latch.await(); }
éããŒã¢ã³ã¹ã¬ããã«å€æŽããããšãããšãäŸå€ãã¹ããŒãããŸãã
@Test void cannotSetNonDaemon() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.isDaemon()).isTrue(); assertThatThrownBy(() -> thread.setDaemon(false)) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage("'false' not legal for virtual threads"); }
å®è£ ç®æã
åªå 床ãå€æŽããããšãããšç¡èŠãããŸãã
@Test void cannotChangePriority() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); thread.setPriority(Thread.MAX_PRIORITY); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); }
å®è£ ç®æã
ThreadLocal
ã䜿ããŸããã
@Test void useThreadLocal() throws InterruptedException { CountDownLatch setLatch = new CountDownLatch(3); CountDownLatch finishLatch = new CountDownLatch(3); ThreadLocal<String> names = new ThreadLocal<>(); Thread thread1 = Thread.ofVirtual().unstarted(() -> { names.set("thread1"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread1"); names.remove(); finishLatch.countDown(); }); Thread thread2 = Thread.ofVirtual().unstarted(() -> { names.set("thread2"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread2"); names.remove(); finishLatch.countDown(); }); Thread thread3 = Thread.ofVirtual().unstarted(() -> { names.set("thread3"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread3"); names.remove(); finishLatch.countDown(); }); thread1.start(); thread2.start(); thread3.start(); finishLatch.await(); }
Executors#newVirtualThreadPerTaskExecutorã䜿ã
æåŸã«ãExecutors#newVirtualThreadPerTaskExecutor
ã䜿ã£ãŠã¿ãŸããéåžžãVirtual Threadsã䜿ãæã¯ãã¡ãã§ã¯ãªãããªãš
æããŸãã
src/test/java/org/littlewings/virtualthreads/ExecutorForVirtualThreadTest.java
package org.littlewings.virtualthreads; import org.junit.jupiter.api.Test; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; class ExecutorForVirtualThreadTest { @Test void virtualThreadPerTask() throws ExecutionException, InterruptedException { AtomicInteger sum = new AtomicInteger(); try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<Integer>> futures = IntStream .rangeClosed(1, 10000) .mapToObj(i -> es.submit(() -> sum.addAndGet(i))) .toList(); for (Future<Integer> f : futures) { f.get(); } } assertThat(sum.get()).isEqualTo(50005000); } }
Executors#newVirtualThreadPerTaskExecutor
ã§ãã¿ã¹ã¯ããšã«ä»®æ³ã¹ã¬ããã®ã€ã³ã¹ã¿ã³ã¹ãçæãããŸãã
èåŸã§äœ¿ãããŠããã®ã¯ãThread#ofVirtual#factory
ã§ããã
public static ExecutorService newVirtualThreadPerTaskExecutor() { ThreadFactory factory = Thread.ofVirtual().factory(); return newThreadPerTaskExecutor(factory); }
ãšããã§ãJava 19ã§ExecutorService
ãAutoCloseable
ã«ãªã£ãããã§ãã
try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) {
ãããã©ãããå®è£ ã«ãªã£ãŠãããæ°ã«ãªããšããã§ããã
ãŸãExecutorService#shutdown
ãåŒã³åºãã·ã£ããããŠã³åŠçãéå§ïŒæ°èŠã®ã¿ã¹ã¯ã¯åä»æåŠïŒããŠãå®è¡äžã®ã¿ã¹ã¯ãããå Žåã¯
1æ¥åŸ
ã€ããã§ãã
@Override default void close() { boolean terminated = isTerminated(); if (!terminated) { shutdown(); boolean interrupted = false; while (!terminated) { try { terminated = awaitTermination(1L, TimeUnit.DAYS); } catch (InterruptedException e) { if (!interrupted) { shutdownNow(); interrupted = true; } } } if (interrupted) { Thread.currentThread().interrupt(); } } }
ä»åã¯ããã®ãããã«ããŠãããŸãã
ãªãã±
Virtual Threadsã®ã¹ã±ãžã¥ãŒã©ãŒã¯ForkJoinPoolã§ããããã®ãã©ã¡ãŒã¿ãŒã¯ã·ã¹ãã ããããã£ã§èª¿æŽå¯èœã ãšããããšã§ãããã
ããã¯ãã®ãããã®ããã§ãã
@SuppressWarnings("removal") private static ForkJoinPool createDefaultScheduler() { ForkJoinWorkerThreadFactory factory = pool -> { PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); return AccessController.doPrivileged(pa); }; PrivilegedAction<ForkJoinPool> pa = () -> { int parallelism, maxPoolSize, minRunnable; String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); if (parallelismValue != null) { parallelism = Integer.parseInt(parallelismValue); } else { parallelism = Runtime.getRuntime().availableProcessors(); } if (maxPoolSizeValue != null) { maxPoolSize = Integer.parseInt(maxPoolSizeValue); parallelism = Integer.min(parallelism, maxPoolSize); } else { maxPoolSize = Integer.max(parallelism, 256); } if (minRunnableValue != null) { minRunnable = Integer.parseInt(minRunnableValue); } else { minRunnable = Integer.max(parallelism / 2, 1); } Thread.UncaughtExceptionHandler handler = (t, e) -> { }; boolean asyncMode = true; // FIFO return new ForkJoinPool(parallelism, factory, handler, asyncMode, 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); }; return AccessController.doPrivileged(pa); }
ãããèŠããšãããã©ã«ãã®äžŠå床ã¯ç¢ºãã«äœ¿çšã§ããããã»ããµãŒæ°ã§ãããã¹ã¬ããããŒã«ã®æ倧å€ã¯ããã©ã«ãã§ã¯
䞊å床ãš256ã®å€§ããæ¹ãæå°å€ã¯äžŠå床ãš1ã®å€§ããæ¹ã®ããã§ãã
ãããã«
Java 21ã§æ£åŒçã«ãªã£ãJEP 444ïŒVirtual ThreadsïŒãè©ŠããŠã¿ãŸããã
ä»åã¯ãšããããã¹ã¬ããã«é¢ããAPIãŸãããèŠãŠãã£ãŠã¿ãŸããããã¹ã¬ãããã³ããªã©ã¯ãŸãæ°ã«ãªããšãããªã®ã§ãããã¯
å¥ã®æ©äŒã«èŠãŠã¿ãããšæããŸãã