CLOVER🍀

That was when it all began.

Nettyアプリケーションをシャットダウンする

Netty User Guidの最後のチュートリアルです。アプリケーションのシャットダウン方法を学びます。

9. Shutting Down Your Application
http://netty.io/docs/stable/guide/html/#start.12

ではでは、訳していってみましょう。


TimeClientを実行したのであれば、アプリケーションがしばらく終了せずに実行中になっていることに気付いたでしょう。すべてのスタックトレースを見ると、2つ3つのI/Oスレッドが動作し続けているのを見つけるでしょう。I/Oスレッドとアプリケーションを優雅に終了させるには、ChannelFactoryによって確保されたリソースを解放する必要があります。

典型的なネットワークアプリケーションのシャットダウンプロセスは、次の3ステップから成ります。

  1. 全てのサーバ側のソケットをクローズします
  2. サーバ側でないソケットも、全てクローズします(例えば、クライアントサイドソケット、受け入れたソケット)
  3. ChannelFactoryが使用しているリソースを解放します

この3ステップをTimeClientとTimeClient.main()に適用することで、ただ1つのクライアント側のコネクションを閉じ、ChannelFactoryが使用しているリソースを解放することで優雅にシャットダウンすることができました。
TimeClient.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.{ChannelFactory, ChannelFuture, ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory

object TimeClient {
  def main(args: Array[String]): Unit = args toList match {
    case host :: portAsString :: Nil =>
      val port = tryOption(portAsString.toInt).getOrElse(usageWithExit())

      val factory: ChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool,
                                                                      Executors.newCachedThreadPool)
      val bootstrap: ClientBootstrap = new ClientBootstrap(factory)

      bootstrap.setPipelineFactory(new ChannelPipelineFactory {
        def getPipeline: ChannelPipeline = Channels.pipeline(new TimeDecoder, new TimeClientHandler)
      })

      bootstrap.setOption("tcpNoDelay", true)
      bootstrap.setOption("keepAlive", true)

      val future: ChannelFuture = bootstrap.connect(new InetSocketAddress(host, port)) // (29)
      future.awaitUninterruptibly() // (30)
      if (!future.isSuccess) {
        future.getCause.printStackTrace() // (31)
      }
      future.getChannel.getCloseFuture.awaitUninterruptibly() // (32)
      factory.releaseExternalResources() // (33)
    case _ => usageWithExit() 
  }

  def usageWithExit(): Nothing = {
    val message = """Required 2 Arguments
                    |  host: target host
                    |  port: target port as Integer
                    |
                    | example) TimeClient localhost 8080""".stripMargin
    println(message)
    sys.exit(1)
  }

  def tryOption[T](fun: => T): Option[T] = try {
    Some(fun)
  } catch {
    case _ => None
  }
}

(29)
ClientBootstrapのconnectメソッドは、接続が成功するか失敗した時に通知を行う、ChannelFutureを返します。そしてChannelFutureは、接続しようとしたChannelへの参照を保持しています。
(30)
返されたChannelFutureが接続に成功したのか失敗したのか、判断するのを待ちましょう。
(31)
もし失敗した場合は、なぜ失敗したのかを知るために、失敗した原因を表示します。接続が成功したのでもキャンセルされたのでもなければ、ChannelFutureのgetCause()メソッドは失敗した原因を返すでしょう。
(32)
接続の試みが終わったので、コネクションがChannelのcloseFutureを待つことにより閉じられるまで、待機しておく必要があります。あなたに通知を行い、閉じられた領域であるアクションを実行できるように、全てのChannelには自分のcloseFutureがあります。
接続が失敗したとしても、接続が失敗した時にChannelは自動的にクローズされるので、closeFutureは通知されるでしょう。
(33)
この場所で全てのコネクションがクローズされます。ここで残っているタスクは、ChannelFutureが使用しているリソースを解放するだけです。それは、releaseExternalResources()メソッドを呼び出す単純なことです。NIO Selectorとスレッドプールを含む全てのリソースは、シャットダウンし自動的に終了するでしょう。

クライアントのシャットダウンは美しくて簡単でしたが、サーバのシャットダウンはどのようにしたらよいのでしょう?ポートからアンバインドし、全ての受け入れているコネクションをクローズする必要があります。これを行うためには、アクティブなコネクションのリストを追跡し続けるデータ構造が必要です。そしてそれは、ささいなタスクではありません。幸運なことに、ChannelGroupという解決策があります。

ChannelGroupは、オープンしているChannelの集合(Set)を表現している、JavaのコレクションAPIの特別な拡張です。ChannelがChannelGroupに追加され、そして追加されたChannelがクローズされた場合、クローズしたChannelは自動的にChannelGroupから削除されます。さらに同じグループ内の全てのChannelに対して、操作を行うことができます。例えば、サーバをシャットダウンする時に、ChannelGroup中の全てのChannelをクローズすることができます。

開いているソケットを追跡し続けるために、グローバルなChannelGroup、TimeServer.allChannelsに新しく開いたChannelを追加するように修正する必要があります。
TimeServerHandler.scala

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.SimpleChannelHandler

class TimeServerHandler extends SimpleChannelHandler {
  override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    TimeServer.allChannels.add(e.getChannel) // (34)

