これは、なにをしたくて書いたもの?
Infinispanのソースコードを眺めていたら、Virtual Threadsへの対応が入っている雰囲気を見つけたので少し試してみることにしました。
Infinispan 15.0のVirtual Threadsへの対応
今回の内容は、特にInfinispanのブログなどにも登場していません。
強いて言えば、Infinispan 15.0.0.Dev01のリリース時に「Virtual Threadsへの対応を含む、JDK 21のサポート」というのが登場していますが。
Java 17 and 21
Starting with Infinispan 15, we will baseline on JDK 17 and also offer support for JDK 21, including virtual threads.
では、Infinispan 15.0でのVirtual Threadsへの対応のissue、Pull Requestを紹介します。
まずは以下のissue/Pull Requestで、システムプロパティorg.infinispan.threads.virtual
をtrue
にするとInfinispanが作成するスレッドを
Virtual Threadsに切り替えられるようになりました。
ISPN-14690 / Rework virtual thread detection and make it optional
次に、以下のissue/Pull Requestでシステムプロパティorg.infinispan.threads.virtual
またはjgroups.thread.virtual
をtrue
にすると
JGroupsでもVirtual Threadsを使うようになります。
ISPN-15304 / Virtual Threads improvements
というか、JGroupsもVirtual Threadsに対応していたんですね。
なお、Infinispan内部で使用しているNettyに関するスレッドにはVirtual Threadsを使うようになっていません。
Virtual Threads are disabled for Netty, they are not supported at the moment.
つまり、システムプロパティorg.infinispan.threads.virtual
をtrue
にするとInfinispanとJGroupsのスレッドプールで使うスレッドを
Virtual Threadsに、jgroups.thread.virtual
をtrue
にするとJGroupsが使うスレッドプールのみをVirtual Threadsにすることができます。
デフォルトはどちらもfalse
です。
ソースコードとしては、このあたりですね。
private static final org.infinispan.commons.spi.ThreadCreator INSTANCE = getInstance(); public static boolean useVirtualThreads() { return Boolean.getBoolean("org.infinispan.threads.virtual"); } private static org.infinispan.commons.spi.ThreadCreator getInstance() { try { if (useVirtualThreads()) { org.infinispan.commons.spi.ThreadCreator instance = Util.getInstance("org.infinispan.commons.jdk21.ThreadCreatorImpl", ThreadCreator.class.getClassLoader()); Log.CONTAINER.infof("Virtual threads support enabled"); return instance; } } catch (Throwable t) { Log.CONTAINER.debugf("Could not initialize virtual threads", t); } return new ThreadCreatorImpl(); }
スレッドやExecutorService
を作成するクラスはこちら。
JGroupsの設定ファイルは、TCPとUDPを載せておきます。
<TCP bind_addr="${jgroups.bind.address,jgroups.tcp.address:SITE_LOCAL}" bind_port="${jgroups.bind.port,jgroups.tcp.port:7800}" diag.enabled="${jgroups.diag.enabled:false}" thread_naming_pattern="pl" send_buf_size="640k" sock_conn_timeout="300" linger="${jgroups.tcp.linger:-1}" bundler_type="${jgroups.bundler.type:transfer-queue}" bundler.max_size="${jgroups.bundler.max_size:64000}" non_blocking_sends="${jgroups.non_blocking_sends:false}" thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}" thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}" thread_pool.keep_alive_time="60000" thread_pool.thread_dumps_threshold="${jgroups.thread_dumps_threshold:10000}" use_virtual_threads="${jgroups.thread.virtual,org.infinispan.threads.virtual:false}" />
<UDP bind_addr="${jgroups.bind.address,jgroups.udp.address:SITE_LOCAL}" bind_port="${jgroups.bind.port,jgroups.udp.port:0}" mcast_addr="${jgroups.mcast_addr:239.6.7.8}" mcast_port="${jgroups.mcast_port:46655}" tos="0" ucast_send_buf_size="1m" mcast_send_buf_size="1m" ucast_recv_buf_size="20m" mcast_recv_buf_size="25m" ip_ttl="${jgroups.ip_ttl:2}" thread_naming_pattern="pl" diag.enabled="${jgroups.diag.enabled:false}" bundler_type="${jgroups.bundler.type:transfer-queue}" bundler.max_size="${jgroups.bundler.max_size:64000}" thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}" thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}" thread_pool.keep_alive_time="60000" thread_pool.thread_dumps_threshold="${jgroups.thread_dumps_threshold:10000}" use_virtual_threads="${jgroups.thread.virtual,org.infinispan.threads.virtual:false}" />
この部分ですね。
use_virtual_threads="${jgroups.thread.virtual,org.infinispan.threads.virtual:false}"
他の設定ファイルの場合は、以下のディレクトリ内のファイルを参照してください。
https://github.com/infinispan/infinispan/tree/15.0.4.Final/core/src/main/resources/default-configs
Nettyに対してVirtual Threadsを無効にしているのはこちら。
if (threadFactory instanceof DefaultThreadFactory) { //not supported at the moment ((DefaultThreadFactory) threadFactory).useVirtualThread(false); }
対象はInfinispan Serverのようですね。
今回は、Virtual Threadsへ対応している動作などを細かく見るつもりはないのですが、せっかく見つけたのでメモを兼ねてということと、
Virtual Threadsを有効にするとログが出力されるようなのでそれをもって確認にしたいと思います。
環境
今回の環境は、こちら。
$ java --version openjdk 21.0.2 2024-01-16 OpenJDK Runtime Environment (build 21.0.2+13-Ubuntu-122.04.1) OpenJDK 64-Bit Server VM (build 21.0.2+13-Ubuntu-122.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.9.7 (8b094c9513efc1b9ce2d952b3b9c8eaedaf8cbf0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 21.0.2, vendor: Private Build, runtime: /usr/lib/jvm/java-21-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-107-generic", arch: "amd64", family: "unix"
こちらは、Hot Rod Clientを使ったプログラムを書くのに使用します。
Infinispan Server。
$ java --version openjdk 21.0.3 2024-04-16 LTS OpenJDK Runtime Environment Temurin-21.0.3+9 (build 21.0.3+9-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.3+9 (build 21.0.3+9-LTS, mixed mode, sharing) $ bin/server.sh --version Infinispan Server 15.0.4.Final (I'm Still Standing) Copyright (C) Red Hat Inc. and/or its affiliates and other contributors License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0
Infinispan ServerをVirtual Threadsを有効にして起動してみる
まずはInfinispan Serverを起動して確認します。
以下の流れでやってみたいと思います。
- ユーザー作成
- デフォルト設定でInfinispan Serverを起動、キャッシュ作成および操作
- スレッドダンプを取得、確認
- Infinispan Serverを停止
-Dorg.infinispan.threads.virtual=true
を指定してInfinispan Serverを起動、キャッシュ操作- キャッシュ定義は最初のものを利用
- スレッドダンプを取得、確認
- Infinispan Serverを停止
-Djgroups.thread.virtual=true
を指定してInfinispan Serverを起動、キャッシュ操作- スレッドダンプを取得、確認
- Infinispan Serverを停止
Infninispan Serverは3ノードのクラスター構成とします。少し処理を動かした方がいいと思うので、キャッシュ操作まで行ってから
スレッドダンプを取得し、その中にVirtual Threadsが含まれているかどうかを見てみたいと思います。
全ノードでユーザーの作成。管理者グループとアプリケーショングループにそれぞれユーザーを作成しますが、ここでは管理者グループの
ユーザーを使用します。アプリケーション用のユーザーは、Hot Rod Clientからの接続時に利用することにします。
$ bin/cli.sh user create -g admin -p password ispn-admin $ bin/cli.sh user create -g application -p password ispn-user
Infinispan Serverを起動。この時のJGroupsのプロトコルスタックは、Infinispan ServerのデフォルトのTCPです。
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=$(hostname -i)
Infinispan Serverのうちの1ノードに接続してキャッシュを定義。
$ bin/cli.sh -c http://ispn-admin:password@localhost:11222 > create cache myCache --template=org.infinispan.DIST_SYNC > describe caches/myCache { "myCache" : { "distributed-cache" : { "configuration" : "org.infinispan.LOCAL", "mode" : "SYNC", "remote-timeout" : "17500", "statistics" : true, "locking" : { "concurrency-level" : "1000", "acquire-timeout" : "15000", "striping" : false }, "state-transfer" : { "timeout" : "60000" } } } } > put --cache=myCache key1 value1 > get --cache=myCache key1 value1
この時のスレッドダンプを取得してみます。
$ jcmd $(jcmd -l | grep org.infinispan.server.Bootstrap | cut -d' ' -f1) Thread.dump_to_file -format=plain -overwrite thread_dump.txt
Virtual Threadsがあるかどうか確認。もちろん含まれていません。
$ grep 'virtual$' thread_dump.txt
1度Infinispan Serverを停止して、-Dorg.infinispan.threads.virtual=true
を付与して起動してみます。
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=$(hostname -i) \ -Dorg.infinispan.threads.virtual=true
すると、起動時にこんなログが現れます。Virtual Threadsが有効になったようです。
2024-05-28 15:22:41,416 INFO [o.i.CONTAINER] Virtual threads support enabled
Infinispan Serverに接続して、キャッシュを操作します。
$ bin/cli.sh -c http://ispn-admin:password@localhost:11222 > put --cache=myCache key1 value1 > get --cache=myCache key1 value1
スレッドダンプを取得します。
$ jcmd $(jcmd -l | grep org.infinispan.server.Bootstrap | cut -d' ' -f1) Thread.dump_to_file -format=plain -overwrite thread_dump.txt
Virtual Threadsがあるかどうか確認。
$ grep 'virtual$' thread_dump.txt #32 "TcpServer.Acceptor[7800]-1,d16f4816172a-48647" virtual #33 "Timer runner-2,d16f4816172a-48647" virtual #35 "TQ-Bundler-6,d16f4816172a-48647" virtual #38 "MPING-4,d16f4816172a-48647" virtual #39 "NioServer.Selector [/0.0.0.0:57800]-5,d16f4816172a-48647" virtual #1532 "Connection.Receiver [172.17.0.2:47757 - 172.17.0.4:7800]-8,d16f4816172a-48647" virtual #77 "Connection.Receiver [172.17.0.2:38495 - 172.17.0.3:7800]-7,d16f4816172a-48647" virtual #82 "timeout-thread--p4-t1" virtual #106 "expiration-thread--p5-t1" virtual
現れました。それぞれInfinispan Serverのもの、JGroupsのものと思われるものがあります。アクセスを継続しながら取得した方が
もっといろんなスレッドが記録されると思うのですが、今回はこれくらいにしておきます。
参考までに後続のスタックトレースも出力しておきましょう。
$ grep -A 10 'virtual$' thread_dump.txt #32 "TcpServer.Acceptor[7800]-1,d16f4816172a-48647" virtual java.base/java.lang.VirtualThread.park(VirtualThread.java:582) java.base/java.lang.System$2.parkVirtualThread(System.java:2643) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54) java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369) java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139) java.base/sun.nio.ch.Poller.poll(Poller.java:102) java.base/sun.nio.ch.Poller.poll(Poller.java:87) java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175) java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201) java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:750) -- #33 "Timer runner-2,d16f4816172a-48647" virtual java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621) java.base/java.lang.System$2.parkVirtualThread(System.java:2652) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67) java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267) java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758) java.base/java.util.concurrent.DelayQueue.take(DelayQueue.java:254) java.base/java.util.concurrent.DelayQueue.take(DelayQueue.java:100) org.jgroups.util.TimeScheduler3.run(TimeScheduler3.java:197) java.base/java.lang.VirtualThread.run(VirtualThread.java:309) #35 "TQ-Bundler-6,d16f4816172a-48647" virtual java.base/java.lang.VirtualThread.park(VirtualThread.java:582) java.base/java.lang.System$2.parkVirtualThread(System.java:2643) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54) java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369) java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(AbstractQueuedSynchronizer.java:519) java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780) java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725) java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1707) java.base/java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:420) org.jgroups.protocols.TransferQueueBundler.run(TransferQueueBundler.java:120) -- #38 "MPING-4,d16f4816172a-48647" virtual java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621) java.base/java.lang.System$2.parkVirtualThread(System.java:2652) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67) java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:408) java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:137) java.base/sun.nio.ch.Poller.poll(Poller.java:102) java.base/sun.nio.ch.Poller.poll(Poller.java:87) java.base/sun.nio.ch.DatagramChannelImpl.park(DatagramChannelImpl.java:494) java.base/sun.nio.ch.DatagramChannelImpl.tryBlockingReceive(DatagramChannelImpl.java:762) java.base/sun.nio.ch.DatagramChannelImpl.blockingReceive(DatagramChannelImpl.java:692) -- #39 "NioServer.Selector [/0.0.0.0:57800]-5,d16f4816172a-48647" virtual java.base/sun.nio.ch.EPoll.wait(Native Method) java.base/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:121) java.base/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:130) java.base/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:147) org.jgroups.blocks.cs.NioBaseServer$Acceptor.doSelect(NioBaseServer.java:164) org.jgroups.blocks.cs.NioBaseServer$Acceptor.run(NioBaseServer.java:119) java.base/java.lang.VirtualThread.run(VirtualThread.java:309) #1532 "Connection.Receiver [172.17.0.2:47757 - 172.17.0.4:7800]-8,d16f4816172a-48647" virtual java.base/java.lang.VirtualThread.park(VirtualThread.java:582) java.base/java.lang.System$2.parkVirtualThread(System.java:2643) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54) java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369) java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139) java.base/sun.nio.ch.Poller.poll(Poller.java:102) java.base/sun.nio.ch.Poller.poll(Poller.java:87) java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175) java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201) java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309) -- #77 "Connection.Receiver [172.17.0.2:38495 - 172.17.0.3:7800]-7,d16f4816172a-48647" virtual java.base/java.lang.VirtualThread.park(VirtualThread.java:582) java.base/java.lang.System$2.parkVirtualThread(System.java:2643) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54) java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369) java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139) java.base/sun.nio.ch.Poller.poll(Poller.java:102) java.base/sun.nio.ch.Poller.poll(Poller.java:87) java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175) java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201) java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309) -- #82 "timeout-thread--p4-t1" virtual java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621) java.base/java.lang.System$2.parkVirtualThread(System.java:2652) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67) java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267) java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899) java.base/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1070) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) -- #106 "expiration-thread--p5-t1" virtual java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621) java.base/java.lang.System$2.parkVirtualThread(System.java:2652) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67) java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267) java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899) java.base/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1070) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
もう1度Infinispan Serverを停止して、最後に-Djgroups.thread.virtual=true
を付与して起動してみます。
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=$(hostname -i) \ -Djgroups.thread.virtual=true
今度は[o.i.CONTAINER] Virtual threads support enabled
というログは出力されません。Infinispan Server側のVirtual Threadsは有効にして
いませんからね。JGroupsのみです。
Infinispan Serverに接続して、キャッシュを操作します。
$ bin/cli.sh -c http://ispn-admin:password@localhost:11222 > put --cache=myCache key1 value1 > get --cache=myCache key1 value1
スレッドダンプを取得します。
$ jcmd $(jcmd -l | grep org.infinispan.server.Bootstrap | cut -d' ' -f1) Thread.dump_to_file -format=plain -overwrite thread_dump.txt
Virtual Threadsの存在を確認。
$ grep 'virtual$' thread_dump.txt #32 "TcpServer.Acceptor[7800]-1,d16f4816172a-28811" virtual #33 "Timer runner-2,d16f4816172a-28811" virtual #35 "TQ-Bundler-6,d16f4816172a-28811" virtual #99 "Connection.Receiver [172.17.0.2:43953 - 172.17.0.3:7800]-7,d16f4816172a-28811" virtual #38 "MPING-4,d16f4816172a-28811" virtual #41 "NioServer.Selector [/0.0.0.0:57800]-5,d16f4816172a-28811" virtual #201 "Connection.Receiver [172.17.0.2:39577 - 172.17.0.4:7800]-8,d16f4816172a-28811" virtual
先ほどと比べると、timeout-thread--〜
、expiration-thread--〜
というVirtual Threadsがなくなっていますね。このあたりが
Infinispan Serverのものだったようです。
というわけで、Infinispan ServerおよびJGroups、もしくはJGroupsのみに対してVirtual Threadsを有効する方法を確認できました。
ところで、-Djdk.tracePinnedThreads
を付けたらどうなるんだろうと思って試してみると
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=$(hostname -i) \ -Dorg.infinispan.threads.virtual=true \ -Djdk.tracePinnedThreads
今回の操作だとJGroupsのみがpinning(ピン留め)された状態として検出されました。
Thread[#48,ForkJoinPool-1-worker-4,5,CarrierThreads] java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183) java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393) java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621) java.base/java.lang.System$2.parkVirtualThread(System.java:2652) java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67) java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267) java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758) org.jgroups.util.CondVar.waitFor(CondVar.java:64) org.jgroups.util.Promise._getResultWithTimeout(Promise.java:146) org.jgroups.util.Promise.getResultWithTimeout(Promise.java:38) org.jgroups.util.Promise.getResult(Promise.java:73) org.jgroups.util.Promise.getResult(Promise.java:68) org.jgroups.protocols.FD_SOCK2$PingDest.waitForConnect(FD_SOCK2.java:560) org.jgroups.protocols.FD_SOCK2.connectTo(FD_SOCK2.java:433) org.jgroups.protocols.FD_SOCK2.connectTo(FD_SOCK2.java:404) org.jgroups.protocols.FD_SOCK2.connectToNextPingDest(FD_SOCK2.java:380) org.jgroups.protocols.FD_SOCK2.handle(FD_SOCK2.java:351) org.jgroups.protocols.FD_SOCK2.handle(FD_SOCK2.java:33) org.jgroups.util.ProcessingQueue.process(ProcessingQueue.java:55) org.jgroups.util.ProcessingQueue.add(ProcessingQueue.java:35) org.jgroups.protocols.FD_SOCK2.handleView(FD_SOCK2.java:368) org.jgroups.protocols.FD_SOCK2.down(FD_SOCK2.java:229) org.jgroups.protocols.FailureDetection.down(FailureDetection.java:149) org.jgroups.protocols.VERIFY_SUSPECT2.down(VERIFY_SUSPECT2.java:84) <== monitors:1 org.jgroups.protocols.pbcast.NAKACK2.down(NAKACK2.java:632) org.jgroups.protocols.UNICAST3.down(UNICAST3.java:652) org.jgroups.protocols.pbcast.STABLE.down(STABLE.java:260) org.jgroups.protocols.pbcast.GMS.installView(GMS.java:682) <== monitors:2 org.jgroups.protocols.pbcast.ServerGmsImpl.handleViewChange(ServerGmsImpl.java:66) org.jgroups.protocols.pbcast.GMS.castViewChangeAndSendJoinRsps(GMS.java:558) org.jgroups.protocols.pbcast.CoordGmsImpl.handleMembershipChange(CoordGmsImpl.java:198) org.jgroups.protocols.pbcast.GMS.process(GMS.java:1281) org.jgroups.protocols.pbcast.ViewHandler.process(ViewHandler.java:239) org.jgroups.protocols.pbcast.ViewHandler.add(ViewHandler.java:63) org.jgroups.protocols.pbcast.GMS.handle(GMS.java:952) org.jgroups.protocols.pbcast.GMS.up(GMS.java:854) org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:226) org.jgroups.protocols.UNICAST3.deliverMessage(UNICAST3.java:1135) org.jgroups.protocols.UNICAST3.addMessage(UNICAST3.java:871) org.jgroups.protocols.UNICAST3.handleDataReceived(UNICAST3.java:853) org.jgroups.protocols.UNICAST3.up(UNICAST3.java:465) org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:669) org.jgroups.protocols.VERIFY_SUSPECT2.up(VERIFY_SUSPECT2.java:105) org.jgroups.protocols.FailureDetection.up(FailureDetection.java:180) org.jgroups.protocols.FD_SOCK2.up(FD_SOCK2.java:190) org.jgroups.protocols.MERGE3.up(MERGE3.java:274) org.jgroups.protocols.Discovery.up(Discovery.java:296) org.jgroups.stack.Protocol.up(Protocol.java:360) org.jgroups.protocols.TP.passMessageUp(TP.java:1188) org.jgroups.util.SubmitToThreadPool$SingleMessageHandler.run(SubmitToThreadPool.java:95) java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314) java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
Hot Rod Clientで試してみる
最後は、Hot Rod Clientからアクセスしてみましょう。
Infinispan Serverは、以下の設定で3ノード起動しているものとします。
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=$(hostname -i) \ -Dorg.infinispan.threads.virtual=true \ -Djdk.tracePinnedThreads
Infinispan Serverのユーザーおよびキャッシュは作成済みとします。
$ bin/cli.sh user create -g admin -p password ispn-admin $ bin/cli.sh user create -g application -p password ispn-user $ bin/cli.sh -c http://ispn-admin:password@localhost:11222 > create cache myCache --template=org.infinispan.DIST_SYNC
簡単なHot Rod Clientを使ったアプリケーションを作成。
Maven依存関係など。
<properties> <maven.compiler.release>21</maven.compiler.release> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-client-hotrod</artifactId> <version>15.0.4.Final</version> </dependency> </dependencies>
Hot Rod Clientを使ったソースコード。
src/main/java/org/littlewings/infinispan/remote/virtualthreads/App.java
package org.littlewings.infinispan.remote.virtualthreads; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; public class App { public static void main(String... args) { System.out.printf("org.infinispan.threads.virtual = %s\n", System.getProperty("org.infinispan.threads.virtual")); String uri = "hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222"; try (RemoteCacheManager cacheManager = new RemoteCacheManager(uri)) { RemoteCache<String, String> cache = cacheManager.getCache("myCache"); cache.put("key1", "value1"); System.out.printf("get(key1) = %s%n", cache.get("key1")); cache.stop(); } } }
以下のコマンドで起動してみます。
$ mvn compile exec:java \ -Dexec.mainClass=org.littlewings.infinispan.remote.virtualthreads.App \ -Dorg.infinispan.threads.virtual=true \ -Djdk.tracePinnedThreads
この時のログはこうなりました。
org.infinispan.threads.virtual = true 5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.impl.transport.netty.NativeTransport useNativeIOUring INFO: ISPN004108: Native IOUring transport not available, using NIO instead: io.netty.incubator.channel.uring.IOUring 5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.RemoteCacheManager actualStart INFO: ISPN004021: Infinispan version: Infinispan 'I'm Still Standing' 15.0.4.Final 5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory receiveTopology INFO: ISPN004006: Server sent new topology view (id=1690332234, age=0) containing 3 addresses: [172.18.0.4/<unresolved>:11222, 172.18.0.3/<unresolved>:11222, 172.18.0.2/<unresolved>:11222] 5月 29, 2024 11:09:29 午後 org.infinispan.client.hotrod.impl.RemoteCacheImpl init INFO: ISPN004113: Query module not found. Queries are disabled. get(key1) = value1
Infinispan Serverのように、「Virtual threads support enabled」というログが出力されませんね。
ソースコードを確認してみると、Hot Rod Clientで使うスレッドプールはVirtual Threadsを意識していなさそうです。
こちらで作成されたスレッドプールが、Nettyで使われます。
というか、Nettyを使うのであればそもそもVirtual Threadsは関係なさそうですね。というわけで、Virtual ThreadsはInfinispan Serverで
有効になるもののようです。
※Embedded Modeでも試してみましたが、有効になったことを示すログは確認できました
io_uringを使うとどうなるのかな?とも思いましたが、結局こちらもNettyを使うので同じですね。
実際どうなのか
Virtual Threadsを使うように設定できるInfinispanですが、ソースコードを確認するとまだまだたくさんのsynchronized
ブロックが
あったりします。
ですが、Java 21以上で動作させた時にはデフォルトでVirtual Threadsを有効にするかどうかといったissueもあって、状況がよくわかりません。
ISPN-16105 / Enable virtual threads by default
リリース時のブログにも書かれていなかったですし、今は対応が進んでいることを把握しておく、くらいがいいんでしょうか。
少しソースコードを見る
ExecutorService
を作成するのはこちらなのですが、
デフォルトだとなにも返しません。
@Override public Optional<ExecutorService> newVirtualThreadPerTaskExecutor() { return Optional.empty(); }
Virtual Threadsを有効にした時にExecutors#newVirtualThreadPerTaskExecutor
を返すようになっています。
@Override public Optional<ExecutorService> newVirtualThreadPerTaskExecutor() { return Optional.of(Executors.newVirtualThreadPerTaskExecutor()); }
Infinispanのスレッドプールの設定にはcore-threads
やmax-threads
などいろいろあったと思いますが、このあたりの整合性はどうなって
いるのだろうと思ったのですが、ThreadCreator#createBlockingExecutorService
が値を返すとそれがそのまま使われ、そうでない場合は
設定値を元にスレッドプールが作成されるようです。
return ThreadCreator.createBlockingExecutorService().orElseGet(() -> { EnhancedQueueExecutor.Builder builder = new EnhancedQueueExecutor.Builder(); builder.setThreadFactory(factory); builder.setCorePoolSize(coreThreads); builder.setMaximumPoolSize(maxThreads); builder.setGrowthResistance(0.0f); builder.setMaximumQueueSize(queueLength); builder.setKeepAliveTime(Duration.of(keepAlive, ChronoUnit.MILLIS)); EnhancedQueueExecutor enhancedQueueExecutor = builder.build(); enhancedQueueExecutor.setHandoffExecutor(task -> BlockingRejectedExecutionHandler.getInstance().rejectedExecution(task, enhancedQueueExecutor)); return enhancedQueueExecutor; });
つまり、ThreadCreator#createBlockingExecutorService
がExecutors#newVirtualThreadPerTaskExecutor
を返した場合はそのまま
使われるということですね。
ではこの処理がどのスレッドプールで使われるのかまで確認しようと思ったのですが、今回はちょっと諦めました。
おわりに
Infinispan 15.0でVirtual Threadsへの対応が入っていたので、ちょっと確認していました。
といっても、有効にする方法とInfninispan Serverでちょっとスレッドダンプを見ただけですが。
現時点では、Hot Rod Clientには縁がなさそうなこともわかりました。
どのスレッドプールで使われるかまでは把握できなかったのがちょっと残念ですが、これについてはまた機会を改めて確認できたらいいなとは
思います…。
やらない気もしますが…。
リリース時のブログとかに載ったらまた、でしょうかね。