CLOVER🍀

That was when it all began.

NettyのChannelBufferをPOJOに置き換える

今度は、Timeクライアント/サーバをChannelBufferからPOJOを使うように変更してきます。

User Guide的には、こちらの内容です。

Speaking in POJO instead of ChannelBuffer
http://netty.io/docs/stable/guide/html/#start.pojo

ではでは、いってみましょう。


これまで説明してきた例は、プロトコル・メッセージの主なデータ構造として全てChannelBufferを使用してきました。この節では、ChannelBufferの代わりにPOJOを使用するように、TIMEクライアント/サーバを改善してみましょう。

ChannelHandlerの中で、POJOを使うことの利点は明らかです。ChannelHandlerは、ChannelBufferから情報を抽出するコードを分離することにより、メンテナンスしやすくなり、また再利用性しやすくなります。TIMEクライアント/サーバの例では、私達は32ビット整数を読み出すだけですが、それはChannelBufferを直接使用する主な関心事ではありません。しかしながら、あなたは現実世界のプロトコルを実装するには、分離が必要なことを知るでしょう。

最初に、UnixTimeと呼ばれる新しい型を定義しましょう。
UnixTime.scala

import java.util.Date

class UnixTime(val value: Int) {
  override def toString(): String = new Date(value * 1000L).toString
}

TimeDecoderを、ChannelBufferに代わってUnixTimeを返すように修正しましょう。
TimeDecoder.scala

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.frame.FrameDecoder

class TimeDecoder extends FrameDecoder {
  override protected def decode(ctx: ChannelHandlerContext,
                                channel: Channel,
                                buffer: ChannelBuffer): AnyRef = {
    buffer readableBytes match {
      case readed if readed < 4 => null
      case _ => new UnixTime(buffer.readInt()) // (26)
    }
  }
}

(26)
FrameDecoderとReplayingDecoderは、あらゆる型のオブジェクトを返すことができます。もし、これらがChannelBufferのみを返すように制限されていれば、ChannelBufferをUnixTimeに変換する別のChannelHandlerを挿入しなくてはならないでしょう。

最新のDecoderでは、TimeClientHandlerはもはやChannelBufferを使用しません。
TimeClientHandler.scala

import java.util.Date

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{ChannelHandlerContext, ExceptionEvent, MessageEvent, SimpleChannelHandler}

class TimeClientHandler extends SimpleChannelHandler {
  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
    val m: UnixTime = e.getMessage.asInstanceOf[UnixTime]

    println("Current Time = %s".format(m))

    e.getChannel.close()
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    e.getCause.printStackTrace()
    e.getChannel.close()
  }
}

とてもシンプルで、エレガントではないでしょうか?同じテクニックを、サーバサイドにも適用することができます。今回は、TimeServerHandlerを最初に修正しましょう。
TimeServerHandler.scala

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.SimpleChannelHandler

class TimeServerHandler extends SimpleChannelHandler {
  override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    val time: UnixTime = new UnixTime((System.currentTimeMillis() / 1000).toInt)
    val f: ChannelFuture = e.getChannel.write(time)
    f.addListener(ChannelFutureListener.CLOSE)
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    e.getCause.printStackTrace()
    e.getChannel.close()
  }
}

さて、唯一見当たらないピースはEncoderで、それはUnixTimeをChannelBufferに翻訳するChannelHandlerの実装です。メッセージをエンコードする時には、パケットのフラグメントを集める必要がないので、Decoderを書くよりとても単純です。
TimeEncoder.scala

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.buffer.ChannelBuffers._
import org.jboss.netty.channel.{Channels, ChannelHandlerContext, MessageEvent, SimpleChannelHandler}

class TimeEncoder extends SimpleChannelHandler {
  override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { // (27)
    val time: UnixTime = e.getMessage.asInstanceOf[UnixTime]

    val buf: ChannelBuffer = buffer(4)
    buf.writeInt(time.value)

    Channels.write(ctx, e.getFuture, buf) // (28)
  }
}

(27)
Encoderは、writeRequestedメソッドをオーバーライドすることで、書き込みのリクエストを横取りします。パラメータであるMessageEventはmessageReceivedで指定されたものと同じ型ですが、これらが異なる解釈をされることに注意してください。ChannelEventは、upstreamイベントまたはdownstreamイベントといった、イベントフローの方向に依存します。例えば、MessageEventは、messageReceivedメソッドが呼び出された時はupstreamイベントであり、またwriteRequestedメソッドが呼び出された時はdownstreamイベントとなります。upstreamイベントとdownstreamイベントの違いについてこれ以上学びたければ、APIリファレンスを参照してください。
(28)
1度ChannelBufferをPOJOに変換すれば、ChannelPipelineの中の前のChannelDownstreamHandlerへ新しいバッファを転送すべきです。Channelsクラスは、ChannelEventの生成や送信に関する、様々なヘルパーメソッドを提供しています。この例では、Channels.write(...)メソッドは、新しいMessageEventを作成し、ChannelPipeline中の前のChannelDownstreamHandlerに送信します。

一方で、ChannelsをStatic Importするのはよい考えです。

import org.jboss.netty.channel.Channels._

val pipeline = pipeline()
write(ctx, e.getFuture(), buf)
fireChannelDisconnected(ctx)

残った最後のタスクは、サーバ側のChannelPipelineにTimeEncoderを挿入することです。そしてそれは、ちょっとした課題として残しています。


訳はここまで。残った課題は、TimeServerのパイプラインを作ってる部分を、こういう風に変えたらいいのかな?

    bootstrap.setPipelineFactory(new ChannelPipelineFactory {
      def getPipeline: ChannelPipeline = Channels.pipeline(new TimeEncoder, new TimeServerHandler)
    })