CLOVER🍀

That was when it all began.

Actorのlinkとrestart

よく忘れるので、自分のためにメモ。ScalaのActorで、linkとrestartの使い方。

ScalaのActorには、Erlangのプロセスと同じようにリンクと再起動の概念があるようです。Actorトレイトのメソッドとして、linkとrestartがあるのですがこの使い方を本当によく忘れるんですよねー。

まずは、サンプルとして以下のようなコードを用意。

import scala.actors.{Actor, Exit}
import scala.actors.Actor._

case class Message(value: String)
case class Eval(fun: () => Unit)
case object Kill

class LinkableActor(val name: String, val parent: Option[Actor]) extends Actor {
  trapExit = true

  def act(): Unit = {
    parent.foreach(p => self.link(p))

    loop {
      react {
        case Message(value) =>
          println("%s: Receive Message[%s]".format(name, value))
        case Kill =>
          println("%s: Received Kill Message".format(name))
          exit("Killed")
        case Eval(fun) =>
          println("%s: Received Eval Message".format(name))
          fun()
        case Exit(from, reason) =>
          println("%s: Receive Exit Reason[%s]".format(name, reason))

          from match {
            case actor: LinkableActor =>
              println("%s: Restart Exited LinkableActor[%s]".format(name, actor.name))
              actor.restart()
              self.link(actor)
            case actor: Actor =>
              println("%s: Restart Exited Actor".format(name))
              actor.restart()
              self.link(actor)
            case _ => println("%s: Exited AbstractActor".format(name))
          }
      }
    }
  }
}

Case ClassのMessageは、今回作成したLinkableActorに文字列入りのメッセージを送るための、Evalは関数を送るための、そしてKillはLinkableActorを停止するためのクラスです。

このファイルを、REPLでロードします。

$ scala
Welcome to Scala version 2.9.0.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_24).
Type in expressions to have them evaluated.
Type :help for more information.

scala> :load LinkableActor.scala
Loading LinkableActor.scala...
import scala.actors.{Actor, Exit}
import scala.actors.Actor._
defined class Message
defined class Eval
defined module Kill
defined class LinkableActor

では、まず最初のActorを開始。

scala> val first = new LinkableActor("First Actor", None)
first: LinkableActor = LinkableActor@63bc19e

scala> first.start()
res0: scala.actors.Actor = LinkableActor@63bc19e

続いて、2つ目のActorを作成・開始しつつ、最初のActorとリンクします。

scala> val second = new LinkableActor("Second Actor", Some(first))
second: LinkableActor = LinkableActor@111de95a

scala> second.start()
res1: scala.actors.Actor = LinkableActor@111de95a

この状態で、2つのActorは相互にリンクされ、メッセージを待っている状態になります。では、メッセージを送ってみましょう。

scala> first ! Message("Hello First Actor")

scala> First Actor: Receive Message[Hello First Actor]


scala> second ! Message("Hello Second Actor")
Second Actor: Receive Message[Hello Second Actor]

Actorが出力するメッセージが入力待ち状態のコンソールと重なっているのは、Actorが非同期に実行されているからですね。

では、Killを送信してActorを止めてみましょう。

scala> first ! Kill
First Actor: Received Kill Message

scala> Second Actor: Receive Exit Reason[Killed]
Second Actor: Restart Exited LinkableActor[First Actor]

最初のActorにKillが送信されるとexitするのですが、それを2つ目のActorが検知して再起動をしています。

この後、最初のActorは何事もなかったようにメッセージを受け取れます。

scala> first ! Message("Hello")
First Actor: Receive Message[Hello]

Evalで実行時例外で死んでもらっても、やっぱり再起動しようとします。

scala> first ! Eval(() => 1 / 0)
First Actor: Received Eval Message

scala> Second Actor: Receive Exit Reason[UncaughtException($line6.$read$$iw$$iw$$iw$$iw$LinkableActor@63bc19e,Some(Eval(<function0>)),Some(scala.actors.ActorProxy@5e1f5fc7),java.lang.ArithmeticException: / by zero)]
Second Actor: Restart Exited LinkableActor[First Actor]

scala> first ! Eval(() => println("Eval!"))
First Actor: Received Eval Message
Eval!

なお、リンクは双方向なので、2つ目のActorを終了すると最初のActorがそれを検知して再起動します。

scala> second ! Kill
Second Actor: Received Kill Message
First Actor: Receive Exit Reason[Killed]
First Actor: Restart Exited LinkableActor[Second Actor]

scala> second ! Message("Hello")
Second Actor: Receive Message[Hello]

とまあ、結果だけ見るとすごい単純っぽいのですが、実はけっこうハマりました…。

まず、trapExitにtrueを設定しないとExitメッセージは受け取れません。これは前提です。しかも、linkだけしてtrapExitをtrueにしなかった場合、片方のActorが死ぬともう片方のActorも共づれて死にます。あと、問題なのは、linkを書く場所です…。

  def act(): Unit = {
    parent.foreach(p => self.link(p))

    loop {

と、actのすぐ次に書いていますが、これ、コンストラクタなどで書いても全く機能しません。また、Actorインスタンスの外で書くのもNGです。linkはstart後でないとダメなようですね。

さらに、ActorがExitするとlinkが切れてしまうようなので、下記のようにExitメッセージを受け取ったら再度linkし直すことが必要です。
※注意点
ExitしたActorをrestartすると、再度actメソッドから開始されるので2つ目のActorに限って言えば再起動したタイミングで再度最初のActorとlinkするでしょう。が、最初のActorは2つ目のActorのことを直接知らないので、Exit時にlinkするしかない…と思っています。もしくは、最初のstart前に相互のActorのフィールドなりに予め設定しておくんでしょうね。気になるのは、Erlangのspawn_linkのようなアトミックな操作って厳密にはできないんじゃあ…?

        case Exit(from, reason) =>
          println("%s: Receive Exit Reason[%s]".format(name, reason))

          from match {
            case actor: LinkableActor =>
              println("%s: Restart Exited LinkableActor[%s]".format(name, actor.name))
              actor.restart()
              self.link(actor)
            case actor: Actor =>
              println("%s: Restart Exited Actor".format(name))
              actor.restart()
              self.link(actor)

ここになかなか気付かず、前もよくハマっていましたが、今回も久しぶりにやったらキレイにハマりました…。

あと、再起動前にメッセージを受け取っても、一応メールボックスには溜まるっぽい?

ちょっと修正。

          from match {
            case actor: LinkableActor =>
              reactWithin(5000L) {
                case scala.actors.TIMEOUT =>
                  println("%s: Restart Exited LinkableActor[%s]".format(name, actor.name))
                  actor.restart()
                  self.link(actor)
              }

実験。

scala> first ! Kill
First Actor: Received Kill Message

scala> Second Actor: Receive Exit Reason[Killed]


scala> first ! Message("Hello1")

scala> first ! Message("Hello2")

scala> Second Actor: Restart Exited LinkableActor[First Actor]
First Actor: Receive Message[Hello1]
First Actor: Receive Message[Hello2]