CLOVER🍀

That was when it all began.

Java 21で正式版になったJEP 444(Virtual Threads)に関するAPIを試す

これは、なにをしたくて書いたもの?

Java 21で正式版になった、JEP 444(Virtual Threads)を試しておきたいなということで。

スレッドに関するAPIも変わっているようなので、こちらも合わせて。

なお、スレッドダンプの取得やHTTPサーバー/クライアントを使った例、そしてpinning(ピン留め)が発生した時にスタックトレースを
出力する方法は別エントリーに書いています。

Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀

JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀

JEP 444: Virtual Threads

Virtual ThreadsはJEP 444で導入されました。

JEP 444: Virtual Threads

Java 21のドキュメントでは、こちらに記載があります。

コア・ライブラリ / 並行処理 / 仮想スレッド

主な内容

JEP 444の内容を見ていってみます。ちょっとわかりにくいですが、Virtual Threadsの特定のスレッドインスタンスを指す場合は
「仮想スレッド」と表記するようにしたつもりです。

  • 目標
    • リクエストごとに各スレッドで扱う形式で記述されたサーバーアプリケーションを、ハードウェア使用率をより最適化できるように拡張する
    • java.lang.Threadを使う既存のコードに対して最小限の変更でVirtual Threadsを導入する
    • Virtual Threadsに対するトラブルシュート、デバッグ、プロファイリングなどを既存のJDKツールを使用して簡単にできるようにする
  • 目標としていないもの
    • 従来のスレッド実装を削除したり、既存のアプリケーションをVirtual Threadsを使うようにサイレントに移行移行したりしない
    • Javaの基本的な並行モデルを変更しない
    • Java言語やJavaライブラリーで新しいデータ並列構造を提供しない(Stream APIがある)

Virtual Threadsが導入されたモチベーションとしては、

  • 既存のスレッドはOSスレッドのラッパーとして実装されているため(プラットフォームスレッド)使用可能なスレッドが制限されており、
    またコストも高いため、スレッド数がリクエストをスレッドに割り当てて処理を行うサーバーアプリケーションの場合、CPUなどのリソースを使い切る前にスレッド数が制限要因になってしまう
  • 非同期スタイルはリクエストを異なるスレッドがインターリーブ方式で扱うため、スタックトレースが扱いづらい、プログラミングスタイルがJavaプラットフォームと相容れないという問題がある
  • Virtual Threadsを導入することで、ブロッキングIO操作を呼び出すと仮想スレッドを再開するまで一時停止することができるようになり、高いスループットを実現できるようになる

といったところのようです。IOでブロックしている間に、別の仮想スレッドを動かせる、というものみたいですね。

また、仮想スレッドはプールする必要はないそうです。

デバッガー、JDK Flight Recorderは仮想スレッドを扱うことができ、スレッドダンプも新しい形式になったようです。

動作に関する記述を少し見ていきます。

  • プロセッサーへの割当
    • OSスレッドとして実装されるプラットフォームスレッドの場合はOSのスケジューラーに依存するが、Virtual Threadsの場合はJDK独自のスケジューラーに依存し、仮想スレッドをプラットフォームスレッドに割り当てる
    • 仮想スレッドをプラットフォームスレッドに割り当てることをマウントと呼び、実行終了または停止中にはアンマウントされる
    • この状態でプラットフォームスレッドは従来どおりOSによってスケジューリングされる
  • Virtual Threadsのスケジューラー
    • ForkJoinPoolがスケジューラーとして使われている
    • スケジューラーの並列数はデフォルトで使用可能なプロセッサー数だが、jdk.virtualThreadScheduler.parallelismシステムプロパティで調整可能
    • スケジューラーが使用するプラットフォームスレッドの数はjdk.virtualThreadScheduler.maxPoolSizeシステムプロパティで調整可能
    • Parallel Streamなどで使われるForkJoinPool#commonPoolとは管理が別
  • Thread#currentThread
    • 仮想スレッドで実行されているものであれば、仮想スレッドを返す
  • マウント/アンマウント
    • 仮想スレッドは、IOまたはブロック操作などによりブロックされるとアンマウントされ、その後再度マウントされ再開される
    • synchronizedメソッドまたはブロック内の処理はアンマウントできない
      • 代わりにReentrantLockを使うこと
      • 将来的に改善される可能性がある
    • nativeメソッドまたはForeign Functionを実行中はアンマウントできない
  • ピン留め
    • ある仮想スレッドがプラットフォームスレッドに固定されること
    • 発生すると、JDK Flight Recorderイベントが生成される
    • jdk.tracePinnedThreadsシステムプロパティでスタックトレースをトリガー可能
