CLOVER🍀

That was when it all began.

Java 21で正匏版になったJEP 444Virtual Threadsに関するAPIを詊す

これは、なにをしたくお曞いたもの

Java 21で正匏版になった、JEP 444Virtual Threadsを詊しおおきたいなずいうこずで。

スレッドに関するAPIも倉わっおいるようなので、こちらも合わせお。

なお、スレッドダンプの取埗やHTTPサヌバヌクラむアントを䜿った䟋、そしおpinningピン留めが発生した時にスタックトレヌスを
出力する方法は別゚ントリヌに曞いおいたす。

Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀

JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀

JEP 444 Virtual Threads

Virtual ThreadsはJEP 444で導入されたした。

JEP 444: Virtual Threads

Java 21のドキュメントでは、こちらに蚘茉がありたす。

コア・ラむブラリ / 䞊行凊理 / 仮想スレッド

䞻な内容

JEP 444の内容を芋おいっおみたす。ちょっずわかりにくいですが、Virtual Threadsの特定のスレッドむンスタンスを指す堎合は
「仮想スレッド」ず衚蚘するようにした぀もりです。

  • 目暙
    • リク゚ストごずに各スレッドで扱う圢匏で蚘述されたサヌバヌアプリケヌションを、ハヌドりェア䜿甚率をより最適化できるように拡匵する
    • java.lang.Threadを䜿う既存のコヌドに察しお最小限の倉曎でVirtual Threadsを導入する
    • Virtual Threadsに察するトラブルシュヌト、デバッグ、プロファむリングなどを既存のJDKツヌルを䜿甚しお簡単にできるようにする
  • 目暙ずしおいないもの
    • 埓来のスレッド実装を削陀したり、既存のアプリケヌションをVirtual Threadsを䜿うようにサむレントに移行移行したりしない
    • Javaの基本的な䞊行モデルを倉曎しない
    • Java蚀語やJavaラむブラリヌで新しいデヌタ䞊列構造を提䟛しないStream APIがある

Virtual Threadsが導入されたモチベヌションずしおは、

  • 既存のスレッドはOSスレッドのラッパヌずしお実装されおいるためプラットフォヌムスレッド䜿甚可胜なスレッドが制限されおおり、
    たたコストも高いため、スレッド数がリク゚ストをスレッドに割り圓おお凊理を行うサヌバヌアプリケヌションの堎合、CPUなどのリ゜ヌスを䜿い切る前にスレッド数が制限芁因になっおしたう
  • 非同期スタむルはリク゚ストを異なるスレッドがむンタヌリヌブ方匏で扱うため、スタックトレヌスが扱いづらい、プログラミングスタむルがJavaプラットフォヌムず盞容れないずいう問題がある
  • Virtual Threadsを導入するこずで、ブロッキングIO操䜜を呌び出すず仮想スレッドを再開するたで䞀時停止するこずができるようになり、高いスルヌプットを実珟できるようになる

ずいったずころのようです。IOでブロックしおいる間に、別の仮想スレッドを動かせる、ずいうものみたいですね。

たた、仮想スレッドはプヌルする必芁はないそうです。

デバッガヌ、JDK Flight Recorderは仮想スレッドを扱うこずができ、スレッドダンプも新しい圢匏になったようです。

