CLOVER🍀

That was when it all began.

Futureを使う時にJavaのExecutorServiceを利用する

ScalaのFutureといえば、

    import scala.concurrent._
    import scala.concurrent.ExecutionContext.Implicits.global

をimportしてfuture関数を使うんだよみたいな話だと思いますが、Javajava.util.concurrent.ExecutorServiceやjava.util.concurrent.Executorも使用することができます。

まあ、そんなに面白い話しでもないですし、知ってはいましたが試してみたくなったので。

とりあえず、普通にScalaのFutureを使う簡単な例は、こんな感じ。

    import scala.concurrent._
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration.Duration
    import java.util.concurrent.TimeUnit

    val f = future {
      TimeUnit.SECONDS.sleep(3)
      "Hello Future!!"
    }
    
    val result = Await.result(f, Duration.Inf)
    println(result)
    require(result == "Hello Future!!")

スリープ入れているには、あんまり意味がないですが。ExecutionContext.Implicits.globalをimportしているところが、ポイントですね。

では、続いてJavaのExecutorServiceからFutureを利用するようにしてみましょう。Executor版は、ExecutorServiceを使って入れば実質直接使用することはないかな…?
ExecutorService自体、Executorのサブインターフェースですし。

    import scala.concurrent._
    import scala.concurrent.duration.Duration
    import java.util.concurrent.{Executors, TimeUnit}

    val es = Executors.newCachedThreadPool
    implicit val context = ExecutionContext.fromExecutorService(es)

    val f = future {
      TimeUnit.SECONDS.sleep(3)
      "Hello Future!!"
    }

    val result = Await.result(f, Duration.Inf)
    println(result)
    require(result == "Hello Future!!")

    es.shutdown()
    es.awaitTermination(5, TimeUnit.SECONDS)

ExecutionContextコンパニオンオブジェクトのfromExecutorServiceメソッドから、ExecutionContextを作成することができます。この時、implicitを付与しておくことがポイントです。

これで、future関数のImplicit ParameterでこのExecutionContextを使ってくれます。というか、むしろ付けないとエラーになります。

ExecutorServiceが使えるということは、ForkJoinPoolも使用できます。

    import scala.concurrent._
    import scala.concurrent.duration.Duration
    import java.util.concurrent.{ForkJoinPool, TimeUnit}

    val pool = new ForkJoinPool
    implicit val context = ExecutionContext.fromExecutorService(pool)

    val f = future {
      TimeUnit.SECONDS.sleep(3)
      "Hello Future!!"
    }

    val result = Await.result(f, Duration.Inf)
    println(result)
    require(result == "Hello Future!!")

    // ForkJoinPoolで作成されるスレッドは、
    // Daemonスレッドなので待たなくても終了できる

使い方に応じて、ExecutionContext.Implicits.global、JavaのExecutor/ExecutorServiceを使い分けたいですねー。

ちなみに、ExecutionContext.Implicits.globalは実質JSR-166のForkJoinPoolと同じです。