CLOVER🍀

That was when it all began.

Scala 2.10.0 Parallel Collections are now configurable with custom thread pools

並列コレクションの、スレッドプールのカスタマイズができるようになったらしいですよ。

って、
http://www.scala-lang.org/node/27499
のポストが案内している
http://docs.scala-lang.org/overviews/parallel-collections/overview.html
は、並列コレクションの概要なので、実際に見るべきはこちら。
http://docs.scala-lang.org/overviews/parallel-collections/configuration.html

日本語訳もできています。
http://docs.scala-lang.org/ja/overviews/parallel-collections/configuration.html

それで、どうやって並列コレクションのスレッドプールをカスタマイズするかなのですが、scala.collection.parallel.ParIterableLikeトレイトにtasksupportというメソッドができているようなので、こちらにscala.collection.parallel.TaskSupportトレイトの実装を設定することで行うようです。

まずは、デフォルトの実装を確認してみます。

$ scala
Welcome to Scala version 2.10.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_11).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val pxs = List(1, 2, 3).par
pxs: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3)

scala> pxs.tasksupport.getClass
res0: Class[_ <: scala.collection.parallel.TaskSupport] = class scala.collection.parallel.ForkJoinTaskSupport

デフォルトは、scala.collection.parallel.ForkJoinTaskSupportのようですね。

まあ、スレッドプールの設定が変えられて何が嬉しいのかって、並列コレクションごとにスレッドプールを割り当てられたりできるということでしょうか。

例えば、

scala> val pxs = (1 to 100).toList.par
pxs: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

scala> val pxs2 = (1 to 100).toList.par
pxs2: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

scala> pxs.sum
res0: Int = 5050

scala> pxs2.sum
res1: Int = 5050

というコードを実行している時に、スレッドダンプでWorkerThreadの数を見てみると

$ jstack 23835 | grep -i pool | grep worker
"ForkJoinPool-1-worker-2" daemon prio=10 tid=0x00007f56d0001800 nid=0x5d4f waiting on condition [0x00007f5708ef1000]
"ForkJoinPool-1-worker-1" daemon prio=10 tid=0x00007f570c50e800 nid=0x5d4e waiting on condition [0x00007f5708ff2000]

となり、WorkerThreadが共有されていそうな感じになっています。
*当PCはCPUコア数が2なので、並列コレクションごとにThreadPoolがあるのなら、4つスレッドがいないとおかしい

というわけで、ここで2つ目のコレクションの設定を変えてみます。

scala> import scala.collection.parallel._
import scala.collection.parallel._

scala> import scala.concurrent.forkjoin._
import scala.concurrent.forkjoin._

scala> pxs2.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool)
pxs2.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@1204b6e9

では、各コレクションをもう1度合算してみます。

scala> pxs.sum
res2: Int = 5050

scala> pxs2.sum
res3: Int = 5050

この時のWorkerThreadは

$ jstack 23835 | grep -i pool | grep worker
"ForkJoinPool-2-worker-2" daemon prio=10 tid=0x00007f56d8452800 nid=0x5dc4 waiting on condition [0x00007f5708ef1000]
"ForkJoinPool-2-worker-1" daemon prio=10 tid=0x00007f570c4fc000 nid=0x5dc3 waiting on condition [0x00007f5710240000]
"ForkJoinPool-1-worker-3" daemon prio=10 tid=0x00007f56d0001800 nid=0x5dc0 waiting on condition [0x00007f571013f000]
"ForkJoinPool-1-worker-1" daemon prio=10 tid=0x00007f570c50e800 nid=0x5d4e waiting on condition [0x00007f5708ff2000]

となり、ThreadPool自体が増えていることが分かります。

並列度を変えたりしたければ、そのように設定すればOKです。

scala> pxs2.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(4))
pxs2.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4cde17e5

java.util.concurrent.ThreadPoolExecutorを使用したい場合は、ThreadPoolTaskSupportを設定すればよいみたいです。

scala> pxs.tasksupport = new ThreadPoolTaskSupport
pxs.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ThreadPoolTaskSupport@1c7f4a17