APIの変更

JEP 444を見る限りは、Virtual Threadsの導入にあたり新しいクラスが追加されたわけではなさそうですね。
Thread.Builderが微妙なところです。

いくつか挙げておきましょう。

  • スレッドの作成
    • Thread.Builder、Thread#ofVirtual、Thread#ofPlatformを使う
    • Thread#startVirtualThreadで仮想スレッドを作成、開始できる
  • 仮想スレッドかどうかの判定
    • Thread#isVirtual
  • Thread#getAllStackTracesで返すのは、すべてのプラットフォームスレッドのMap
  • API上の仮想スレッドとプラットフォームスレッドの違い
    • Threadのパブリックなコンストラクターでは、仮想スレッドを作成できない
    • 仮想スレッドは常にデーモンスレッドで、Thread#setDaemonで非デーモンスレッドに変更できない
    • 仮想スレッドの優先度は常に固定でThread.NORM_PRIORITY
      • 将来的にこの制限を再検討する可能性はある
    • 仮想スレッドはスレッドグループのアクティブなメンバーではなく、仮想スレッドに対してThread#getThreadGroupを呼び出すとVirtualThreadsという名前のスレッドグループが返される
    • 仮想スレッドに対するSecurityManagerの権限はない
  • 仮想スレッドはThreadLocalを利用可能
    • ただしThreadLocalは多数のスレッドが作成される仮想スレッドからのアクセスを想定すると重いので、Scoped Valuesが導入予定
  • java.util.concurrent.LockSupportはVirtual Threadsをサポートする
    • LockSupportを使用するAPI(Lock、Semaphore、BlockingQueueなど)が仮想スレッドから呼び出された時に機能するようになった
  • タスクごとにスレッドを作成するExecutors#newThreadPerTaskExecutorとExecutors#newVirtualThreadPerTaskExecutorによりスレッドプールを使う既存のコードからの移行と相互運用が可能に
  • ネットワーク
    • java.netおよびjava.nio.channelsパッケージのネットワークAPIの実装はVirtual Threadsをサポートし、たとえば仮想スレッド上で読み取りをブロックする操作を行うとプラットフォームスレッドをアンマウント、開放する
    • java.net.Socket、ServerSocket、DatagramSocketは仮想スレッドで呼び出された時にインタラプト可能になり、インタラプトおよびキャンセルができるようになった
    • ソケット上でブロックされている仮想スレッドを中断すると、スレッドが解除されソケットもクローズされる
  • java.io
    • java.ioパッケージはsynchronizedが多用されており、Virtual Threadsの導入のために変更が必要だった
    • BufferedInputStream、BufferedOutputStream、BufferedReader、BufferedWriter、PrintStream、PrintWriterはsynchronizedではなく明示的なロックを使うようになった
      • これらのクラスをサブクラス化しても問題ない
    • InputStreamReaderおよびOutputStreamWriterから利用されるデコーダー、エンコーダーはInputStreamReader、OutputStreamWriterと同じロックを使うようになった
  • JMX
    • ThreadMXBeanがサポートするのはプラットフォームスレッドのみ
    • ThreadMXBean#findDeadlockedThreadsが検出するのはデッドロック状態のプラットフォームスレッドであり、仮想スレッドは検出されない

JNI、デバッグ、JDK Flight Recorderについては今回は省略します。

