CLOVER🍀

That was when it all began.

Scala 2.10.0 Futures and Promises - 2

前回の続きです。今度は、Futureコンパニオンオブジェクトに定義されているメソッドを試していってみます。

今回も、ソースには以下のimport文があるものとし

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}

実行時間の計測と、待ち時間を指定して値を戻すメソッドとして、以下を使うものとします。

  // stop-watch
  def sw[A](body: => A): Unit = {
    val start = System.currentTimeMillis
    try {
      println(s"got [$body], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec")
    } catch {
      case th: Throwable =>
        println(s"got Exception[$th], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec")
    }
  }

  // wait-and-get
  def wag(waitSec: Int, n: Int): Int = {
    Thread.sleep(waitSec * 1000L)
    n
  }

では、いってみましょう。

Future.successful、Future.failed

それぞれ引数に指定した値を元に、成功するFutureと失敗するFutureを作成します。

    sw {
      val f = Future.successful("Hello World")
      f.value
    }
    // => got [Some(Success(Hello World))], elapsed time [0.003]msec

    sw {
      val f = Future.failed(new Exception)
      f.value
    }
    // => got [Some(Failure(java.lang.Exception))], elapsed time [0.001]msec

渡すのは確定した値なので、特に待たずとも結果が取れます。

Future.sequence、Future.traverse

sequenceはFutureのシーケンスからFutureを作成するメソッド、traverseはシーケンスとシーケンスの中身をFutureに変換する関数を与えてFutureを作成するメソッドです。

    sw {
      val xs = List(
        future { wag(3, 1) },
        future { wag(3, 2) },
        future { wag(3, 3) },
        future { wag(3, 4) },
        future { wag(3, 5) },
        future { wag(3, 6) }
      )
      val futures = Future.sequence(xs)
      Await.result(futures, Duration.Inf)
    }
    // => got [List(1, 2, 3, 4, 5, 6)], elapsed time [9.022]msec

    sw {
      val xs = List(1, 2, 3, 4, 5, 6)
      val futures = Future.traverse(xs)(n => future { wag(3, n) })
      Await.result(futures, Duration.Inf)
    }
    // => got [List(1, 2, 3, 4, 5, 6)], elapsed time [9.007]msec

実行時間は半分。これは、うちのマシンが2コアだからだと思います…。

Future.fold、Future.reduce

畳み込みですね。共に、Futureのシーケンスを畳み込みます。foldには初期値が必要なのは、Listなどのfoldと同じです。結果はFutureとして得られます。

    sw {
      val xs = List(
        future { wag(3, 1) },
        future { wag(3, 2) },
        future { wag(3, 3) },
        future { wag(3, 4) },
        future { wag(3, 5) },
        future { wag(3, 6) }
      )
      val f = Future.fold(xs)(0) { (acc, n) => acc + n }
      Await.result(f, Duration.Inf)
    }
    // => got [21], elapsed time [9.032]msec

    sw {
      val xs = List(
        future { wag(3, 1) },
        future { wag(3, 2) },
        future { wag(3, 3) },
        future { wag(3, 4) },
        future { wag(3, 5) },
        future { wag(3, 6) }
      )
      val f = Future.reduce(xs) { (acc, n) => acc + n }
      Await.result(f, Duration.Inf)
    }
    // => got [21], elapsed time [9.004]msec

ちなみに、畳み込みは左から行われます(foldLeft、reduceLeft)。Futureの処理が終了した順に畳み込まれていく、ということもなさそうです。
畳み込みのうち、ひとつでも失敗すると全体が失敗します。

Future.find

引数に渡したFutureのシーケンスの中で、Futureの計算結果が条件を満たす、最初に終了したFutureを返却します。

    sw {
      val xs = List(
        future { wag(3, 1) },
        future { wag(3, 2) },
        future { wag(5, 3) },
        future { wag(3, 4) }
      )
      val f = Future.find(xs) { n => n > 2 }
      Await.result(f, Duration.Inf)
    }
    // => got [Some(4)], elapsed time [6.027]msec

返却されるFutureは、Future[Option[T]]となります。
だいぶわざとらしいですが、3番目の要素より4番目の要素の方が先に終了するので、結果としてSome(4)が得られます。
*最初の2要素の計算処理がほぼ同時に終わるのと、実行環境のCPU数が2コアだから、という前提となっていますが

Future.firstCompletedOf

引数に渡したFutureのシーケンスのうち、最初に結果を返すFutureを返却します。

    sw {
      val xs = List(
        future { wag(3, 1) },
        future { wag(1, 2) },
        future { wag(3, 3) },
        future { wag(1, 4) },
        future { wag(3, 5) },
        future { wag(1, 6) }
      )
      val f = Future.firstCompletedOf(xs)
      Await.result(f, Duration.Inf)
    }
    // => got [2], elapsed time [1.027]msec

最初に終了した要素が、計算結果に失敗した場合は返却されたFutureも失敗します(このコード例だと、Await#resultが例外を投げます)。

Future.apply

ずっと使っている

future { ... }

の実体みたいです。事実、ソース上ではscala.concurrentパッケージオブジェクトに定義されているfuture関数は、Future#applyを呼び出しているだけです。

以上、Futureコンパニオンオブジェクトでした。