今回は、Nettyを使用してTimeプロトコルのクライアント/サーバを書いてみます。といっても、なんか大層なことをするわけではなくUser Guidの以下のセクションの写しです。なお、Timeプロトコルっていうのは、NTP以前の時刻同期プロトコルなんだそうな。
[Writing a Time Server]
http://netty.io/docs/stable/guide/html/#start.8
[Writing a Time Client]
http://netty.io/docs/stable/guide/html/#start.9
ついに、クライアントプログラミングも登場しますね。さ〜、頑張ってみましょう。この例では、メッセージの作成と送信方法、処理完了後のコネクションのクローズ方法を学ぶことが目的らしいです。
では、まずはTime Serverから。今回のプログラムは、Nettyのクラスを見せるためにあんまり型推論を使わないようにしています。
TimeServerHandler.scala
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers} import org.jboss.netty.channel.Channel import org.jboss.netty.channel.ChannelFuture import org.jboss.netty.channel.ChannelFutureListener import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.SimpleChannelHandler class TimeServerHandler extends SimpleChannelHandler { override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { // (11) val ch: Channel = e.getChannel val time: ChannelBuffer = ChannelBuffers.buffer(4) // (12) time.writeInt((System.currentTimeMillis / 1000).asInstanceOf[Int]) val f: ChannelFuture = ch.write(time) // (13) f.addListener(new ChannelFutureListener { // (14) def operationComplete(future: ChannelFuture): Unit = { val ch: Channel = future.getChannel ch.close() } }) } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { e.getCause.printStackTrace() e.getChannel.close() } }
それでは、訳していってみましょう。
(11)
channelConnectedメソッドは、接続が確立した時に呼び出されます。ここでは、現在時刻(秒)を表す32ビット整数(Integer)を書き込みます。
(12)
新しくメッセージを送るために、メッセージを含むバッファを確保する必要があります。32ビット整数を送るつもりなので、容量4バイトのChannelBufferが必要です。ChannelBuffersというヘルパークラスは、新しいバッファを確保するために使用されています。bufferメソッドに加えて、ChannelBuffersクラスはChannelBufferに関連する多くの便利なメソッドを提供しています。詳細は、APIリファレンスを参照してください。
ChannelBuffersクラスをStatic Importするのは、よい考えです。
import org.jboss.netty.buffer.ChannelBuffers._ val dynamicBuf = dynamicBuffer(256) val ordinaryBuf = buffer(1024)
(13)
あとはいつも通り、作成したメッセージを書き出します。
ところで、flipメソッドはどうしたのでしょうか?NIOでは、メッセージを送る前にByteBuffer.flipメソッドを呼び出す必要があったのではないでしょうか?ChannelBufferには、そのようなメソッドを持たない2つのポイントがあります。
1つは、読み込み操作と書き込み操作は関連しているということです。読み込み用インデックス変更が変更されない間に、ChannelBufferに対して何か書き込んだ時、書き込み用インデックスのインクリメントを行います。読み込み用インデックスと書き込み用インデックスは、それぞれメッセージがどこで始まってどこで終了しているかを表現しています。
対照的に、NIOのバッファはflipメソッドの呼び出しなしでメッセージの開始と終了をきれいに得る方法を提供しません。よって、flipメソッドを呼び出し忘れた時、何も送られない、または正しくないデータが送られてしまうため、あなたは厄介な目に遭うでしょう。
もう1つの注目点は、writeメソッドがChannelFutureインターフェースを返すことです。ChannelFutureインターフェースは、まだ行っていないI/O操作を表します。これは、全てのオペレーションがNettyにより非同期に実行されるため、要求したオペレーションがまだ行われていないかもしれない、ということを意味しています。例えば、次のコードはメッセージを送る前に、コネクションを閉じてしまうかもしれません。
val ch = ...
ch.write(message)
ch.close()
このため、writeメソッドにより返されたChannelFutureが、書き込み操作が終わったことを通知してきた後に、closeメソッドを呼び出す必要があります。ただし、closeメソッドはすぐにコネクションを閉じるのではなく、ChannelFutureを返すことに注意してください。
(14)
書き込み要求が終了したことを、どのようにすれば通知してもらえるのでしょうか?簡単な方法は、戻り値のChannelFutureにChannelFutureListenerを加えることです。ここでは、オペレーションが終了した時にChannelをクローズするChannelFutureListenerの無名クラスを作成しています。
もしくは、あらかじめ定義されたリスナーを使用して、コードを単純にすることもできます。
f.addListener(ChannelFutureListener.CLOSE)
とりあえず、サーバはここまでです。
User GuidにはChannelHandlerしか載っていなかったので、main自体はDiscard Serverを真似て作成しました。
TimeServer.scala
import java.net.InetSocketAddress import java.util.concurrent.Executors import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel.{ChannelFactory, ChannelPipeline, ChannelPipelineFactory, Channels} import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory object TimeServer { def main(args: Array[String]): Unit = { val factory: ChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) val bootstrap: ServerBootstrap = new ServerBootstrap(factory) bootstrap.setPipelineFactory(new ChannelPipelineFactory { def getPipeline: ChannelPipeline = Channels.pipeline(new TimeServerHandler) }) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.bind(new InetSocketAddress(8080)) } }
続いて、クライアント側。DiscardやEchoとは違って、サーバが送信してくるのは32ビットのバイナリデータなので、人が読めるようにするにはクライアントプログラムが必要です。なので、Nettyを使ったクライアントプログラミングを勉強してみましょう、という例です。
TimeClient.scala
import java.net.InetSocketAddress import java.util.concurrent.Executors import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel.{ChannelFactory, ChannelPipeline, ChannelPipelineFactory, Channels} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory object TimeClient { def main(args: Array[String]): Unit = args toList match { case host :: portAsString :: Nil => val port = tryOption(portAsString.toInt).getOrElse(usageWithExit()) val factory: ChannelFactory = new NioClientSocketChannelFactory( // (15) Executors.newCachedThreadPool, Executors.newCachedThreadPool) val bootstrap: ClientBootstrap = new ClientBootstrap(factory) // (16) bootstrap.setPipelineFactory(new ChannelPipelineFactory { def getPipeline: ChannelPipeline = Channels.pipeline(new TimeClientHandler) }) bootstrap.setOption("tcpNoDelay", true) // (17) bootstrap.setOption("keepAlive", true) bootstrap.connect(new InetSocketAddress(host, port)) // (18) case _ => usageWithExit() } def usageWithExit(): Nothing = { val message = """Required 2 Arguments | host: target host | port: target port as Integer | | example) TimeClient localhost 8080""".stripMargin println(message) sys.exit(1) } def tryOption[T](fun: => T): Option[T] = try { Some(fun) } catch { case _ => None } }
main以外のメソッドは、無視の方向で…。それでは、訳へ。
(15)
クライアントサイドのChannelを作成するので、NioServerSocketChannelFactoryクラスをNioClientSocketChannelFactoryクラスに置き換えています。
(16)
ClientBootstrapクラスは、クライアントサイドでのServerBootstrapクラスの対となるものです。
(17)
bindメソッドの代わりに、connectメソッドを呼び出します。
続いて、受け取った32ビット整数を人が読めるフォーマットで出力して、それからコネクションを閉じるためのChannelHandlerの実装です。
TimeClientHandler.scala
import java.util.Date import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{ChannelHandlerContext, ExceptionEvent, MessageEvent, SimpleChannelHandler} class TimeClientHandler extends SimpleChannelHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { val buffer: ChannelBuffer = e.getMessage.asInstanceOf[ChannelBuffer] val currentTimeMillis: Long = buffer.readInt() * 1000L println("Current Time = %s".format(new Date(currentTimeMillis))) e.getChannel.close() } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { e.getCause.printStackTrace() e.getChannel.close() } }
サーバサイドの例と、大した違いはありません。が、このChannelHandlerは時々IndexOutOfBoundsExceptionを送出することがあるそうな?これについては、次のセクションで何が起こっているのかを議論する、と書いてあります。
ではでは、動かしてみましょう。
まずはサーバを起動。
$ sbt "run-main TimeServer" [info] Set current project to netty-time (in build file:/xxxxx/netty-time/) [info] Running TimeServer
続いて、クライアントを起動。
$ sbt "run-main TimeClient localhost 8080" [info] Set current project to netty-time (in build file:/xxxxx/netty-time/) [info] Running TimeClient localhost 8080 Current Time = Sun Feb 05 18:43:24 JST 2012
なお、このクライアントは結果を出力してもすぐには終了しません。アプリケーションのシャットダウン方法は、また後ろのセクションで出てくるみたいです。
う〜ん、ちょっと長かったな〜。