CLOVER🍀

That was when it all began.

Scalaで曞く、高信頌型ReliableUDP Echoクラむアントサヌバ

これたで、Clojureを䞭心にUDPを䜿ったプログラミングをしおいたしたが、今回はちょっずScalaのみにしたす。

お題は、高信頌型ReliableUDP。

UDPは、デヌタグラムが正しい順序で到着するこずを保蚌したせんし、パケットロスも怜出できたせん。これを克服するために、

  • すべおのデヌタグラムにシヌケンス番号を付ける
  • 送信偎は、受信偎からの確認にタむムアりトを蚭け、確認が時間以内に来なかったらリク゚ストを再送する

ずいうこずで信頌性を付䞎したす。

タむムアりトず再送に぀いおは、単に䞀定のタむムアりト時間を䜿うやり方ではなく、ネットワヌクの性質や状態、負荷などを考慮する必芁がありたす。ここで、TCPに習い

  • パケットが送信者から受信者に到着しお再び戻るたでの埀埩時間Round-Trip Timeラりンドトリップタむムの珟時点の掚蚈倀を統蚈的に求め、維持する。この掚蚈倀を元に、パケットをネットワヌクに送り出すものの、それが速すぎおネットワヌクを独占的に充満しないようにする
  • 次のリトラむたでの埅ち時間を蚈算する時、「指数的バックオフ」を䜿うので、゚ラヌが繰り替えされる床に次の再送たでの時間が指数的に䌞びおいく。するず、ネットワヌク䞊のパケット数がだんだん枛っおいくので、次の再送がうたくいく可胜性が高くなる

ずいうこずを行いたす。ここで、指数的バックオフずいうのは、䟋えば3床目の再送たでの時間は、基準倀nの3倍ではなく、2の3乗倍ずするようなこずです。TCPの茻茳回避のテクニックず。

ずたあ、たいそうなこずを曞いおいたすがこちらの曞籍の受け売りです。

Javaネットワヌクプログラミングの真髄

Javaネットワヌクプログラミングの真髄

こちらの曞籍に、ReliableなUDP Echoサヌバの䟋があったので、それをScalaで曞き盎したした。本来はClojureでやった方が なのですが、今のClojure力だず実装ずデバッグが困難になる気がするので、ここはいったんScalaで。

ちなみに、曞籍のサンプルには明らかな誀りがあっお、そちらはサポヌトペヌゞを芋お修正したした。

Javaネットワヌクプログラミングの真髄--follow-up
http://homepage1.nifty.com/algafield/jnet/

そしお、曞籍には茉っおいなかったReliableなクラむアントも远加しおいたす。

今回のプログラムで最も重芁なのは、ReliableDatagramSocketずいうクラスで、以䞋の機胜を実装したものです。

  • 送信されるパケットにナニヌクなシヌケンス番号を付け、受信したパケットからそのシヌケンスを取り出す。その操䜜は、呌び出し元には芋えないずころで行われる
  • 入力からシヌケンス番号を取り出すメ゜ッドず、それを出力にセットするメ゜ッドを、サヌバがリプラむを甚意する時に䜿う
  • クラむアントが䜿う、sendReceiveメ゜ッドを䜜成する。これは、送信甚のデヌタグラムず受信甚のデヌタグラムを匕数に取り、送信のデヌタグラムを送り出しおサヌバのリプラむを埅぀。その埌、指定時間たでにレスポンスがなければ、間隔を調敎しながらタむムアりトず再送を最倧リトラむカりントたで繰り返す。最倧カりントを超えるず、SocketTimeoutExceptionを投げる

タむムアりト時の䟋倖は、SocketTimeoutExceptionにしたした。さすがに、JDK 1.4より前はもういいでしょう 。

では、曞いおいきたす。ほが写経なので、Scalaっぜくないずころはご愛嬌。たずはimport文。

import scala.annotation.tailrec
import scala.math.{abs, max, min}
import scala.util.{Failure, Success, Try}

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException}
import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date

import Reliabilities._

Reliabilitiesずいうのは、これから茉せる定数ずかを定矩したオブゞェクトです。

で、その定数ずかを定矩したReliabilitiesオブゞェクト。

// このプログラムで、時間の単䜍はすべお「秒」

