CLOVER🍀

That was when it all began.

Scala 2.10.0 Futures and Promises - 1

今度は、Futures and Promisesです。
http://docs.scala-lang.org/overviews/core/futures.html

日本語訳。
http://docs.scala-lang.org/ja/overviews/core/futures.html

そこそこ量が多くなりそうなので、分割していきます。まずはFutureから。

Futureって?

ある計算処理を非同期(並列)に実行しておいて、結果は後から取得しようといったものです。マルチスレッド系のFutureパターンや、Javaだとjava.util.concurrent.Futureでお馴染みかもですね。そういえば、Scala自身のActorパッケージにもあった気がしますが…。

まあ、2.10.0で追加されたFutureということで。

サンプルはこんな感じです。

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

val session = ...
val f: Future[List[String]] = future {
  session.getRecentPosts
}

f onComplete {
  case Success(posts) => for (post <- posts) println(post)
  case Failure(t) => println("エラーが発生した: " + t.getMessage)
}

future関数で、Futureを作成することができます。また、Futureの計算結果には成功と失敗があります。計算処理で例外がスローされると、失敗となるみたいですね。

以下のimport文は、デフォルトの実行コンテキストを使用することを示しています。

import ExecutionContext.Implicits.global

この値が、Futureを作成したり実行したりする時の実行コンテキストとしてImplicit Parameterという形で与えられます。デフォルトの実行コンテキスト以外をどうやって作るのかは…ですが。とりあえず、今はこれのimportが必要ですよ、と。

また、future関数の結果として返ってくるFutureトレイトのインスタンスですが、それ自身に待ち合わせの機能はありません。
Javaは、Future#getで計算結果が得られるまで待ち合わせることができます
http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future

スタンスとしては、Futureを使う時は結果を同期的に待つのではなくて、Futureトレイトのインスタンスにコールバック関数を登録して(先のサンプルでは、Future#onComplete)、非同期に結果を受け取って処理しなさい、というスタンスなようです。
*その方が、性能的に好ましいということだそうで

AwaitとDuration

とはいっても、Futureのバックグラウンドで使われているスレッドはForkJoinPoolのWorkerスレッド(Daemonスレッド)なので、普通にプログラムを実行してそれ以降の処理がないと、JavaVMが終了してしまいます…。
*この辺り、Clojureと違いますよね

REPLでやっているならともかく、サンプルコードを書いているとやりづらくて仕方がないので、先に待ち合わせの方法から紹介します。

待ち合わせるためにはscala.concurrent.Awaitを、待ち合わせの時間を指定するにはscala.concurrent.duration.Durationを使用します。

以下、サンプルです。

val f = future { Thread.sleep(3000L); 10 }
println(Await.result(f, Duration.Inf)) // => 10

このサンプルを実行すると、3秒後にコメントで記載してあるような内容がコンソールに出力されます。

Await#result

def result[T](awaitable: Awaitable[T], atMost: Duration): T

で、時間を表すDurationを指定してFutureの結果を取得します。結果は、future関数で作成した計算結果です。計算処理が失敗した(例外がスローされた)場合は、このメソッドから例外が飛んできます。

Await#ready

def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type

Futureの計算結果を待ち合わせるだけのメソッドです。結果は、成功/失敗に関わらず、計算結果が終了したFutureが返ってきます。もちろん、引数に与えたFutureを見てもよいですが。

両メソッドとも、指定したDurationまでに終了しなかった場合にはjava.util.concurrent.TimeoutExceptionがスローされます。

Durationは、時間を長さを表す基底クラスで、有限も無限も表現できます。先の例

Duration.Inf

は無限でした。

Durationのインスタンスは、下記のような方法で作成することができます。まあ、これはガイドの写しですが。

import scala.concurrent.Duration
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._
// 作成
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100 millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 &#181;s") // from String

パターンマッチも可能。

// パターンマッチング
val Duration(length, unit) = 5 millis

以降の例では、すべてimportとして

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}

が指定してあるものとし、待ち合わせを行う時間は

Duration.Inf

で行うものとします。

また、Futureを作成する時には

  // wait-and-get
  def wag(waitSec: Int, n: Int): Int = {
    Thread.sleep(waitSec * 1000L)
    n
  }

と指定時間待機するような関数を使い、時間の計測があった方が並列に動いていることがわかりやすいので

  // stop-watch
  def sw[A](body: => A): Unit = {
    val start = System.currentTimeMillis
    try {
      println(s"got [$body], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec")
    } catch {
      case th: Throwable =>
        println(s"got Exception[$th], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec")
    }
  }

という関数で包んで実行するものとします。

