久々のNettyネタです。まあ、写経ですが…なかなかNettyネタだけで突っ走るのも難しいのですが、知識が衰えないように時々書いておかないと(笑)。
Nettyのサンプルとして、SecureChatというのがあるのですが、今回はこれを書いてみようと思います。ただ、今回はSSLはいらんよなーと思うことと、サーバのシャットダウンをちゃんと書くようにすることを念頭に置いていきます。
最初に、build.sbtを用意。
name := "netty-chat" version := "0.0.1" scalaVersion := "2.9.2" organization := "littlewings" libraryDependencies += "io.netty" % "netty" % "3.4.6.Final"
もう3.4.6.Finalになっています…。すでに3.5.0がBeta1…。開発ペースが早いなぁ…。
では、書いたソースをサーバ側から。
ChatServer.scala
import java.net.InetSocketAddress import java.util.concurrent.Executors import java.util.logging.{Level, Logger} import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel.ChannelFuture import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory object ChatServer { val logger: Logger = Logger.getLogger(classOf[ChatServer].getName) def main(args: Array[String]): Unit = { val port = args size match { case n if n > 0 => args(0).toInt case _ => 8080 } new ChatServer(port).run() } } class ChatServer(port: Int) { import ChatServer._ def run(): Unit = { val bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool, Executors.newCachedThreadPool)) bootstrap.setPipelineFactory(new ChatServerPipelineFactory) val channel = bootstrap.bind(new InetSocketAddress(port)) ChatServerHandler.channels.add(channel) waitForShutdown() val future = ChatServerHandler.channels.close() future.awaitUninterruptibly() bootstrap.releaseExternalResources() } private def waitForShutdown(): Unit = { readLine() logger.info("Shutdown Chat Server...") } }
サンプルからちょっと加えて、コンソールを残したまま起動して、Enterを打たれた時点でシャットダウンするように変更しています。ところで、User Guideではシャットダウン時にはChannelFactory#releaseExternalResourcesを呼び出してリソース開放をしていましたが、Bootstrap#releaseExternalResourcesで代替でもよさそうですね。というか、APIドキュメントにChannelFactory#releaseExternalResourcesに委譲してるよ、と書いてあります…。
続いて、ChannelPipelineFactory。
ChatServerPipelineFactory.scala
import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels} import org.jboss.netty.handler.codec.frame.{DelimiterBasedFrameDecoder, Delimiters} import org.jboss.netty.handler.codec.string.{StringDecoder, StringEncoder} class ChatServerPipelineFactory extends ChannelPipelineFactory { def getPipeline(): ChannelPipeline = { val pipeline = Channels.pipeline pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*)) pipeline.addLast("decoder", new StringDecoder) pipeline.addLast("encoder", new StringEncoder) pipeline.addLast("handler", new ChatServerHandler) pipeline } }
こちらは、特に面白いところはありません。が、Scalaで書いているとDelimiterBasedFrameDecoderのコンストラクタ呼び出しが、ちょっとうまくいきませんでした。なんか、コンパイラがどのコンストラクタを呼ぶのか迷っていたので、
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*))
と明示的に書くことに。
サーバ側のHandlerクラス。
ChatServerHandler.scala
import scala.collection.JavaConverters._ import java.net.InetAddress import java.util.logging.{Level, Logger} import org.jboss.netty.channel.Channel import org.jboss.netty.channel.ChannelEvent import org.jboss.netty.channel.ChannelFuture import org.jboss.netty.channel.ChannelFutureListener import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.MessageEvent import org.jboss.netty.channel.SimpleChannelUpstreamHandler import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup} import ChatServerHandler._ object ChatServerHandler { private val logger: Logger = Logger.getLogger(classOf[ChatServerHandler].getName) val channels: ChannelGroup = new DefaultChannelGroup class Greeter extends ChannelFutureListener { @throws(classOf[Exception]) def operationComplete(future: ChannelFuture): Unit = future isSuccess match { case true => val ch = future.getChannel ch.write("Welcome to %s chat server!\n".format(InetAddress.getLocalHost.getHostName)) channels.add(ch) case false => future.getChannel.close() } } } class ChatServerHandler extends SimpleChannelUpstreamHandler { @throws(classOf[Exception]) override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { if (e.isInstanceOf[ChannelStateEvent]) logger.info(e.toString) super.handleUpstream(ctx, e) } @throws(classOf[Exception]) override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = e.getFuture.addListener(new Greeter) @throws(classOf[Exception]) override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = channels.remove(e.getChannel) @throws(classOf[Exception]) override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { val request = e.getMessage.asInstanceOf[String] for (c <- channels.asScala) { if (c != e.getChannel) { c.write("[%s] %s\n".format(e.getChannel.getRemoteAddress, request)) } else { c.write("[you] %s\n".format(request)) } } if (request.toLowerCase == "bye") { e.getChannel.close() } } @throws(classOf[Exception]) override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { logger.log(Level.WARNING, "Unexcepted exception from downstream.", e.getCause) e.getChannel.close() } }
SSLがなくなった分だけ単純になりましたが、channelConnected時にChannelFutureListenerをくっつけるHandlerがなくなってしまったので、代わりにChannelStateEvent#getFutureで取得できるChannelFutureに対してListenerを追加しました。
今度は、クライアント側。こちらはあまり面白いところがないので、さらさらと書きます。
ChatClient.scala
import scala.annotation.tailrec import java.io.IOException import java.net.InetSocketAddress import java.util.concurrent.Executors import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel.{Channel, ChannelFuture} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory object ChatClient { @throws(classOf[Exception]) def main(args: Array[String]): Unit = { val (host, port) = args size match { case 0 => ("localhost", 8080) case 1 => ("localhost", args(0).toInt) case 2 => (args(0), args(1).toInt) case _ => printf("Usage: %s <host> <port>%n", classOf[ChatClient].getSimpleName) sys.exit(1) } new ChatClient(host, port).run() } } class ChatClient(host: String, port: Int) { @throws(classOf[IOException]) def run(): Unit = { val bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool, Executors.newCachedThreadPool)) bootstrap.setPipelineFactory(new ChatClientPipelineFactory) val future = bootstrap.connect(new InetSocketAddress(host, port)) val channel = future.awaitUninterruptibly().getChannel future isSuccess match { case true => readLineWhile(channel).foreach(_.awaitUninterruptibly()) channel.close().awaitUninterruptibly() case false => future.getCause.printStackTrace() } bootstrap.releaseExternalResources() } @tailrec private def readLineWhile(channel: Channel, lastWriteFuture: Option[ChannelFuture] = None): Option[ChannelFuture] = readLine() match { case null => lastWriteFuture case line if line == "bye" => val future = Some(channel.write(line + "\r\n")) channel.getCloseFuture.awaitUninterruptibly() future case line => val future = Some(channel.write(line + "\r\n")) readLineWhile(channel, future) } }
ChatClientPipelineFactory.scala
import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels} import org.jboss.netty.handler.codec.frame.{DelimiterBasedFrameDecoder, Delimiters} import org.jboss.netty.handler.codec.string.{StringDecoder, StringEncoder} class ChatClientPipelineFactory extends ChannelPipelineFactory { @throws(classOf[Exception]) def getPipeline(): ChannelPipeline = { val pipeline = Channels.pipeline pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*)) pipeline.addLast("decoder", new StringDecoder) pipeline.addLast("encoder", new StringEncoder) pipeline.addLast("handler", new ChatClientHandler) pipeline } }
ChatClientHandler.scala
import java.util.logging.{Level, Logger} import org.jboss.netty.channel.ChannelEvent import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.MessageEvent import org.jboss.netty.channel.SimpleChannelUpstreamHandler import ChatClientHandler._ object ChatClientHandler { private val logger: Logger = Logger.getLogger(classOf[ChatClientHandler].getName) } class ChatClientHandler extends SimpleChannelUpstreamHandler { @throws(classOf[Exception]) override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { if (e.isInstanceOf[ChannelEvent]) logger.info(e.toString) super.handleUpstream(ctx, e) } @throws(classOf[Exception]) override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = println(e.getMessage) @throws(classOf[Exception]) override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { logger.log(Level.WARNING, "Unexcepted exception from dowstream.", e.getCause) e.getChannel.close() } }
動作結果はこちら。まずはサーバ側。sbtコンソール上で実行しています。
> run-main ChatServer [info] Running ChatServer
クライアントから接続があると、こんな感じでログが出ます。
6 03, 2012 6:39:15 午後 ChatServerHandler handleUpstream 情報: [id: 0x6d2a4623, /127.0.0.1:51970 => /127.0.0.1:8080] OPEN 6 03, 2012 6:39:15 午後 ChatServerHandler handleUpstream 情報: [id: 0x6d2a4623, /127.0.0.1:51970 => /127.0.0.1:8080] BOUND: /127.0.0.1:8080 6 03, 2012 6:39:15 午後 ChatServerHandler handleUpstream 情報: [id: 0x6d2a4623, /127.0.0.1:51970 => /127.0.0.1:8080] CONNECTED: /127.0.0.1:51970
クライアントから「bye」と送信されると、接続を切るのでこうなります。
6 03, 2012 6:39:55 午後 ChatServerHandler handleUpstream 情報: [id: 0x6d2a4623, /127.0.0.1:51970 :> /127.0.0.1:8080] DISCONNECTED 6 03, 2012 6:39:55 午後 ChatServerHandler handleUpstream 情報: [id: 0x6d2a4623, /127.0.0.1:51970 :> /127.0.0.1:8080] UNBOUND 6 03, 2012 6:39:55 午後 ChatServerHandler handleUpstream 情報: [id: 0x6d2a4623, /127.0.0.1:51970 :> /127.0.0.1:8080] CLOSED
クライアント側は、こんな感じ。こちらは、シェルからsbtコマンド経由で起動しています。
$ sbt "run-main ChatClient" [info] Set current project to netty-chat (in build file:/xxxxx/netty-chat/) [info] Running ChatClient 6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a] OPEN 6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] BOUND: /127.0.0.1:51972 6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] CONNECTED: localhost/127.0.0.1:8080 6 03, 2012 6:41:37 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] RECEIVED: Welcome to ubuntu chat server! Welcome to ubuntu chat server!
発言すると、自身のコンソールにはこんな感じで出力されます。
6 03, 2012 6:43:13 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] WRITTEN_AMOUNT: 7 6 03, 2012 6:43:13 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] RECEIVED: [you] Hello [you] Hello
他人が発言した場合は、接続元情報と共にメッセージが出力されます。
6 03, 2012 6:44:12 午後 ChatClientHandler handleUpstream 情報: [id: 0x6c46708a, /127.0.0.1:51972 => localhost/127.0.0.1:8080] RECEIVED: [/127.0.0.1:51977] Enter [/127.0.0.1:51977] Enter
あと、「bye」と入力すると接続が切れます。その発言自体も、相手側に出力されますが…。
とりあえず、まだ普通に書けそうです(笑)。でも、シャットダウン周りのコーディングはちょっと怪しいかなぁ。もうちょっと現実味のあるものを書き始めると、もっと面白くなるかも?