object Reliabilities {
  // タむムアりトの最小・最倧倀
  val MIN_RETRANSMIT_TIMEOUT: Int = 1
  val MAX_RETRANSMIT_TIMEOUT: Int = 64
  // ひず぀のデヌタグラムの最倧再送回数、3か4くらい
  val MAX_RETRANSMISSIONS: Int = 4

  // 通信先ポヌト
  val PORT: Int = 50000

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")

  // AutoCloseableをfor匏で䜿えるようにするための、Implicit Class
  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

コメントにもありたすが、今回登堎するプログラムに䜿われる時間の単䜍は、党お「秒」です。

続いお、ラりンドトリップタむムを蚈算するクラス。

class RoundTripTimer {
  // 最も最近のラりンドトリップタむムRTT
  private var roundTripTime: Float = 0.0F
  // 平滑化したRTT
  private var smoothedTripTime: Float = 0.0F
  // 平滑化した暙準偏差
  private var deviation: Float = 0.75F
  // 再送カりント0, 1, 2
  private var retransmissions: Short = 0
  // 珟圚の再送タむムアりト
  var currentTimeout = minmax(calculateRetransmitTimeout)

  // 再送タむムアりトを返す
  private def calculateRetransmitTimeout: Int =
    (smoothedTripTime + 4.0 * deviation).toInt

  // 䞊限のある再送タむムアりトを返す
  private def minmax(rto: Float): Float =
    min(max(rto, MIN_RETRANSMIT_TIMEOUT), MAX_RETRANSMIT_TIMEOUT)

  // 新たなパケットを送信する床に、珟圚の再送カりントを初期化する
  def newPacket(): Unit =
    retransmissions = 0

  /**
   * 成功した受信の盎埌に呌ばれ、ラりンドトリップタむムを蚈算し、
   * 次に平滑化したラりンドトリップずその分散偏差を蚈算する
   **/
  def stoppedAt(ms: Long): Unit = {
    // このパケットのラりンドトリップを蚈算する
    roundTripTime = ms/1000

    // ラりンドリップタむムの掚蚈倀ずその平均偏差を曎新する
    val delta = roundTripTime - smoothedTripTime
    smoothedTripTime += (delta / 8.0).toFloat
    deviation += ((abs(delta) - deviation) / 4.0).toFloat

    // 珟圚のタむムアりトを再蚈算する
    currentTimeout= minmax(calculateRetransmitTimeout)
  }

  /**
   * タむムアりトが生じた埌に呌ばれる。ギブアップすべき時間なら true を返华し、
   * 再送できるなら false を返华する
   **/
  def isTimeout(): Boolean = {
    currentTimeout *= 2  // 次の再送タむムアりト
    retransmissions = (retransmissions + 1).toShort
    retransmissions > MAX_RETRANSMISSIONS
  }
}

この埌登堎するReliableDatagramPacket#sendReceiveメ゜ッドを䜿ったパケット送信時に、このクラスのnewPacketメ゜ッドを呌び出し、リトラむ回数をリセットしたす。䜿甚元でタむムアりトを怜出した時は、isTimeoutメ゜ッドを呌び出しリトラむの䞊限を確認するず共に、次回の再送タむムアりトを䌞ばしおいきたす。正垞に通信ができた堎合は、stoppedAtメ゜ッドを呌び出しラりンドトリップタむムの掚蚈倀を求め、その平均偏差を曎新したす。

そしお、ReliableDatagramSocketクラス。

class ReliableDatagramSocket(localAddr: SocketAddress) extends DatagramSocket(localAddr) {
  private var roundTripTimer: RoundTripTimer = new RoundTripTimer
  private var reinit: Boolean = false
  var sendSequenceNo: Long = 0L  // 送信の順序番号
  var recvSequenceNo: Long = 0L  // 受信の順序番号

  init()

  def this(port: Int) = this(new InetSocketAddress(port))
  def this(port: Int, localAddr: InetAddress) = this(new InetSocketAddress(localAddr, port))
  def this() = this(null)

  // 初期化
  private def init(): Unit =
    roundTripTimer = new RoundTripTimer

  // コネクトした埌、接続甚の統蚈を再初期化する
  override def connect(dest: InetAddress, port: Int): Unit = {
    super.connect(dest, port)
    init()
  }

