CLOVER🍀

That was when it all began.

NettyでChat Server/Clientを書く

久々のNettyネタです。まあ、写経ですが…なかなかNettyネタだけで突っ走るのも難しいのですが、知識が衰えないように時々書いておかないと(笑)。

Nettyのサンプルとして、SecureChatというのがあるのですが、今回はこれを書いてみようと思います。ただ、今回はSSLはいらんよなーと思うことと、サーバのシャットダウンをちゃんと書くようにすることを念頭に置いていきます。

最初に、build.sbtを用意。

name := "netty-chat"

version := "0.0.1"

scalaVersion := "2.9.2"

organization := "littlewings"

libraryDependencies += "io.netty" % "netty" % "3.4.6.Final"

もう3.4.6.Finalになっています…。すでに3.5.0がBeta1…。開発ペースが早いなぁ…。

では、書いたソースをサーバ側から。
ChatServer.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors
import java.util.logging.{Level, Logger}

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory

object ChatServer {
  val logger: Logger = Logger.getLogger(classOf[ChatServer].getName)

  def main(args: Array[String]): Unit = {
    val port = args size match {
      case n if n > 0 => args(0).toInt
      case _ => 8080
    }

    new ChatServer(port).run()
  }
}

class ChatServer(port: Int) {
  import ChatServer._

  def run(): Unit = {
    val bootstrap = new ServerBootstrap(
                        new NioServerSocketChannelFactory(
                                Executors.newCachedThreadPool,
                                Executors.newCachedThreadPool))

    bootstrap.setPipelineFactory(new ChatServerPipelineFactory)
    val channel = bootstrap.bind(new InetSocketAddress(port))

    ChatServerHandler.channels.add(channel)
    waitForShutdown()
    val future = ChatServerHandler.channels.close()
    future.awaitUninterruptibly()
    bootstrap.releaseExternalResources()
  }

  private def waitForShutdown(): Unit = {
    readLine()
    logger.info("Shutdown Chat Server...")
  }
}

サンプルからちょっと加えて、コンソールを残したまま起動して、Enterを打たれた時点でシャットダウンするように変更しています。ところで、User Guideではシャットダウン時にはChannelFactory#releaseExternalResourcesを呼び出してリソース開放をしていましたが、Bootstrap#releaseExternalResourcesで代替でもよさそうですね。というか、APIドキュメントにChannelFactory#releaseExternalResourcesに委譲してるよ、と書いてあります…。

続いて、ChannelPipelineFactory。
ChatServerPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.frame.{DelimiterBasedFrameDecoder, Delimiters}
import org.jboss.netty.handler.codec.string.{StringDecoder, StringEncoder}

class ChatServerPipelineFactory extends ChannelPipelineFactory {
  def getPipeline(): ChannelPipeline = {
    val pipeline = Channels.pipeline
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*))
    pipeline.addLast("decoder", new StringDecoder)
    pipeline.addLast("encoder", new StringEncoder)
    pipeline.addLast("handler", new ChatServerHandler)
    pipeline
  }
}

こちらは、特に面白いところはありません。が、Scalaで書いているとDelimiterBasedFrameDecoderのコンストラクタ呼び出しが、ちょっとうまくいきませんでした。なんか、コンパイラがどのコンストラクタを呼ぶのか迷っていたので、

    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*))

と明示的に書くことに。

サーバ側のHandlerクラス。
ChatServerHandler.scala

import scala.collection.JavaConverters._

import java.net.InetAddress
import java.util.logging.{Level, Logger}

import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}

import ChatServerHandler._

object ChatServerHandler {
  private val logger: Logger = Logger.getLogger(classOf[ChatServerHandler].getName)
  val channels: ChannelGroup = new DefaultChannelGroup

  class Greeter extends ChannelFutureListener {
    @throws(classOf[Exception])
    def operationComplete(future: ChannelFuture): Unit = future isSuccess match {
      case true =>
        val ch = future.getChannel
        ch.write("Welcome to %s chat server!\n".format(InetAddress.getLocalHost.getHostName))
        channels.add(ch)
      case false =>
        future.getChannel.close()
    }
  }
}

class ChatServerHandler extends SimpleChannelUpstreamHandler {
  @throws(classOf[Exception])
  override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    if (e.isInstanceOf[ChannelStateEvent]) logger.info(e.toString)

    super.handleUpstream(ctx, e)
  }

  @throws(classOf[Exception])
  override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit =
    e.getFuture.addListener(new Greeter)

  @throws(classOf[Exception])
  override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit =
    channels.remove(e.getChannel)

  @throws(classOf[Exception])
  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
    val request = e.getMessage.asInstanceOf[String]

    for (c <- channels.asScala) {
      if (c != e.getChannel) {
        c.write("[%s] %s\n".format(e.getChannel.getRemoteAddress, request))
      } else {
        c.write("[you] %s\n".format(request))
      }
    }

    if (request.toLowerCase == "bye") {
      e.getChannel.close()
    }
  }

  @throws(classOf[Exception])
  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    logger.log(Level.WARNING, "Unexcepted exception from downstream.", e.getCause)
    e.getChannel.close()
  }
}

SSLがなくなった分だけ単純になりましたが、channelConnected時にChannelFutureListenerをくっつけるHandlerがなくなってしまったので、代わりにChannelStateEvent#getFutureで取得できるChannelFutureに対してListenerを追加しました。

今度は、クライアント側。こちらはあまり面白いところがないので、さらさらと書きます。
ChatClient.scala

import scala.annotation.tailrec