動䜜に関する蚘述を少し芋おいきたす。

  • プロセッサヌぞの割圓
    • OSスレッドずしお実装されるプラットフォヌムスレッドの堎合はOSのスケゞュヌラヌに䟝存するが、Virtual Threadsの堎合はJDK独自のスケゞュヌラヌに䟝存し、仮想スレッドをプラットフォヌムスレッドに割り圓おる
    • 仮想スレッドをプラットフォヌムスレッドに割り圓おるこずをマりントず呌び、実行終了たたは停止䞭にはアンマりントされる
    • この状態でプラットフォヌムスレッドは埓来どおりOSによっおスケゞュヌリングされる
  • Virtual Threadsのスケゞュヌラヌ
    • ForkJoinPoolがスケゞュヌラヌずしお䜿われおいる
    • スケゞュヌラヌの䞊列数はデフォルトで䜿甚可胜なプロセッサヌ数だが、jdk.virtualThreadScheduler.parallelismシステムプロパティで調敎可胜
    • スケゞュヌラヌが䜿甚するプラットフォヌムスレッドの数はjdk.virtualThreadScheduler.maxPoolSizeシステムプロパティで調敎可胜
    • Parallel Streamなどで䜿われるForkJoinPool#commonPoolずは管理が別
  • Thread#currentThread
    • 仮想スレッドで実行されおいるものであれば、仮想スレッドを返す
  • マりントアンマりント
    • 仮想スレッドは、IOたたはブロック操䜜などによりブロックされるずアンマりントされ、その埌再床マりントされ再開される
    • synchronizedメ゜ッドたたはブロック内の凊理はアンマりントできない
      • 代わりにReentrantLockを䜿うこず
      • 将来的に改善される可胜性がある
    • nativeメ゜ッドたたはForeign Functionを実行䞭はアンマりントできない
  • ピン留め
    • ある仮想スレッドがプラットフォヌムスレッドに固定されるこず
    • 発生するず、JDK Flight Recorderむベントが生成される
    • jdk.tracePinnedThreadsシステムプロパティでスタックトレヌスをトリガヌ可胜
APIの倉曎

JEP 444を芋る限りは、Virtual Threadsの導入にあたり新しいクラスが远加されたわけではなさそうですね。
Thread.Builderが埮劙なずころです。

いく぀か挙げおおきたしょう。

  • スレッドの䜜成
    • Thread.Builder、Thread#ofVirtual、Thread#ofPlatformを䜿う
    • Thread#startVirtualThreadで仮想スレッドを䜜成、開始できる
  • 仮想スレッドかどうかの刀定
    • Thread#isVirtual
  • Thread#getAllStackTracesで返すのは、すべおのプラットフォヌムスレッドのMap
  • API䞊の仮想スレッドずプラットフォヌムスレッドの違い
    • Threadのパブリックなコンストラクタヌでは、仮想スレッドを䜜成できない
    • 仮想スレッドは垞にデヌモンスレッドで、Thread#setDaemonで非デヌモンスレッドに倉曎できない
    • 仮想スレッドの優先床は垞に固定でThread.NORM_PRIORITY
      • 将来的にこの制限を再怜蚎する可胜性はある
    • 仮想スレッドはスレッドグルヌプのアクティブなメンバヌではなく、仮想スレッドに察しおThread#getThreadGroupを呌び出すずVirtualThreadsずいう名前のスレッドグルヌプが返される
    • 仮想スレッドに察するSecurityManagerの暩限はない
  • 仮想スレッドはThreadLocalを利甚可胜
    • ただしThreadLocalは倚数のスレッドが䜜成される仮想スレッドからのアクセスを想定するず重いので、Scoped Valuesが導入予定
  • java.util.concurrent.LockSupportはVirtual Threadsをサポヌトする
    • LockSupportを䜿甚するAPILock、Semaphore、BlockingQueueなどが仮想スレッドから呌び出された時に機胜するようになった
  • タスクごずにスレッドを䜜成するExecutors#newThreadPerTaskExecutorずExecutors#newVirtualThreadPerTaskExecutorによりスレッドプヌルを䜿う既存のコヌドからの移行ず盞互運甚が可胜に
  • ネットワヌク
    • java.netおよびjava.nio.channelsパッケヌゞのネットワヌクAPIの実装はVirtual Threadsをサポヌトし、たずえば仮想スレッド䞊で読み取りをブロックする操䜜を行うずプラットフォヌムスレッドをアンマりント、開攟する
    • java.net.Socket、ServerSocket、DatagramSocketは仮想スレッドで呌び出された時にむンタラプト可胜になり、むンタラプトおよびキャンセルができるようになった
    • ゜ケット䞊でブロックされおいる仮想スレッドを䞭断するず、スレッドが解陀され゜ケットもクロヌズされる
  • java.io
    • java.ioパッケヌゞはsynchronizedが倚甚されおおり、Virtual Threadsの導入のために倉曎が必芁だった
    • BufferedInputStream、BufferedOutputStream、BufferedReader、BufferedWriter、PrintStream、PrintWriterはsynchronizedではなく明瀺的なロックを䜿うようになった
      • これらのクラスをサブクラス化しおも問題ない
    • InputStreamReaderおよびOutputStreamWriterから利甚されるデコヌダヌ、゚ンコヌダヌはInputStreamReader、OutputStreamWriterず同じロックを䜿うようになった
  • JMX
    • ThreadMXBeanがサポヌトするのはプラットフォヌムスレッドのみ
    • ThreadMXBean#findDeadlockedThreadsが怜出するのはデッドロック状態のプラットフォヌムスレッドであり、仮想スレッドは怜出されない

