CLOVER🍀

That was when it all began.

JBoss NettyでHTTPサーバ

先週触っていた、Netty ExampleのScala写経の続きです。今回のお題は、HTTPサーバ。
http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html

まずは、サーバ側。
HttpServer.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory

object HttpServer {
  def main(args: Array[String]): Unit = {
    val bootstrap = new ServerBootstrap(
      new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))

    bootstrap.setPipelineFactory(new HttpServerPipelineFactory)

    bootstrap.bind(new InetSocketAddress(8080))
  }
}

HttpServerPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.http.{HttpContentCompressor, HttpRequestDecoder, HttpResponseEncoder}

class HttpServerPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline()

    pipeline.addLast("decoder", new HttpRequestDecoder)
    pipeline.addLast("encoder", new HttpResponseEncoder)
    pipeline.addLast("deflater", new HttpContentCompressor)
    pipeline.addLast("handler", new HttpRequestHandler)

    pipeline
  }
}

HttpRequestHandler.scala

import scala.collection.JavaConverters._

import java.util.Map.Entry

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import org.jboss.netty.handler.codec.http.Cookie
import org.jboss.netty.handler.codec.http.CookieDecoder
import org.jboss.netty.handler.codec.http.CookieEncoder
import org.jboss.netty.handler.codec.http.DefaultHttpResponse
import org.jboss.netty.handler.codec.http.HttpChunk
import org.jboss.netty.handler.codec.http.HttpChunkTrailer
import org.jboss.netty.handler.codec.http.HttpHeaders
import org.jboss.netty.handler.codec.http.HttpHeaders.Names
import org.jboss.netty.handler.codec.http.HttpRequest
import org.jboss.netty.handler.codec.http.HttpResponse
import org.jboss.netty.handler.codec.http.HttpResponseStatus
import org.jboss.netty.handler.codec.http.HttpVersion
import org.jboss.netty.handler.codec.http.QueryStringDecoder
import org.jboss.netty.util.CharsetUtil

class HttpRequestHandler extends SimpleChannelUpstreamHandler {
  private var request: HttpRequest = _
  private var readingChunks: Boolean = _
  private val buf: StringBuilder = new StringBuilder

  @throws(classOf[Exception])
  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = readingChunks match {
    case false =>
      request = e.getMessage.asInstanceOf[HttpRequest]

      if (HttpHeaders.is100ContinueExpected(request)) send100Continue(e)

      buf.setLength(0)
      buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n")
      buf.append("===================================\r\n")

      buf.append("VERSION: " + request.getProtocolVersion() + "\r\n")
      buf.append("HOSTNAME: " + HttpHeaders.getHost(request, "unknown") + "\r\n")
      buf.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n")

      for (h <- request.getHeaders.asScala) {
	buf.append("HEADER: " + h.getKey + " = " + h.getValue + "\r\n")
      }

      val queryStringDecoder = new QueryStringDecoder(request.getUri)
      val params = queryStringDecoder.getParameters
      if (!params.isEmpty) {
	for  {
	  (key, values) <- params.asScala
	  value <- values.asScala
	} {
	  buf.append("PARAM: " + key + " = " + value + "\r\n")
	}

	buf.append("\r\n")
      }

      request.isChunked match {
	case true => readingChunks = true
	case false =>
	  val content = request.getContent
	  if (content.readable) buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n")
	  writeResponse(e)
      }
    case true =>
      val chunk = e.getMessage.asInstanceOf[HttpChunk]
      chunk.isLast match {
	case true =>
	  readingChunks = false
	  buf.append("END OF CONTENT\r\n")

	  val trailer = chunk.asInstanceOf[HttpChunkTrailer]
	  if (!trailer.getHeaderNames.isEmpty) {
	    buf.append("\r\n")

	    for {
	      name <- trailer.getHeaderNames.asScala
	      value <- trailer.getHeaders(name).asScala
	    } buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n")

	    buf.append("\r\n")
	  }

	  writeResponse(e)
	case false => buf.append("CHUNK: " + chunk.getContent.toString(CharsetUtil.UTF_8) + "\r\n")
      }
  }

  private def writeResponse(e: MessageEvent): Unit = {
    val keepAlive = HttpHeaders.isKeepAlive(request)

    val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
    response.setContent(ChannelBuffers.copiedBuffer(buf.toString, CharsetUtil.UTF_8))
    response.setHeader(Names.CONTENT_TYPE, "text/plain; charset=UTF-8")

    if (keepAlive) response.setHeader(Names.CONTENT_LENGTH, response.getContent.readableBytes)

    request.getHeader(Names.COOKIE) match {
      case null =>
      case cookieString =>
	val cookieDecoder = new CookieDecoder
	val cookies = cookieDecoder.decode(cookieString)

	if (!cookies.isEmpty) {
	  val cookieEncoder = new CookieEncoder(true)
	  for (cookie <- cookies.asScala) cookieEncoder.addCookie(cookie)
	  response.addHeader(Names.SET_COOKIE, cookieEncoder.encode)
	}
    }

    val future = e.getChannel.write(response)

    if (!keepAlive) future.addListener(ChannelFutureListener.CLOSE)
  }

  private def send100Continue(e: MessageEvent): Unit = {
    val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)
    e.getChannel.write(response)
  }

  @throws(classOf[Exception])
  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    e.getCause.printStackTrace()
    e.getChannel.close()
  }
}

完全に忠実な移植ではありません。match式を使っていたり、scala.collection.JavaConvertesを使ってJavaのコレクションをScalaのコレクションに変換したりしています。JavaConversionsではないのは、暗黙の型変換が入っていることを明記したかったので…。

続いて、クライアント側。
HttpClient.scala