あとは、Java 21のドキュメントのこのあたりを見ていくのもよいと思います。書かれている内容のイメージが付きやすいように、
含まれているセクションのタイトルも記載しておきます。

とりあえず、今回はスレッドまわりのAPIを簡単に試してみたいと思います。

スレッドダンプの取得や、HTTPサーバー/クライアントを使った仮想スレッドのマウント/アンマウントあたりの切り替え、
pinning(ピン留め)が発生した時にスタックトレースを出力する方法は別のエントリーで確認しています。

Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀

JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀

環境

今回の環境はこちら。

$ java --version
openjdk 21.0.1 2023-10-17
OpenJDK Runtime Environment (build 21.0.1+12-Ubuntu-222.04)
OpenJDK 64-Bit Server VM (build 21.0.1+12-Ubuntu-222.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 21.0.1, vendor: Private Build, runtime: /usr/lib/jvm/java-21-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-91-generic", arch: "amd64", family: "unix"

準備

Maven依存関係などはこちら。

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.10.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.1.2</version>
            </plugin>
        </plugins>
    </build>

確認は、テストコードで行うことにします。

新しいスレッドに関するAPIを使う

それでは、今回は新しいスレッドに関するAPIを使っていってみましょう。

プラットフォームスレッド

まずは既存のプラットフォームスレッドから。このあたりを直接使うことはないと思いますが。

src/test/java/org/littlewings/virtualthreads/ThreadApiForPlatformThreadTest.java

package org.littlewings.virtualthreads;

import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

class ThreadApiForPlatformThreadTest {
    @Test
    void createSimply() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = Thread.ofPlatform().start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromConstructor() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = new Thread(latch::countDown);
        thread.start();

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilder() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilderUnstarted() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void registerThreadGroup() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = Thread.ofPlatform().group(new ThreadGroup("my-group")).start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }
}

以下で、スレッドを作成して開始できます。

        Thread thread = Thread.ofPlatform().start(latch::countDown);

Threadのコンストラクタを使ってスレッドを作成すると、プラットフォームスレッドになります。

        Thread thread = new Thread(latch::countDown);
        thread.start();

        ...

        assertThat(thread.isVirtual()).isFalse();

Thread.Builderというものが導入されたということでしたが、これはThread#ofPlatform(Virtual Threadsの場合はThread#ofVirtual)で
返ってくるものがThread.Builderのインスタンスみたいですね。

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

Thread.Builder#unstartedで、まだ開始していないスレッドを作成することもできます。

        Thread.Builder builder = Thread.ofPlatform();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("main");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

Virtual Threads

続いて、Virtual Threads。こちらも、これらのAPIを直接使うことはないと思います。

src/test/java/org/littlewings/virtualthreads/ThreadApiForVirtualThreadTest.java

package org.littlewings.virtualthreads;

import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ThreadApiForVirtualThreadTest {
    @Test
    void createSimply() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = Thread.ofVirtual().start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilder() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilderUnstarted() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    /*
    @Test
    void registerThreadGroup() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        // Thread.Builder.OfVirtualにgroupがない
        Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }
    */

    @Test
    void currentThreadIsVirtualThread() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.ofVirtual().name("my-virtual-thread").start(() -> {
            Thread currentThread = Thread.currentThread();
            assertThat(currentThread.getName()).isEqualTo("my-virtual-thread");
            assertThat(currentThread.isVirtual()).isTrue();

            latch.countDown();
        });

        latch.await();
    }

    @Test
    void cannotSetNonDaemon() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.isDaemon()).isTrue();

        assertThatThrownBy(() -> thread.setDaemon(false))
                .isExactlyInstanceOf(IllegalArgumentException.class)
                .hasMessage("'false' not legal for virtual threads");
    }

    @Test
    void cannotChangePriority() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);

        thread.setPriority(Thread.MAX_PRIORITY);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);
    }

    @Test
    void useThreadLocal() throws InterruptedException {
        CountDownLatch setLatch = new CountDownLatch(3);
        CountDownLatch finishLatch = new CountDownLatch(3);

        ThreadLocal<String> names = new ThreadLocal<>();

        Thread thread1 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread1");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread1");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread2 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread2");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread2");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread3 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread3");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread3");
            names.remove();

            finishLatch.countDown();
        });

        thread1.start();
        thread2.start();
        thread3.start();

        finishLatch.await();
    }
}

