CLOVER🍀

That was when it all began.

Infinispan 15.0でVirtual Threadsへの対応が入っていたという話

これは、なにをしたくて書いたもの?

Infinispanのソースコードを眺めていたら、Virtual Threadsへの対応が入っている雰囲気を見つけたので少し試してみることにしました。

Infinispan 15.0のVirtual Threadsへの対応

今回の内容は、特にInfinispanのブログなどにも登場していません。

強いて言えば、Infinispan 15.0.0.Dev01のリリース時に「Virtual Threadsへの対応を含む、JDK 21のサポート」というのが登場していますが。

Java 17 and 21
Starting with Infinispan 15, we will baseline on JDK 17 and also offer support for JDK 21, including virtual threads.

Infinispan 15.0.0.Dev01

では、Infinispan 15.0でのVirtual Threadsへの対応のissue、Pull Requestを紹介します。

まずは以下のissue/Pull Requestで、システムプロパティorg.infinispan.threads.virtualtrueにするとInfinispanが作成するスレッドを
Virtual Threadsに切り替えられるようになりました。

ISPN-14690 / Rework virtual thread detection and make it optional

Rework virtual threads to work with JDK 21 and make it optional by tristantarrant · Pull Request #10741 · infinispan/infinispan · GitHub

次に、以下のissue/Pull Requestでシステムプロパティorg.infinispan.threads.virtualまたはjgroups.thread.virtualtrueにすると
JGroupsでもVirtual Threadsを使うようになります。

ISPN-15304 / Virtual Threads improvements

ISPN-15304 Virtual Threads improvements by pruivo · Pull Request #11334 · infinispan/infinispan · GitHub

というか、JGroupsもVirtual Threadsに対応していたんですね。

なお、Infinispan内部で使用しているNettyに関するスレッドにはVirtual Threadsを使うようになっていません。

Virtual Threads are disabled for Netty, they are not supported at the moment.

つまり、システムプロパティorg.infinispan.threads.virtualtrueにするとInfinispanとJGroupsのスレッドプールで使うスレッドを
Virtual Threadsに、jgroups.thread.virtualtrueにするとJGroupsが使うスレッドプールのみをVirtual Threadsにすることができます。
デフォルトはどちらもfalseです。

ソースコードとしては、このあたりですね。

   private static final org.infinispan.commons.spi.ThreadCreator INSTANCE = getInstance();

   public static boolean useVirtualThreads() {
      return Boolean.getBoolean("org.infinispan.threads.virtual");
   }

   private static org.infinispan.commons.spi.ThreadCreator getInstance() {
      try {
         if (useVirtualThreads()) {
            org.infinispan.commons.spi.ThreadCreator instance = Util.getInstance("org.infinispan.commons.jdk21.ThreadCreatorImpl", ThreadCreator.class.getClassLoader());
            Log.CONTAINER.infof("Virtual threads support enabled");
            return instance;
         }
      } catch (Throwable t) {
         Log.CONTAINER.debugf("Could not initialize virtual threads", t);
      }
      return new ThreadCreatorImpl();
   }

https://github.com/infinispan/infinispan/blob/15.0.4.Final/commons/all/src/main/java/org/infinispan/commons/jdkspecific/ThreadCreator.java#L14-L31

スレッドやExecutorServiceを作成するクラスはこちら。

https://github.com/infinispan/infinispan/blob/15.0.4.Final/commons/jdk21/src/main/java/org/infinispan/commons/jdk21/ThreadCreatorImpl.java

JGroupsの設定ファイルは、TCPUDPを載せておきます。

   <TCP bind_addr="${jgroups.bind.address,jgroups.tcp.address:SITE_LOCAL}"
        bind_port="${jgroups.bind.port,jgroups.tcp.port:7800}"
        diag.enabled="${jgroups.diag.enabled:false}"
        thread_naming_pattern="pl"
        send_buf_size="640k"
        sock_conn_timeout="300"
        linger="${jgroups.tcp.linger:-1}"
        bundler_type="${jgroups.bundler.type:transfer-queue}"
        bundler.max_size="${jgroups.bundler.max_size:64000}"
        non_blocking_sends="${jgroups.non_blocking_sends:false}"

        thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
        thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
        thread_pool.keep_alive_time="60000"

        thread_pool.thread_dumps_threshold="${jgroups.thread_dumps_threshold:10000}"
        use_virtual_threads="${jgroups.thread.virtual,org.infinispan.threads.virtual:false}"
   />

