CLOVER🍀

That was when it all began.

NettyのPipelineとChannelHandlerの関係を学ぶ

久々にNettyです。以前はUser Guideの写経でしたが、今度はもうちょっと踏み込んだ内容に手を出していこうと思います。

まずは、Pipelineとそれに組み込まれているChannelHandlerを理解していくところから。

ChannelHandlerインターフェースは、そのサブクラスとしてChannelDownstreamHandlerインターフェースとChannelUpstreamHandlerインターフェースが存在します。それぞれ、どう使われるのか?

それを理解するには、ChannelPipelineインターフェースのAPIドキュメントにかかれた図が役に立ちます。
http://netty.io/docs/stable/api/org/jboss/netty/channel/ChannelPipeline.html

これと、以前参考にさせていただいたサイトの内容を合わせると、

  • 送信はDownstream
  • 受信はUpstream

ということになります。つまり、

  • クライアントの場合はDownstream→Upstream
  • サーバの場合はUpstream→Downstream

の順番で発生したイベントが伝播していきます。ここで言っているイベントと、各Pipeline内に組み込まれたChannelHandlerがどう利用されているのかを確認するために、例題のTelnetサーバをちょっとカスタマイズしてみました。

まずは、メインクラス。サンプルをScalaで写経したものにすぎません。
TelnetServer.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory

object TelnetServer {
  def main(args: Array[String]): Unit = {
    val port = args size match {
      case 0 => 8080
      case _ => args(0).toInt
    }

    new TelnetServer(port).run()
  }
}

class TelnetServer(port: Int) {
  def run(): Unit = {
    val bootstrap = new ServerBootstrap(
                                        new NioServerSocketChannelFactory(
                                          Executors.newCachedThreadPool,
                                          Executors.newCachedThreadPool
                                        ))
    bootstrap.setPipelineFactory(new TelnetServerPipelineFactory)
    bootstrap.bind(new InetSocketAddress(port))
  }
}

続いて、PipelineFactoryです。基本はサンプルと同じですが、Codecを自作のものに置き換えています。
TelnetServerPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.frame.Delimiters

class TelnetServerPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline

    pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter))
    pipeline.addLast("decoder", new MyStringDecoder)
    pipeline.addLast("encoder", new MyStringEncoder)

    pipeline.addLast("handler", new TelnetServerHandler)

    pipeline
  }
}

自作のCodecは、オリジナルのCodecを継承した上で、ちょっとしたトレイトをMix-inしているだけです。

// MyDelimiterBasedFrameDecoder.scala
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder

class MyDelimiterBasedFrameDecoder(maxFrameLength: Int, delimiters: Array[ChannelBuffer])
  extends DelimiterBasedFrameDecoder(maxFrameLength, delimiters: _*) with UpstreamHandlerLogger {

  @throws(classOf[Exception])
  override protected def decode(ctx: ChannelHandlerContext, channel: Channel, channelBuffer: ChannelBuffer): AnyRef = {
    called("decode")
    super.decode(ctx, channel, channelBuffer)
  }
}

// MyStringDecoder.scala
import java.nio.charset.Charset

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.channel.ChannelHandler.Sharable
import org.jboss.netty.handler.codec.string.StringDecoder

@Sharable
class MyStringDecoder(charset: Charset) extends StringDecoder(charset) with UpstreamHandlerLogger {
  def this() = this(Charset.defaultCharset)

  @throws(classOf[Exception])
  override protected def decode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
    called("decode")
    super.decode(ctx, channel, msg)
  }
}

// MyStringEncoder.scala
import java.nio.charset.Charset

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.channel.ChannelHandler.Sharable
import org.jboss.netty.handler.codec.string.StringEncoder

@Sharable
class MyStringEncoder(charset: Charset) extends StringEncoder(charset) with DownstreamHandlerLogger {
  def this() = this(Charset.defaultCharset)

  @throws(classOf[Exception])
  override protected def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
    called("encode")
    super.encode(ctx, channel, msg)
  }
}

続いて、ServerHandler。
TelnetServerHandler.scala

import scala.collection.JavaConverters._

import java.net.InetAddress
import java.util.Date

import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler

class TelnetServerHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger {
  @throws(classOf[Exception])
  override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    /*
    if (e.isInstanceOf[ChannelStateEvent]) {
      info("     ChannelStateEvent = " + e.toString)
    }
    */

