CLOVER🍀

That was when it all began.

Nettyでストリームベースのプロトコルを扱う

前回のエントリから、ちょっと時間が空いてしまいました…。その間に、世間では

  • ScalaがVersion 2.10のMilestone 1と2をリリース
  • Nettyが3.3.1にバージョンアップ

とまあ、ちょっと置いてかれてる感じですね。Scala 2.10はまだMilestoneリリースなのと、Nettyに関する勉強が中途半端なのでまだ置いておきます。

Nettyはバージョンアップしているものの、UserGuideドキュメントは変わっていないので、変わらずこれを続けられそうです。

というわけで、引き続きGetting Startedを進めていくとしましょう。今日のテーマはこちら。

[Dealing with a Stream-based Transport]
http://netty.io/docs/stable/guide/html/#start.10

今回は、訳が中心なりそうです。

ソケットバッファに対する、小さな警告

http://netty.io/docs/stable/guide/html/#start.10.2

TCP/IPのようなストリームベースの輸送では、受信したデータをソケットの受信バッファに保存します。残念ながら、ストリームベースで運ばれたバッファは、パケットのキューではなくバイトのキューです。これは、あなたが2つの独立したパケットで2つのメッセージを送信したとしても、OSは2つのメッセージではなく単なるバイトの束として扱ってしまうことを意味します。そのため、リモートクライアントが書き出したデータを正確に読み取ることは保証されていません。例えば、OSのTCP/IPスタックが、次のような3のパケットを受け取ったと仮定しましょう。

  1 +-----+-----+-----+
  2 | ABC | DEF | GHI |
    +-----+-----+-----+

ストリームベースのプロトコルの一般的な特性のため、あなたのアプリケーションは次のような断片(フラグメント)となった形式を読み込んでしまう可能性が十分にあります。

  1 +----+-------+---+---+
  2 | AB | CDEFG | H | I |
    +----+-------+---+---+

したがって、受信データを、サーバサイド、クライアントサイドに関係なく、アプリケーションロジックにより容易に理解できる意味のある構造に再配置するべきです。

上記の例の場合は、受信データが下記の様な構造となるべきです。

  1 +-----+-----+-----+
  2 | ABC | DEF | GHI |
    +-----+-----+-----+

最初の解答

http://netty.io/docs/stable/guide/html/#start.10.3

TIMEクライアントの例に戻りましょう。私達は同じ問題を抱えています。32ビット整数は非常に少ないデータであり、多くの場合フラグメント化されていないでしょう。しかしながら、問題はフラグメント化されているかもしれない、ということです。そしてトラフィックが増加するにしたがってフラグメント化する可能性も上昇していくでしょう。

極端に単純化した解は、累積できる内部バッファを作成し、すべての4バイトデータが内部バッファに受信されるまで待つことです。下記は、その問題を解決するTimeClientHandlerクラスの実装です。

TimeClientHandler.scala

import java.util.Date

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.{ChannelHandlerContext, ExceptionEvent, MessageEvent, SimpleChannelHandler}

class TimeClientHandler extends SimpleChannelHandler {
  val buf: ChannelBuffer = ChannelBuffers.dynamicBuffer // (19)

  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
    val m: ChannelBuffer = e.getMessage.asInstanceOf[ChannelBuffer]
    buf.writeBytes(m) // (20)

    if (buf.readableBytes >= 4) { // (21)
      val currentTimeMillis: Long = buf.readInt() * 1000L
      println("Current Time = %s".format(new Date(currentTimeMillis)))
      e.getChannel.close()
    }
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    e.getCause.printStackTrace()
    e.getChannel.close()
  }
}

(19)
dynamicBufferとは、要求に応じて容量の増加するChannelBufferです。メッセージの長さを知らない場合に、非常に役に立ちます。
(20)
最初に、全ての受信したデータは変数bufへまとめられるべきです。
(21)
ハンドラはbufが十分なデータ(この例では4バイト)を持っているかどうか、チェックしなくてはなりません。そして、実際のビジネスロジックに移ります。

2つ目の解答

http://netty.io/docs/stable/guide/html/#start.10.4

最初の解答では、TIMEクライアントでの問題を解決しましたが、修正されたハンドラは美しくありません。可変長フィールドのように多数のフィールドからできているような、より複雑なプロトコルを思い浮かべます。あなたのChannelHandlerの実装は、早期にメンテナンスができなくなるでしょう。

あなたは気付いているかもしれませんが、あなたは1つ以上のChannelHandlerをChannelPipelineに加えることができます。そのため、あなたは一枚岩であったChannelHandlerを、アプリケーションの複雑さを小さくするため多くのモジュールとして分割することができます。例えば、TimeClientHandlerは2つのハンドラへ分割することができました。

  • TimeDecoderはフラグメントの問題を取扱い、
  • TimeClientHandlerは、最初のシンプルなバージョンにします

幸いNettyは、あなたが自由にアプリケーションを記述をすることを助けてくれる、拡張可能なクラスを提供しています。

TimeDecoder.scala

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.frame.FrameDecoder

class TimeDecoder extends FrameDecoder { // (22)
  override protected def decode(ctx: ChannelHandlerContext,
                                channel: Channel,
                                buffer: ChannelBuffer): AnyRef = { // (23)
    buffer readableBytes match {
      case readed if readed < 4 => null // (24)
      case _ => buffer.readBytes(4) // (25)
    }
  }
}

(22)
FrameDecoderは、フラグメントの問題を簡単に扱えるようにするための、ChannelHandlerの実装です。
(23)
FrameDecoderは、新しいデータを受信した時、内部に保持された累積バッファを引数にdecodeメソッドを呼び出します。
(24)
戻り値としてnullを戻した時、データがまだ十分に得られていないことを意味します。十分なデータがある場合、FrameDecoderは再度decodeメソッドを呼び出すでしょう。
(25)
null以外の値を戻した場合、それはdecodeメソッドがメッセージの復号に成功したことを意味します。FrameDecoderは、内部の累積バッファの読み取り部を破棄するでしょう。ここで、多数のメッセージを復号する必要はないことを覚えておいてください。FrameDecoderは、nullが返されるまでdecoderメソッドを呼び出し続けるでしょう。

今、ChannelPipelineにもうひとつのハンドラを加えます。TimeClientのChannelPipelineFactoryの実装を修正しましょう。

      bootstrap.setPipelineFactory(new ChannelPipelineFactory {
        def getPipeline: ChannelPipeline = Channels.pipeline(new TimeDecoder, new TimeClientHandler)
      })

もしあなたが冒険好きなら、デコーダをもっと単純化するReplayingDecoderを試してみたいと思うかもしれません。詳しい情報は、APIリファレンスを参照する必要があります。

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.replay.{ReplayingDecoder, VoidEnum}

class TimeDecoder extends ReplayingDecoder[VoidEnum] {
  override protected def decode(ctx: ChannelHandlerContext,
                                channel: Channel,
                                buffer: ChannelBuffer,
                                state: VoidEnum): AnyRef = {
    buffer.readBytes(4)
  }
}

さらに、Nettyはほとんどのプロトコルを容易に実装することを可能にし、一枚岩でメンテナンスの難しいハンドラを実装してしまうことを避けるられるような、独創的なデコーダ群を提供しています。より多くの詳細な例については、次のパッケージを参照してください。


ん〜、なかなか疲れましたわ〜。