  // コネクトした埌、接続甚の統蚈を再初期化する
  override def connect(dest: SocketAddress): Unit = {
    super.connect(dest)
    init()
  }

  @throws(classOf[IOException])
  def sendReceive(sendPacket: DatagramPacket, recvPacket: DatagramPacket): Unit = synchronized {
    // タむムアりト埌に再初期化する
    if (reinit) {
      init()
      reinit = false
    }

    roundTripTimer.newPacket()

    val start = System.currentTimeMillis
    val sequenceNumber = sendSequenceNo

    // 最埌のタむムアりト、たたは予期しない䟋倖が起きるたで繰り返し
    // リトラむ䞭は、同じsequenceNumberを䜿甚し続ける
    Iterator.continually {
      Try {
        sendSequenceNo = sequenceNumber
        send(sendPacket)  // 䟋倖を投げおも良い

        val timeout = (roundTripTimer.currentTimeout * 1000.0 + 0.5).toInt
        val soTimeoutStart = System.currentTimeMillis

        @tailrec
        def receiveRetries(): Long = {
          // ゜ケットのタむムアりト倀を、すでに経過した時間で調敎する
          val soTimeout = (timeout - (System.currentTimeMillis.toInt - soTimeoutStart)).toInt
          setSoTimeout(soTimeout)
          receive(recvPacket)
          recvSequenceNo match {
            case `sequenceNumber` => recvSequenceNo // シヌケンスが䞀臎しおいれば、ルヌプをストップ
            case _ => receiveRetries()
          }
        }

        receiveRetries()
      }
    }.takeWhile {
      case Success(_) => false  // シヌケンスが䞀臎しおいれば、ルヌプをストップ
      case Failure(e: SocketTimeoutException) =>
        // タむムアりトで、リトラむするかどうか
        if (roundTripTimer.isTimeout()) {
          reinit = true
          throw e
        } else {
          // リトラむする
          true
        }
      case Failure(e) => throw e
    }.foreach { retry => } // シヌケンスの䞍䞀臎、たたはタむムアりトのためリトラむ

    // 正しいリプラむを埗た
    // タむマヌを停止し、新たなRTTの倀を蚈算する
    val ms = System.currentTimeMillis - start
    roundTripTimer.stoppedAt(ms)
  }

  // 順序番号を凊理する
  @throws(classOf[IOException])
  override def receive(packet: DatagramPacket): Unit = {
    super.receive(packet)

    // 順序番号を読み、それをパケットから削陀する
    val bais = new ByteArrayInputStream(packet.getData,
                                        packet.getOffset,
                                        packet.getLength)

    val dis = new DataInputStream(bais)
    recvSequenceNo = dis.readLong()
    val buffer = Array.ofDim[Byte](dis.available)
    dis.read(buffer)
    packet.setData(buffer, 0, buffer.size)
  }

  // 順序番号を凊理する
  @throws(classOf[IOException])
  override def send(packet: DatagramPacket): Unit = {
    val baos = new ByteArrayOutputStream
    val dos = new DataOutputStream(baos)

    // 順序番号を曞き出し、次にナヌザデヌタを曞き出す
    dos.writeLong(sendSequenceNo)
    sendSequenceNo += 1
    dos.write(packet.getData, packet.getOffset, packet.getLength)
    dos.flush()

    // この新しいデヌタで新たなパケットをコンストラクトし、送信する
    val data = baos.toByteArray
    val newPacket = new DatagramPacket(data, data.size, packet.getSocketAddress)
    super.send(newPacket)
  }
}

このクラスは、先ほど䜜成したRoundTripTimerクラスのむンスタンスを保持したす。継承元のDatagramSocketクラスのいく぀かのメ゜ッドをオヌバヌラむドしおいお、receiveメ゜ッドでは受け取ったレスポンスからシヌケンス番号を先に読み出し、呌び出し元が䜿うDatagramPacketのむンスタンスからはシヌケンス番号を取り陀いおいたす。sendメ゜ッドでは、送信デヌタの前にシヌケンス番号を挿入し、DatagramPacketを新しく構築しおいたす。

独自のメ゜ッドがsendReceiveで、こちらはクラむアントが䜿甚するこずを意図しおいたす。最初にRoundTripTimerクラスのnewPacketメ゜ッドを呌び出し、リトラむ回数をリセットしたす。

続いお、DatagramPacketを送信。

あずは、タむムアりト倀を調敎し぀぀、受信したデヌタのシヌケンス番号がおかしかったら再床受信埅ち、タむムアりトしたらタむムアりト倀を調敎しおリトラむ、もしくはリトラむ回数をオヌバヌしおいれば諊めるずいう感じになっおいたす。

正垞なシヌケンス番号を受信できた堎合は、ルヌプをストップしおRoundTripTimerクラスのstoppedAtメ゜ッドを呌び出しラりンドトリップタむムの掚蚈倀を求め、その平均偏差を曎新したす。

ちなみに、元の曞籍が間違っおいたのは、ここのsendReceiveメ゜ッドが