https://github.com/infinispan/infinispan/blob/15.0.4.Final/core/src/main/resources/default-configs/default-jgroups-tcp.xml#L21

   <UDP bind_addr="${jgroups.bind.address,jgroups.udp.address:SITE_LOCAL}"
        bind_port="${jgroups.bind.port,jgroups.udp.port:0}"
        mcast_addr="${jgroups.mcast_addr:239.6.7.8}"
        mcast_port="${jgroups.mcast_port:46655}"
        tos="0"
        ucast_send_buf_size="1m"
        mcast_send_buf_size="1m"
        ucast_recv_buf_size="20m"
        mcast_recv_buf_size="25m"
        ip_ttl="${jgroups.ip_ttl:2}"
        thread_naming_pattern="pl"
        diag.enabled="${jgroups.diag.enabled:false}"
        bundler_type="${jgroups.bundler.type:transfer-queue}"
        bundler.max_size="${jgroups.bundler.max_size:64000}"

        thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
        thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
        thread_pool.keep_alive_time="60000"

        thread_pool.thread_dumps_threshold="${jgroups.thread_dumps_threshold:10000}"
        use_virtual_threads="${jgroups.thread.virtual,org.infinispan.threads.virtual:false}"
   />

https://github.com/infinispan/infinispan/blob/15.0.4.Final/core/src/main/resources/default-configs/default-jgroups-udp.xml#L25

この部分ですね。

        use_virtual_threads="${jgroups.thread.virtual,org.infinispan.threads.virtual:false}"

他の設定ファイルの場合は、以下のディレクトリ内のファイルを参照してください。

https://github.com/infinispan/infinispan/tree/15.0.4.Final/core/src/main/resources/default-configs

Nettyに対してVirtual Threadsを無効にしているのはこちら。

      if (threadFactory instanceof DefaultThreadFactory) {
         //not supported at the moment
         ((DefaultThreadFactory) threadFactory).useVirtualThread(false);
      }

https://github.com/infinispan/infinispan/blob/15.0.4.Final/server/core/src/main/java/org/infinispan/server/core/factories/NettyEventLoopFactory.java#L33-L36

対象はInfinispan Serverのようですね。

今回は、Virtual Threadsへ対応している動作などを細かく見るつもりはないのですが、せっかく見つけたのでメモを兼ねてということと、
Virtual Threadsを有効にするとログが出力されるようなのでそれをもって確認にしたいと思います。

環境

今回の環境は、こちら。