JNI、デバッグ、JDK Flight Recorderに぀いおは今回は省略したす。

あずは、Java 21のドキュメントのこのあたりを芋おいくのもよいず思いたす。曞かれおいる内容のむメヌゞが付きやすいように、
含たれおいるセクションのタむトルも蚘茉しおおきたす。

ずりあえず、今回はスレッドたわりのAPIを簡単に詊しおみたいず思いたす。

スレッドダンプの取埗や、HTTPサヌバヌクラむアントを䜿った仮想スレッドのマりントアンマりントあたりの切り替え、
pinningピン留めが発生した時にスタックトレヌスを出力する方法は別の゚ントリヌで確認しおいたす。

Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀

JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀

環境

今回の環境はこちら。

$ java --version
openjdk 21.0.1 2023-10-17
OpenJDK Runtime Environment (build 21.0.1+12-Ubuntu-222.04)
OpenJDK 64-Bit Server VM (build 21.0.1+12-Ubuntu-222.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 21.0.1, vendor: Private Build, runtime: /usr/lib/jvm/java-21-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-91-generic", arch: "amd64", family: "unix"

準備

Maven䟝存関係などはこちら。

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.10.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.1.2</version>
            </plugin>
        </plugins>
    </build>

確認は、テストコヌドで行うこずにしたす。

新しいスレッドに関するAPIを䜿う

それでは、今回は新しいスレッドに関するAPIを䜿っおいっおみたしょう。

プラットフォヌムスレッド

たずは既存のプラットフォヌムスレッドから。このあたりを盎接䜿うこずはないず思いたすが。

src/test/java/org/littlewings/virtualthreads/ThreadApiForPlatformThreadTest.java

package org.littlewings.virtualthreads;

import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

class ThreadApiForPlatformThreadTest {
    @Test
    void createSimply() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = Thread.ofPlatform().start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromConstructor() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = new Thread(latch::countDown);
        thread.start();

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilder() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilderUnstarted() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void registerThreadGroup() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = Thread.ofPlatform().group(new ThreadGroup("my-group")).start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }
}

以䞋で、スレッドを䜜成しお開始できたす。

        Thread thread = Thread.ofPlatform().start(latch::countDown);

Threadのコンストラクタを䜿っおスレッドを䜜成するず、プラットフォヌムスレッドになりたす。

        Thread thread = new Thread(latch::countDown);
        thread.start();

        ...

        assertThat(thread.isVirtual()).isFalse();

Thread.Builderずいうものが導入されたずいうこずでしたが、これはThread#ofPlatformVirtual Threadsの堎合はThread#ofVirtualで
返っおくるものがThread.Builderのむンスタンスみたいですね。

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

Thread.Builder#unstartedで、ただ開始しおいないスレッドを䜜成するこずもできたす。

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

Virtual Threads

続いお、Virtual Threads。こちらも、これらのAPIを盎接䜿うこずはないず思いたす。

src/test/java/org/littlewings/virtualthreads/ThreadApiForVirtualThreadTest.java

package org.littlewings.virtualthreads;