ThreadPoolExecutorのコンストラクタの引数を指定しない場合、、Scalaで用意しているデフォルトのThreadPoolExecutorが使用され各ThreadPoolTaskSupportのインスタンスで共有されることになります。カスタマイズしたい場合は、ThreadPoolTaskSupportのコンストラクタの引数に、ThreadPoolExecutorのインスタンスを渡しましょう。

また合算してみると

scala> pxs.sum
res4: Int = 5050

scala> pxs2.sum
res5: Int = 5050

「pc-thread」という名前のスレッドが増えています。

$ jstack 24253 | grep '^"'
"ForkJoinPool-2-worker-7" daemon prio=10 tid=0x00007f4bf83e0000 nid=0x5f69 waiting on condition [0x00007f4c2d41c000]
"ForkJoinPool-2-worker-6" daemon prio=10 tid=0x000000000125c800 nid=0x5f68 waiting on condition [0x00007f4c2d71f000]
"ForkJoinPool-2-worker-5" daemon prio=10 tid=0x000000000125e800 nid=0x5f67 waiting on condition [0x00007f4c2d51d000]
"pc-thread-2" daemon prio=10 tid=0x00007f4bf83df800 nid=0x5f64 waiting on condition [0x00007f4c2d61e000]
"pc-thread-1" daemon prio=10 tid=0x00007f4c3451d000 nid=0x5f63 waiting on condition [0x00007f4c2db23000]
"ForkJoinPool-2-worker-2" daemon prio=10 tid=0x00007f4bf83de000 nid=0x5f39 waiting on condition [0x00007f4c2d820000]
"ForkJoinPool-1-worker-3" daemon prio=10 tid=0x00007f4bf0001000 nid=0x5f33 waiting on condition [0x00007f4c2da22000]

TaskSupportトレイトの実装としては、以下の3種類が提供されています。

  • scala.collection.parallel.ExecutionContextTaskSupport
  • scala.collection.parallel.ForkJoinTaskSupport
  • scala.collection.parallel.ThreadPoolTaskSupport

デフォルトは、scala.collection.packageオブジェクトに登録されているdefaultTaskSupportが使用されます。

  private[parallel] def getTaskSupport: TaskSupport =
    if (scala.util.Properties.isJavaAtLeast("1.6")) {
      val vendor = scala.util.Properties.javaVmVendor
      if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport
      else new ThreadPoolTaskSupport
    } else new ThreadPoolTaskSupport

  val defaultTaskSupport: TaskSupport = getTaskSupport

見ての通り、Java 1.6以上かつ特定ベンダの実装であればForkJoinTaskSupportが、それ以外であればThreadPoolTaskSupportが選ばれるようですね。また、この定義のため並列コレクションはデフォルトだと同じThreadPoolを共有していると思われます。

ForkJoinTaskSupportであれば細粒度並列処理を相手に、ThreadPoolTaskSupportであれば少し粒度の大きい処理(IOとか?)を相手にしろってことかな…。

じゃあ、ExecutionContextTaskSupportって何でしょうね?

これの実体は、以前Futureで使用したExecutionContext.globalみたいです。

class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks

*scala.concurrent.ExecutionContext.globalとscala.concurrent.ExecutionContext.Implicits.globalは、ソース上の定義は同じものを指しています

ExecutionContextTaskSupportが使用するThreadPoolもForkJoinPoolですが、グローバルなプールを使用します。なので、Futureと並列コレクションを使用すると別々のForkJoinPoolから作られたWorkerThreadを見ることができます。

よって、並列コレクションで

scala> pxs.tasksupport = new ExecutionContextTaskSupport
pxs.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ExecutionContextTaskSupport@2dc0820b

のように設定すると、Futureと同じThreadPoolを使用するようになります。

ちなみに、自分でTaskSupportを自作する場合は、生成するWorkerThreadをDaemonにするかどうかとか、ThreadPoolをシャットダウンさせるかどうかとかを注意した方がいいんでしょうね。