$ java --version
openjdk 21.0.2 2024-01-16
OpenJDK Runtime Environment (build 21.0.2+13-Ubuntu-122.04.1)
OpenJDK 64-Bit Server VM (build 21.0.2+13-Ubuntu-122.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.7 (8b094c9513efc1b9ce2d952b3b9c8eaedaf8cbf0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 21.0.2, vendor: Private Build, runtime: /usr/lib/jvm/java-21-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-107-generic", arch: "amd64", family: "unix"

こちらは、Hot Rod Clientを使ったプログラムを書くのに使用します。

Infinispan Server。

$ java --version
openjdk 21.0.3 2024-04-16 LTS
OpenJDK Runtime Environment Temurin-21.0.3+9 (build 21.0.3+9-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.3+9 (build 21.0.3+9-LTS, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 15.0.4.Final (I'm Still Standing)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

Infinispan ServerをVirtual Threadsを有効にして起動してみる

まずはInfinispan Serverを起動して確認します。

以下の流れでやってみたいと思います。

  • ユーザー作成
  • デフォルト設定でInfinispan Serverを起動、キャッシュ作成および操作
  • スレッドダンプを取得、確認
  • Infinispan Serverを停止
  • -Dorg.infinispan.threads.virtual=trueを指定してInfinispan Serverを起動、キャッシュ操作
    • キャッシュ定義は最初のものを利用
  • スレッドダンプを取得、確認
  • Infinispan Serverを停止
  • -Djgroups.thread.virtual=trueを指定してInfinispan Serverを起動、キャッシュ操作
  • スレッドダンプを取得、確認
  • Infinispan Serverを停止

Infninispan Serverは3ノードのクラスター構成とします。少し処理を動かした方がいいと思うので、キャッシュ操作まで行ってから
スレッドダンプを取得し、その中にVirtual Threadsが含まれているかどうかを見てみたいと思います。

全ノードでユーザーの作成。管理者グループとアプリケーショングループにそれぞれユーザーを作成しますが、ここでは管理者グループの
ユーザーを使用します。アプリケーション用のユーザーは、Hot Rod Clientからの接続時に利用することにします。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

Infinispan Serverを起動。この時のJGroupsのプロトコルスタックは、Infinispan ServerのデフォルトのTCPです。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i)

Infinispan Serverのうちの1ノードに接続してキャッシュを定義。

$ bin/cli.sh -c http://ispn-admin:password@localhost:11222


> create cache myCache --template=org.infinispan.DIST_SYNC
> describe caches/myCache
{
  "myCache" : {
    "distributed-cache" : {
      "configuration" : "org.infinispan.LOCAL",
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}


> put --cache=myCache key1 value1
> get --cache=myCache key1
value1

この時のスレッドダンプを取得してみます。

$ jcmd $(jcmd -l | grep org.infinispan.server.Bootstrap | cut -d' ' -f1) Thread.dump_to_file -format=plain -overwrite thread_dump.txt

Virtual Threadsがあるかどうか確認。もちろん含まれていません。

$ grep 'virtual$' thread_dump.txt

1度Infinispan Serverを停止して、-Dorg.infinispan.threads.virtual=trueを付与して起動してみます。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i) \
    -Dorg.infinispan.threads.virtual=true

すると、起動時にこんなログが現れます。Virtual Threadsが有効になったようです。

2024-05-28 15:22:41,416 INFO  [o.i.CONTAINER] Virtual threads support enabled

Infinispan Serverに接続して、キャッシュを操作します。

$ bin/cli.sh -c http://ispn-admin:password@localhost:11222


> put --cache=myCache key1 value1
> get --cache=myCache key1
value1

スレッドダンプを取得します。

$ jcmd $(jcmd -l | grep org.infinispan.server.Bootstrap | cut -d' ' -f1) Thread.dump_to_file -format=plain -overwrite thread_dump.txt

Virtual Threadsがあるかどうか確認。

$ grep 'virtual$' thread_dump.txt
#32 "TcpServer.Acceptor[7800]-1,d16f4816172a-48647" virtual
#33 "Timer runner-2,d16f4816172a-48647" virtual
#35 "TQ-Bundler-6,d16f4816172a-48647" virtual
#38 "MPING-4,d16f4816172a-48647" virtual
#39 "NioServer.Selector [/0.0.0.0:57800]-5,d16f4816172a-48647" virtual
#1532 "Connection.Receiver [172.17.0.2:47757 - 172.17.0.4:7800]-8,d16f4816172a-48647" virtual
#77 "Connection.Receiver [172.17.0.2:38495 - 172.17.0.3:7800]-7,d16f4816172a-48647" virtual
#82 "timeout-thread--p4-t1" virtual
#106 "expiration-thread--p5-t1" virtual

現れました。それぞれInfinispan Serverのもの、JGroupsのものと思われるものがあります。アクセスを継続しながら取得した方が
もっといろんなスレッドが記録されると思うのですが、今回はこれくらいにしておきます。

参考までに後続のスタックトレースも出力しておきましょう。

$ grep -A 10 'virtual$' thread_dump.txt
#32 "TcpServer.Acceptor[7800]-1,d16f4816172a-48647" virtual
      java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
      java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369)
      java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139)
      java.base/sun.nio.ch.Poller.poll(Poller.java:102)
      java.base/sun.nio.ch.Poller.poll(Poller.java:87)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201)
      java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:750)
--
#33 "Timer runner-2,d16f4816172a-48647" virtual
      java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2652)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
      java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267)
      java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758)
      java.base/java.util.concurrent.DelayQueue.take(DelayQueue.java:254)
      java.base/java.util.concurrent.DelayQueue.take(DelayQueue.java:100)
      org.jgroups.util.TimeScheduler3.run(TimeScheduler3.java:197)
      java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