例えば、こんな感じです。

    sw {
      val f = future { wag(3, 5) }
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [3.019]msec

Future#value

Futureの値が取得できます。

    sw {
      val f = future { wag(3, 5) }
      f.value
    }
    // => got [None], elapsed time [0.012]msec

    sw {
      val f = future { wag(3, 5) }
      Await.ready(f, Duration.Inf)
      f.value
    }
    // => got [Some(Success(5))], elapsed time [3.003]msec

    sw {
      val f = future { throw new Exception }
      Await.ready(f, Duration.Inf)
      f.value
    }
    // => got [Some(Failure(java.lang.Exception))], elapsed time [0.001]msec

別に結果の待ち合わせをするわけでもないので、計算処理が終了していない時にこのメソッドを呼ぶとNoneが返ります。計算処理が終了していた場合は、Someに包まれて結果が返ります。

Future#onComplete

Futureの計算の成功、失敗に関わらず、終了時に呼び出されるコールバック関数を登録します。結果はscala.util.Tryのインスタンスとして渡ってくるので、パターンマッチで判定します。
成功はSuccess、失敗はFailureとなります。

    sw {
      val f = future { wag(3, 5) }
      f onComplete {
        case Success(n) => println(s"got success = $n")  // => got success = 5
        case Failure(e) => println(s"got error = $e")
      }
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [3.018]msec

    sw {
      val f = future { throw new Exception }
      f onComplete {
        case Success(n) => println(s"got success = $n")
        case Failure(e) => println(s"got error = $e")  // => got error = java.lang.Exception
      }
      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.lang.Exception], elapsed time [0.002]msec

2つ目の方は、Futureが例外を投げているのでsw関数のcatch節に捕まったことになります。なお、コールバック関数は(ドキュメントの書き方は曖昧ですが)非同期に実行されているようです。

コールバック関数は、複数登録することもできます。

    sw {
      val f = future { wag(3, 5) }
      f onComplete {
        case Success(n) => println(s"1. got success = $n")
        case Failure(e) => println(s"1. got error = $e")
      }

      f onComplete {
        case Success(n) => println(s"2. got success = $n")
        case Failure(e) => println(s"2. got error = $e")
      }

      Await.result(f, Duration.Inf)
    }

この場合も、登録した関数は登録順に関係なく非同期に実行されるようです。

Future#onSuccess、Future#onFailure

それぞれ、onCompleteのようにコールバック関数を登録するメソッドですが、それぞれ成功時または失敗時のみにしか呼び出されないことが異なります。

    sw {
      val f = future { wag(3, 5) }
      f onSuccess {
        case n => println(s"got success $n")  // => got success 5
      }
      f onFailure {
        case t => println(s"got failure $t")
      }
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [3.017]msec

    sw {
      val f = future { throw new Exception }
      f onSuccess {
        case n => println(s"got success $n")
      }
      f onFailure {
        case t => println(s"got failure $t")  // => got failure java.lang.Exception
      }
      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.lang.Exception], elapsed time [0.008]msec

成功時にはonFailureに登録した関数が、失敗時にはonSuccessに登録した関数が無視されることになります。

こちらも、複数のコールバック関数を登録することが可能です。

Future#map、Future#flatMap、Future#foreach、Future#withFilter、Future#collect

お馴染みの変換、反復メソッドですね。

    sw {
      val f = future { wag(3, 5) } map { n => wag(5, 10) }
      f foreach println  // => 10
      Await.result(f, Duration.Inf)
    }
    // => got [10], elapsed time [8.022]msec

    sw {
      val f1 = future { wag(3, 5) }
      val f2 = future { wag(5, 10) }
      val fs = f1 flatMap { n1 => f2 map { n2 => n1 + n2 } }
      Await.result(fs, Duration.Inf)
    }
    // => got [15], elapsed time [5.003]msec

最初のmapの例では、Futureの計算結果に対してさらにFutureを使った計算をしています。よって、2つ目のFutureが最初のFutureの結果に依存しているため、逐次処理となり実行時間は8秒となっています。
対してflatMapの例では、2つのFutureの結果が揃わなければ結果は得られませんが、2つのFutureは別々に計算できるため5秒で実行が完了しています。2つのFutureは、並列に実行されたということですね。

Futureにおいてforeachは、onSuccessと同じと考えてよいみたいです。また、mapとflatMapに渡した関数も、Futureの計算が成功しなければ実行されません。この辺りは、OptionのSome、Noneの関係と同じと考えてよさそうですね。

map、flatMapが使えるので、先ほどのflatMapの例はfor式で書き直すことができます。

    sw {
      val f1 = future { wag(3, 5) }
      val f2 = future { wag(5, 10) }
      val fs = for {
        n1 <- f1
        n2 <- f2
      } yield n1 + n2
      Await.result(fs, Duration.Inf)
    }

withFilterとfilterもあるので、for式内でifを使用することもできます。ただ、withFilter(またはfilter)の結果がfalseとなった場合は、Futureの結果としてはjava.util.NoSuchElementExceptionがスローされることになります。

    sw {
      val f1 = future { wag(3, 5) }
      val f2 = future { wag(5, 10) }
      val f = for {
        n1 <- f1
        if n1 > 5
        n2 <- f2
      } yield n1 + n2
      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.util.NoSuchElementException: Future.filter predicate is not satisfied], elapsed time [3.002]msec

collectは割愛〜。

Future#recover、Future#recoverWith

Futureの計算が失敗した場合に、例外の結果から新たなFutureを作成して継続するためのメソッドです。

    sw {
      val f = future {
        1 / 0
      } recover {
        case th: ArithmeticException => 5
      }
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [0.035]msec

    sw {
      val f = future {
        1 / 0
      } recoverWith {
        case th: ArithmeticException => future { wag(3, 5) }
      }
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [3.004]msec

ガイドにも書いていますが、それぞれmapとflatMapによく似ていますね。なお、引数はPartialFunctionなので、部分関数に渡ってくる引数の例外に対してパターンマッチが成功しなかった場合は、recoverおよびrecoverWithの結果もまた失敗します。

Future#fallbackTo

recover、recoverWithに似ていますが、失敗時の代替となるFutureを直接設定します。

    sw {
      val f = future { 1 / 0 } fallbackTo future { wag(3, 5) }
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [3.017]msec

Future#andThen

副作用を起こすことを目的として、PartialFunctionを指定します。結果はFutureが返りますが、andThenで呼び出される関数が何を返そうと、元のFutureの値が呼び出し元には返ります。

    sw {
      val f = future { wag(3, 5) }
                .andThen { case Success(n) => println(n); 20 }  // 5
                .andThen { case Success(n) => println(n); 10 }  // 5
      Await.result(f, Duration.Inf)
    }
    // => got [5], elapsed time [3.065]msec

この例では、PartialFunctionで最初のFutureと全然関係ない値を返していますが、andThenに渡される引数もAwait#resultで得られる結果も、最初のFutureの結果で得られるものです。

Future#zip

2つのFutureをまとめることができます。

    sw {
      val f = future { wag(3, 5) } zip future { wag(5, 10) }
      Await.result(f, Duration.Inf)
    }
    // => got [(5,10)], elapsed time [5.027]msec

並列実行されているようなので、実行時間は5秒となっています。

普通にタプルにした場合と、何が違うの?というところですが

    sw {
      val f = future { throw new Exception } zip future { wag(5, 10) }
      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.lang.Exception], elapsed time [0.026]msec

    sw {
      val f = future { wag(3, 5) } zip future { throw new Exception }
      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.lang.Exception], elapsed time [3.003]msec

どちらか片方でも失敗した場合は、その例外が得られることになります。

Future#failed

完了したFutureが失敗した場合に、Future[Throwable]が返りforeach(もしくはfor内包表記)で使うことができます。

    sw {
      val f = future { wag(3, 5) }
      f.failed.foreach(th => println(s"got Exception[${th}]"))
      Await.result(f, Duration.Inf)
    }

    sw {
      val f = future { Thread.sleep(3000L); 1 / 0 }
      f.failed.foreach(th => println(s"got Exception[${th}]"))  // => got Exception[java.lang.ArithmeticException: / by zero]
      Await.result(f, Duration.Inf)
    }

    sw {
      val f = future { Thread.sleep(3000L); 1 / 0 }
      for (th <- f.failed) println(s"got Exception[${th}]")  // => got Exception[java.lang.ArithmeticException: / by zero]
      Await.result(f, Duration.Inf)
    }

最初のコードはforeachで何も出力しません。

Future#transform

Futureの結果を変換することができます。変換するための関数は、成功時と失敗時に対してそれぞれ設定します。

    sw {
      val f = future { wag(3, 5) } transform ({ n => n * 10}, { th => new RuntimeException(th) })
      Await.result(f, Duration.Inf)
    }
    // => got [50], elapsed time [3.016]msec

    sw {
      val f = future { 1 / 0 } transform ({ n => n * 10}, { th => new RuntimeException(th) })
      Await.result(f, Duration.Inf)
    }
    // => got Exception[java.lang.RuntimeException: java.lang.ArithmeticException: / by zero], elapsed time [0.004]msec

見た目的にそんなに変わらないのですが、Future#transformの宣言が

def transform[S](s: (T) ⇒ S, f: (Throwable) ⇒ Throwable)(implicit executor: ExecutionContext): Future[S]

と、例外を受けた場合は例外を返却すればいいので、別にスローしなくていいんでしょうね。Await#resultでは、変換した例外が飛んできているようですが。

いやー、けっこう面白いです。まだFutureオブジェクトとPromiseが残っているので、これは次回以降に。