    val time: UnixTime = new UnixTime((System.currentTimeMillis() / 1000).toInt)
    val f: ChannelFuture = e.getChannel.write(time)
    f.addListener(ChannelFutureListener.CLOSE)
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    e.getCause.printStackTrace()
    e.getChannel.close()
  }
}

(34)
しかも、ChannelGroupはスレッドセーフです。

今や、全てのアクティブなChannelのリストは自動的に管理されるので、サーバのシャットダウンはクライアントのシャットダウンのように簡単になりました。
TimeServer.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.group.{ChannelGroup, ChannelGroupFuture, DefaultChannelGroup}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory

object TimeServer {
  val allChannels: ChannelGroup = new DefaultChannelGroup("time-server") // (35)

  def main(args: Array[String]): Unit = {
    val factory: ChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
    val bootstrap: ServerBootstrap = new ServerBootstrap(factory)

    bootstrap.setPipelineFactory(new ChannelPipelineFactory {
      def getPipeline: ChannelPipeline = Channels.pipeline(new TimeEncoder, new TimeServerHandler)
    })

    bootstrap.setOption("child.tcpNoDelay", true)
    bootstrap.setOption("child.keepAlive", true)

    val channel: Channel = bootstrap.bind(new InetSocketAddress(8080)) // (36)
    allChannels.add(channel) // (37)
    waitForShutdownCommand() // (38)
    val future: ChannelGroupFuture = allChannels.close() // (39)
    future.awaitUninterruptibly()
    factory.releaseExternalResources()
  }
}

(35)
DefaultChannelGroupは、コンストラクタのパラメータとしてグループの名前を要求します。このグループ名は、単に他のグループと区別するために使用されます。
(36)
ServerBootstrapのbindメソッドは、指定したローカル・アドレスにバインドされた、サーバサイドのChannelを返します。返されたChannelのclose()メソッドを呼び出すと、Channelはバインドされたローカル・アドレスからアンバインドされるでしょう。
(37)
サーバサイドやクライアントサイド、受け入れたものに関わらず、任意の型のChannelをかまわずChannelGroupに追加することができます。そのため、サーバをシャットダウンする時に受け入れたChannelと一緒に、1回でバインドしたChannelもクローズすることができます。
(38)
waitForShutdownCommand()は、シャットダウンシグナルを待つ架空のメソッドです。特権のあるクライアントや、JVMシャットダウンフックからのメッセージを待つことができます。
(39)
同じChannelGroup中の全てのChannelに対して、同じ操作を行うことができます。このケースでは、バインドされたサーバサイドのChannelがアンバインドされ、全ての受け入れたChannelは非同期にクローズされることを意味します。全てのコネクションがクローズに成功したことを通知するため、ChannelFutureによく似た役割を持つChannelGroupFutureを返します。


訳は以上です!疲れました〜。

ところで、上の例ではwaitForShutdownCommandメソッドは特に載っていないので、何か書かないとサンプルを動かすことができません。まあ、ずっと浮いたままのサーバでもいいのですが、シャットダウンするのを見るため、3回リクエストを受けたらシャットダウンするサーバにしてみました。

まずは、TimeServerはこんな感じで実装。
TimeServer.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.group.{ChannelGroup, ChannelGroupFuture, DefaultChannelGroup}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory

object TimeServer {
  val allChannels: ChannelGroup = new DefaultChannelGroup("time-server") // (35)

  def main(args: Array[String]): Unit = {
    val factory: ChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
    val bootstrap: ServerBootstrap = new ServerBootstrap(factory)

    bootstrap.setPipelineFactory(new ChannelPipelineFactory {
      def getPipeline: ChannelPipeline = Channels.pipeline(new TimeEncoder, new TimeServerHandler)
    })

    bootstrap.setOption("child.tcpNoDelay", true)
    bootstrap.setOption("child.keepAlive", true)

    val channel: Channel = bootstrap.bind(new InetSocketAddress(8080)) // (36)
    allChannels.add(channel) // (37)
    waitForShutdownCommand() // (38)
    val future: ChannelGroupFuture = allChannels.close() // (39)
    future.awaitUninterruptibly()
    factory.releaseExternalResources()
  }

  import java.util.concurrent.CountDownLatch
  val countDownLatch: CountDownLatch = new CountDownLatch(3)
  def waitForShutdownCommand(): Unit = {
    countDownLatch.await()
  }
}

CountDownLatchを、カウント3で生成しています。waitForShutdownCommandメソッドは、CountDownLatch#awaitを呼び出しているだけです。

続いて、TimeServerHandler側。

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.SimpleChannelHandler

class TimeServerHandler extends SimpleChannelHandler {
  override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    TimeServer.allChannels.add(e.getChannel) // (34)
    TimeServer.countDownLatch.countDown()

    val time: UnixTime = new UnixTime((System.currentTimeMillis() / 1000).toInt)
    val f: ChannelFuture = e.getChannel.write(time)
    f.addListener(ChannelFutureListener.CLOSE)
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    e.getCause.printStackTrace()
    e.getChannel.close()
  }
}

こちらは、TimeServer.countDownLatch#countDownをchannelConnected内で呼び出しています。これで、3回このサーバを呼び出すとシャットダウンするようになります。

意味は、あまりありませんが…。