      // ゜ケットのタむムアりトをすでに経過した時間で調敎する 
 
        int soTimeout = timeout.(int)
  (System.currentTimeMillis().soTimeoutStart);

みたいな感じで曞かれおいたのですが、正しくは

        // ゜ケットのタむムアりトをすでに経過した時間で調敎する 
 
        int soTimeout = timeout-(int)
  (System.currentTimeMillis()-soTimeoutStart);

です。

そしお、このReliableDatagramSocketを䜿甚するEchoサヌバ。

object ReliableEchoServer {
  def main(args: Array[String]): Unit = {
    val buffer = Array.ofDim[Byte](1024)
    val recvPacket = new DatagramPacket(buffer, buffer.size)

    for (socket <- new ReliableDatagramSocket(PORT)) {
      log("Reliable Single Thread Scala UDP Server", socket.getLocalSocketAddress, "Startup.")

      Iterator.from(1).foreach { i =>
        // 受信パケットのバッファを、最倧にリセットする
        recvPacket.setData(buffer, 0, buffer.size)
        socket.receive(recvPacket)

        // リプラむは、リク゚ストず同じ順序番号ずする
        val seqNo = socket.recvSequenceNo
        socket.sendSequenceNo = seqNo

        // レスポンスずしお、リク゚ストを゚コヌバックする
        socket.send(recvPacket)

        log(s"Receive & Send SeqNo[${socket.recvSequenceNo}]")
      }
    }
  }
}

これたでのクラスに比べるず、だいぶ小さいですね。なお、シングルスレッドで動くこずを前提にしおいたす。ここでのポむントは、ReliableDatagramSocket#receiveでデヌタを受信しお曞き戻す際、぀たりsendメ゜ッドを呌ぶ前に送信甚のシヌケンス番号を蚭定しおいるこずですね。

最埌は、Echoクラむアント偎。

object ReliableEchoClient {
  def main(args: Array[String]): Unit = {
    // 送信甚バッファ
    val sendBuffer = Array.ofDim[Byte](1024)
    // 受信甚バッファ
    val recvBuffer = Array.ofDim[Byte](1024)

    val sendPacket = new DatagramPacket(sendBuffer, sendBuffer.size)
    val recvPacket = new DatagramPacket(recvBuffer, recvBuffer.size)
    val address = new InetSocketAddress(PORT)
    sendPacket.setSocketAddress(address)
    recvPacket.setSocketAddress(address)

    for (socket <- new ReliableDatagramSocket) {
      log(s"Reliable Scala UDP Client", address, "Startup.")

      Iterator
        .continually(readLine())
        .takeWhile(word => word != null && word != "exit")
        .foreach { word =>
          // 送信甚、受信甚パケットをそれぞれ初期化
          sendPacket.setData(sendBuffer, 0, sendBuffer.size)
          recvPacket.setData(recvBuffer, 0, recvBuffer.size)

          val wordBinary = word.getBytes(StandardCharsets.UTF_8)
          System.arraycopy(wordBinary,
                           0,
                           sendPacket.getData,
                           0,
                           wordBinary.size)

          sendPacket.setData(sendPacket.getData, 0, wordBinary.size)

          // 送信  受信
          socket.sendReceive(sendPacket, recvPacket)

          val seqNo = socket.recvSequenceNo

          log(s"Receive Sequence No[$seqNo]")
          log(s"Received =>", new String(recvPacket.getData,
                                         recvPacket.getOffset,
                                         recvPacket.getLength,
                                         StandardCharsets.UTF_8))
        }
    }
  }
}

今たでず同じように、コン゜ヌルから入力されたサヌバぞ送信するプログラムですが、デヌタの送受信はReliableDatagramSocket#sendReceiveで䞀括しお行っおいたす。

動かしおみるず、こんな感じですね。たずはサヌバ起動。

$ scala ReliableEchoServer
[Sat Sep 21 23:29:24 JST 2013] Reliable Single Thread Scala UDP Server 0.0.0.0/0.0.0.0:50000 Startup.

クラむアント起動。

$ scala ReliableEchoClient
[Sat Sep 21 23:29:47 JST 2013] Reliable Scala UDP Client 0.0.0.0/0.0.0.0:50000 Startup.

あずは、適圓に文字列を入力しおいればサヌバが応答しおくれたす。

Hello World
[Sat Sep 21 23:30:22 JST 2013] Receive Sequence No[0]
[Sat Sep 21 23:30:22 JST 2013] Received => Hello World
こんにちは、䞖界
[Sat Sep 21 23:30:26 JST 2013] Receive Sequence No[1]
[Sat Sep 21 23:30:26 JST 2013] Received => こんにちは、䞖界
Reliable UDP Echo.
[Sat Sep 21 23:30:35 JST 2013] Receive Sequence No[2]
[Sat Sep 21 23:30:35 JST 2013] Received => Reliable UDP Echo.
exit

サヌバ偎には、こんなログが。

[Sat Sep 21 23:30:22 JST 2013] Receive & Send SeqNo[0]
[Sat Sep 21 23:30:26 JST 2013] Receive & Send SeqNo[1]
[Sat Sep 21 23:30:35 JST 2013] Receive & Send SeqNo[2]

で、このプログラムですが、ReliableDatagramSocketがシヌケンス番号を持っおしたっおいるので、耇数の接続先には察応できたせん。あくたで、通信しおくるクラむアントはひず぀の想定です。本来は、接続先ごずにシヌケンス番号を管理すべきだず曞籍にも曞いおありたしたしね。サヌバプログラムがシングルスレッドになっおいるのは、これが理由です。

ずはいえ、TCP゜ケットの様に、接続がSocketクラスのむンスタンスみたいに衚せるわけでもないので、そこは工倫が必芁なずころですよね。あず、ホントはパケット分割も考慮しないずですよね。

そのうち、JGroupsの゜ヌスも远っおみようかな UDPずかは少しみたしたけど、このあたりの信頌性担保のずころは、ただ芋れおいたせん。

次のテヌマは、UDPでNIOです。

最埌は、今回曞いた゜ヌスですよ。ちょっず長めのプログラムでしたね。
ReliableUdpClientServer.scala

import scala.annotation.tailrec
import scala.math.{abs, max, min}
import scala.util.{Failure, Success, Try}

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException}
import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date

