CLOVER🍀

That was when it all began.

NettyでPipelineの構成を動的に変更する

前回は、Pipelineに組み込まれたChannelHandlerがどう呼ばれているのかをトレースしました。今度は、Pipeline内のChannelHandlerを動的に変更してみたいと思います。

これ自体は、実は前にWebSocketの例を写経した時に書いているんですよね。まあ、当時は「何かPipelineのメンバーを置換してるけど、どういうことだろ?」くらいにしか思ってませんでしたが…。
*Netty 3.2.4.Finalの頃の話です。Netty 3.3.1.Finalのサンプルでは、WebSocketでもPipelineの変更は登場しません

前回のログでは、

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[Hello World]
     current pipelines[framer, decoder, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

の以下の部分

     current pipelines[framer, decoder, encoder, handler]

のように、現在のPipelineの内容を出力していましたが、これはその伏線だったりします。

では、このTelnetサーバのPipelineに動的にChannelHandlerを追加してみましょう。しょうもない例ですが、クライアントから「decoration」と送信された後は、クライアントの送信メッセージに「***」を前後に付けてみることにします。

というわけで、Decoderとしてこんなものを作ってみます。
StringDecorator.scala

import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder

class StringDecorator extends OneToOneDecoder with UpstreamHandlerLogger {
  override protected def decode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef =
    msg match {
      case request: String =>
        called("decode")
        "***" + request + "***"
    }
}

これを、先ほどのHandlerを修正してPipelineに加えてみます。
TelnetServerHandler.scala

/** import文省略 **/

class TelnetServerHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger {
  /** 省略 **/

  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e getMessage match {
    case request: String =>
      info("     received message[%s]".format(request))

      val pipeline = ctx.getChannel.getPipeline
      info(pipeline.getNames.asScala.mkString("     current pipelines[", ", ", "]"))

      val (response, close) =
          request match {
            case "" => ("Please type something.\r\n", false)
            case "decoration" =>
              pipeline.addAfter("decoder", "decorator", new StringDecorator)
              ("Apply Decoration Mode!\r\n", false)
            case _ if request.toLowerCase == "bye" => ("Have a good day!.\r\n", true)
            case _ => ("Did you say '%s'?\r\n".format(request), false)
          }

      val future = e.getChannel.write(response)

      if (close) {
        future.addListener(ChannelFutureListener.CLOSE)
      }
  }

  /** 省略 **/
}

現在のPipelineは、ChannelHandlerContextから取得できるChannelから取得できます。遠いですねー。

      val pipeline = ctx.getChannel.getPipeline

このPipelineに、先ほど作成したDecoratorを加えます。

              pipeline.addAfter("decoder", "decorator", new StringDecorator)

StringDecoderの後に動いて欲しいので、「decoder」の後ろにくるように書いています。

前回はPipelineFactoryに、MyStringDecoderを「decoder」という名前でPipelineに登録していました。

class TelnetServerPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline

    pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter))
    pipeline.addLast("decoder", new MyStringDecoder)
    pipeline.addLast("encoder", new MyStringEncoder)

    pipeline.addLast("handler", new TelnetServerHandler)

    pipeline
  }
}

Pipelineへの追加は、これまで見てきたメソッドを合わせて以下のものがあるようです。

メソッド 内容
addAfter(String baseName, String name, ChannelHandler handler) 既に存在する、baseNameを持つChannelHandlerの後に追加
addBefore(String baseName, String name, ChannelHandler handler) 既に存在する、baseNameを持つChannelHandlerの前に追加
addFirst(String name, ChannelHandler handler) Pipelineの先頭に追加
addLast(String name, ChannelHandler handler) Pipelineの最後に追加

その他、Handlerの取得やPipelineからの削除、置換などができます。まあ、詳しくはAPIリファレンスを…。
http://netty.io/docs/stable/api/org/jboss/netty/channel/ChannelPipeline.html

では、Telnetサーバを起動してみます。

$ sbt run
[info] Set current project to netty-custom-telnet (in build file:/xxxxx/netty-custom-telnet/)
[info] Running TelnetServer 

これにクライアントを繋げます。

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to ubuntu !
It is Sun Mar 25 21:19:29 JST 2012 now.
Hello World
Did you say 'Hello World'?

とりあえず、「Hello World」と打ってます。では、「decoration」と打ってみましょう。

decoration
Apply Decoration Mode!

サーバ側には、こんな感じでログが出ます。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[decoration]
     current pipelines[framer, decoder, encoder, handler]
クラス[StringDecorator]のインスタンスを作成しました
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
StringDecorator#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

すると次回からは、送信した文字列に「***」が付いて返されるようになります。

Hello World              
Did you say '***Hello World***'?

この時、サーバ側ではこういうログになっています。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
StringDecorator#hanleUpstream, [eventName = WRITE] called
StringDecorator#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[***Hello World***]
     current pipelines[framer, decoder, decorator, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
StringDecorator#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

よ〜く見ると、Pipelineにdecoratorが増えています。

     current pipelines[framer, decoder, decorator, encoder, handler]

ここで、別のターミナルからもう1つ接続を張ってみます。

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to ubuntu !
It is Sun Mar 25 21:26:25 JST 2012 now.
Hello World
Did you say 'Hello World'?

特に装飾は行われていませんね。また、サーバ側のPipelineにも「decorator」は含まれていません。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[Hello World]
     current pipelines[framer, decoder, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

そもそも、新規クライアントを接続するとこういうログが出るので、Pipelineは接続ごとに作成されるようですね。

クラス[MyDelimiterBasedFrameDecoder]のインスタンスを作成しました
クラス[MyStringDecoder]のインスタンスを作成しました
クラス[MyStringEncoder]のインスタンスを作成しました
クラス[TelnetServerHandler]のインスタンスを作成しました
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called
MyStringDecoder#hanleUpstream, [eventName = OPEN] called
TelnetServerHandler#hanleUpstream, [eventName = OPEN] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called
MyStringDecoder#hanleUpstream, [eventName = BOUND] called
TelnetServerHandler#hanleUpstream, [eventName = BOUND] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

もちろん、最初に接続したターミナルでは、「***」で装飾が行われるままです。

Hello Netty
Did you say '***Hello Netty***'?

というわけで、PipelineおよびChannelHandlerは、接続ごとに作成されるという理解で正しそうですね。んで、ChannelHandlerは1回の接続の間は同じインスタンスが使われる、と。以前、FrameDecoderの例でフィールドを使っていたりしていた時、マルチスレッド下でどういう共有のされ方をするのかちょっと気になっていたので、これでスッキリしました。

ところで、今回のサンプルはクライアントから送信されてくる文字列を改変してしまうため、例としてはあまりよろしくありません…。このDecoderを適用した後は、「bye」で抜けられなくなりますし…。