GroovyのGParsとかを見ていて、そういえばClojureって並列コレクションとか持ってないのかなぁと思って見てみたら、なんか少しはありそうな感じです。
とりあえず、map関数の並列版である、pmap関数を見つけたのでちょっと試してみました。
ClojureDocsでの例を参考に…pmapとmapにそれぞれSleepしつつ引数を3倍する関数を渡すようにしてみました。
pmap_sample.clj
(defn wait-and-triple [n] (Thread/sleep 3000) (* n 3)) (time (println (pmap wait-and-triple (range 1 4)))) (time (println (map wait-and-triple (range 1 4)))) (shutdown-agents)
ちなみに、shutdown-agents関数入れないと、プログラムが終わりません…。
では、実行。
$ clj pmap_sample.clj (3 6 9) "Elapsed time: 3051.287682 msecs" (3 6 9) "Elapsed time: 9003.196502 msecs"
map版は要素数に比例して実行時間が延びていますが、pmap版は3秒になっています。
では、ちょっと要素数を増やしてみます。
pmap_sample.clj (defn wait-and-triple [n] (Thread/sleep 3000) (* n 3)) (time (println (pmap wait-and-triple (range 1 11)))) (time (println (map wait-and-triple (range 1 11)))) (shutdown-agents)
実行。
$ clj pmap_sample.clj (3 6 9 12 15 18 21 24 27 30) "Elapsed time: 3047.479419 msecs" (3 6 9 12 15 18 21 24 27 30) "Elapsed time: 30011.919161 msecs"
…pmap版、3秒??
もしかして、全部の要素にスレッド割り当ててる??
このスクリプトを実行中に、ちょっとスレッドダンプを見てみましょう。
$ clj pmap_sample.clj (3 6 9 12 15 18 21 24 27 30) "Elapsed time: 3052.164623 msecs" 〜別ターミナルで…〜 $ jps 5951 Jps 5939 main $ jstack 5939 2012-10-18 23:57:31 Full thread dump Java HotSpot(TM) 64-Bit Server VM (23.3-b01 mixed mode): "Attach Listener" daemon prio=10 tid=0x00007f2250001000 nid=0x1762 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "clojure-agent-send-off-pool-9" prio=10 tid=0x00007f227c5c5000 nid=0x1755 waiting on condition [0x00007f227598c000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359) at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1043) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1103) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-8" prio=10 tid=0x00007f227c5c3000 nid=0x1754 waiting on condition [0x00007f2275a8d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-7" prio=10 tid=0x00007f227c5c1000 nid=0x1753 waiting on condition [0x00007f2275b8e000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-6" prio=10 tid=0x00007f227c5be800 nid=0x1752 waiting on condition [0x00007f2275c8f000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-5" prio=10 tid=0x00007f227c5bc800 nid=0x1751 waiting on condition [0x00007f2275d90000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-4" prio=10 tid=0x00007f227c5ba800 nid=0x1750 waiting on condition [0x00007f2275e91000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-3" prio=10 tid=0x00007f227c5b8800 nid=0x174f waiting on condition [0x00007f2275f92000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-2" prio=10 tid=0x00007f227c5b6800 nid=0x174e waiting on condition [0x00007f2276093000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-1" prio=10 tid=0x00007f227c5b5000 nid=0x174d waiting on condition [0x00007f2276194000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "clojure-agent-send-off-pool-0" prio=10 tid=0x00007f227c5b3000 nid=0x174c waiting on condition [0x00007f2276295000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ebedb598> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at 〜省略〜 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) "Service Thread" daemon prio=10 tid=0x00007f227c0ec000 nid=0x173d runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread1" daemon prio=10 tid=0x00007f227c0e9800 nid=0x173c waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread0" daemon prio=10 tid=0x00007f227c0e6800 nid=0x173b waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x00007f227c0e4800 nid=0x173a runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=10 tid=0x00007f227c097000 nid=0x1739 in Object.wait() [0x00007f22802e2000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec570d50> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135) - locked <0x00000000ec570d50> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177) "Reference Handler" daemon prio=10 tid=0x00007f227c094800 nid=0x1738 in Object.wait() [0x00007f22803e3000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec5708d8> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:503) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133) - locked <0x00000000ec5708d8> (a java.lang.ref.Reference$Lock) "main" prio=10 tid=0x00007f227c008800 nid=0x1734 waiting on condition [0x00007f2284b41000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at user$wait_and_triple.invoke(pmap_sample.clj:2) at clojure.core$map$fn__4087.invoke(core.clj:2432) 〜省略〜 at clojure.main.main(main.java:37) "VM Thread" prio=10 tid=0x00007f227c08d000 nid=0x1737 runnable "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f227c016800 nid=0x1735 runnable "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f227c018800 nid=0x1736 runnable "VM Periodic Task Thread" prio=10 tid=0x00007f227c0f6800 nid=0x173e waiting on condition JNI global references: 269
「clojure-agent-send-off-pool」で始まるスレッド名を持つスレッドが、10本ありますけど…。そして、Fork/Join Poolじゃなくて、普通にExecutorフレームワークが動いてる感じですね。
他にもfuture関数とかあるみたいですね、今度調べてみましょう。