Thread#ofVirtualで作成したスレッドのインスタンスは、仮想スレッドになります。

        Thread thread = Thread.ofVirtual().start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();

この時のスレッドグループの名前は、確かにVirtualThreadsですね。

Thread.Builderの使い方は、プラットフォームスレッドと大差ないですね。

    @Test
    void createFromBuilder() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");
        Thread thread = builder.start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

    @Test
    void createFromBuilderUnstarted() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        builder.name("my-thread");

        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");

        thread.start();
        assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isTrue();
        assertThat(thread.getThreadGroup()).isNull();
    }

一方で、仮想スレッドに対してはスレッドグループは設定できません。Thread#ofVirtualで返されるThread.Builderではスレッドグループを
指定できません。

    /*
    @Test
    void registerThreadGroup() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        // Thread.Builder.OfVirtualにgroupがない
        Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown);

        assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group");

        latch.await();
        TimeUnit.MILLISECONDS.sleep(300L);

        assertThat(thread.isVirtual()).isFalse();
        assertThat(thread.getThreadGroup()).isNull();
    }
    */

Thread.Builderと言っていますが、厳密にはプラットフォームスレッドの時はThread.Builder.OfPlatform、仮想スレッドの時は
Thread.Builder.OfVirtualが返ってきています。

Thread#currentThreadは、現在の仮想スレッドのインスタンスを返します。

    @Test
    void currentThreadIsVirtualThread() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.ofVirtual().name("my-virtual-thread").start(() -> {
            Thread currentThread = Thread.currentThread();
            assertThat(currentThread.getName()).isEqualTo("my-virtual-thread");
            assertThat(currentThread.isVirtual()).isTrue();

            latch.countDown();
        });

        latch.await();
    }

非デーモンスレッドに変更しようとすると、例外がスローされます。

    @Test
    void cannotSetNonDaemon() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.isDaemon()).isTrue();

        assertThatThrownBy(() -> thread.setDaemon(false))
                .isExactlyInstanceOf(IllegalArgumentException.class)
                .hasMessage("'false' not legal for virtual threads");
    }

実装箇所。

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/Thread.java#L2236-L2237

優先度を変更しようとすると無視されます。

    @Test
    void cannotChangePriority() {
        CountDownLatch latch = new CountDownLatch(1);

        Thread.Builder builder = Thread.ofVirtual();
        Thread thread = builder.unstarted(latch::countDown);
        assertThat(thread.getState()).isEqualTo(Thread.State.NEW);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);

        thread.setPriority(Thread.MAX_PRIORITY);

        assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);
    }

実装箇所。

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/Thread.java#L1870-L1872

ThreadLocalも使えますね。

    @Test
    void useThreadLocal() throws InterruptedException {
        CountDownLatch setLatch = new CountDownLatch(3);
        CountDownLatch finishLatch = new CountDownLatch(3);

        ThreadLocal<String> names = new ThreadLocal<>();

        Thread thread1 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread1");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread1");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread2 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread2");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread2");
            names.remove();

            finishLatch.countDown();
        });

        Thread thread3 = Thread.ofVirtual().unstarted(() -> {
            names.set("thread3");

            setLatch.countDown();
            try {
                setLatch.await();
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(names.get()).isEqualTo("thread3");
            names.remove();

            finishLatch.countDown();
        });

        thread1.start();
        thread2.start();
        thread3.start();

        finishLatch.await();
    }

Executors#newVirtualThreadPerTaskExecutorを使う

最後に、Executors#newVirtualThreadPerTaskExecutorを使ってみます。通常、Virtual Threadsを使う時はこちらではないかなと
思います。