    super.handleUpstream(ctx, e)
  }

  override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    e.getChannel.write("Welcome to %s !\r\n".format(InetAddress.getLocalHost.getHostName))
    e.getChannel.write("It is %s now.\r\n".format(new Date))
  }

  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e getMessage match {
    case request: String =>
      info("     received message[%s]".format(request))

      val pipeline = ctx.getChannel.getPipeline
      info(pipeline.getNames.asScala.mkString("     current pipelines[", ", ", "]"))

      val (response, close) =
          request match {
            case "" => ("Please type something.\r\n", false)
            case _ if request.toLowerCase == "bye" => ("Have a good day!.\r\n", true)
            case _ => ("Did you say '%s'?\r\n".format(request), false)
          }

      val future = e.getChannel.write(response)

      if (close) {
        future.addListener(ChannelFutureListener.CLOSE)
      }
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    warnLog("Unexcepted exception from downstream.", e.getCause)
    e.getChannel.close()
  }
}

んで、各ChannelHandler(Codec含む)がMix-inしているトレイト。
Logger.scala

import org.jboss.netty.channel.ChannelDownstreamHandler
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ChannelUpstreamHandler
import org.jboss.netty.channel.ChildChannelStateEvent
import org.jboss.netty.channel.DownstreamMessageEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.UpstreamMessageEvent
import org.jboss.netty.channel.WriteCompletionEvent
import org.jboss.netty.handler.timeout.IdleStateEvent

trait Logger {
  info("クラス[%s]のインスタンスを作成しました".format(getClass.getName))

  protected def info(msg: String): Unit = println(msg)
  protected def warnLog(msg: String, thrown: Throwable): Unit =
    println("WARN:%s %s".format(msg, thrown))

  protected def called(methodName: String, params: AnyRef*): Unit = params size match {
    case 0 => info("%s#%s called.".format(getClass.getName, methodName))
    case n => info("%s#%s, [%s] called".format(getClass.getName, methodName, params.mkString(",")))
  }
}

trait HandlerLoggerSupport {
  def eventName(e: ChannelEvent): String = e match {
    case event: ChannelStateEvent => event.getState.name
    case event: DownstreamMessageEvent => "RECEIVED"
    case event: ExceptionEvent => "EXCEPTION"
    case event: ChildChannelStateEvent =>
      if (event.getChildChannel.isOpen) "CHILD_OPEN"
      else "CHILD_CLOSED"
    case event: IdleStateEvent => event.getState.name
    case event: UpstreamMessageEvent => "WRITE"
    case event: WriteCompletionEvent => "WRITTEN_AMOUNT"
  }
}

trait UpstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelUpstreamHandler {
  abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    called("hanleUpstream", "eventName = " + eventName(e))
    super.handleUpstream(ctx, e)
  }
}

trait DownstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelDownstreamHandler {
  abstract override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    called("handleDownstream", "eventName = " + eventName(e))
    super.handleDownstream(ctx, e)
  }
}

trait HandlerLogger extends DownstreamHandlerLogger with UpstreamHandlerLogger

トレイトにChannelUpstreamHandlerインターフェースまたはChannelDownstreamHandlerインターフェースをMix-inした上で、各インターフェースの抽象メソッドをabstract defでオーバーライドします。ChannelUpstreamHandlerの場合は、こんな感じ。

  abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    called("hanleUpstream", "eventName = " + eventName(e))
    super.handleUpstream(ctx, e)
  }

メソッドeventNameは、ChannelEventの内容に応じて、イベントを表す文字列を返すメソッドです。実装は、この部分。

  def eventName(e: ChannelEvent): String = e match {
    case event: ChannelStateEvent => event.getState.name
    case event: DownstreamMessageEvent => "RECEIVED"
    case event: ExceptionEvent => "EXCEPTION"
    case event: ChildChannelStateEvent =>
      if (event.getChildChannel.isOpen) "CHILD_OPEN"
      else "CHILD_CLOSED"
    case event: IdleStateEvent => event.getState.name
    case event: UpstreamMessageEvent => "WRITE"
    case event: WriteCompletionEvent => "WRITTEN_AMOUNT"
  }

これで、hadleUpstreamメソッドとhandleDownstreamメソッドに伝播しているイベントをトレースしよう、という目論見です。

ではでは、Telnetサーバを起動してみます。

$ sbt run
[info] Set current project to netty-custom-telnet (in build file:/xxxxx/netty-custom-telnet/)
[info] Running TelnetServer 

このサーバに、telnetコマンドでアクセスしてみます。

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to ubuntu !
It is Sun Mar 25 20:28:33 JST 2012 now.

すると、サーバ側のコンソールには、こんなログが出ます。