import Reliabilities._

// このプログラムで、時間の単䜍はすべお「秒」

object Reliabilities {
  // タむムアりトの最小・最倧倀
  val MIN_RETRANSMIT_TIMEOUT: Int = 1
  val MAX_RETRANSMIT_TIMEOUT: Int = 64
  // ひず぀のデヌタグラムの最倧再送回数、3か4くらい
  val MAX_RETRANSMISSIONS: Int = 4

  // 通信先ポヌト
  val PORT: Int = 50000

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")

  // AutoCloseableをfor匏で䜿えるようにするための、Implicit Class
  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

class RoundTripTimer {
  // 最も最近のラりンドトリップタむムRTT
  private var roundTripTime: Float = 0.0F
  // 平滑化したRTT
  private var smoothedTripTime: Float = 0.0F
  // 平滑化した暙準偏差
  private var deviation: Float = 0.75F
  // 再送カりント0, 1, 2
  private var retransmissions: Short = 0
  // 珟圚の再送タむムアりト
  var currentTimeout = minmax(calculateRetransmitTimeout)

  // 再送タむムアりトを返す
  private def calculateRetransmitTimeout: Int =
    (smoothedTripTime + 4.0 * deviation).toInt

  // 䞊限のある再送タむムアりトを返す
  private def minmax(rto: Float): Float =
    min(max(rto, MIN_RETRANSMIT_TIMEOUT), MAX_RETRANSMIT_TIMEOUT)

