最後、Promiseです。
Promiseって?
とりあえず、以下のドキュメントにあるPromiseの説明を読んでみます。
http://docs.scala-lang.org/overviews/core/futures.html
http://docs.scala-lang.org/ja/overviews/core/futures.html
Future がリードオンリーのまだ存在しない値に対するプレースホルダ・オブジェクトの一種だと定義されるのに対して、Promise は書き込み可能で、1度だけ代入できるコンテナで Future を完了させるものだと考えることができる。 つまり、Promise は success メソッドを用いて (約束を「完了させる」ことで) Future を値とともに成功させることができる。 逆に、Promise は failure メソッドを用いて Future を例外とともに失敗させることもできる。
要は、Futureがfuture関数(またはFuture.apply)で指定した結果を返すのに対して、Promiseは外部から結果を設定できる(結果を完了する)ということですね。その結果は、Futureを経由して取得することになりますが。
今回は、ソースに以下のimport文があるものとして書いていきます。
import scala.concurrent.{Await, future, promise, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.{Success, Failure}
基本的な使い方ですが、まずはscala.concurrentパッケージオブジェクトに定義されているpromise関数を使ってPromiseを作成します。次いで作成したPromiseから取得できるFutureを使用して結果の取得と、Promiseに対しては結果自体の設定を行います。
val p = promise[Int] // Promiseの作成 val f = p.future // Promiseに対するFuture future { p.success(...) } // Promiseの結果の確定 future { f onSunccess { ... } ) // Futureに対するコールバック関数の登録
最後の2行は、別にfuture関数内で実行しなくてはいけないわけではないですが…。
値を設定する方(Promiseを完了させる方)がPromiseのインスタンスを使用し、結果を取得する方がPromiseから取得できるFutureを使用するわけですね。
では、いってみましょう。今回も
// stop-watch def sw[A](body: => A): Unit = { val start = System.currentTimeMillis try { println(s"got [$body], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec") } catch { case th: Throwable => println(s"got Exception[$th], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec") } }
は利用します。
Promise#success、Promise#failure、Promise#complete
1番基本的な使い方になりますね。ガイドのサンプルを、ちょっと書き換えてみました。わざとらしい、Sleepを使った処理がいくつか入っています。
def produceSomething: String = { Thread.sleep(5 * 1000L) "Produced! [Producer]" } def continueDoingSomethingUnrelated: String = { Thread.sleep(5 * 1000L) "Completed Do Something! [Producer]" } def startDoingSomething() { Thread.sleep(8 * 1000L) println("Completed Do Something! [Consumer]") } sw { val p = promise[String] val f = p.future val producer = future { val result = produceSomething p success result // これは、以下と同義 // p complete Success(result) println(continueDoingSomethingUnrelated) } val consumer = future { startDoingSomething() f onSuccess { case r => println(s"Received Producer Result[$r]") } println("End Consumer") } Await.result(producer, Duration.Inf) }
producer側では、produceSomething関数が実行し終わったところでPromiseを完了させます(Promise#success)。その後、また時間のかかる処理を行うようにしています。
これを実行すると、コンソールには以下のように出力されます。
Completed Do Something! [Consumer] End Consumer Received Producer Result[Produced! [Producer]] Completed Do Something! [Producer] got [()], elapsed time [10.016]msec
Procuderの仕事は完了していませんが、Consumer側にはFutureのコールバック関数が起動され、結果が渡っていますね。
Promiseを失敗させるには、Promise#failureメソッドを使用します。
sw { val p = promise[String] val f = p.future val producer = future { p failure new Exception("Oops!") // これは、以下と同義 // p complete Failure(new Exception("Oops!")) } val consumer = future { f onComplete { case Success(r) => println(s"got [$r]") case Failure(th) => println(s"failed reason[$th]") } } Await.result(f, Duration.Inf) } // => got Exception[java.lang.Exception: Oops!], elapsed time [0.034]msec
Consumer側のFuture#onCompleteに登録した部分関数でのパターンマッチには、もちろんFailureが引っかかります。
なお、コメントにも書いていますが、
p success result // これは、以下と同義 // p complete Success(result) p failure new Exception("Oops!") // これは、以下と同義 // p complete Failure(new Exception("Oops!"))
というように、Promise#successはPromise#complete(Success(...))、Promise#failureはPromise#complete(Failure(...))のショートカットです。
Promise#completeWith
Promiseの結果を、別のFutureで完了させるメソッドです。
sw { val other = future { Thread.sleep(3 * 1000L); 10 } val p = promise[Int] p completeWith other val f = p.future f onSuccess { case n => println(n) } // => 10 Await.ready(f, Duration.Inf) }
この場合、別のFutureである変数otherの内容がPromiseのコールバックに渡っていますね。
## Promise#trySuccess、Promise#tryFailure、Promise#tryComplete
Promiseに対して値を設定して完了させますが、実際に反映されるのは1番最初に設定された値です。設定できたかどうかは戻り値がBooleanなので、それで判定します。
sw { val p = promise[Int] println(p.trySuccess(10)) // => true println(p.trySuccess(5)) // => false println(p.tryFailure(new Exception("Oops!"))) // => false p.future.value } // => got [Some(Success(10))], elapsed time [0.006]msec sw { val p = promise[Int] println(p.tryFailure(new Exception("Oops!"))) // => true println(p.trySuccess(10)) // => false println(p.trySuccess(5)) // => false p.future.value } // => got [Some(Failure(java.lang.Exception: Oops!))], elapsed time [0.001]msec
複数のFutureなどでPromiseを共有し、最初に完了したFutureで値を設定するようなシーンを想定しているみたいです。
val p = promise[...] future { // 何か処理 p.trySuccess(...) } future { // 何か処理 p.trySuccess(...) } future { // 何か処理 p.tryFailure(...) }
こうすると、1番早く終わったFutureの結果でPromiseが完了するわけですね。
なお、trySuccess、tryFailure、tryCompleteの関係は、success、failure、completeに似ているので、先の例は以下のように書き直せます。
sw { val p = promise[Int] println(p.tryComplete(Success(10))) // => true println(p.tryComplete(Success(5))) // => false println(p.tryComplete(Failure(new Exception("Oops!")))) // => false p.future.value } sw { val p = promise[Int] println(p.tryComplete(Failure(new Exception("Oops!")))) // => true println(p.tryComplete(Success(10))) // => false println(p.tryComplete(Success(5))) // => false p.future.value }
Promise.successful、Promise.failed
Promiseコンパニオンオブジェクトです。まずは、successfulとfailed。割と直感的に分かりそうですが、すでに完了済みのPromiseを作成します。
sw { val p = Promise.successful(10) p.future.value } // => got [Some(Success(10))], elapsed time [0.003]msec sw { val p = Promise.failed(new Exception("Oops!")) p.future.value } // => got [Some(Failure(java.lang.Exception: Oops!))], elapsed time [0.0]msec
Promise.apply
これまで使用していた、promise関数の実体です。
これで、Futures and Promisesは終了です。長かった〜。