CLOVER🍀

That was when it all began.

Clojureで学ぶ、UDPネットワークプログラミング - 基礎

前回、突然TCPソケットを使ったEcho Serverを書きましたが、冒頭にちょこっと述べていましたがホントはUDPソケットを使ってみたかったのでした。

なにかと言いますと、最近Infinispan(JGroups)みたいなマルチキャストを使ったノードディスカバリを行うミドルウェアとかを使っているところもあり、UDPを知っていた方がよくない?と思い始めていたことがきっかけです。

これまでにもいくつかネットワークプログラミングの勉強をしたり本を買ったりはしていましたが、だいたいUDPはあんまり興味なくて、さらっと流してたり飛ばしたりしてたりしていたことが多かったので…。

ちょっと更新ペースはイマイチかもしれませんが、ちょっとずつ勉強していこうと思います。

で、プログラムはClojureで書きます。そして、前回同様、チェックの意味も込めてScalaでのリライト版も掲載する感じで。

では、まず簡単なUDPサーバ。動きとしては、前のEcho Serverの簡易版的なもので、

  • 入力された文字に「You Say => 」をくっつけて、クライアントに送信する
  • クライアントの接続ごとに、マルチスレッドで動作する

ところだけをやるようにします。

ここで、いらんこと文字を付け足して返却するというところを入れたおかげで、だ〜いぶてこずりました。

では、プログラムの方を。
udp_server.clj

(import '(java.net DatagramPacket DatagramSocket InetSocketAddress)
        '(java.util Date))
(require '[clojure.string :as str])

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  (str/join \  (conj more-words word)))))