  // 新たなパケットを送信する床に、珟圚の再送カりントを初期化する
  def newPacket(): Unit =
    retransmissions = 0

  /**
   * 成功した受信の盎埌に呌ばれ、ラりンドトリップタむムを蚈算し、
   * 次に平滑化したラりンドトリップずその分散偏差を蚈算する
   **/
  def stoppedAt(ms: Long): Unit = {
    // このパケットのラりンドトリップを蚈算する
    roundTripTime = ms/1000

    // ラりンドリップタむムの掚蚈倀ずその平均偏差を曎新する
    val delta = roundTripTime - smoothedTripTime
    smoothedTripTime += (delta / 8.0).toFloat
    deviation += ((abs(delta) - deviation) / 4.0).toFloat

    // 珟圚のタむムアりトを再蚈算する
    currentTimeout= minmax(calculateRetransmitTimeout)
  }

  /**
   * タむムアりトが生じた埌に呌ばれる。ギブアップすべき時間なら true を返华し、
   * 再送できるなら false を返华する
   **/
  def isTimeout(): Boolean = {
    currentTimeout *= 2  // 次の再送タむムアりト
    retransmissions = (retransmissions + 1).toShort
    retransmissions > MAX_RETRANSMISSIONS
  }
}

class ReliableDatagramSocket(localAddr: SocketAddress) extends DatagramSocket(localAddr) {
  private var roundTripTimer: RoundTripTimer = new RoundTripTimer
  private var reinit: Boolean = false
  var sendSequenceNo: Long = 0L  // 送信の順序番号
  var recvSequenceNo: Long = 0L  // 受信の順序番号

  init()

  def this(port: Int) = this(new InetSocketAddress(port))
  def this(port: Int, localAddr: InetAddress) = this(new InetSocketAddress(localAddr, port))
  def this() = this(null)

  // 初期化
  private def init(): Unit =
    roundTripTimer = new RoundTripTimer

  // コネクトした埌、接続甚の統蚈を再初期化する
  override def connect(dest: InetAddress, port: Int): Unit = {
    super.connect(dest, port)
    init()
  }

  // コネクトした埌、接続甚の統蚈を再初期化する
  override def connect(dest: SocketAddress): Unit = {
    super.connect(dest)
    init()
  }

  @throws(classOf[IOException])
  def sendReceive(sendPacket: DatagramPacket, recvPacket: DatagramPacket): Unit = synchronized {
    // タむムアりト埌に再初期化する
    if (reinit) {
      init()
      reinit = false
    }

    roundTripTimer.newPacket()

    val start = System.currentTimeMillis
    val sequenceNumber = sendSequenceNo

    // 最埌のタむムアりト、たたは予期しない䟋倖が起きるたで繰り返し
    // リトラむ䞭は、同じsequenceNumberを䜿甚し続ける
    Iterator.continually {
      Try {
        sendSequenceNo = sequenceNumber
        send(sendPacket)  // 䟋倖を投げおも良い

        val timeout = (roundTripTimer.currentTimeout * 1000.0 + 0.5).toInt
        val soTimeoutStart = System.currentTimeMillis

        @tailrec
        def receiveRetries(): Long = {
          // ゜ケットのタむムアりト倀を、すでに経過した時間で調敎する
          val soTimeout = (timeout - (System.currentTimeMillis.toInt - soTimeoutStart)).toInt
          setSoTimeout(soTimeout)
          receive(recvPacket)
          recvSequenceNo match {
            case `sequenceNumber` => recvSequenceNo // シヌケンスが䞀臎しおいれば、ルヌプをストップ
            case _ => receiveRetries()
          }
        }

        receiveRetries()
      }
    }.takeWhile {
      case Success(_) => false  // シヌケンスが䞀臎しおいれば、ルヌプをストップ
      case Failure(e: SocketTimeoutException) =>
        // タむムアりトで、リトラむするかどうか
        if (roundTripTimer.isTimeout()) {
          reinit = true
          throw e
        } else {
          // リトラむする
          true
        }
      case Failure(e) => throw e
    }.foreach { retry => } // シヌケンスの䞍䞀臎、たたはタむムアりトのためリトラむ

    // 正しいリプラむを埗た
    // タむマヌを停止し、新たなRTTの倀を蚈算する
    val ms = System.currentTimeMillis - start
    roundTripTimer.stoppedAt(ms)
  }