#35 "TQ-Bundler-6,d16f4816172a-48647" virtual
      java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
      java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369)
      java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(AbstractQueuedSynchronizer.java:519)
      java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780)
      java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725)
      java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1707)
      java.base/java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:420)
      org.jgroups.protocols.TransferQueueBundler.run(TransferQueueBundler.java:120)
--
#38 "MPING-4,d16f4816172a-48647" virtual
      java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2652)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
      java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:408)
      java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:137)
      java.base/sun.nio.ch.Poller.poll(Poller.java:102)
      java.base/sun.nio.ch.Poller.poll(Poller.java:87)
      java.base/sun.nio.ch.DatagramChannelImpl.park(DatagramChannelImpl.java:494)
      java.base/sun.nio.ch.DatagramChannelImpl.tryBlockingReceive(DatagramChannelImpl.java:762)
      java.base/sun.nio.ch.DatagramChannelImpl.blockingReceive(DatagramChannelImpl.java:692)
--
#39 "NioServer.Selector [/0.0.0.0:57800]-5,d16f4816172a-48647" virtual
      java.base/sun.nio.ch.EPoll.wait(Native Method)
      java.base/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:121)
      java.base/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:130)
      java.base/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:147)
      org.jgroups.blocks.cs.NioBaseServer$Acceptor.doSelect(NioBaseServer.java:164)
      org.jgroups.blocks.cs.NioBaseServer$Acceptor.run(NioBaseServer.java:119)
      java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

#1532 "Connection.Receiver [172.17.0.2:47757 - 172.17.0.4:7800]-8,d16f4816172a-48647" virtual
      java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
      java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369)
      java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139)
      java.base/sun.nio.ch.Poller.poll(Poller.java:102)
      java.base/sun.nio.ch.Poller.poll(Poller.java:87)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201)
      java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
--
#77 "Connection.Receiver [172.17.0.2:38495 - 172.17.0.3:7800]-7,d16f4816172a-48647" virtual
      java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
      java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369)
      java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139)
      java.base/sun.nio.ch.Poller.poll(Poller.java:102)
      java.base/sun.nio.ch.Poller.poll(Poller.java:87)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201)
      java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
--
#82 "timeout-thread--p4-t1" virtual
      java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2652)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
      java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267)
      java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758)
      java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
      java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
      java.base/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1070)
      java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
      java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
--
#106 "expiration-thread--p5-t1" virtual
      java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2652)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
      java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267)
      java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758)
      java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
      java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
      java.base/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1070)
      java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
      java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)

もう1度Infinispan Serverを停止して、最後に-Djgroups.thread.virtual=trueを付与して起動してみます。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i) \
    -Djgroups.thread.virtual=true

今度は[o.i.CONTAINER] Virtual threads support enabledというログは出力されません。Infinispan Server側のVirtual Threadsは有効にして
いませんからね。JGroupsのみです。

Infinispan Serverに接続して、キャッシュを操作します。

$ bin/cli.sh -c http://ispn-admin:password@localhost:11222


> put --cache=myCache key1 value1
> get --cache=myCache key1
value1

スレッドダンプを取得します。

$ jcmd $(jcmd -l | grep org.infinispan.server.Bootstrap | cut -d' ' -f1) Thread.dump_to_file -format=plain -overwrite thread_dump.txt

Virtual Threadsの存在を確認。

