並列コレクションの、スレッドプールのカスタマイズができるようになったらしいですよ。
って、
http://www.scala-lang.org/node/27499
のポストが案内している
http://docs.scala-lang.org/overviews/parallel-collections/overview.html
は、並列コレクションの概要なので、実際に見るべきはこちら。
http://docs.scala-lang.org/overviews/parallel-collections/configuration.html
日本語訳もできています。
http://docs.scala-lang.org/ja/overviews/parallel-collections/configuration.html
それで、どうやって並列コレクションのスレッドプールをカスタマイズするかなのですが、scala.collection.parallel.ParIterableLikeトレイトにtasksupportというメソッドができているようなので、こちらにscala.collection.parallel.TaskSupportトレイトの実装を設定することで行うようです。
まずは、デフォルトの実装を確認してみます。
$ scala Welcome to Scala version 2.10.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_11). Type in expressions to have them evaluated. Type :help for more information. scala> val pxs = List(1, 2, 3).par pxs: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3) scala> pxs.tasksupport.getClass res0: Class[_ <: scala.collection.parallel.TaskSupport] = class scala.collection.parallel.ForkJoinTaskSupport
デフォルトは、scala.collection.parallel.ForkJoinTaskSupportのようですね。
まあ、スレッドプールの設定が変えられて何が嬉しいのかって、並列コレクションごとにスレッドプールを割り当てられたりできるということでしょうか。
例えば、
scala> val pxs = (1 to 100).toList.par pxs: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100) scala> val pxs2 = (1 to 100).toList.par pxs2: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100) scala> pxs.sum res0: Int = 5050 scala> pxs2.sum res1: Int = 5050
というコードを実行している時に、スレッドダンプでWorkerThreadの数を見てみると
$ jstack 23835 | grep -i pool | grep worker "ForkJoinPool-1-worker-2" daemon prio=10 tid=0x00007f56d0001800 nid=0x5d4f waiting on condition [0x00007f5708ef1000] "ForkJoinPool-1-worker-1" daemon prio=10 tid=0x00007f570c50e800 nid=0x5d4e waiting on condition [0x00007f5708ff2000]
となり、WorkerThreadが共有されていそうな感じになっています。
*当PCはCPUコア数が2なので、並列コレクションごとにThreadPoolがあるのなら、4つスレッドがいないとおかしい
というわけで、ここで2つ目のコレクションの設定を変えてみます。
scala> import scala.collection.parallel._ import scala.collection.parallel._ scala> import scala.concurrent.forkjoin._ import scala.concurrent.forkjoin._ scala> pxs2.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool) pxs2.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@1204b6e9
では、各コレクションをもう1度合算してみます。
scala> pxs.sum res2: Int = 5050 scala> pxs2.sum res3: Int = 5050
この時のWorkerThreadは
$ jstack 23835 | grep -i pool | grep worker "ForkJoinPool-2-worker-2" daemon prio=10 tid=0x00007f56d8452800 nid=0x5dc4 waiting on condition [0x00007f5708ef1000] "ForkJoinPool-2-worker-1" daemon prio=10 tid=0x00007f570c4fc000 nid=0x5dc3 waiting on condition [0x00007f5710240000] "ForkJoinPool-1-worker-3" daemon prio=10 tid=0x00007f56d0001800 nid=0x5dc0 waiting on condition [0x00007f571013f000] "ForkJoinPool-1-worker-1" daemon prio=10 tid=0x00007f570c50e800 nid=0x5d4e waiting on condition [0x00007f5708ff2000]
となり、ThreadPool自体が増えていることが分かります。
並列度を変えたりしたければ、そのように設定すればOKです。
scala> pxs2.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(4)) pxs2.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4cde17e5
java.util.concurrent.ThreadPoolExecutorを使用したい場合は、ThreadPoolTaskSupportを設定すればよいみたいです。
scala> pxs.tasksupport = new ThreadPoolTaskSupport
pxs.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ThreadPoolTaskSupport@1c7f4a17
ThreadPoolExecutorのコンストラクタの引数を指定しない場合、、Scalaで用意しているデフォルトのThreadPoolExecutorが使用され各ThreadPoolTaskSupportのインスタンスで共有されることになります。カスタマイズしたい場合は、ThreadPoolTaskSupportのコンストラクタの引数に、ThreadPoolExecutorのインスタンスを渡しましょう。
また合算してみると
scala> pxs.sum res4: Int = 5050 scala> pxs2.sum res5: Int = 5050
「pc-thread」という名前のスレッドが増えています。
$ jstack 24253 | grep '^"' "ForkJoinPool-2-worker-7" daemon prio=10 tid=0x00007f4bf83e0000 nid=0x5f69 waiting on condition [0x00007f4c2d41c000] "ForkJoinPool-2-worker-6" daemon prio=10 tid=0x000000000125c800 nid=0x5f68 waiting on condition [0x00007f4c2d71f000] "ForkJoinPool-2-worker-5" daemon prio=10 tid=0x000000000125e800 nid=0x5f67 waiting on condition [0x00007f4c2d51d000] "pc-thread-2" daemon prio=10 tid=0x00007f4bf83df800 nid=0x5f64 waiting on condition [0x00007f4c2d61e000] "pc-thread-1" daemon prio=10 tid=0x00007f4c3451d000 nid=0x5f63 waiting on condition [0x00007f4c2db23000] "ForkJoinPool-2-worker-2" daemon prio=10 tid=0x00007f4bf83de000 nid=0x5f39 waiting on condition [0x00007f4c2d820000] "ForkJoinPool-1-worker-3" daemon prio=10 tid=0x00007f4bf0001000 nid=0x5f33 waiting on condition [0x00007f4c2da22000]
TaskSupportトレイトの実装としては、以下の3種類が提供されています。
- scala.collection.parallel.ExecutionContextTaskSupport
- scala.collection.parallel.ForkJoinTaskSupport
- scala.collection.parallel.ThreadPoolTaskSupport
デフォルトは、scala.collection.packageオブジェクトに登録されているdefaultTaskSupportが使用されます。
private[parallel] def getTaskSupport: TaskSupport = if (scala.util.Properties.isJavaAtLeast("1.6")) { val vendor = scala.util.Properties.javaVmVendor if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport else new ThreadPoolTaskSupport } else new ThreadPoolTaskSupport val defaultTaskSupport: TaskSupport = getTaskSupport
見ての通り、Java 1.6以上かつ特定ベンダの実装であればForkJoinTaskSupportが、それ以外であればThreadPoolTaskSupportが選ばれるようですね。また、この定義のため並列コレクションはデフォルトだと同じThreadPoolを共有していると思われます。
ForkJoinTaskSupportであれば細粒度並列処理を相手に、ThreadPoolTaskSupportであれば少し粒度の大きい処理(IOとか?)を相手にしろってことかな…。
じゃあ、ExecutionContextTaskSupportって何でしょうね?
これの実体は、以前Futureで使用したExecutionContext.globalみたいです。
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global) extends TaskSupport with ExecutionContextTasks
*scala.concurrent.ExecutionContext.globalとscala.concurrent.ExecutionContext.Implicits.globalは、ソース上の定義は同じものを指しています
ExecutionContextTaskSupportが使用するThreadPoolもForkJoinPoolですが、グローバルなプールを使用します。なので、Futureと並列コレクションを使用すると別々のForkJoinPoolから作られたWorkerThreadを見ることができます。
よって、並列コレクションで
scala> pxs.tasksupport = new ExecutionContextTaskSupport
pxs.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ExecutionContextTaskSupport@2dc0820b
のように設定すると、Futureと同じThreadPoolを使用するようになります。
ちなみに、自分でTaskSupportを自作する場合は、生成するWorkerThreadをDaemonにするかどうかとか、ThreadPoolをシャットダウンさせるかどうかとかを注意した方がいいんでしょうね。