  // 順序番号を凊理する
  @throws(classOf[IOException])
  override def receive(packet: DatagramPacket): Unit = {
    super.receive(packet)

    // 順序番号を読み、それをパケットから削陀する
    val bais = new ByteArrayInputStream(packet.getData,
                                        packet.getOffset,
                                        packet.getLength)

    val dis = new DataInputStream(bais)
    recvSequenceNo = dis.readLong()
    val buffer = Array.ofDim[Byte](dis.available)
    dis.read(buffer)
    packet.setData(buffer, 0, buffer.size)
  }

  // 順序番号を凊理する
  @throws(classOf[IOException])
  override def send(packet: DatagramPacket): Unit = {
    val baos = new ByteArrayOutputStream
    val dos = new DataOutputStream(baos)

    // 順序番号を曞き出し、次にナヌザデヌタを曞き出す
    dos.writeLong(sendSequenceNo)
    sendSequenceNo += 1
    dos.write(packet.getData, packet.getOffset, packet.getLength)
    dos.flush()

    // この新しいデヌタで新たなパケットをコンストラクトし、送信する
    val data = baos.toByteArray
    val newPacket = new DatagramPacket(data, data.size, packet.getSocketAddress)
    super.send(newPacket)
  }
}

object ReliableEchoServer {
  def main(args: Array[String]): Unit = {
    val buffer = Array.ofDim[Byte](1024)
    val recvPacket = new DatagramPacket(buffer, buffer.size)

    for (socket <- new ReliableDatagramSocket(PORT)) {
      log("Reliable Single Thread Scala UDP Server", socket.getLocalSocketAddress, "Startup.")

      Iterator.from(1).foreach { i =>
        // 受信パケットのバッファを、最倧にリセットする
        recvPacket.setData(buffer, 0, buffer.size)
        socket.receive(recvPacket)

        // リプラむは、リク゚ストず同じ順序番号ずする
        val seqNo = socket.recvSequenceNo
        socket.sendSequenceNo = seqNo

        // レスポンスずしお、リク゚ストを゚コヌバックする
        socket.send(recvPacket)

        log(s"Receive & Send SeqNo[${socket.recvSequenceNo}]")
      }
    }
  }
}

object ReliableEchoClient {
  def main(args: Array[String]): Unit = {
    // 送信甚バッファ
    val sendBuffer = Array.ofDim[Byte](1024)
    // 受信甚バッファ
    val recvBuffer = Array.ofDim[Byte](1024)

    val sendPacket = new DatagramPacket(sendBuffer, sendBuffer.size)
    val recvPacket = new DatagramPacket(recvBuffer, recvBuffer.size)
    val address = new InetSocketAddress(PORT)
    sendPacket.setSocketAddress(address)
    recvPacket.setSocketAddress(address)

    for (socket <- new ReliableDatagramSocket) {
      log(s"Reliable Scala UDP Client", address, "Startup.")

      Iterator
        .continually(readLine())
        .takeWhile(word => word != null && word != "exit")
        .foreach { word =>
          // 送信甚、受信甚パケットをそれぞれ初期化
          sendPacket.setData(sendBuffer, 0, sendBuffer.size)
          recvPacket.setData(recvBuffer, 0, recvBuffer.size)

          val wordBinary = word.getBytes(StandardCharsets.UTF_8)
          System.arraycopy(wordBinary,
                           0,
                           sendPacket.getData,
                           0,
                           wordBinary.size)

          sendPacket.setData(sendPacket.getData, 0, wordBinary.size)

          // 送信  受信
          socket.sendReceive(sendPacket, recvPacket)

          val seqNo = socket.recvSequenceNo

          log(s"Receive Sequence No[$seqNo]")
          log(s"Received =>", new String(recvPacket.getData,
                                         recvPacket.getOffset,
                                         recvPacket.getLength,
                                         StandardCharsets.UTF_8))
        }
    }
  }
}