$ grep 'virtual$' thread_dump.txt
#32 "TcpServer.Acceptor[7800]-1,d16f4816172a-28811" virtual
#33 "Timer runner-2,d16f4816172a-28811" virtual
#35 "TQ-Bundler-6,d16f4816172a-28811" virtual
#99 "Connection.Receiver [172.17.0.2:43953 - 172.17.0.3:7800]-7,d16f4816172a-28811" virtual
#38 "MPING-4,d16f4816172a-28811" virtual
#41 "NioServer.Selector [/0.0.0.0:57800]-5,d16f4816172a-28811" virtual
#201 "Connection.Receiver [172.17.0.2:39577 - 172.17.0.4:7800]-8,d16f4816172a-28811" virtual

先ほどと比べると、timeout-thread--〜expiration-thread--〜というVirtual Threadsがなくなっていますね。このあたりが
Infinispan Serverのものだったようです。

というわけで、Infinispan ServerおよびJGroups、もしくはJGroupsのみに対してVirtual Threadsを有効する方法を確認できました。

ところで、-Djdk.tracePinnedThreadsを付けたらどうなるんだろうと思って試してみると

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i) \
    -Dorg.infinispan.threads.virtual=true \
    -Djdk.tracePinnedThreads

今回の操作だとJGroupsのみがpinning(ピン留め)された状態として検出されました。

Thread[#48,ForkJoinPool-1-worker-4,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2652)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
    java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758)
    org.jgroups.util.CondVar.waitFor(CondVar.java:64)
    org.jgroups.util.Promise._getResultWithTimeout(Promise.java:146)
    org.jgroups.util.Promise.getResultWithTimeout(Promise.java:38)
    org.jgroups.util.Promise.getResult(Promise.java:73)
    org.jgroups.util.Promise.getResult(Promise.java:68)
    org.jgroups.protocols.FD_SOCK2$PingDest.waitForConnect(FD_SOCK2.java:560)
    org.jgroups.protocols.FD_SOCK2.connectTo(FD_SOCK2.java:433)
    org.jgroups.protocols.FD_SOCK2.connectTo(FD_SOCK2.java:404)
    org.jgroups.protocols.FD_SOCK2.connectToNextPingDest(FD_SOCK2.java:380)
    org.jgroups.protocols.FD_SOCK2.handle(FD_SOCK2.java:351)
    org.jgroups.protocols.FD_SOCK2.handle(FD_SOCK2.java:33)
    org.jgroups.util.ProcessingQueue.process(ProcessingQueue.java:55)
    org.jgroups.util.ProcessingQueue.add(ProcessingQueue.java:35)
    org.jgroups.protocols.FD_SOCK2.handleView(FD_SOCK2.java:368)
    org.jgroups.protocols.FD_SOCK2.down(FD_SOCK2.java:229)
    org.jgroups.protocols.FailureDetection.down(FailureDetection.java:149)
    org.jgroups.protocols.VERIFY_SUSPECT2.down(VERIFY_SUSPECT2.java:84) <== monitors:1
    org.jgroups.protocols.pbcast.NAKACK2.down(NAKACK2.java:632)
    org.jgroups.protocols.UNICAST3.down(UNICAST3.java:652)
    org.jgroups.protocols.pbcast.STABLE.down(STABLE.java:260)
    org.jgroups.protocols.pbcast.GMS.installView(GMS.java:682) <== monitors:2
    org.jgroups.protocols.pbcast.ServerGmsImpl.handleViewChange(ServerGmsImpl.java:66)
    org.jgroups.protocols.pbcast.GMS.castViewChangeAndSendJoinRsps(GMS.java:558)
    org.jgroups.protocols.pbcast.CoordGmsImpl.handleMembershipChange(CoordGmsImpl.java:198)
    org.jgroups.protocols.pbcast.GMS.process(GMS.java:1281)
    org.jgroups.protocols.pbcast.ViewHandler.process(ViewHandler.java:239)
    org.jgroups.protocols.pbcast.ViewHandler.add(ViewHandler.java:63)
    org.jgroups.protocols.pbcast.GMS.handle(GMS.java:952)
    org.jgroups.protocols.pbcast.GMS.up(GMS.java:854)
    org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:226)
    org.jgroups.protocols.UNICAST3.deliverMessage(UNICAST3.java:1135)
    org.jgroups.protocols.UNICAST3.addMessage(UNICAST3.java:871)
    org.jgroups.protocols.UNICAST3.handleDataReceived(UNICAST3.java:853)
    org.jgroups.protocols.UNICAST3.up(UNICAST3.java:465)
    org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:669)
    org.jgroups.protocols.VERIFY_SUSPECT2.up(VERIFY_SUSPECT2.java:105)
    org.jgroups.protocols.FailureDetection.up(FailureDetection.java:180)
    org.jgroups.protocols.FD_SOCK2.up(FD_SOCK2.java:190)
    org.jgroups.protocols.MERGE3.up(MERGE3.java:274)
    org.jgroups.protocols.Discovery.up(Discovery.java:296)
    org.jgroups.stack.Protocol.up(Protocol.java:360)
    org.jgroups.protocols.TP.passMessageUp(TP.java:1188)
    org.jgroups.util.SubmitToThreadPool$SingleMessageHandler.run(SubmitToThreadPool.java:95)
    java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
    java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