import java.net.{InetSocketAddress, URI}
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.{Channel, ChannelFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.http.CookieEncoder
import org.jboss.netty.handler.codec.http.DefaultHttpRequest
import org.jboss.netty.handler.codec.http.HttpHeaders
import org.jboss.netty.handler.codec.http.HttpMethod
import org.jboss.netty.handler.codec.http.HttpRequest
import org.jboss.netty.handler.codec.http.HttpVersion

object HttpClient {
  @throws(classOf[Exception])
  def main(args: Array[String]): Unit = {
    args.length match {
      case 1 =>
      case _ =>
	Console.err.println("Usage: %s <URL>".format(this.getClass.getSimpleName))
	sys.exit(1)
    }

    val uri = new URI(args(0))
    val scheme = uri.getScheme match {
      case null => "http"
      case s => s
    }
    val host = uri.getHost match {
      case null => "localhost"
      case h => h
    }
    val port = uri.getPort match {
      case -1 if scheme == "http" => 80
      case p => p
    }

    if (!scheme.equalsIgnoreCase("http")) {
      Console.err.println("Only HTTP is supported.")
      sys.exit(1)
    }

    val bootstrap = new ClientBootstrap(
      new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))

    bootstrap.setPipelineFactory(new HttpClientPipelineFactory)

    val future = bootstrap.connect(new InetSocketAddress(host, port))

    val channel = future.awaitUninterruptibly().getChannel
    if (!future.isSuccess) {
      future.getCause.printStackTrace()
      bootstrap.releaseExternalResources()
      sys.exit(1)
    }

    val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString)
    request.setHeader(HttpHeaders.Names.HOST, host)
    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE)
    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)

    val httpCookieEncoder = new CookieEncoder(false)
    httpCookieEncoder.addCookie("my-cookie", "foo")
    httpCookieEncoder.addCookie("anothor-cookie", "bar")
    request.setHeader(HttpHeaders.Names.COOKIE, httpCookieEncoder.encode)

    channel.write(request)

    channel.getCloseFuture.awaitUninterruptibly()

    bootstrap.releaseExternalResources()
  }
}

HttpClientPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.http.{HttpClientCodec, HttpContentDecompressor}

class HttpClientPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline()

    pipeline.addLast("codec", new HttpClientCodec)
    pipeline.addLast("inflater", new HttpContentDecompressor)
    pipeline.addLast("handler", new HttpResponseHandler)

    pipeline
  }
}

HttpResponseHandler.scala

import scala.collection.JavaConverters._

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{ChannelHandlerContext, MessageEvent, SimpleChannelUpstreamHandler}
import org.jboss.netty.handler.codec.http.{HttpChunk, HttpResponse}
import org.jboss.netty.util.CharsetUtil

class HttpResponseHandler extends SimpleChannelUpstreamHandler {
  private var readingChunks: Boolean = _

  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = readingChunks match {
    case false =>
      val response = e.getMessage.asInstanceOf[HttpResponse]

      println("STATUS: " + response.getStatus)
      println("VERSION: " + response.getProtocolVersion)
      println()

      if (!response.getHeaderNames.isEmpty) {
	for {
	  name <- response.getHeaderNames.asScala
	  value <- response.getHeaders(name).asScala
	} println("HEADER: %s = %s".format(name, value))

	println()
      }

      response.getStatus.getCode match {
	case 200 if response.isChunked =>
	  readingChunks = true
	  println("CHUNKED CONTENT {")
	case _ =>
	  val content = response.getContent
	  if (content.readable) {
	    println("CONTENT {")
	    println(content.toString(CharsetUtil.UTF_8))
	    println("} END OF CONTENT")
	  }
      }
    case true =>
      val chunk = e.getMessage.asInstanceOf[HttpChunk]
      chunk.isLast match {
	case true =>
	  readingChunks = false
	  println("} END OF CHUNKED CONTENT")
	case false =>
	  println(chunk.getContent.toString(CharsetUtil.UTF_8))
	  Console.out.flush()
      }
  }
}

SSLのサポートは外しました。違うExampleに依存しておりましたので。

それでは動作確認。まずはサーバを起動。

$ sbt "run-main HttpServer"
[info] Set current project to default (in build file:/xxxxx/netty-http/)
[info] Running HttpServer 

違うコンソールで、クライアントを実行。

$ sbt "run-main HttpClient http://localhost:8080/index.html?param1=foo&param2=かずひら"
[info] Set current project to default (in build file:/xxxxx/netty-http/)
[info] Running HttpClient http://localhost:8080/index.html?param1=foo&param2=かずひら
STATUS: 200 OK
VERSION: HTTP/1.1

HEADER: Content-Encoding = identity
HEADER: Content-Type = text/plain; charset=UTF-8
HEADER: Set-Cookie = anothor-cookie=bar;my-cookie=foo

CHUNKED CONTENT {
WELCOME TO THE WILD WILD WEB SERVER
===================================
VERSION: HTTP/1.1
HOSTNAME: localhost
REQUEST_URI: http://localhost:8080/index.html?param1=foo&param2=%E3%81%8B%E3%81%9A%E3%81%B2%E3%82%89

HEADER: Host = localhost
HEADER: Connection = close
HEADER: Accept-Encoding = gzip
HEADER: Cookie = anothor-cookie=bar;my-cookie=foo
PARAM: param1 = foo
PARAM: param2 = かずひら


} END OF CHUNKED CONTENT
[success] Total time: 1 s, completed Jul 31, 2011 7:19:18 AM

うん、当たり前といえば当たり前ですが、動いております。

3つほどExampleを写してきたので、少しだけNettyの書き方がなんとなくわかるようになりました。これで実アプリケーションが書けるか?と聞かれると、う〜んって感じですけどね…。