久々にNettyです。以前はUser Guideの写経でしたが、今度はもうちょっと踏み込んだ内容に手を出していこうと思います。
まずは、Pipelineとそれに組み込まれているChannelHandlerを理解していくところから。
ChannelHandlerインターフェースは、そのサブクラスとしてChannelDownstreamHandlerインターフェースとChannelUpstreamHandlerインターフェースが存在します。それぞれ、どう使われるのか?
それを理解するには、ChannelPipelineインターフェースのAPIドキュメントにかかれた図が役に立ちます。
http://netty.io/docs/stable/api/org/jboss/netty/channel/ChannelPipeline.html
これと、以前参考にさせていただいたサイトの内容を合わせると、
- 送信はDownstream
- 受信はUpstream
ということになります。つまり、
- クライアントの場合はDownstream→Upstream
- サーバの場合はUpstream→Downstream
の順番で発生したイベントが伝播していきます。ここで言っているイベントと、各Pipeline内に組み込まれたChannelHandlerがどう利用されているのかを確認するために、例題のTelnetサーバをちょっとカスタマイズしてみました。
まずは、メインクラス。サンプルをScalaで写経したものにすぎません。
TelnetServer.scala
import java.net.InetSocketAddress import java.util.concurrent.Executors import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory object TelnetServer { def main(args: Array[String]): Unit = { val port = args size match { case 0 => 8080 case _ => args(0).toInt } new TelnetServer(port).run() } } class TelnetServer(port: Int) { def run(): Unit = { val bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool, Executors.newCachedThreadPool )) bootstrap.setPipelineFactory(new TelnetServerPipelineFactory) bootstrap.bind(new InetSocketAddress(port)) } }
続いて、PipelineFactoryです。基本はサンプルと同じですが、Codecを自作のものに置き換えています。
TelnetServerPipelineFactory.scala
import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels} import org.jboss.netty.handler.codec.frame.Delimiters class TelnetServerPipelineFactory extends ChannelPipelineFactory { @throws(classOf[Exception]) def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter)) pipeline.addLast("decoder", new MyStringDecoder) pipeline.addLast("encoder", new MyStringEncoder) pipeline.addLast("handler", new TelnetServerHandler) pipeline } }
自作のCodecは、オリジナルのCodecを継承した上で、ちょっとしたトレイトをMix-inしているだけです。
// MyDelimiterBasedFrameDecoder.scala import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder class MyDelimiterBasedFrameDecoder(maxFrameLength: Int, delimiters: Array[ChannelBuffer]) extends DelimiterBasedFrameDecoder(maxFrameLength, delimiters: _*) with UpstreamHandlerLogger { @throws(classOf[Exception]) override protected def decode(ctx: ChannelHandlerContext, channel: Channel, channelBuffer: ChannelBuffer): AnyRef = { called("decode") super.decode(ctx, channel, channelBuffer) } } // MyStringDecoder.scala import java.nio.charset.Charset import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.channel.ChannelHandler.Sharable import org.jboss.netty.handler.codec.string.StringDecoder @Sharable class MyStringDecoder(charset: Charset) extends StringDecoder(charset) with UpstreamHandlerLogger { def this() = this(Charset.defaultCharset) @throws(classOf[Exception]) override protected def decode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { called("decode") super.decode(ctx, channel, msg) } } // MyStringEncoder.scala import java.nio.charset.Charset import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.channel.ChannelHandler.Sharable import org.jboss.netty.handler.codec.string.StringEncoder @Sharable class MyStringEncoder(charset: Charset) extends StringEncoder(charset) with DownstreamHandlerLogger { def this() = this(Charset.defaultCharset) @throws(classOf[Exception]) override protected def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { called("encode") super.encode(ctx, channel, msg) } }
続いて、ServerHandler。
TelnetServerHandler.scala
import scala.collection.JavaConverters._ import java.net.InetAddress import java.util.Date import org.jboss.netty.channel.ChannelEvent import org.jboss.netty.channel.ChannelFuture import org.jboss.netty.channel.ChannelFutureListener import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.MessageEvent import org.jboss.netty.channel.SimpleChannelUpstreamHandler class TelnetServerHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger { @throws(classOf[Exception]) override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { /* if (e.isInstanceOf[ChannelStateEvent]) { info(" ChannelStateEvent = " + e.toString) } */ super.handleUpstream(ctx, e) } override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { e.getChannel.write("Welcome to %s !\r\n".format(InetAddress.getLocalHost.getHostName)) e.getChannel.write("It is %s now.\r\n".format(new Date)) } override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e getMessage match { case request: String => info(" received message[%s]".format(request)) val pipeline = ctx.getChannel.getPipeline info(pipeline.getNames.asScala.mkString(" current pipelines[", ", ", "]")) val (response, close) = request match { case "" => ("Please type something.\r\n", false) case _ if request.toLowerCase == "bye" => ("Have a good day!.\r\n", true) case _ => ("Did you say '%s'?\r\n".format(request), false) } val future = e.getChannel.write(response) if (close) { future.addListener(ChannelFutureListener.CLOSE) } } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { warnLog("Unexcepted exception from downstream.", e.getCause) e.getChannel.close() } }
んで、各ChannelHandler(Codec含む)がMix-inしているトレイト。
Logger.scala
import org.jboss.netty.channel.ChannelDownstreamHandler import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelEvent import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ChannelUpstreamHandler import org.jboss.netty.channel.ChildChannelStateEvent import org.jboss.netty.channel.DownstreamMessageEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.UpstreamMessageEvent import org.jboss.netty.channel.WriteCompletionEvent import org.jboss.netty.handler.timeout.IdleStateEvent trait Logger { info("クラス[%s]のインスタンスを作成しました".format(getClass.getName)) protected def info(msg: String): Unit = println(msg) protected def warnLog(msg: String, thrown: Throwable): Unit = println("WARN:%s %s".format(msg, thrown)) protected def called(methodName: String, params: AnyRef*): Unit = params size match { case 0 => info("%s#%s called.".format(getClass.getName, methodName)) case n => info("%s#%s, [%s] called".format(getClass.getName, methodName, params.mkString(","))) } } trait HandlerLoggerSupport { def eventName(e: ChannelEvent): String = e match { case event: ChannelStateEvent => event.getState.name case event: DownstreamMessageEvent => "RECEIVED" case event: ExceptionEvent => "EXCEPTION" case event: ChildChannelStateEvent => if (event.getChildChannel.isOpen) "CHILD_OPEN" else "CHILD_CLOSED" case event: IdleStateEvent => event.getState.name case event: UpstreamMessageEvent => "WRITE" case event: WriteCompletionEvent => "WRITTEN_AMOUNT" } } trait UpstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelUpstreamHandler { abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { called("hanleUpstream", "eventName = " + eventName(e)) super.handleUpstream(ctx, e) } } trait DownstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelDownstreamHandler { abstract override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { called("handleDownstream", "eventName = " + eventName(e)) super.handleDownstream(ctx, e) } } trait HandlerLogger extends DownstreamHandlerLogger with UpstreamHandlerLogger
トレイトにChannelUpstreamHandlerインターフェースまたはChannelDownstreamHandlerインターフェースをMix-inした上で、各インターフェースの抽象メソッドをabstract defでオーバーライドします。ChannelUpstreamHandlerの場合は、こんな感じ。
abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { called("hanleUpstream", "eventName = " + eventName(e)) super.handleUpstream(ctx, e) }
メソッドeventNameは、ChannelEventの内容に応じて、イベントを表す文字列を返すメソッドです。実装は、この部分。
def eventName(e: ChannelEvent): String = e match { case event: ChannelStateEvent => event.getState.name case event: DownstreamMessageEvent => "RECEIVED" case event: ExceptionEvent => "EXCEPTION" case event: ChildChannelStateEvent => if (event.getChildChannel.isOpen) "CHILD_OPEN" else "CHILD_CLOSED" case event: IdleStateEvent => event.getState.name case event: UpstreamMessageEvent => "WRITE" case event: WriteCompletionEvent => "WRITTEN_AMOUNT" }
これで、hadleUpstreamメソッドとhandleDownstreamメソッドに伝播しているイベントをトレースしよう、という目論見です。
ではでは、Telnetサーバを起動してみます。
$ sbt run [info] Set current project to netty-custom-telnet (in build file:/xxxxx/netty-custom-telnet/) [info] Running TelnetServer
このサーバに、telnetコマンドでアクセスしてみます。
$ telnet localhost 8080 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Welcome to ubuntu ! It is Sun Mar 25 20:28:33 JST 2012 now.
すると、サーバ側のコンソールには、こんなログが出ます。
クラス[MyDelimiterBasedFrameDecoder]のインスタンスを作成しました クラス[MyStringDecoder]のインスタンスを作成しました クラス[MyStringEncoder]のインスタンスを作成しました クラス[TelnetServerHandler]のインスタンスを作成しました MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called MyStringDecoder#hanleUpstream, [eventName = OPEN] called TelnetServerHandler#hanleUpstream, [eventName = OPEN] called MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called MyStringDecoder#hanleUpstream, [eventName = BOUND] called TelnetServerHandler#hanleUpstream, [eventName = BOUND] called MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called MyStringEncoder#handleDownstream, [eventName = RECEIVED] called MyStringEncoder#encode called. MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringEncoder#handleDownstream, [eventName = RECEIVED] called MyStringEncoder#encode called. MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
インスタンス生成のログが出ているのは、Handlerが作成されるタイミングを見たいためです。なお、encodeメソッドについては自分でログ出力をしています…。
では、クライアント側からメッセージを送ってみます。
Hello World Did you say 'Hello World'?
この時、サーバ側にはこんなログが出ます。
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called MyDelimiterBasedFrameDecoder#decode called. MyStringDecoder#hanleUpstream, [eventName = WRITE] called MyStringDecoder#decode called. TelnetServerHandler#hanleUpstream, [eventName = WRITE] called received message[Hello World] current pipelines[framer, decoder, encoder, handler] MyStringEncoder#handleDownstream, [eventName = RECEIVED] called MyStringEncoder#encode called. MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
なんとなく、同じイベントがPipeline内のHandlerに伝播していっているのがわかりますね。
細かく見ていくと、例えばCONNECTEDイベントの時は
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called
といった感じですが、RECEIVEDイベントの時は
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
とChannelDownstreamHandlerしか表示されていませんし、WRITEイベントの時は
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called MyStringDecoder#hanleUpstream, [eventName = WRITE] called TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
ChannelUpstreamHandlerしか表示されていません。
Pipeline中の、Upstream系のイベントなりDownstream系のイベントに関係のあるChannelHandlerしか呼ばれない、ということでしょうね。SimpleChannelHandlerクラスみたいなChannelUpstreamHandlerでもChannelDownstreamHandlerでもあるようなクラスは、場合によっては両方のイベントに顔を出すこともあるのでしょう。
各ChannelHandlerの実行順ですが、Upstreamの場合はPipelineの前から実行されていきますが、Downstreamの場合はPipelineの後ろから実行されていくようです。ただ、今回はサーバ側で確認しただけなので、後でクライアント側の動作も見ておこうと思います。
→クライアント側も確認しました。
なお、このサーバには「bye」と打つと、接続を切られます。
bye Have a good day!. Connection closed by foreign host.
ところで、接続後の各種操作を行っても、「インスタンスを生成しました」というログは接続時の1度しか表示されませんでした。ということは、Handlerのインスタンスは1つしか生成されていないわけですが、ず〜っと1つかというとそうでもないようです。それは、次回に。