CLOVER🍀

That was when it all began.

Clojureの並列処理関連の関数

Clojureで並列・並行処理といえばSTMやエージェントが有名だと思いますが、お手軽な関数も用意されているみたいなので、調べてみました。

pmap

map関数の並列版です。与えたシーケンスに対して、関数を並列に適用してくれます。

(time
 (println
  (let [sleep-and-double (fn [v]
                           (Thread/sleep 3000)
                           (* 2 v))]
    (pmap sleep-and-double
          (range 1 6)))))  ;; => (2 4 6 8 10) "Elapsed time: 3058.024539 msecs"
(shutdown-agents)

少し前にこの関数を使ったエントリを書きましたが、どうもシーケンスの数だけスレッドが起動するっぽいので、その点は注意が必要そうです。

あと、裏でエージェントを使っているようなので、shutdown-agentsを入れておかないとJavaVMが終了してくれません。この点も注意です。

pcalls

引数として、引数を取らない関数を複数渡すと、並列に実行してくれます。

(time
 (println
  (let [fn1 (fn [] (Thread/sleep 3000) "Hello")
        fn2 (fn [] (Thread/sleep 3000) "World")]
    (pcalls fn1 fn2))))  ;; => (Hello World) "Elapsed time: 3032.286167 msecs"
(shutdown-agents)

関数の参照自体を渡すので、渡す関数には引数は取れません。

なお、pcallsはpmap関数を使って実装されています。こちらも、shutdown-agents関数は必要なのでご注意ください。

pvalues

引数に渡した関数呼び出しを、複数渡すと並列に呼び出し処理を実行してくれます。

(time
 (println
  (let [fn1 (fn [pre post] (Thread/sleep 3000) (str pre "Hello" post))
        fn2 (fn [pre post] (Thread/sleep 3000) (str pre "World" post))]
    (pvalues (fn1 "[" "]") (fn2 "[" "]")))))  ;; => ([Hello] [World]) "Elapsed time: 3062.566762 msecs"
(shutdown-agents)

pcallsとの違いは、関数呼び出しを渡すので引数がある関数も結果的に渡すことができます。

なんですけど、pvalues関数はpcalls関数で実装されているという…。
こちらも、要shutdown-agents。

seque

ちょっとユニークな関数です。別スレッドでシーケンスを作成し、キューに次々と入れていきます。

引数は1つまたは2つで、1つの場合はシーケンスを作成する関数を、2つの場合はバッファサイズとシーケンスを作成する関数を渡します。バッファサイズを省略した場合は、100として扱われるようで。

では、サンプル。元ネタはClojureDocsです。

バッファサイズ10で、iterateでシーケンスを作成するように指定。seque関数の呼び出し後、メインスレッドは5秒待機します。その後、キューから20個要素を取得して表示。

(let [start-time (System/nanoTime)
      q (seque 10
         (iterate
          #(do
             (println "sleep 400 msec...")
             (Thread/sleep 400)
             (inc %))
          0))]
  (println "sleep 5 seconds...")
  (Thread/sleep 5000)
  (doseq [i (take 20 q)]
    (println (int (/ (- (System/nanoTime) start-time) 1e7))
             ":" i)))
(shutdown-agents)

これも、要shutdown-agentsです。

実行すると、こうなります。

sleep 5 seconds...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
501 :sleep 400 msec... 
0
503 : 1
503 : 2
503 : 3
503 : 4
503 : 5
503 : 6
503 : 7
503 : 8
503 : 9
503 : 10
sleep 400 msec...
542 : 11
sleep 400 msec...
582 : 12
sleep 400 msec...
622 : 13
sleep 400 msec...
662 : 14
sleep 400 msec...
702 : 15
sleep 400 msec...
742 : 16
sleep 400 msec...
782 : 17
sleep 400 msec...
823 : 18
sleep 400 msec...
863 : 19
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...
sleep 400 msec...

メインスレッドが待機している間に、せっせとキューに要素が入れられていきます。メインスレッドが目覚めたあとは、キューから値を抜いていきますが10個抜いたところで追いついてしまうので、それからはシーケンスが補充されてるまで待つという動きになっています。

メインスレッドは20個の要素しか要求しないのですが、キューの方は余分に10個作成するところまで裏で進んでいきます。

future、future-call

JavaのFutureによく似た使い方をします。というか、裏はJavaのFutureを使ってますので…。

(def f (future (println "start") (Thread/sleep 5000) (println "done") 100))
(Thread/sleep 500)
(println "eval future")
(println (str "result => " @f))  ;; もしくは、(deref f)
(shutdown-agents)

要shutdown-agents。

実行すると、こうなります。

start
eval future
done
result => 100

future関数を評価したところから、別スレッドでの動作が始まります。futureの値は、derefまたは@で取得できますが、今回はfuture側で5秒待機しているので評価してもすぐには値を取得できず、処理が終わるまで待機します。

