CLOVER🍀

That was when it all began.

Clojureで学ぶ、UDPネットワークプログラミング - ソケットオプション

前回の続きで、今回は書籍に習って前回のUDPサーバ/クライアントの改良型を書きます。

Javaネットワークプログラミングの真髄

Javaネットワークプログラミングの真髄

まずは、できあがったプログラムを。サーバ側からいきます。
udp_server.clj

(import '(java.net DatagramPacket DatagramSocket InetSocketAddress)
        '(java.util Date))
(require '[clojure.string :as str])

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  (str/join \  (conj more-words word)))))

(def max-datagram-bytes 512)
(def max-send-queue 4)
(def max-recv-queue 40)

(def port 50000)

(let [address (InetSocketAddress. port)]
  (with-open [socket (DatagramSocket. address)]
    (.setSendBufferSize socket (* max-datagram-bytes max-send-queue))
    (.setReceiveBufferSize socket (* max-datagram-bytes max-recv-queue))

    (log "DatagramSocket SendBufferSize = " (.getSendBufferSize socket))
    (log "DatagramSocket ReceiveBufferSize = " (.getReceiveBufferSize socket))

    (log "More Better Clojure UDP Server" address "Startup.")
    (letfn [(receive []
              (let [buffer (byte-array 8192)
                    packet (DatagramPacket. buffer (alength buffer))]
                (.receive socket packet)
                packet))
            (accepted-client [packet]
              (let [buffer (.getData packet)
                    offset (.getOffset packet)
                    length (.getLength packet)
                    reply-word (str "More Better Server Reply => ["
                                    (String. buffer offset length "UTF-8")
                                    "]")
                    reply-word-binary (.getBytes reply-word "UTF-8")]
                (.setData packet reply-word-binary 0 (alength reply-word-binary))
                (.send socket packet)))]

      (doseq [packet (repeatedly receive)]
        (log "Accept Client" "=>" (.getSocketAddress packet))
        (.start (Thread. #(accepted-client packet))))))))

まあ、細かいところでちょこっと改良してあるのですが、今回のネタはまずソケットのバッファサイズ。

    (.setSendBufferSize socket (* max-datagram-bytes max-send-queue))
    (.setReceiveBufferSize socket (* max-datagram-bytes max-recv-queue))

それぞれ、送信と受信のバッファサイズを指定しています。今回参考にしている書籍によると、受信バッファのサイズはデータグラムの最大期待サイズの倍数+1、送信バッファのサイズはデータグラムの最大サイズの倍数がよいらしいです。

…今回は、けっこうデタラメですが。

あと、やっぱりDatagramSocketはクローズした方がよいそうです。そりゃあそうですよね…。

続いて、クライアント側。
udp_client.clj

(import '(java.net DatagramPacket DatagramSocket InetSocketAddress SocketTimeoutException)
        '(java.util Date))
(require '[clojure.string :as str])

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  (str/join (conj more-words word)))))

(def max-datagram-bytes 512)
(def max-send-queue 4)
(def max-recv-queue 4)
(def max-retries 4)
(def initial-timeout (* 2 1000))

(def port 50000)

(with-open [socket (DatagramSocket.)]
  (let [address (InetSocketAddress. port)
        initial-buffer (byte-array 8192)
        packet (DatagramPacket. initial-buffer (alength initial-buffer) address)]
    (.setSendBufferSize socket (* max-datagram-bytes max-send-queue))
    (.setReceiveBufferSize socket (* max-datagram-bytes max-recv-queue))

    (log "Start More Better Clojure UDP Client, connect to [" (.getSocketAddress packet) "]")

    (letfn [(send-receive-retries [buffer offset length cnt timeout exception]
              (if (< cnt max-retries)
                (do
                  (System/arraycopy buffer
                                    offset
                                    (.getData packet)
                                    offset
                                    length)
                  (.setData packet (.getData packet) offset length)
                  (.send socket packet)

                  (try
                    (.setSoTimeout socket timeout)
                    (.setLength packet 8192)
                    (.receive socket packet)

                    (println (String. (.getData packet)
                                      (.getOffset packet)
                                      (.getLength packet)
                                      "UTF-8"))
                    (catch SocketTimeoutException e
                      (send-receive-retries buffer
                                            offset
                                            length
                                            (inc cnt)
                                            (* timeout 2) e))))
                (throw exception)))
            (send-receive [buffer offset length]
              (send-receive-retries buffer offset length 0 initial-timeout nil))]

      (doseq [word (take-while #(not (or (nil? %) (empty? %) (= % "exit")))
                               (repeatedly read-line))]
        (let [buffer (.getBytes word "UTF-8")]
          (send-receive buffer 0 (alength buffer)))))))

サーバ側と同様に送信・受信バッファを設定していますが、もう1点異なるのはタイムアウトを設定していることです。

                    (.setSoTimeout socket timeout)

ここは、TCPソケットと同じくSoTimeoutですね。

このクライアントは、SoTimeoutで設定した受信時間をオーバーすると、3回リトライを行うように実装されています。

あと、書籍とは関係ないですけど、何も入力せずEnterを打つか、「exit」と入力してEnterを打つまでは入力を受け付け続けるようにしました。

今回は、ちょっと動かしてみましょうか。

サーバを起動。

$ clj udp_server.clj 
[Fri Sep 20 00:06:28 JST 2013] DatagramSocket SendBufferSize =  2048
[Fri Sep 20 00:06:28 JST 2013] DatagramSocket ReceiveBufferSize =  20480
[Fri Sep 20 00:06:28 JST 2013] More Better Clojure UDP Server 0.0.0.0/0.0.0.0:50000 Startup.

クライアントを実行。

$ clj udp_client.clj 
[Fri Sep 20 00:06:56 JST 2013] Start More Better Clojure UDP Client, connect to [0.0.0.0/0.0.0.0:50000]
Hello World
More Better Server Reply => [Hello World]
こんにちは、世界
More Better Server Reply => [こんにちは、世界]
exit

サーバ側には、こんなログが出ています。

[Fri Sep 20 00:07:01 JST 2013] Accept Client => /0:0:0:0:0:0:0:1:51917
[Fri Sep 20 00:07:06 JST 2013] Accept Client => /0:0:0:0:0:0:0:1:51917

今回は、バッファとタイムアウトの設定、クライアントのリトライを実装しました。

終わりに、Scalaでのリライト版です。
UdpClientServer.scala

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date

import UdpHelper._

object UdpHelper {
  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")
}

object UdpServer {
  val MAX_DATAGRAM_BYTES: Int = 512
  val MAX_SEND_QUEUE: Int = 4
  val MAX_RECV_QUEUE: Int = 40

  val BUFFER_SIZE: Int = 8192

  def main(args: Array[String]): Unit = {
    val address = new InetSocketAddress(50000)
    for (socket <- new DatagramSocket(address)) {
      socket.setSendBufferSize(MAX_DATAGRAM_BYTES * MAX_SEND_QUEUE)
      socket.setReceiveBufferSize(MAX_DATAGRAM_BYTES * MAX_RECV_QUEUE)

      log(s"DatagramSocket SendBufferSize = ${socket.getSendBufferSize}")
      log(s"DatagramSocket ReceiveBufferSize = ${socket.getReceiveBufferSize}")

      log(s"More Better Scala UDP Server $address Startup.")

      Iterator
        .continually {
          val buffer = Array.ofDim[Byte](BUFFER_SIZE)
          val packet = new DatagramPacket(buffer, 0, buffer.size)
          socket.receive(packet)
          packet
        }.foreach { packet =>
          log(s"Accept Client", "=>", packet.getSocketAddress)
          new Thread {
            override def run(): Unit = acceptedClient(socket, packet)
          }.start()
        }
    }
  }

  private def acceptedClient(socket: DatagramSocket, packet: DatagramPacket): Unit = {
    val word = new String(packet.getData,
                          packet.getOffset,
                          packet.getLength,
                          StandardCharsets.UTF_8)
    val replyWord = s"More Better Server Reply => [$word]"
    val replyWordBinary = replyWord.getBytes(StandardCharsets.UTF_8)

    packet.setData(replyWordBinary, 0, replyWordBinary.size)
    socket.send(packet)
  }
}

object UdpClient {
  val MAX_DATAGRAM_BYTES: Int = 512
  val MAX_SEND_QUEUE: Int = 4
  val MAX_RECV_QUEUE: Int = 4
  val MAX_RETRIES: Int = 4
  val TIMEOUT: Int = 2 * 1000
  val BUFFER_SIZE: Int = 8192

  def main(args: Array[String]): Unit = {
    val address = new InetSocketAddress(50000)
    for (socket <- new DatagramSocket) {
      val buffer = Array.ofDim[Byte](BUFFER_SIZE)
      val packet = new DatagramPacket(buffer, 0, buffer.size, address)

      socket.setSendBufferSize(MAX_DATAGRAM_BYTES * MAX_SEND_QUEUE)
      socket.setReceiveBufferSize(MAX_DATAGRAM_BYTES * MAX_RECV_QUEUE)

      log(s"Start More Better UDP Scala Client, connect to [$address]")

      Iterator
        .continually(readLine())
        .takeWhile(line => line != null && !line.isEmpty && line != "exit")
        .foreach(line => sendReceive(socket, packet, line))
    }
  }

  private def sendReceive(socket: DatagramSocket, packet: DatagramPacket, word: String): Unit = {
    val wordBinary = word.getBytes(StandardCharsets.UTF_8)
    System.arraycopy(wordBinary,
                     0,
                     packet.getData,
                     0,
                     wordBinary.size)
        packet.setData(packet.getData,
                       0,
                       wordBinary.size)

    @tailrec
    def sendReceiveInner(timeout: Int, n: Int): Unit =
      Try {
        packet.setLength(wordBinary.size)

        socket.send(packet)

        socket.setSoTimeout(timeout)
        packet.setLength(BUFFER_SIZE)
        socket.receive(packet)

        new String(packet.getData,
                   packet.getOffset,
                   packet.getLength,
                   StandardCharsets.UTF_8)
      } match {
        case Success(msg) => println(msg)
        case Failure(e: SocketTimeoutException) =>
          val next = n + 1
          if (next < MAX_RETRIES)
            sendReceiveInner(timeout * 2, n + 1)
          else
            throw e
        case Failure(e) =>
          throw e
      }

    sendReceiveInner(TIMEOUT, 0)
  }
}