今度は、Timeクライアント/サーバをChannelBufferからPOJOを使うように変更してきます。
User Guide的には、こちらの内容です。
Speaking in POJO instead of ChannelBuffer
http://netty.io/docs/stable/guide/html/#start.pojo
ではでは、いってみましょう。
これまで説明してきた例は、プロトコル・メッセージの主なデータ構造として全てChannelBufferを使用してきました。この節では、ChannelBufferの代わりにPOJOを使用するように、TIMEクライアント/サーバを改善してみましょう。
ChannelHandlerの中で、POJOを使うことの利点は明らかです。ChannelHandlerは、ChannelBufferから情報を抽出するコードを分離することにより、メンテナンスしやすくなり、また再利用性しやすくなります。TIMEクライアント/サーバの例では、私達は32ビット整数を読み出すだけですが、それはChannelBufferを直接使用する主な関心事ではありません。しかしながら、あなたは現実世界のプロトコルを実装するには、分離が必要なことを知るでしょう。
最初に、UnixTimeと呼ばれる新しい型を定義しましょう。
UnixTime.scala
import java.util.Date class UnixTime(val value: Int) { override def toString(): String = new Date(value * 1000L).toString }
TimeDecoderを、ChannelBufferに代わってUnixTimeを返すように修正しましょう。
TimeDecoder.scala
import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.handler.codec.frame.FrameDecoder class TimeDecoder extends FrameDecoder { override protected def decode(ctx: ChannelHandlerContext, channel: Channel, buffer: ChannelBuffer): AnyRef = { buffer readableBytes match { case readed if readed < 4 => null case _ => new UnixTime(buffer.readInt()) // (26) } } }
(26)
FrameDecoderとReplayingDecoderは、あらゆる型のオブジェクトを返すことができます。もし、これらがChannelBufferのみを返すように制限されていれば、ChannelBufferをUnixTimeに変換する別のChannelHandlerを挿入しなくてはならないでしょう。
最新のDecoderでは、TimeClientHandlerはもはやChannelBufferを使用しません。
TimeClientHandler.scala
import java.util.Date import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{ChannelHandlerContext, ExceptionEvent, MessageEvent, SimpleChannelHandler} class TimeClientHandler extends SimpleChannelHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { val m: UnixTime = e.getMessage.asInstanceOf[UnixTime] println("Current Time = %s".format(m)) e.getChannel.close() } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { e.getCause.printStackTrace() e.getChannel.close() } }
とてもシンプルで、エレガントではないでしょうか?同じテクニックを、サーバサイドにも適用することができます。今回は、TimeServerHandlerを最初に修正しましょう。
TimeServerHandler.scala
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers} import org.jboss.netty.channel.Channel import org.jboss.netty.channel.ChannelFuture import org.jboss.netty.channel.ChannelFutureListener import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.SimpleChannelHandler class TimeServerHandler extends SimpleChannelHandler { override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { val time: UnixTime = new UnixTime((System.currentTimeMillis() / 1000).toInt) val f: ChannelFuture = e.getChannel.write(time) f.addListener(ChannelFutureListener.CLOSE) } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { e.getCause.printStackTrace() e.getChannel.close() } }
さて、唯一見当たらないピースはEncoderで、それはUnixTimeをChannelBufferに翻訳するChannelHandlerの実装です。メッセージをエンコードする時には、パケットのフラグメントを集める必要がないので、Decoderを書くよりとても単純です。
TimeEncoder.scala
import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.buffer.ChannelBuffers._ import org.jboss.netty.channel.{Channels, ChannelHandlerContext, MessageEvent, SimpleChannelHandler} class TimeEncoder extends SimpleChannelHandler { override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { // (27) val time: UnixTime = e.getMessage.asInstanceOf[UnixTime] val buf: ChannelBuffer = buffer(4) buf.writeInt(time.value) Channels.write(ctx, e.getFuture, buf) // (28) } }
(27)
Encoderは、writeRequestedメソッドをオーバーライドすることで、書き込みのリクエストを横取りします。パラメータであるMessageEventはmessageReceivedで指定されたものと同じ型ですが、これらが異なる解釈をされることに注意してください。ChannelEventは、upstreamイベントまたはdownstreamイベントといった、イベントフローの方向に依存します。例えば、MessageEventは、messageReceivedメソッドが呼び出された時はupstreamイベントであり、またwriteRequestedメソッドが呼び出された時はdownstreamイベントとなります。upstreamイベントとdownstreamイベントの違いについてこれ以上学びたければ、APIリファレンスを参照してください。
(28)
1度ChannelBufferをPOJOに変換すれば、ChannelPipelineの中の前のChannelDownstreamHandlerへ新しいバッファを転送すべきです。Channelsクラスは、ChannelEventの生成や送信に関する、様々なヘルパーメソッドを提供しています。この例では、Channels.write(...)メソッドは、新しいMessageEventを作成し、ChannelPipeline中の前のChannelDownstreamHandlerに送信します。
一方で、ChannelsをStatic Importするのはよい考えです。
import org.jboss.netty.channel.Channels._ val pipeline = pipeline() write(ctx, e.getFuture(), buf) fireChannelDisconnected(ctx)
残った最後のタスクは、サーバ側のChannelPipelineにTimeEncoderを挿入することです。そしてそれは、ちょっとした課題として残しています。
訳はここまで。残った課題は、TimeServerのパイプラインを作ってる部分を、こういう風に変えたらいいのかな?
bootstrap.setPipelineFactory(new ChannelPipelineFactory { def getPipeline: ChannelPipeline = Channels.pipeline(new TimeEncoder, new TimeServerHandler) })