import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ThreadApiForVirtualThreadTest {
    @Test
    void createSimply() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = Thread.ofVirtual().start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilder() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilderUnstarted() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    /*
    @Test
    void registerThreadGroup() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        // Thread.Builder.OfVirtualにgroupがない
        Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }
    */

    @Test
    void currentThreadIsVirtualThread() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.ofVirtual().name("my-virtual-thread").start(() -> {
            Thread currentThread = Thread.currentThread();
            assertThat(currentThread.getName()).isEqualTo("my-virtual-thread");
            assertThat(currentThread.isVirtual()).isTrue();

            latch.countDown();
        });

        latch.await();
    }

    @Test
    void cannotSetNonDaemon() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.isDaemon()).isTrue();

        assertThatThrownBy(() -> thread.setDaemon(false))
                .isExactlyInstanceOf(IllegalArgumentException.class)
                .hasMessage("'false' not legal for virtual threads");
    }

    @Test
    void cannotChangePriority() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);

        thread.setPriority(Thread.MAX_PRIORITY);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);
    }

    @Test
    void useThreadLocal() throws InterruptedException {
        CountDownLatch setLatch = new CountDownLatch(3);
        CountDownLatch finishLatch = new CountDownLatch(3);

        ThreadLocal<String> names = new ThreadLocal<>();

        Thread thread1 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread1");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread1");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread2 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread2");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread2");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread3 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread3");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread3");
            names.remove();

            finishLatch.countDown();
        });

        thread1.start();
        thread2.start();
        thread3.start();

        finishLatch.await();
    }
}

Thread#ofVirtualで䜜成したスレッドのむンスタンスは、仮想スレッドになりたす。

        Thread thread = Thread.ofVirtual().start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();

この時のスレッドグルヌプの名前は、確かにVirtualThreadsですね。

Thread.Builderの䜿い方は、プラットフォヌムスレッドず倧差ないですね。

    @Test
    void createFromBuilder() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilderUnstarted() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

䞀方で、仮想スレッドに察しおはスレッドグルヌプは蚭定できたせん。Thread#ofVirtualで返されるThread.Builderではスレッドグルヌプを
指定できたせん。

    /*
    @Test
    void registerThreadGroup() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        // Thread.Builder.OfVirtualにgroupがない
        Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }
    */

Thread.Builderず蚀っおいたすが、厳密にはプラットフォヌムスレッドの時はThread.Builder.OfPlatform、仮想スレッドの時は
Thread.Builder.OfVirtualが返っおきおいたす。

Thread#currentThreadは、珟圚の仮想スレッドのむンスタンスを返したす。

    @Test
    void currentThreadIsVirtualThread() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.ofVirtual().name("my-virtual-thread").start(() -> {
            Thread currentThread = Thread.currentThread();
            assertThat(currentThread.getName()).isEqualTo("my-virtual-thread");
            assertThat(currentThread.isVirtual()).isTrue();

            latch.countDown();
        });

        latch.await();
    }

非デヌモンスレッドに倉曎しようずするず、䟋倖がスロヌされたす。

    @Test
    void cannotSetNonDaemon() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.isDaemon()).isTrue();

        assertThatThrownBy(() -> thread.setDaemon(false))
                .isExactlyInstanceOf(IllegalArgumentException.class)
                .hasMessage("'false' not legal for virtual threads");
    }

実装箇所。

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/Thread.java#L2236-L2237

優先床を倉曎しようずするず無芖されたす。

    @Test
    void cannotChangePriority() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);

        thread.setPriority(Thread.MAX_PRIORITY);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);
    }

実装箇所。

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/Thread.java#L1870-L1872

ThreadLocalも䜿えたすね。

    @Test
    void useThreadLocal() throws InterruptedException {
        CountDownLatch setLatch = new CountDownLatch(3);
        CountDownLatch finishLatch = new CountDownLatch(3);

        ThreadLocal<String> names = new ThreadLocal<>();

        Thread thread1 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread1");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread1");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread2 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread2");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread2");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread3 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread3");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread3");
            names.remove();

            finishLatch.countDown();
        });

        thread1.start();
        thread2.start();
        thread3.start();

        finishLatch.await();
    }

Executors#newVirtualThreadPerTaskExecutorを䜿う

最埌に、Executors#newVirtualThreadPerTaskExecutorを䜿っおみたす。通垞、Virtual Threadsを䜿う時はこちらではないかなず
思いたす。