(let [address (InetSocketAddress. 50000)]
  (with-open [socket (DatagramSocket. address)]
    (log "Simple Clojure UDP Server" address "Startup.")

    (letfn [(receive []
              (let [buffer (byte-array 8192)
                    packet (DatagramPacket. buffer (alength buffer))]
                (.receive socket packet)
                packet))
            (accepted-client [s p]
              (let [data (.getData p)
                    offset (.getOffset p)
                    length (.getLength p)
                    word (String. data offset length "UTF-8")
                    reply-word (str "You Say => " word)
                    reply-word-binary (.getBytes reply-word "UTF-8")]
                (.setData p
                          reply-word-binary
                          0
                          (alength reply-word-binary))
                (.send s p)))]

      (doseq [packet
              (repeatedly receive)]
        (log "Accept Client" "=>" (.getSocketAddress packet))
        (.start (Thread. #(accepted-client socket packet)))))))

UDPソケットを使った主な登場人物は、

  • DatagramSocket
  • DatagramPacket

になります。また、コネクションレスなので、接続を作ったり終了する必要がないことが特徴だそうな。…一応、DatagramSocketにcloseメソッドがあるので、呼び出すようにはしていますが。

UDPはストリームベースのプロトコルではないので、DatagramPacketを作成する時のバッファサイズが重要ですね。今回は、8,192バイトにしていますが。

              (let [buffer (byte-array 8192)
                    packet (DatagramPacket. buffer (alength buffer))]

クライアントからの接続は、DatagramSocket#receiveで待ち受けます。

                (.receive socket packet)

また、送信されたデータは、DatagramPacket#getDataメソッドでbyte配列で受け取り、そのオフセットと長さはそれぞれgetOffset、getLengthメソッドで取得します。

              (let [data (.getData p)
                    offset (.getOffset p)
                    length (.getLength p)
                    word (String. data offset length "UTF-8")

今回は、文字列として受ける前提なので、その値をそのまま使ってStringを構築しています。

で、そこに追加の文字列をくっつけて

                    reply-word (str "You Say => " word)
                    reply-word-binary (.getBytes reply-word "UTF-8")]

DatagramPacket#setDataにオフセットと長さと一緒に設定。

                (.setData p
                          reply-word-binary
                          0
                          (alength reply-word-binary))

これで、クライアントから見た時のDatagramPacket#getLengthが、サーバから戻されたbyte配列の長さになります。

で、送信。

                (.send s p)))]

続いて、クライアント側。

udp_client.clj 
(import '(java.net DatagramPacket DatagramSocket InetSocketAddress))

(let [address (InetSocketAddress. 50000)]
  (with-open [socket (DatagramSocket.)]
    (let [word "Hello, Simple UDP Client!!"
          word-binary (.getBytes word "UTF-8")
          buffer (byte-array 8192)]
      (System/arraycopy word-binary 0 buffer 0 (alength word-binary))

      (let [packet (DatagramPacket. buffer 0 (alength word-binary) address)]
        (.send socket packet)
        (.setLength packet 8192)
        (.receive socket packet)
        (println (String. (.getData packet)
                          (.getOffset packet)
                          (.getLength packet)
                          "UTF-8"))))))

短いですよー。超ハマりましたけど…。

クライアント側も、DatagramSocketを作成します。

  (with-open [socket (DatagramSocket.)]

が、こちらはアドレス系の情報は指定しません。ちょっとダミー的なところがあります。

とりあえず、送信するデータを作成して

    (let [word "Hello, Simple UDP Client!!"
          word-binary (.getBytes word "UTF-8")
          buffer (byte-array 8192)]
      (System/arraycopy word-binary 0 buffer 0 (alength word-binary))

送信。

      (let [packet (DatagramPacket. buffer 0 (alength word-binary) address)]

ここで、宛先のアドレスを指定しています。

で、サーバから戻ってくるデータは、送信したデータより長い可能性がある(というか、長い)ので、長さを広げておきます。

        (.setLength packet 8192)

あとは、DatagramSocket#receiveするだけですね。

        (.receive socket packet)

ここで、ちゃんと長さを考えないといけないとか、DatagramPacketを作った時のbyte配列の長さにいろいろ依存するとかを最初わかってなくて、いろいろハマりました。まあ、冷静に考えるとそりゃあそうだよなぁって感じですが。

つか、受信と送信でDatagramPacketを別にしてもよかったですかね。

受信したデータは、こちらもStringにしているだけです。

        (println (String. (.getData packet)
                          (.getOffset packet)
                          (.getLength packet)
                          "UTF-8"))))))

UDPソケットの勉強には、こちらの書籍を使っています。

Javaネットワークプログラミングの真髄

Javaネットワークプログラミングの真髄

これから、頑張って勉強しましょう。

で、自分の理解の確認とClojureで書いたところの点検の意味も含めて、Scalaで書き直しているのでこちらも貼っておきます。

行っていることはほぼ同じなので、説明は割愛です。Clojure慣れしていない人は、こちらの方がまだ読みやすいかな…。

あ、クライアントとサーバは同じファイルに書いています。
UdpClientServer.scala

import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
import java.nio.charset.StandardCharsets
import java.util.Date

import UdpHelper._

object UdpHelper {
  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }

  def log(msgs: Any*): Unit =
    println(s"[${new Date}] ${msgs.mkString(" ")}")
}

object UdpServer {
  def main(args: Array[String]): Unit = {
    val address = new InetSocketAddress(50000)
    for (socket <- new DatagramSocket(address)) {
      log("Simple Scala UDP Server", address, "Startup.")

      Iterator
        .continually {
          val packet = new DatagramPacket(Array.ofDim[Byte](8192), 8192)
          socket.receive(packet)
          packet
        }.foreach { packet =>
          log("Accept Client", "=>", packet.getSocketAddress)
          new Thread {
            override def run(): Unit = acceptedClient(socket, packet)
          }.start()
        }
    }
  }

  private def acceptedClient(socket: DatagramSocket, packet: DatagramPacket): Unit = {
    val word = new String(packet.getData,
                          packet.getOffset,
                          packet.getLength,
                          StandardCharsets.UTF_8)
    val replyWord = s"You Say => $word"
    val replyWordBinary = replyWord.getBytes(StandardCharsets.UTF_8)

    packet.setData(replyWordBinary, 0, replyWordBinary.size)
    socket.send(packet)
  }
}

object UdpClient {
  def main(args: Array[String]): Unit = {
    val address = new InetSocketAddress(50000)
    for (socket <- new DatagramSocket) {
      val word = "Hello, Simple UDP Client!!"
      val wordBinary = word.getBytes(StandardCharsets.UTF_8)
      val buffer = Array.ofDim[Byte](8192)

      System.arraycopy(wordBinary, 0, buffer, 0, wordBinary.size)

      val packet = new DatagramPacket(buffer, 0, wordBinary.size, address)
      socket.send(packet)
      packet.setLength(8192)
      socket.receive(packet)
      println(new String(packet.getData,
                         packet.getOffset,
                         packet.getLength,
                         StandardCharsets.UTF_8))
    }
  }
}