CLOVER🍀

That was when it all began.

Scala 2.10.0 Futures and Promises - 3

最後、Promiseです。

Promiseって?

とりあえず、以下のドキュメントにあるPromiseの説明を読んでみます。
http://docs.scala-lang.org/overviews/core/futures.html
http://docs.scala-lang.org/ja/overviews/core/futures.html

Future がリードオンリーのまだ存在しない値に対するプレースホルダ・オブジェクトの一種だと定義されるのに対して、Promise は書き込み可能で、1度だけ代入できるコンテナで Future を完了させるものだと考えることができる。 つまり、Promise は success メソッドを用いて (約束を「完了させる」ことで) Future を値とともに成功させることができる。 逆に、Promise は failure メソッドを用いて Future を例外とともに失敗させることもできる。

要は、Futureがfuture関数(またはFuture.apply)で指定した結果を返すのに対して、Promiseは外部から結果を設定できる(結果を完了する)ということですね。その結果は、Futureを経由して取得することになりますが。

今回は、ソースに以下のimport文があるものとして書いていきます。

import scala.concurrent.{Await, future, promise, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Success, Failure}

基本的な使い方ですが、まずはscala.concurrentパッケージオブジェクトに定義されているpromise関数を使ってPromiseを作成します。次いで作成したPromiseから取得できるFutureを使用して結果の取得と、Promiseに対しては結果自体の設定を行います。

val p = promise[Int]  // Promiseの作成
val f = p.future  // Promiseに対するFuture

future { p.success(...) }  // Promiseの結果の確定
future { f onSunccess { ... } )  // Futureに対するコールバック関数の登録

最後の2行は、別にfuture関数内で実行しなくてはいけないわけではないですが…。

値を設定する方(Promiseを完了させる方)がPromiseのインスタンスを使用し、結果を取得する方がPromiseから取得できるFutureを使用するわけですね。

では、いってみましょう。今回も

  // stop-watch
  def sw[A](body: => A): Unit = {
    val start = System.currentTimeMillis
    try {
      println(s"got [$body], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec")
    } catch {
      case th: Throwable =>
        println(s"got Exception[$th], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec")
    }
  }

は利用します。

Promise#success、Promise#failure、Promise#complete

1番基本的な使い方になりますね。ガイドのサンプルを、ちょっと書き換えてみました。わざとらしい、Sleepを使った処理がいくつか入っています。

  def produceSomething: String = {
    Thread.sleep(5 * 1000L)
    "Produced! [Producer]"
  }

  def continueDoingSomethingUnrelated: String = {
    Thread.sleep(5 * 1000L)
    "Completed Do Something! [Producer]"
  }

  def startDoingSomething() {
    Thread.sleep(8 * 1000L)
    println("Completed Do Something! [Consumer]")
  }

    sw {
      val p = promise[String]
      val f = p.future

      val producer = future {
        val result = produceSomething
        p success result
        // これは、以下と同義
        // p complete Success(result)
        println(continueDoingSomethingUnrelated)
      }

      val consumer = future {
        startDoingSomething()
        f onSuccess { case r => println(s"Received Producer Result[$r]") }
        println("End Consumer")
      }

      Await.result(producer, Duration.Inf)
    }

producer側では、produceSomething関数が実行し終わったところでPromiseを完了させます(Promise#success)。その後、また時間のかかる処理を行うようにしています。

これを実行すると、コンソールには以下のように出力されます。

Completed Do Something! [Consumer]
End Consumer
Received Producer Result[Produced! [Producer]]
Completed Do Something! [Producer]
got [()], elapsed time [10.016]msec

Procuderの仕事は完了していませんが、Consumer側にはFutureのコールバック関数が起動され、結果が渡っていますね。

Promiseを失敗させるには、Promise#failureメソッドを使用します。

    sw {
      val p = promise[String]
      val f = p.future

      val producer = future {
        p failure new Exception("Oops!")
        // これは、以下と同義
        // p complete Failure(new Exception("Oops!"))
      }

      val consumer = future {
        f onComplete {
          case Success(r) => println(s"got [$r]")
          case Failure(th) => println(s"failed reason[$th]")
        }
      }

      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.lang.Exception: Oops!], elapsed time [0.034]msec

Consumer側のFuture#onCompleteに登録した部分関数でのパターンマッチには、もちろんFailureが引っかかります。

なお、コメントにも書いていますが、

p success result
// これは、以下と同義
// p complete Success(result)

p failure new Exception("Oops!")
// これは、以下と同義
// p complete Failure(new Exception("Oops!"))

というように、Promise#successはPromise#complete(Success(...))、Promise#failureはPromise#complete(Failure(...))のショートカットです。

Promise#completeWith

Promiseの結果を、別のFutureで完了させるメソッドです。

    sw {
      val other = future { Thread.sleep(3 * 1000L); 10 }
      val p = promise[Int]

      p completeWith other

      val f = p.future
      f onSuccess { case n => println(n) }  // => 10

      Await.ready(f, Duration.Inf)
    }

この場合、別のFutureである変数otherの内容がPromiseのコールバックに渡っていますね。

## Promise#trySuccess、Promise#tryFailure、Promise#tryComplete
Promiseに対して値を設定して完了させますが、実際に反映されるのは1番最初に設定された値です。設定できたかどうかは戻り値がBooleanなので、それで判定します。

    sw {
      val p = promise[Int]

      println(p.trySuccess(10))  // => true
      println(p.trySuccess(5))  // => false
      println(p.tryFailure(new Exception("Oops!")))  // => false

      p.future.value
    }
    // => got [Some(Success(10))], elapsed time [0.006]msec

    sw {
      val p = promise[Int]

      println(p.tryFailure(new Exception("Oops!")))  // => true
      println(p.trySuccess(10))  // => false
      println(p.trySuccess(5))  // => false

      p.future.value
    }
    // => got [Some(Failure(java.lang.Exception: Oops!))], elapsed time [0.001]msec

複数のFutureなどでPromiseを共有し、最初に完了したFutureで値を設定するようなシーンを想定しているみたいです。

val p = promise[...]
future {
  // 何か処理
  p.trySuccess(...)
}
future {
  // 何か処理
  p.trySuccess(...)
}
future {
  // 何か処理
  p.tryFailure(...)
}

こうすると、1番早く終わったFutureの結果でPromiseが完了するわけですね。

なお、trySuccess、tryFailure、tryCompleteの関係は、success、failure、completeに似ているので、先の例は以下のように書き直せます。

    sw {
      val p = promise[Int]
      
      println(p.tryComplete(Success(10)))  // => true
      println(p.tryComplete(Success(5)))  // => false
      println(p.tryComplete(Failure(new Exception("Oops!"))))  // => false

      p.future.value
    }

    sw {
      val p = promise[Int]

      println(p.tryComplete(Failure(new Exception("Oops!"))))  // => true
      println(p.tryComplete(Success(10)))  // => false
      println(p.tryComplete(Success(5)))  // => false

      p.future.value
    }

Promise.successful、Promise.failed

Promiseコンパニオンオブジェクトです。まずは、successfulとfailed。割と直感的に分かりそうですが、すでに完了済みのPromiseを作成します。

    sw {
      val p = Promise.successful(10)
      p.future.value
    }
    // => got [Some(Success(10))], elapsed time [0.003]msec

    sw {
      val p = Promise.failed(new Exception("Oops!"))
      p.future.value
    }
    // => got [Some(Failure(java.lang.Exception: Oops!))], elapsed time [0.0]msec

Promise.apply

これまで使用していた、promise関数の実体です。

これで、Futures and Promisesは終了です。長かった〜。