CLOVER🍀

That was when it all began.

Pipelineに組み込まれたChannelHandlerの実行順を確認する[クライアント編&まとめ]

前に、NettyのTelnetサーバのサンプルを写経して、Pipelineに組み込まれたChannelHandlerがどういう順番に呼び出されていくのかを確認しました。

今度は、クライアント側を見ていこうと思います。

というわけで、TelnetクライアントのサンプルをScalaでアレンジしつつ写経。
TelnetClient.scala

import scala.annotation.tailrec

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.{Channel, ChannelFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory

object TelnetClient {
  def main(args: Array[String]): Unit = args toList match {
    case host :: portAsString :: Nil => new TelnetClient(host, portAsString.toInt).run()
    case _ =>
      println("Usage: %s <host> <port>".format(classOf[TelnetClient].getSimpleName))
      sys.exit(1)
  }
}

class TelnetClient(host: String, port: Int) {
  def run(): Unit = {
    val bootstrap = new ClientBootstrap(
                        new NioClientSocketChannelFactory(
                              Executors.newCachedThreadPool, Executors.newCachedThreadPool
                        ))

    bootstrap.setPipelineFactory(new TelnetClientPipelineFactory)

    val future = bootstrap.connect(new InetSocketAddress(host, port))
    val channel = future.awaitUninterruptibly().getChannel

    if (!future.isSuccess) {
      future.getCause.printStackTrace()
      bootstrap.releaseExternalResources()
    }

    speak(channel).foreach(_.awaitUninterruptibly())

    channel.close().awaitUninterruptibly()
    bootstrap.releaseExternalResources()
  }

  def speak(channel: Channel): Option[ChannelFuture] = {
    @tailrec
    def speakInner(channel: Channel, beforeWriteFuture: Option[ChannelFuture]): Option[ChannelFuture] =
      readLine() match {
        case null => beforeWriteFuture.orElse(None)
        case line if line == "bye" =>
          val lastWriteFuture = channel.write(line + "\r\n")
          channel.getCloseFuture().awaitUninterruptibly()
          Some(lastWriteFuture)
        case line =>
          val lastWriteFuture = channel.write(line + "\r\n")
          speakInner(channel, Some(lastWriteFuture))
      }

    speakInner(channel, None)
  }
}

TelnetClientPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.frame.Delimiters

class TelnetClientPipelineFactory extends ChannelPipelineFactory {
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline

    pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter))
    pipeline.addLast("decoder", new MyStringDecoder)
    pipeline.addLast("encoder", new MyStringEncoder)

    pipeline.addLast("handler", new TelnetClientHandler)

    pipeline
  }
}

Pipelineに組み込んでいるCodecは、標準のCodecを自作トレイトをMix-inしてログ出力するようにしたものです。

TelnetClientHandler.scala

import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler

class TelnetClientHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger {
  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit =
    println(e.getMessage)

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    warnLog("Unexcepted exception from downstream.", e.getCause)
    e.getChannel.close()
  }
}

TelnetClientHandlerクラスがMix-inしているトレイトは、Codecと同様のロギングを行うトレイトです。

では、起動してみましょう。
*サーバ側は、Listen Port 8080で起動済みの前提です

> run-main TelnetClient localhost 8080
[info] Running TelnetClient localhost 8080
クラス[MyDelimiterBasedFrameDecoder]のインスタンスを作成しました
クラス[MyStringDecoder]のインスタンスを作成しました
クラス[MyStringEncoder]のインスタンスを作成しました
クラス[TelnetClientHandler]のインスタンスを作成しました
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called
MyStringDecoder#hanleUpstream, [eventName = OPEN] called
TelnetClientHandler#hanleUpstream, [eventName = OPEN] called
MyStringEncoder#handleDownstream, [eventName = CONNECTED] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called
MyStringDecoder#hanleUpstream, [eventName = BOUND] called
TelnetClientHandler#hanleUpstream, [eventName = BOUND] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetClientHandler#hanleUpstream, [eventName = CONNECTED] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetClientHandler#hanleUpstream, [eventName = WRITE] called
Welcome to ubuntu !
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetClientHandler#hanleUpstream, [eventName = WRITE] called
It is Sun Apr 01 00:47:44 JST 2012 now.

前回、

  • 送信はDownstream
  • 受信はUpstream

と書きましたが、接続時はあんまりDownstreamが出てきませんね…。

とりあえず、Hello Worldと。

Hello World
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetClientHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetClientHandler#hanleUpstream, [eventName = WRITE] called
Did you say 'Hello World'?

Downstream側のChannelHandlerが現れましたね。まあ、RECEIVEDイベントだけですが。

最後に、接続を切ってみます。

bye
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetClientHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetClientHandler#hanleUpstream, [eventName = WRITE] called
Have a good day!.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetClientHandler#hanleUpstream, [eventName = CONNECTED] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called
MyStringDecoder#hanleUpstream, [eventName = BOUND] called
TelnetClientHandler#hanleUpstream, [eventName = BOUND] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called
MyStringDecoder#hanleUpstream, [eventName = OPEN] called
TelnetClientHandler#hanleUpstream, [eventName = OPEN] called
MyStringEncoder#handleDownstream, [eventName = OPEN] called
[success] Total time: 517 s, completed Apr 1, 2012 12:59:16 AM

ちなみに、前回Upstreamの場合はPipelineの前から実行されていきますが、Downstreamの場合はPipelineの後ろから実行されていくようです、と書きましたが、今回クライアントで試してみた結果もやっぱり同じでした。
*ここには書いていませんが、Encoderを1つ既存のEncoderの後ろに追加すると、追加されたEncoder→既存のEncoderの順に動作しました

というわけで、再度まとめます。

Nettyでは、

  • 受信はUpstream
  • 送信はDownstream

として扱います。よって、

  • クライアントがリクエスト送る場合はDownstream→Upstream
  • サーバがリクエストを受ける場合はUpstream→Downstream

の順にChannelHandlerにイベントが伝播していきます。
*ただ、クライアントの接続の確立/切断時にはUpstream→Downstreamの順に流れているような…。
そして、Pipeline内のChannelHandlerは

  • Upstreamの場合、Pipelineの前から実行されていく
  • Downstreamの場合、Pipelineの後ろから実行されていく

ということが、これで確認できました。

…まあ、ある程度前回の結果とChannelPipelineのAPIの絵から、なんとなく結果は見えていた気がしないでもないですが。