futureは、裏でfuture-callという関数で実装されているようですが、こっちを直接使うことはなさそう…?

future-cancell

futureの処理をキャンセルします。

(def f (future (println "start") (Thread/sleep 5000) (println "done") 100))
(Thread/sleep 500)
(future-cancel f)
(try
  (println (str "result => " @f))
  (catch Exception e (.printStackTrace e)))  ;; => java.util.concurrent.CancellationException
(shutdown-agents)

キャンセルしたfutureを評価しようとすると、java.util.concurrent.CancellationExceptionが飛んできます。

future-cancelled?

futureがキャンセルされたかどうかを、確認する関数です。

(def f (future (println "start") (Thread/sleep 5000) (println "done") 100))
(Thread/sleep 500)
(println (future-cancelled? f))  ;; => false
(future-cancel f)
(println (future-cancelled? f))  ;; => true
(shutdown-agents)

future-done?

futureが終了したかどうかを返却する関数です。

(def f (future (println "start") (Thread/sleep 5000) (println "done") 100))
(Thread/sleep 500)
(println (future-done? f))  ;; => false
(println @f)
(println (future-done? f))  ;; => true

(def f (future (println "start") (Thread/sleep 5000) (println "done") 100))
(Thread/sleep 500)
(println (future-done? f))  ;; => false
(future-cancel f)
(println (future-done? f))  ;; => true
(shutdown-agents)

キャンセルも、終了とみなされます。

future?

引数がfutureかどうかを返却します。

(def f (future (println "start") (Thread/sleep 5000) (println "done") 100))
(println (future? f))  ;; => true
(println (future? "Clojure"))  ;; => false
(shutdown-agents)

shutdown-agents

エージェントが使用している、スレッドプールをシャットダウンします。pmap、pcalls、pvaluesやfutureは、裏でエージェントを使っているのでこの関数の呼び出しが必要です。

(shutdown-agents)

ただ、1回シャットダウンしてしまうと、pmapやfutureを呼ぼうとするとリジェクトされてしまうのでご注意を。

promise & deliver

promise関数、deliver関数は、ほぼセットで使用すると思います。promise関数で、値が未設定のオブジェクトを作成し、deliver関数でオブジェクトに値を設定します。

(def x (promise))
(deliver x 100)
(println @x)  ;; => 100
(deliver x 200)
(println @x)  ;; => 100

deliver関数で値が設定できるのは、1度だけです。なお、この関数はCountDownLatchで実装されているようで、shutdown-agents関数は必要ありません。

promiseの値は、derefまたは@で取得することができますが、deliver関数でpromiseに値が設定されるまではブロックされます。スレッドを2つ使って、片方がpromiseを評価し、もう片方がpromiseに値を設定するサンプル。

(def x (promise))
(pcalls
 #(do (println "Waiting deliver...") (println @x))
 #(do (Thread/sleep 2000) (println "push promise") (deliver x 100)))
(shutdown-agents)

shutdown-agents関数があるのは、pcalls関数を使っているからです…。

動かすと、こんな感じ。

Waiting deliver...
push promise
100

promiseを評価する方が、ブロックされてますね。

realized?

promiseに値が設定されているかどうかを返却する関数です。

(def x (promise))
(println (realized? x)) ;; => false
(deliver x 100)
(println (realized? x)) ;; => true

ちなみに、deliverもrealized?も、promiseが持っているメソッドを呼んでいるだけのラッパー関数です。

(def x (promise))
(deliver x 100)
(println @x)  ;; => 100

(def x (promise))
(x 500)
(println @x)  ;; => 500

(def x (promise))
(println (realized? x)) ;; => false
(deliver x 100)
(println (realized? x)) ;; => true

(def x (promise))
(println (.isRealized x)) ;; => false
(deliver x 100)
(println (.isRealized x)) ;; => true

deliver関数はpromiseへの関数適用で、realized?はisRealizedメソッドに置き換えが可能です。まあ、使わないでしょうが。

locking

Javaのsynchronizedキーワードのようなものだ、とドキュメントには書かれています。

(let [obj (Object.)
      func (fn [] (let [name (-> (Thread/currentThread) .getName)]
                    (println (str "enter locking " name))
                    (Thread/sleep 1000)
                    (println (str "leave locking " name))))]
  (pcalls #(do (locking obj (func)))
          #(do (locking obj (func)))))
(shutdown-agents)

実行すると、こうなります。

enter locking clojure-agent-send-off-pool-0
leave locking clojure-agent-send-off-pool-0
enter locking clojure-agent-send-off-pool-1
leave locking clojure-agent-send-off-pool-1

確かに動き的にはsynchronizedなんですけど、雰囲気的にjava.util.concurrent.locks.ReentrantLockとかを使って実装されていそうな気がするのですが…違うのかな?

お手軽に並列処理などをしたい場合は、この辺りを使うといいのかな?