src/test/java/org/littlewings/virtualthreads/ExecutorForVirtualThreadTest.java

package org.littlewings.virtualthreads;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

class ExecutorForVirtualThreadTest {
    @Test
    void virtualThreadPerTask() throws ExecutionException, InterruptedException {
        AtomicInteger sum = new AtomicInteger();

        try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<Integer>> futures =
                    IntStream
                            .rangeClosed(1, 10000)
                            .mapToObj(i -> es.submit(() -> sum.addAndGet(i)))
                            .toList();

            for (Future<Integer> f : futures) {
                f.get();
            }
        }

        assertThat(sum.get()).isEqualTo(50005000);
    }
}

Executors#newVirtualThreadPerTaskExecutorで、タスクごとに仮想スレッドのインスタンスが生成されます。

背後で使われているのは、Thread#ofVirtual#factoryですね。

    public static ExecutorService newVirtualThreadPerTaskExecutor() {
        ThreadFactory factory = Thread.ofVirtual().factory();
        return newThreadPerTaskExecutor(factory);
    }

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/util/concurrent/Executors.java#L268-L271

ところで、Java 19でExecutorServiceがAutoCloseableになったようです。

        try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) {

これ、どういう実装になっているか気になるところですね。

まずExecutorService#shutdownを呼び出しシャットダウン処理を開始(新規のタスクは受付拒否)して、実行中のタスクがある場合は
1日待つようです。

    @Override
    default void close() {
        boolean terminated = isTerminated();
        if (!terminated) {
            shutdown();
            boolean interrupted = false;
            while (!terminated) {
                try {
                    terminated = awaitTermination(1L, TimeUnit.DAYS);
                } catch (InterruptedException e) {
                    if (!interrupted) {
                        shutdownNow();
                        interrupted = true;
                    }
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/Thread.java#L1870-L1872

今回は、このくらいにしておきます。

オマケ

Virtual ThreadsのスケジューラーはForkJoinPoolであり、そのパラメーターはシステムプロパティで調整可能だということでしたが、
それはこのあたりのようです。

    @SuppressWarnings("removal")
    private static ForkJoinPool createDefaultScheduler() {
        ForkJoinWorkerThreadFactory factory = pool -> {
            PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
            return AccessController.doPrivileged(pa);
        };
        PrivilegedAction<ForkJoinPool> pa = () -> {
            int parallelism, maxPoolSize, minRunnable;
            String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
            String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
            String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
            if (parallelismValue != null) {
                parallelism = Integer.parseInt(parallelismValue);
            } else {
                parallelism = Runtime.getRuntime().availableProcessors();
            }
            if (maxPoolSizeValue != null) {
                maxPoolSize = Integer.parseInt(maxPoolSizeValue);
                parallelism = Integer.min(parallelism, maxPoolSize);
            } else {
                maxPoolSize = Integer.max(parallelism, 256);
            }
            if (minRunnableValue != null) {
                minRunnable = Integer.parseInt(minRunnableValue);
            } else {
                minRunnable = Integer.max(parallelism / 2, 1);
            }
            Thread.UncaughtExceptionHandler handler = (t, e) -> { };
            boolean asyncMode = true; // FIFO
            return new ForkJoinPool(parallelism, factory, handler, asyncMode,
                         0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
        };
        return AccessController.doPrivileged(pa);
    }

https://github.com/openjdk/jdk21u/blob/jdk-21.0.1%2B12/src/java.base/share/classes/java/lang/VirtualThread.java#L1102-L1135

これを見ると、デフォルトの並列度は確かに使用できるプロセッサー数ですね。スレッドプールの最大値はデフォルトでは
並列度と256の大きい方、最小値は並列度と1の大きい方のようです。

おわりに

Java 21で正式版になったJEP 444(Virtual Threads)を試してみました。

今回はとりあえずスレッドに関するAPIまわりを見ていってみましたが、スレッドダンプなどはまた気になるところなので、それは
別の機会に見てみようと思います。