import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.{Channel, ChannelFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory

object ChatClient {
  @throws(classOf[Exception])
  def main(args: Array[String]): Unit = {
    val (host, port) = args size match {
      case 0 => ("localhost", 8080)
      case 1 => ("localhost", args(0).toInt)
      case 2 => (args(0), args(1).toInt)
      case _ =>
        printf("Usage: %s <host> <port>%n", classOf[ChatClient].getSimpleName)
        sys.exit(1)
    }

    new ChatClient(host, port).run()
  }
}

class ChatClient(host: String, port: Int) {
  @throws(classOf[IOException])
  def run(): Unit = {
    val bootstrap = new ClientBootstrap(
                          new NioClientSocketChannelFactory(
                                Executors.newCachedThreadPool,
                                Executors.newCachedThreadPool))

    bootstrap.setPipelineFactory(new ChatClientPipelineFactory)

    val future = bootstrap.connect(new InetSocketAddress(host, port))
    val channel = future.awaitUninterruptibly().getChannel

    future isSuccess match {
      case true =>
        readLineWhile(channel).foreach(_.awaitUninterruptibly())
        channel.close().awaitUninterruptibly()
      case false => future.getCause.printStackTrace()
    }

    bootstrap.releaseExternalResources()
  }

  @tailrec
  private def readLineWhile(channel: Channel, lastWriteFuture: Option[ChannelFuture] = None): Option[ChannelFuture] =
    readLine() match {
      case null => lastWriteFuture
      case line if line == "bye" =>
        val future = Some(channel.write(line + "\r\n"))
        channel.getCloseFuture.awaitUninterruptibly()
        future
      case line =>
        val future = Some(channel.write(line + "\r\n"))
        readLineWhile(channel, future)
    }
}

ChatClientPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.frame.{DelimiterBasedFrameDecoder, Delimiters}
import org.jboss.netty.handler.codec.string.{StringDecoder, StringEncoder}

class ChatClientPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline(): ChannelPipeline = {
    val pipeline = Channels.pipeline
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*))
    pipeline.addLast("decoder", new StringDecoder)
    pipeline.addLast("encoder", new StringEncoder)
    pipeline.addLast("handler", new ChatClientHandler)
    pipeline
  }
}

ChatClientHandler.scala

import java.util.logging.{Level, Logger}

import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler

import ChatClientHandler._

object ChatClientHandler {
  private val logger: Logger = Logger.getLogger(classOf[ChatClientHandler].getName)
}

class ChatClientHandler extends SimpleChannelUpstreamHandler {
  @throws(classOf[Exception])
  override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    if (e.isInstanceOf[ChannelEvent]) logger.info(e.toString)

    super.handleUpstream(ctx, e)
  }

  @throws(classOf[Exception])
  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit =
    println(e.getMessage)

  @throws(classOf[Exception])
  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    logger.log(Level.WARNING, "Unexcepted exception from dowstream.", e.getCause)
    e.getChannel.close()
  }
}

動作結果はこちら。まずはサーバ側。sbtコンソール上で実行しています。

> run-main ChatServer
[info] Running ChatServer 

クライアントから接続があると、こんな感じでログが出ます。

6 03, 2012 6:39:15 午後 ChatServerHandler handleUpstream
情報: [id: 0x6d2a4623, /127.0.0.1:51970 => /127.0.0.1:8080] OPEN
6 03, 2012 6:39:15 午後 ChatServerHandler handleUpstream
情報: [id: 0x6d2a4623, /127.0.0.1:51970 => /127.0.0.1:8080] BOUND: /127.0.0.1:8080
6 03, 2012 6:39:15 午後 ChatServerHandler handleUpstream
情報: [id: 0x6d2a4623, /127.0.0.1:51970 => /127.0.0.1:8080] CONNECTED: /127.0.0.1:51970

クライアントから「bye」と送信されると、接続を切るのでこうなります。

6 03, 2012 6:39:55 午後 ChatServerHandler handleUpstream
情報: [id: 0x6d2a4623, /127.0.0.1:51970 :> /127.0.0.1:8080] DISCONNECTED
6 03, 2012 6:39:55 午後 ChatServerHandler handleUpstream
情報: [id: 0x6d2a4623, /127.0.0.1:51970 :> /127.0.0.1:8080] UNBOUND
6 03, 2012 6:39:55 午後 ChatServerHandler handleUpstream
情報: [id: 0x6d2a4623, /127.0.0.1:51970 :> /127.0.0.1:8080] CLOSED

クライアント側は、こんな感じ。こちらは、シェルからsbtコマンド経由で起動しています。

$ sbt "run-main ChatClient"
[info] Set current project to netty-chat (in build file:/xxxxx/netty-chat/)
[info] Running ChatClient 
6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a] OPEN
6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] BOUND: /127.0.0.1:51972
6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] CONNECTED: localhost/127.0.0.1:8080
6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] RECEIVED: Welcome to ubuntu chat server!
Welcome to ubuntu chat server!

発言すると、自身のコンソールにはこんな感じで出力されます。

6 03, 2012 6:43:13 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] WRITTEN_AMOUNT: 7
6 03, 2012 6:43:13 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] RECEIVED: [you] Hello
[you] Hello

他人が発言した場合は、接続元情報と共にメッセージが出力されます。

6 03, 2012 6:44:12 午後 ChatClientHandler handleUpstream
情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] RECEIVED: [/127.0.0.1:51977] Enter
[/127.0.0.1:51977] Enter

あと、「bye」と入力すると接続が切れます。その発言自体も、相手側に出力されますが…。

とりあえず、まだ普通に書けそうです(笑)。でも、シャットダウン周りのコーディングはちょっと怪しいかなぁ。もうちょっと現実味のあるものを書き始めると、もっと面白くなるかも?