Hot Rod Clientで試してみる

最後は、Hot Rod Clientからアクセスしてみましょう。

Infinispan Serverは、以下の設定で3ノード起動しているものとします。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i) \
    -Dorg.infinispan.threads.virtual=true \
    -Djdk.tracePinnedThreads

Infinispan Serverのユーザーおよびキャッシュは作成済みとします。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user


$ bin/cli.sh -c http://ispn-admin:password@localhost:11222


> create cache myCache --template=org.infinispan.DIST_SYNC

簡単なHot Rod Clientを使ったアプリケーションを作成。

Maven依存関係など。

    <properties>
        <maven.compiler.release>21</maven.compiler.release>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-client-hotrod</artifactId>
            <version>15.0.4.Final</version>
        </dependency>
    </dependencies>

Hot Rod Clientを使ったソースコード

src/main/java/org/littlewings/infinispan/remote/virtualthreads/App.java

package org.littlewings.infinispan.remote.virtualthreads;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;

public class App {
    public static void main(String... args) {
        System.out.printf("org.infinispan.threads.virtual = %s\n", System.getProperty("org.infinispan.threads.virtual"));

        String uri = "hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222";

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(uri)) {
            RemoteCache<String, String> cache = cacheManager.getCache("myCache");

            cache.put("key1", "value1");
            System.out.printf("get(key1) = %s%n", cache.get("key1"));

            cache.stop();
        }
    }
}

以下のコマンドで起動してみます。

$ mvn compile exec:java \
    -Dexec.mainClass=org.littlewings.infinispan.remote.virtualthreads.App \
    -Dorg.infinispan.threads.virtual=true \
    -Djdk.tracePinnedThreads

この時のログはこうなりました。

org.infinispan.threads.virtual = true
5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.impl.transport.netty.NativeTransport useNativeIOUring
INFO: ISPN004108: Native IOUring transport not available, using NIO instead: io.netty.incubator.channel.uring.IOUring
5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.RemoteCacheManager actualStart
INFO: ISPN004021: Infinispan version: Infinispan 'I'm Still Standing' 15.0.4.Final
5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory receiveTopology
INFO: ISPN004006: Server sent new topology view (id=1690332234, age=0) containing 3 addresses: [172.18.0.4/<unresolved>:11222, 172.18.0.3/<unresolved>:11222, 172.18.0.2/<unresolved>:11222]
5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.impl.RemoteCacheImpl init
INFO: ISPN004113: Query module not found. Queries are disabled.
get(key1) = value1

Infinispan Serverのように、「Virtual threads support enabled」というログが出力されませんね。

ソースコードを確認してみると、Hot Rod Clientで使うスレッドプールはVirtual Threadsを意識していなさそうです。

https://github.com/infinispan/infinispan/blob/15.0.4.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java#L33-L54

こちらで作成されたスレッドプールが、Nettyで使われます。

https://github.com/infinispan/infinispan/blob/15.0.4.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/ChannelFactory.java#L125