クラス[MyDelimiterBasedFrameDecoder]のインスタンスを作成しました
クラス[MyStringDecoder]のインスタンスを作成しました
クラス[MyStringEncoder]のインスタンスを作成しました
クラス[TelnetServerHandler]のインスタンスを作成しました
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called
MyStringDecoder#hanleUpstream, [eventName = OPEN] called
TelnetServerHandler#hanleUpstream, [eventName = OPEN] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called
MyStringDecoder#hanleUpstream, [eventName = BOUND] called
TelnetServerHandler#hanleUpstream, [eventName = BOUND] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

インスタンス生成のログが出ているのは、Handlerが作成されるタイミングを見たいためです。なお、encodeメソッドについては自分でログ出力をしています…。

では、クライアント側からメッセージを送ってみます。

Hello World
Did you say 'Hello World'?

この時、サーバ側にはこんなログが出ます。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[Hello World]
     current pipelines[framer, decoder, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

なんとなく、同じイベントがPipeline内のHandlerに伝播していっているのがわかりますね。

細かく見ていくと、例えばCONNECTEDイベントの時は

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called

といった感じですが、RECEIVEDイベントの時は

MyStringEncoder#handleDownstream, [eventName = RECEIVED] called

とChannelDownstreamHandlerしか表示されていませんし、WRITEイベントの時は

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called

ChannelUpstreamHandlerしか表示されていません。

Pipeline中の、Upstream系のイベントなりDownstream系のイベントに関係のあるChannelHandlerしか呼ばれない、ということでしょうね。SimpleChannelHandlerクラスみたいなChannelUpstreamHandlerでもChannelDownstreamHandlerでもあるようなクラスは、場合によっては両方のイベントに顔を出すこともあるのでしょう。

各ChannelHandlerの実行順ですが、Upstreamの場合はPipelineの前から実行されていきますが、Downstreamの場合はPipelineの後ろから実行されていくようです。ただ、今回はサーバ側で確認しただけなので、後でクライアント側の動作も見ておこうと思います。
クライアント側も確認しました。

なお、このサーバには「bye」と打つと、接続を切られます。

bye
Have a good day!.
Connection closed by foreign host.

ところで、接続後の各種操作を行っても、「インスタンスを生成しました」というログは接続時の1度しか表示されませんでした。ということは、Handlerのインスタンスは1つしか生成されていないわけですが、ず〜っと1つかというとそうでもないようです。それは、次回に。

NettyでPipelineの構成を動的に変更する

前回は、Pipelineに組み込まれたChannelHandlerがどう呼ばれているのかをトレースしました。今度は、Pipeline内のChannelHandlerを動的に変更してみたいと思います。

これ自体は、実は前にWebSocketの例を写経した時に書いているんですよね。まあ、当時は「何かPipelineのメンバーを置換してるけど、どういうことだろ?」くらいにしか思ってませんでしたが…。
*Netty 3.2.4.Finalの頃の話です。Netty 3.3.1.Finalのサンプルでは、WebSocketでもPipelineの変更は登場しません

前回のログでは、

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[Hello World]
     current pipelines[framer, decoder, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

の以下の部分

     current pipelines[framer, decoder, encoder, handler]

のように、現在のPipelineの内容を出力していましたが、これはその伏線だったりします。

では、このTelnetサーバのPipelineに動的にChannelHandlerを追加してみましょう。しょうもない例ですが、クライアントから「decoration」と送信された後は、クライアントの送信メッセージに「***」を前後に付けてみることにします。

というわけで、Decoderとしてこんなものを作ってみます。
StringDecorator.scala

import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder

class StringDecorator extends OneToOneDecoder with UpstreamHandlerLogger {
  override protected def decode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef =
    msg match {
      case request: String =>
        called("decode")
        "***" + request + "***"
    }
}

これを、先ほどのHandlerを修正してPipelineに加えてみます。
TelnetServerHandler.scala

/** import文省略 **/

class TelnetServerHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger {
  /** 省略 **/

  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e getMessage match {
    case request: String =>
      info("     received message[%s]".format(request))

      val pipeline = ctx.getChannel.getPipeline
      info(pipeline.getNames.asScala.mkString("     current pipelines[", ", ", "]"))

      val (response, close) =
          request match {
            case "" => ("Please type something.\r\n", false)
            case "decoration" =>
              pipeline.addAfter("decoder", "decorator", new StringDecorator)
              ("Apply Decoration Mode!\r\n", false)
            case _ if request.toLowerCase == "bye" => ("Have a good day!.\r\n", true)
            case _ => ("Did you say '%s'?\r\n".format(request), false)
          }

      val future = e.getChannel.write(response)

      if (close) {
        future.addListener(ChannelFutureListener.CLOSE)
      }
  }

  /** 省略 **/
}

現在のPipelineは、ChannelHandlerContextから取得できるChannelから取得できます。遠いですねー。

      val pipeline = ctx.getChannel.getPipeline

このPipelineに、先ほど作成したDecoratorを加えます。

              pipeline.addAfter("decoder", "decorator", new StringDecorator)

StringDecoderの後に動いて欲しいので、「decoder」の後ろにくるように書いています。

前回はPipelineFactoryに、MyStringDecoderを「decoder」という名前でPipelineに登録していました。

class TelnetServerPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline

    pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter))
    pipeline.addLast("decoder", new MyStringDecoder)
    pipeline.addLast("encoder", new MyStringEncoder)

    pipeline.addLast("handler", new TelnetServerHandler)

    pipeline
  }
}

