CLOVER🍀

That was when it all began.

Parallel Collectionsの裏にいる人

Scala 2.9.0で追加されたParallel Collectionsを利用すると、Collectionに対する各種操作が並列実行されます。ただ、並列とはいいますが、どの程度の並列度で実行されるんでしょうねぇ?確認してみましょうか。

この手のライブラリは、利用者にこっそり裏でスレッドを使っているもんですが、各操作ごとにスレッドを生成するとオーバーヘッドにしかならないので、スレッドプールを利用しているはずです。これを調べてみましょう。

まずはREPLでList作成。

$ scala
Welcome to Scala version 2.9.0.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_24).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val xs = List(1, 2, 3, 4, 5)
xs: List[Int] = List(1, 2, 3, 4, 5)

ここで、現在REPLで利用されているスレッドを見てみましょう。別のターミナルで、スレッドダンプを出力するように指示してみます。

$ ps -ef | grep java
xxxxx     7579  7571 23 18:39 pts/4    00:00:14 /usr/java/default/bin/java -Xmx256M -Xms32M -Xbootclasspath/a:/usr/local/scala/scala-2.9.0.final/lib/jline.jar:/usr/local/scala/scala-2.9.0.final/lib/scala-compiler.jar:/usr/local/scala/scala-2.9.0.final/lib/scala-dbc.jar:/usr/local/scala/scala-2.9.0.final/lib/scala-library.jar:/usr/local/scala/scala-2.9.0.final/lib/scala-swing.jar:/usr/local/scala/scala-2.9.0.final/lib/scalap.jar -Dscala.usejavacp=true -Dscala.home=/usr/local/scala/scala-2.9.0.final -Denv.emacs= scala.tools.nsc.MainGenericRunner
xxxxx     7655 18249  0 18:40 pts/5    00:00:00 grep java
$ kill -3 7579

REPLのPIDを調べて、kill -3を該当のPIDに送ると、REPL側にこんなのが出ます。

2011-05-15 18:41:04
Full thread dump Java HotSpot(TM) 64-Bit Server VM (19.1-b02 mixed mode):

"Low Memory Detector" daemon prio=10 tid=0x00000000508c1800 nid=0x1da5 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread1" daemon prio=10 tid=0x00000000508bf800 nid=0x1da4 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread0" daemon prio=10 tid=0x00000000508ba000 nid=0x1da3 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00000000508b8000 nid=0x1da2 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x0000000050896000 nid=0x1da1 in Object.wait() [0x00000000422e5000]
   java.lang.Thread.State: WAITING (on object monitor)

"Reference Handler" daemon prio=10 tid=0x0000000050894000 nid=0x1da0 in Object.wait() [0x00000000421e4000]
   java.lang.Thread.State: WAITING (on object monitor)

"main" prio=10 tid=0x0000000050833800 nid=0x1d9c runnable [0x00000000412d1000]
   java.lang.Thread.State: RUNNABLE

スレッド以外のものも貼ると長いので、ちょっと端折っています。

んで、さっきのListをParallel化して、スレッドダンプをもう1回取得してみます。

scala> xs.par
res0: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5)

scala> 2011-05-15 18:44:04
Full thread dump Java HotSpot(TM) 64-Bit Server VM (19.1-b02 mixed mode):

"Low Memory Detector" daemon prio=10 tid=0x00000000508c1800 nid=0x1da5 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread1" daemon prio=10 tid=0x00000000508bf800 nid=0x1da4 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread0" daemon prio=10 tid=0x00000000508ba000 nid=0x1da3 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00000000508b8000 nid=0x1da2 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x0000000050896000 nid=0x1da1 in Object.wait() [0x00000000422e5000]
   java.lang.Thread.State: WAITING (on object monitor)

"Reference Handler" daemon prio=10 tid=0x0000000050894000 nid=0x1da0 in Object.wait() [0x00000000421e4000]
   java.lang.Thread.State: WAITING (on object monitor)

"main" prio=10 tid=0x0000000050833800 nid=0x1d9c runnable [0x00000000412d1000]
   java.lang.Thread.State: RUNNABLE

特に変わりませんね。んじゃ、今度はforeachを実行した後にスレッドダンプを取ってみます。

scala> xs.par.foreach(println)
3
1
2
4
5

scala> 2011-05-15 18:45:16
Full thread dump Java HotSpot(TM) 64-Bit Server VM (19.1-b02 mixed mode):

"ForkJoinPool-1-worker-1" daemon prio=10 tid=0x00002aaab422f000 nid=0x1eaa waiting on condition [0x00000000423e6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000fe86fca0> (a scala.concurrent.forkjoin.ForkJoinPool$WaitQueueNode)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at scala.concurrent.forkjoin.ForkJoinPool$WaitQueueNode.awaitSyncRelease(ForkJoinPool.java:1340)
        at scala.concurrent.forkjoin.ForkJoinPool.sync(ForkJoinPool.java:1417)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:342)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

"ForkJoinPool-1-worker-0" daemon prio=10 tid=0x00002aaab481e800 nid=0x1ea9 waiting on condition [0x0000000040f2f000]
   java.lang.Thread.State: WAITING (parking)

"Low Memory Detector" daemon prio=10 tid=0x00000000508c1800 nid=0x1da5 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread1" daemon prio=10 tid=0x00000000508bf800 nid=0x1da4 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread0" daemon prio=10 tid=0x00000000508ba000 nid=0x1da3 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00000000508b8000 nid=0x1da2 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x0000000050896000 nid=0x1da1 in Object.wait() [0x00000000422e5000]
   java.lang.Thread.State: WAITING (on object monitor)

"Reference Handler" daemon prio=10 tid=0x0000000050894000 nid=0x1da0 in Object.wait() [0x00000000421e4000]
   java.lang.Thread.State: WAITING (on object monitor)

"main" prio=10 tid=0x0000000050833800 nid=0x1d9c runnable [0x00000000412d1000]
   java.lang.Thread.State: RUNNABLE

お、なんか先頭に「ForkJoinPool-1-worker」と名前の付けられたスレッドが増えましたね。こいつが怪しそうです。んで、増えたスレッドは2本ですね、しかもDaemonスレッド。これを実行している仮想マシンには、CPUを2個割り当てているのでこの数が効いていそうな気がします。

名前的に「scala.concurrent.forkjoin.ForkJoinPool」というクラスが怪しそうですね。このクラスはAPIリファレンスには載っていないので、Scalaライブラリ内で利用しているクラスなのでしょう。ソースを引っ張ってきて、中身を確認してみました。

ちなみに、このクラスはScalaではなくJavaで書かれていて、コンストラクタで並列度を指定できるようなのですが、ここでこんな記述がありました。

public ForkJoinPool() {
    this(Runtime.getRuntime().availableProcessors(),
         defaultForkJoinWorkerThreadFactory);
}

チェインされたコンストラクタの第1引数が、並列度を表しているようで、それにRuntime.getRuntime().availableProcessors()を渡しているので、現在利用可能なCPUの数がワーカーの数になっていそうです。また、一応コードを見ている限りは足りなくなったらスレッドを増やしそうな雰囲気に見えるのですが、そこまではちょっと試してないです…。

まだあんまりこの手の並列ライブラリって、使い倒すノウハウが無いような気がするんですが…どうなんでしょ?とりあえず、ScalaのParallel Collectionsではスレッドプールは「ForkJoin」と言っているくらいなので、基本的にI/Oとかじゃなくて、計算処理メインで使ってもらう前提に立っている気がするのですが。