前回の続きです。今度は、Futureコンパニオンオブジェクトに定義されているメソッドを試していってみます。
今回も、ソースには以下のimport文があるものとし
import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.{Failure, Success}
実行時間の計測と、待ち時間を指定して値を戻すメソッドとして、以下を使うものとします。
// stop-watch def sw[A](body: => A): Unit = { val start = System.currentTimeMillis try { println(s"got [$body], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec") } catch { case th: Throwable => println(s"got Exception[$th], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec") } } // wait-and-get def wag(waitSec: Int, n: Int): Int = { Thread.sleep(waitSec * 1000L) n }
では、いってみましょう。
Future.successful、Future.failed
それぞれ引数に指定した値を元に、成功するFutureと失敗するFutureを作成します。
sw { val f = Future.successful("Hello World") f.value } // => got [Some(Success(Hello World))], elapsed time [0.003]msec sw { val f = Future.failed(new Exception) f.value } // => got [Some(Failure(java.lang.Exception))], elapsed time [0.001]msec
渡すのは確定した値なので、特に待たずとも結果が取れます。
Future.sequence、Future.traverse
sequenceはFutureのシーケンスからFutureを作成するメソッド、traverseはシーケンスとシーケンスの中身をFutureに変換する関数を与えてFutureを作成するメソッドです。
sw { val xs = List( future { wag(3, 1) }, future { wag(3, 2) }, future { wag(3, 3) }, future { wag(3, 4) }, future { wag(3, 5) }, future { wag(3, 6) } ) val futures = Future.sequence(xs) Await.result(futures, Duration.Inf) } // => got [List(1, 2, 3, 4, 5, 6)], elapsed time [9.022]msec sw { val xs = List(1, 2, 3, 4, 5, 6) val futures = Future.traverse(xs)(n => future { wag(3, n) }) Await.result(futures, Duration.Inf) } // => got [List(1, 2, 3, 4, 5, 6)], elapsed time [9.007]msec
実行時間は半分。これは、うちのマシンが2コアだからだと思います…。
Future.fold、Future.reduce
畳み込みですね。共に、Futureのシーケンスを畳み込みます。foldには初期値が必要なのは、Listなどのfoldと同じです。結果はFutureとして得られます。
sw { val xs = List( future { wag(3, 1) }, future { wag(3, 2) }, future { wag(3, 3) }, future { wag(3, 4) }, future { wag(3, 5) }, future { wag(3, 6) } ) val f = Future.fold(xs)(0) { (acc, n) => acc + n } Await.result(f, Duration.Inf) } // => got [21], elapsed time [9.032]msec sw { val xs = List( future { wag(3, 1) }, future { wag(3, 2) }, future { wag(3, 3) }, future { wag(3, 4) }, future { wag(3, 5) }, future { wag(3, 6) } ) val f = Future.reduce(xs) { (acc, n) => acc + n } Await.result(f, Duration.Inf) } // => got [21], elapsed time [9.004]msec
ちなみに、畳み込みは左から行われます(foldLeft、reduceLeft)。Futureの処理が終了した順に畳み込まれていく、ということもなさそうです。
畳み込みのうち、ひとつでも失敗すると全体が失敗します。
Future.find
引数に渡したFutureのシーケンスの中で、Futureの計算結果が条件を満たす、最初に終了したFutureを返却します。
sw { val xs = List( future { wag(3, 1) }, future { wag(3, 2) }, future { wag(5, 3) }, future { wag(3, 4) } ) val f = Future.find(xs) { n => n > 2 } Await.result(f, Duration.Inf) } // => got [Some(4)], elapsed time [6.027]msec
返却されるFutureは、Future[Option[T]]となります。
だいぶわざとらしいですが、3番目の要素より4番目の要素の方が先に終了するので、結果としてSome(4)が得られます。
*最初の2要素の計算処理がほぼ同時に終わるのと、実行環境のCPU数が2コアだから、という前提となっていますが
Future.firstCompletedOf
引数に渡したFutureのシーケンスのうち、最初に結果を返すFutureを返却します。
sw { val xs = List( future { wag(3, 1) }, future { wag(1, 2) }, future { wag(3, 3) }, future { wag(1, 4) }, future { wag(3, 5) }, future { wag(1, 6) } ) val f = Future.firstCompletedOf(xs) Await.result(f, Duration.Inf) } // => got [2], elapsed time [1.027]msec
最初に終了した要素が、計算結果に失敗した場合は返却されたFutureも失敗します(このコード例だと、Await#resultが例外を投げます)。
Future.apply
ずっと使っている
future { ... }
の実体みたいです。事実、ソース上ではscala.concurrentパッケージオブジェクトに定義されているfuture関数は、Future#applyを呼び出しているだけです。
以上、Futureコンパニオンオブジェクトでした。