Pipelineへの追加は、これまで見てきたメソッドを合わせて以下のものがあるようです。

メソッド 内容
addAfter(String baseName, String name, ChannelHandler handler) 既に存在する、baseNameを持つChannelHandlerの後に追加
addBefore(String baseName, String name, ChannelHandler handler) 既に存在する、baseNameを持つChannelHandlerの前に追加
addFirst(String name, ChannelHandler handler) Pipelineの先頭に追加
addLast(String name, ChannelHandler handler) Pipelineの最後に追加

その他、Handlerの取得やPipelineからの削除、置換などができます。まあ、詳しくはAPIリファレンスを…。
http://netty.io/docs/stable/api/org/jboss/netty/channel/ChannelPipeline.html

では、Telnetサーバを起動してみます。

$ sbt run
[info] Set current project to netty-custom-telnet (in build file:/xxxxx/netty-custom-telnet/)
[info] Running TelnetServer 

これにクライアントを繋げます。

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to ubuntu !
It is Sun Mar 25 21:19:29 JST 2012 now.
Hello World
Did you say 'Hello World'?

とりあえず、「Hello World」と打ってます。では、「decoration」と打ってみましょう。

decoration
Apply Decoration Mode!

サーバ側には、こんな感じでログが出ます。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[decoration]
     current pipelines[framer, decoder, encoder, handler]
クラス[StringDecorator]のインスタンスを作成しました
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
StringDecorator#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

すると次回からは、送信した文字列に「***」が付いて返されるようになります。

Hello World              
Did you say '***Hello World***'?

この時、サーバ側ではこういうログになっています。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
StringDecorator#hanleUpstream, [eventName = WRITE] called
StringDecorator#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[***Hello World***]
     current pipelines[framer, decoder, decorator, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
StringDecorator#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

よ〜く見ると、Pipelineにdecoratorが増えています。

     current pipelines[framer, decoder, decorator, encoder, handler]

ここで、別のターミナルからもう1つ接続を張ってみます。

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to ubuntu !
It is Sun Mar 25 21:26:25 JST 2012 now.
Hello World
Did you say 'Hello World'?

特に装飾は行われていませんね。また、サーバ側のPipelineにも「decorator」は含まれていません。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[Hello World]
     current pipelines[framer, decoder, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

そもそも、新規クライアントを接続するとこういうログが出るので、Pipelineは接続ごとに作成されるようですね。

クラス[MyDelimiterBasedFrameDecoder]のインスタンスを作成しました
クラス[MyStringDecoder]のインスタンスを作成しました
クラス[MyStringEncoder]のインスタンスを作成しました
クラス[TelnetServerHandler]のインスタンスを作成しました
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called
MyStringDecoder#hanleUpstream, [eventName = OPEN] called
TelnetServerHandler#hanleUpstream, [eventName = OPEN] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called
MyStringDecoder#hanleUpstream, [eventName = BOUND] called
TelnetServerHandler#hanleUpstream, [eventName = BOUND] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

もちろん、最初に接続したターミナルでは、「***」で装飾が行われるままです。

Hello Netty
Did you say '***Hello Netty***'?

というわけで、PipelineおよびChannelHandlerは、接続ごとに作成されるという理解で正しそうですね。んで、ChannelHandlerは1回の接続の間は同じインスタンスが使われる、と。以前、FrameDecoderの例でフィールドを使っていたりしていた時、マルチスレッド下でどういう共有のされ方をするのかちょっと気になっていたので、これでスッキリしました。

ところで、今回のサンプルはクライアントから送信されてくる文字列を改変してしまうため、例としてはあまりよろしくありません…。このDecoderを適用した後は、「bye」で抜けられなくなりますし…。