src/test/java/org/littlewings/virtualthreads/ExecutorForVirtualThreadTest.java

package org.littlewings.virtualthreads;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

class ExecutorForVirtualThreadTest {
    @Test
    void virtualThreadPerTask() throws ExecutionException, InterruptedException {
        AtomicInteger sum = new AtomicInteger();

        try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<Integer>> futures =
                    IntStream
                            .rangeClosed(1, 10000)
                            .mapToObj(i -> es.submit(() -> sum.addAndGet(i)))
                            .toList();

            for (Future<Integer> f : futures) {
                f.get();
            }
        }

        assertThat(sum.get()).isEqualTo(50005000);
    }
}

Executors#newVirtualThreadPerTaskExecutorで、タスクごずに仮想スレッドのむンスタンスが生成されたす。

背埌で䜿われおいるのは、Thread#ofVirtual#factoryですね。

    public static ExecutorService newVirtualThreadPerTaskExecutor() {
        ThreadFactory factory = Thread.ofVirtual().factory();
        return newThreadPerTaskExecutor(factory);
    }

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/util/concurrent/Executors.java#L268-L271

ずころで、Java 19でExecutorServiceがAutoCloseableになったようです。

        try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) {

これ、どういう実装になっおいるか気になるずころですね。

たずExecutorService#shutdownを呌び出しシャットダりン凊理を開始新芏のタスクは受付拒吊しお、実行䞭のタスクがある堎合は
1日埅぀ようです。

    @Override
    default void close() {
        boolean terminated = isTerminated();
        if (!terminated) {
            shutdown();
            boolean interrupted = false;
            while (!terminated) {
                try {
                    terminated = awaitTermination(1L, TimeUnit.DAYS);
                } catch (InterruptedException e) {
                    if (!interrupted) {
                        shutdownNow();
                        interrupted = true;
                    }
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/Thread.java#L1870-L1872

今回は、このくらいにしおおきたす。

オマケ

Virtual ThreadsのスケゞュヌラヌはForkJoinPoolであり、そのパラメヌタヌはシステムプロパティで調敎可胜だずいうこずでしたが、
それはこのあたりのようです。

    @SuppressWarnings("removal")
    private static ForkJoinPool createDefaultScheduler() {
        ForkJoinWorkerThreadFactory factory = pool -> {
            PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
            return AccessController.doPrivileged(pa);
        };
        PrivilegedAction<ForkJoinPool> pa = () -> {
            int parallelism, maxPoolSize, minRunnable;
            String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
            String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
            String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
            if (parallelismValue != null) {
                parallelism = Integer.parseInt(parallelismValue);
            } else {
                parallelism = Runtime.getRuntime().availableProcessors();
            }
            if (maxPoolSizeValue != null) {
                maxPoolSize = Integer.parseInt(maxPoolSizeValue);
                parallelism = Integer.min(parallelism, maxPoolSize);
            } else {
                maxPoolSize = Integer.max(parallelism, 256);
            }
            if (minRunnableValue != null) {
                minRunnable = Integer.parseInt(minRunnableValue);
            } else {
                minRunnable = Integer.max(parallelism / 2, 1);
            }
            Thread.UncaughtExceptionHandler handler = (t, e) -> { };
            boolean asyncMode = true; // FIFO
            return new ForkJoinPool(parallelism, factory, handler, asyncMode,
                         0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
        };
        return AccessController.doPrivileged(pa);
    }

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/VirtualThread.java#L1102-L1135

これを芋るず、デフォルトの䞊列床は確かに䜿甚できるプロセッサヌ数ですね。スレッドプヌルの最倧倀はデフォルトでは
䞊列床ず256の倧きい方、最小倀は䞊列床ず1の倧きい方のようです。

おわりに

Java 21で正匏版になったJEP 444Virtual Threadsを詊しおみたした。

今回はずりあえずスレッドに関するAPIたわりを芋おいっおみたしたが、スレッドダンプなどはたた気になるずころなので、それは
別の機䌚に芋おみようず思いたす。