というか、Nettyを使うのであればそもそもVirtual Threadsは関係なさそうですね。というわけで、Virtual ThreadsはInfinispan Serverで
有効になるもののようです。
※Embedded Modeでも試してみましたが、有効になったことを示すログは確認できました

io_uringを使うとどうなるのかな?とも思いましたが、結局こちらもNettyを使うので同じですね。

実際どうなのか

Virtual Threadsを使うように設定できるInfinispanですが、ソースコードを確認するとまだまだたくさんのsynchronizedブロックが
あったりします。

ですが、Java 21以上で動作させた時にはデフォルトでVirtual Threadsを有効にするかどうかといったissueもあって、状況がよくわかりません。

ISPN-16105 / Enable virtual threads by default

リリース時のブログにも書かれていなかったですし、今は対応が進んでいることを把握しておく、くらいがいいんでしょうか。

少しソースコードを見る

ExecutorServiceを作成するのはこちらなのですが、

https://github.com/infinispan/infinispan/blob/15.0.4.Final/commons/all/src/main/java/org/infinispan/commons/jdkspecific/ThreadCreator.java#L38-L40

デフォルトだとなにも返しません。

      @Override
      public Optional<ExecutorService> newVirtualThreadPerTaskExecutor() {
         return Optional.empty();
      }

https://github.com/infinispan/infinispan/blob/15.0.4.Final/commons/all/src/main/java/org/infinispan/commons/jdkspecific/ThreadCreator.java#L49-L52

Virtual Threadsを有効にした時にExecutors#newVirtualThreadPerTaskExecutorを返すようになっています。

   @Override
   public Optional<ExecutorService> newVirtualThreadPerTaskExecutor() {
      return Optional.of(Executors.newVirtualThreadPerTaskExecutor());
   }

https://github.com/infinispan/infinispan/blob/15.0.4.Final/commons/jdk21/src/main/java/org/infinispan/commons/jdk21/ThreadCreatorImpl.java#L21-L24

Infinispanのスレッドプールの設定にはcore-threadsmax-threadsなどいろいろあったと思いますが、このあたりの整合性はどうなって
いるのだろうと思ったのですが、ThreadCreator#createBlockingExecutorServiceが値を返すとそれがそのまま使われ、そうでない場合は
設定値を元にスレッドプールが作成されるようです。

      return ThreadCreator.createBlockingExecutorService().orElseGet(() -> {
         EnhancedQueueExecutor.Builder builder = new EnhancedQueueExecutor.Builder();
         builder.setThreadFactory(factory);
         builder.setCorePoolSize(coreThreads);
         builder.setMaximumPoolSize(maxThreads);
         builder.setGrowthResistance(0.0f);
         builder.setMaximumQueueSize(queueLength);
         builder.setKeepAliveTime(Duration.of(keepAlive, ChronoUnit.MILLIS));

         EnhancedQueueExecutor enhancedQueueExecutor = builder.build();
         enhancedQueueExecutor.setHandoffExecutor(task ->
               BlockingRejectedExecutionHandler.getInstance().rejectedExecution(task, enhancedQueueExecutor));
         return enhancedQueueExecutor;
      });

https://github.com/infinispan/infinispan/blob/15.0.4.Final/core/src/main/java/org/infinispan/factories/threads/EnhancedQueueExecutorFactory.java#L36-L49

つまり、ThreadCreator#createBlockingExecutorServiceExecutors#newVirtualThreadPerTaskExecutorを返した場合はそのまま
使われるということですね。

ではこの処理がどのスレッドプールで使われるのかまで確認しようと思ったのですが、今回はちょっと諦めました。

おわりに

Infinispan 15.0でVirtual Threadsへの対応が入っていたので、ちょっと確認していました。

といっても、有効にする方法とInfninispan Serverでちょっとスレッドダンプを見ただけですが。
現時点では、Hot Rod Clientには縁がなさそうなこともわかりました。

どのスレッドプールで使われるかまでは把握できなかったのがちょっと残念ですが、これについてはまた機会を改めて確認できたらいいなとは
思います…。
やらない気もしますが…。

リリース時のブログとかに載ったらまた、でしょうかね。