CLOVERšŸ€

That was when it all began.

Scala恧ę›øćć€é«˜äæ”頼型ļ¼ˆReliableļ¼‰UDP Echoć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆļ¼ć‚µćƒ¼ćƒ

ć“ć‚Œć¾ć§ć€Clojure悒äø­åæƒć«UDP悒ä½æć£ćŸćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ć‚’ć—ć¦ć„ć¾ć—ćŸćŒć€ä»Šå›žćÆć”ć‚‡ć£ćØScala恮ćæć«ć—ć¾ć™ć€‚

ćŠé”ŒćÆć€é«˜äæ”頼型ļ¼ˆReliableļ¼‰UDP怂

UDPćÆć€ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ćŒę­£ć—ć„é †åŗć§åˆ°ē€ć™ć‚‹ć“ćØ悒äæčØ¼ć—ć¾ć›ć‚“ć—ć€ćƒ‘ć‚±ćƒƒćƒˆćƒ­ć‚¹ć‚‚ę¤œå‡ŗć§ćć¾ć›ć‚“ć€‚ć“ć‚Œć‚’å…‹ęœć™ć‚‹ćŸć‚ć«ć€

  • ć™ć¹ć¦ć®ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ć«ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’ä»˜ć‘ć‚‹
  • 送äæ”偓ćÆ态ļ¼ˆå—äæ”å“ć‹ć‚‰ć®ļ¼‰ē¢ŗčŖć«ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’čØ­ć‘ć€ē¢ŗčŖćŒę™‚é–“ä»„å†…ć«ę„ćŖć‹ć£ćŸć‚‰ćƒŖć‚Æć‚Øć‚¹ćƒˆć‚’å†é€ć™ć‚‹

ćØ恄恆恓ćØ恧äæ”é ¼ę€§ć‚’ä»˜äøŽć—ć¾ć™ć€‚

ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆćØå†é€ć«ć¤ć„ć¦ćÆć€å˜ć«äø€å®šć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆę™‚é–“ć‚’ä½æć†ć‚„ć‚Šę–¹ć§ćÆćŖ恏态惍惃惈ćƒÆćƒ¼ć‚Æ恮ꀧč³Ŗ悄ēŠ¶ę…‹ć€č² č·ćŖć©ć‚’č€ƒę…®ć™ć‚‹åæ…č¦ćŒć‚ć‚Šć¾ć™ć€‚ć“ć“ć§ć€TCP恫ēæ’恄

  • ćƒ‘ć‚±ćƒƒćƒˆćŒé€äæ”č€…ć‹ć‚‰å—äæ”č€…ć«åˆ°ē€ć—ć¦å†ć³ęˆ»ć‚‹ć¾ć§ć®å¾€å¾©ę™‚é–“ļ¼ˆRound-Trip Timeļ¼šćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æ悤惠ļ¼‰ć®ē¾ę™‚ē‚¹ć®ęŽØčØˆå€¤ć‚’ēµ±č؈ēš„恫걂悁态ē¶­ęŒć™ć‚‹ć€‚恓恮ęŽØčØˆå€¤ć‚’å…ƒć«ć€ćƒ‘ć‚±ćƒƒćƒˆć‚’ćƒćƒƒćƒˆćƒÆćƒ¼ć‚Æ恫送悊å‡ŗć™ć‚‚ć®ć®ć€ćć‚ŒćŒé€Ÿć™ćŽć¦ćƒćƒƒćƒˆćƒÆćƒ¼ć‚Æ悒ē‹¬å ēš„ć«å……ęŗ€ć—ćŖć„ć‚ˆć†ć«ć™ć‚‹
  • ꬔ恮ćƒŖćƒˆćƒ©ć‚¤ć¾ć§ć®å¾…ć”ę™‚é–“ć‚’č؈ē®—ć™ć‚‹ę™‚ć€ć€ŒęŒ‡ę•°ēš„ćƒćƒƒć‚Æć‚Ŗ惕怍悒ä½æ恆恮恧态ć‚Øćƒ©ćƒ¼ćŒē¹°ć‚Šę›æ恈恕悌悋åŗ¦ć«ę¬”ć®å†é€ć¾ć§ć®ę™‚é–“ćŒęŒ‡ę•°ēš„恫ä¼øć³ć¦ć„ćć€‚ć™ć‚‹ćØ态惍惃惈ćƒÆćƒ¼ć‚ÆäøŠć®ćƒ‘ć‚±ćƒƒćƒˆę•°ćŒć ć‚“ć ć‚“ęø›ć£ć¦ć„ćć®ć§ć€ę¬”ć®å†é€ćŒć†ć¾ćć„ćåÆčƒ½ę€§ćŒé«˜ććŖ悋

ćØ恄恆恓ćØć‚’č”Œć„ć¾ć™ć€‚ć“ć“ć§ć€ęŒ‡ę•°ēš„ćƒćƒƒć‚Æć‚Ŗ惕ćØ恄恆恮ćÆć€ä¾‹ćˆć°3åŗ¦ē›®ć®å†é€ć¾ć§ć®ę™‚é–“ćÆ态åŸŗęŗ–値n恮3å€ć§ćÆćŖ恏态2恮3乗倍ćØ恙悋悈恆ćŖ恓ćØ恧恙怂TCPć®č¼»č¼³å›žéæć®ćƒ†ć‚Æ惋惃ć‚ÆćØ怂

ćØć¾ć‚ć€ćŸć„ćć†ćŖ恓ćØ悒ę›øć„ć¦ć„ć¾ć™ćŒć“ć”ć‚‰ć®ę›øē±ć®å—ć‘å£²ć‚Šć§ć™ć€‚

Java惍惃惈ćƒÆćƒ¼ć‚Æćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ć®ēœŸé«„

Java惍惃惈ćƒÆćƒ¼ć‚Æćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ć®ēœŸé«„

恓恔悉恮ę›øē±ć«ć€ReliablećŖUDP Echoć‚µćƒ¼ćƒć®ä¾‹ćŒć‚ć£ćŸć®ć§ć€ćć‚Œć‚’Scala恧ę›ø恍ē›“ć—ć¾ć—ćŸć€‚ęœ¬ę„ćÆClojureć§ć‚„ć£ćŸę–¹ćŒā€¦ćŖć®ć§ć™ćŒć€ä»Šć®ClojureåŠ›ć ćØå®Ÿč£…ćØćƒ‡ćƒćƒƒć‚°ćŒå›°é›£ć«ćŖć‚‹ę°—ćŒć™ć‚‹ć®ć§ć€ć“ć“ćÆć„ć£ćŸć‚“Scala恧怂

恔ćŖćæ恫态ę›øē±ć®ć‚µćƒ³ćƒ—ćƒ«ć«ćÆ꘎悉恋ćŖčŖ¤ć‚ŠćŒć‚ć£ć¦ć€ćć”悉ćÆć‚µćƒćƒ¼ćƒˆćƒšćƒ¼ć‚ø悒見恦äæ®ę­£ć—ć¾ć—ćŸć€‚

Java惍惃惈ćƒÆćƒ¼ć‚Æćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ć®ēœŸé«„--follow-up
http://homepage1.nifty.com/algafield/jnet/

ćć—ć¦ć€ę›øē±ć«ćÆč¼‰ć£ć¦ć„ćŖć‹ć£ćŸReliablećŖć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆć‚‚čæ½åŠ ć—ć¦ć„ć¾ć™ć€‚

ä»Šå›žć®ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć§ęœ€ć‚‚é‡č¦ćŖ恮ćÆ态ReliableDatagramSocketćØ恄恆ć‚Æćƒ©ć‚¹ć§ć€ä»„äø‹ć®ę©Ÿčƒ½ć‚’å®Ÿč£…ć—ćŸć‚‚ć®ć§ć™ć€‚

  • 送äæ”ć•ć‚Œć‚‹ćƒ‘ć‚±ćƒƒćƒˆć«ćƒ¦ćƒ‹ćƒ¼ć‚ÆćŖć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’ä»˜ć‘ć€å—äæ”ć—ćŸćƒ‘ć‚±ćƒƒćƒˆć‹ć‚‰ćć®ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ć‚’å–ć‚Šå‡ŗć™ć€‚ćć®ę“ä½œćÆć€å‘¼ć³å‡ŗć—å…ƒć«ćÆč¦‹ćˆćŖ恄ćØć“ć‚ć§č”Œć‚ć‚Œć‚‹
  • å…„åŠ›ć‹ć‚‰ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’å–ć‚Šå‡ŗć™ćƒ”ć‚½ćƒƒćƒ‰ćØ态恝悌悒å‡ŗåŠ›ć«ć‚»ćƒƒćƒˆć™ć‚‹ćƒ”ć‚½ćƒƒćƒ‰ć‚’ć€ć‚µćƒ¼ćƒćŒćƒŖćƒ—ćƒ©ć‚¤ć‚’ē”Øę„ć™ć‚‹ę™‚ć«ä½æ恆
  • ć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆćŒä½æ恆态sendReceivećƒ”ć‚½ćƒƒćƒ‰ć‚’ä½œęˆć™ć‚‹ć€‚ć“ć‚ŒćÆ态送äæ”ē”Øć®ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ćØ受äæ”ē”Øć®ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ć‚’å¼•ę•°ć«å–ć‚Šć€é€äæ”ć®ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ć‚’é€ć‚Šå‡ŗć—ć¦ć‚µćƒ¼ćƒć®ćƒŖćƒ—ćƒ©ć‚¤ć‚’å¾…ć¤ć€‚ćć®å¾Œć€ęŒ‡å®šę™‚é–“ć¾ć§ć«ćƒ¬ć‚¹ćƒćƒ³ć‚¹ćŒćŖ恑悌恰态間隔悒čŖæꕓ恗ćŖ恌悉ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆćØå†é€ć‚’ęœ€å¤§ćƒŖćƒˆćƒ©ć‚¤ć‚«ć‚¦ćƒ³ćƒˆć¾ć§ē¹°ć‚Ščæ”ć™ć€‚ęœ€å¤§ć‚«ć‚¦ćƒ³ćƒˆć‚’č¶…ćˆć‚‹ćØ态SocketTimeoutException悒ꊕ恒悋

ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆę™‚ć®ä¾‹å¤–ćÆ态SocketTimeoutExceptionć«ć—ć¾ć—ćŸć€‚ć•ć™ćŒć«ć€JDK 1.4ć‚ˆć‚Šå‰ćÆ悂恆恄恄恧恗悇恆ā€¦ć€‚

恧ćÆ态ę›øć„ć¦ć„ćć¾ć™ć€‚ć»ć¼å†™ēµŒćŖ恮恧态Scalać£ć½ććŖ恄ćØ恓悍ćÆć”ę„›å¬Œć€‚ć¾ćšćÆimportꖇ怂

import scala.annotation.tailrec
import scala.math.{abs, max, min}
import scala.util.{Failure, Success, Try}

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException}
import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date

import Reliabilities._

ReliabilitiesćØ恄恆恮ćÆć€ć“ć‚Œć‹ć‚‰č¼‰ć›ć‚‹å®šę•°ćØć‹ć‚’å®šē¾©ć—ćŸć‚Ŗ惖ć‚ø悧ć‚Æ惈恧恙怂

ć§ć€ćć®å®šę•°ćØć‹ć‚’å®šē¾©ć—ćŸReliabilitiesć‚Ŗ惖ć‚ø悧ć‚Æ惈怂

// ć“ć®ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć§ć€ę™‚é–“ć®å˜ä½ćÆć™ć¹ć¦ć€Œē§’ć€

object Reliabilities {
  // ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć®ęœ€å°ćƒ»ęœ€å¤§å€¤
  val MIN_RETRANSMIT_TIMEOUT: Int = 1
  val MAX_RETRANSMIT_TIMEOUT: Int = 64
  // ć²ćØć¤ć®ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ć®ęœ€å¤§å†é€å›žę•°ć€3恋4恏悉恄
  val MAX_RETRANSMISSIONS: Int = 4

  // 通äæ”å…ˆćƒćƒ¼ćƒˆ
  val PORT: Int = 50000

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")

  // AutoCloseable悒forå¼ć§ä½æćˆć‚‹ć‚ˆć†ć«ć™ć‚‹ćŸć‚ć®ć€Implicit Class
  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

ć‚³ćƒ”ćƒ³ćƒˆć«ć‚‚ć‚ć‚Šć¾ć™ćŒć€ä»Šå›žē™»å “ć™ć‚‹ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć«ä½æć‚ć‚Œć‚‹ę™‚é–“ć®å˜ä½ćÆ态å…Øć¦ć€Œē§’ć€ć§ć™ć€‚

ē¶šć„ć¦ć€ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æ悤惠悒č؈ē®—恙悋ć‚Æćƒ©ć‚¹ć€‚

class RoundTripTimer {
  // ꜀悂꜀čæ‘ć®ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æ悤惠ļ¼ˆRTTļ¼‰
  private var roundTripTime: Float = 0.0F
  // å¹³ę»‘åŒ–ć—ćŸRTT
  private var smoothedTripTime: Float = 0.0F
  // å¹³ę»‘åŒ–ć—ćŸęؙęŗ–偏差
  private var deviation: Float = 0.75F
  // å†é€ć‚«ć‚¦ćƒ³ćƒˆļ¼š0, 1, 2
  private var retransmissions: Short = 0
  // ē¾åœØć®å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆ
  var currentTimeout = minmax(calculateRetransmitTimeout)

  // 再送ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’čæ”恙
  private def calculateRetransmitTimeout: Int =
    (smoothedTripTime + 4.0 * deviation).toInt

  // äøŠé™ć®ć‚ć‚‹å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’čæ”恙
  private def minmax(rto: Float): Float =
    min(max(rto, MIN_RETRANSMIT_TIMEOUT), MAX_RETRANSMIT_TIMEOUT)

  // ꖰ恟ćŖćƒ‘ć‚±ćƒƒćƒˆć‚’é€äæ”恙悋åŗ¦ć«ć€ē¾åœØć®å†é€ć‚«ć‚¦ćƒ³ćƒˆć‚’åˆęœŸåŒ–ć™ć‚‹
  def newPacket(): Unit =
    retransmissions = 0

  /**
   * ęˆåŠŸć—ćŸå—äæ”恮ē›“å¾Œć«å‘¼ć°ć‚Œć€ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æ悤惠悒č؈ē®—恗态
   * ę¬”ć«å¹³ę»‘åŒ–ć—ćŸćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ćØćć®åˆ†ę•£ļ¼ˆåå·®ļ¼‰ć‚’č؈ē®—恙悋
   **/
  def stoppedAt(ms: Long): Unit = {
    // ć“ć®ćƒ‘ć‚±ćƒƒćƒˆć®ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗悒č؈ē®—恙悋
    roundTripTime = ms/1000

    // ćƒ©ć‚¦ćƒ³ćƒ‰ćƒŖ惃惗ć‚æć‚¤ćƒ ć®ęŽØč؈値ćØćć®å¹³å‡åå·®ć‚’ę›“ę–°ć™ć‚‹
    val delta = roundTripTime - smoothedTripTime
    smoothedTripTime += (delta / 8.0).toFloat
    deviation += ((abs(delta) - deviation) / 4.0).toFloat

    // ē¾åœØ恮ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’å†č؈ē®—恙悋
    currentTimeout= minmax(calculateRetransmitTimeout)
  }

  /**
   * ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆćŒē”Ÿć˜ćŸå¾Œć«å‘¼ć°ć‚Œć‚‹ć€‚ć‚®ćƒ–ć‚¢ćƒƒćƒ—ć™ć¹ćę™‚é–“ćŖ悉 true 悒čæ”卓恗态
   * å†é€ć§ćć‚‹ćŖ悉 false 悒čæ”卓恙悋
   **/
  def isTimeout(): Boolean = {
    currentTimeout *= 2  // ę¬”ć®å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆ
    retransmissions = (retransmissions + 1).toShort
    retransmissions > MAX_RETRANSMISSIONS
  }
}

ć“ć®å¾Œē™»å “恙悋ReliableDatagramPacket#sendReceivećƒ”ć‚½ćƒƒćƒ‰ć‚’ä½æć£ćŸćƒ‘ć‚±ćƒƒćƒˆé€äæ”Ꙃ恫态恓恮ć‚Æćƒ©ć‚¹ć®newPacketćƒ”ć‚½ćƒƒćƒ‰ć‚’å‘¼ć³å‡ŗ恗态ćƒŖćƒˆćƒ©ć‚¤å›žę•°ć‚’ćƒŖć‚»ćƒƒćƒˆć—ć¾ć™ć€‚ä½æē”Øå…ƒć§ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’ę¤œå‡ŗ恗恟ꙂćÆ态isTimeoutćƒ”ć‚½ćƒƒćƒ‰ć‚’å‘¼ć³å‡ŗ恗ćƒŖćƒˆćƒ©ć‚¤ć®äøŠé™ć‚’ē¢ŗčŖć™ć‚‹ćØå…±ć«ć€ę¬”å›žć®å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’ä¼øć°ć—ć¦ć„ćć¾ć™ć€‚ę­£åøø恫通äæ”ćŒć§ććŸå “åˆćÆ态stoppedAtćƒ”ć‚½ćƒƒćƒ‰ć‚’å‘¼ć³å‡ŗć—ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æć‚¤ćƒ ć®ęŽØčØˆå€¤ć‚’ę±‚ć‚ć€ćć®å¹³å‡åå·®ć‚’ę›“ę–°ć—ć¾ć™ć€‚

ćć—ć¦ć€ReliableDatagramSocketć‚Æćƒ©ć‚¹ć€‚

class ReliableDatagramSocket(localAddr: SocketAddress) extends DatagramSocket(localAddr) {
  private var roundTripTimer: RoundTripTimer = new RoundTripTimer
  private var reinit: Boolean = false
  var sendSequenceNo: Long = 0L  // 送äæ”恮順åŗē•Ŗ号
  var recvSequenceNo: Long = 0L  // 受äæ”恮順åŗē•Ŗ号

  init()

  def this(port: Int) = this(new InetSocketAddress(port))
  def this(port: Int, localAddr: InetAddress) = this(new InetSocketAddress(localAddr, port))
  def this() = this(null)

  // åˆęœŸåŒ–
  private def init(): Unit =
    roundTripTimer = new RoundTripTimer

  // ć‚³ćƒć‚Æćƒˆć—ćŸå¾Œć€ęŽ„ē¶šē”Ø恮ēµ±čØˆć‚’ļ¼ˆå†ļ¼‰åˆęœŸåŒ–恙悋
  override def connect(dest: InetAddress, port: Int): Unit = {
    super.connect(dest, port)
    init()
  }

  // ć‚³ćƒć‚Æćƒˆć—ćŸå¾Œć€ęŽ„ē¶šē”Ø恮ēµ±čØˆć‚’ļ¼ˆå†ļ¼‰åˆęœŸåŒ–恙悋
  override def connect(dest: SocketAddress): Unit = {
    super.connect(dest)
    init()
  }

  @throws(classOf[IOException])
  def sendReceive(sendPacket: DatagramPacket, recvPacket: DatagramPacket): Unit = synchronized {
    // ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆå¾Œć«å†åˆęœŸåŒ–ć™ć‚‹
    if (reinit) {
      init()
      reinit = false
    }

    roundTripTimer.newPacket()

    val start = System.currentTimeMillis
    val sequenceNumber = sendSequenceNo

    // ęœ€å¾Œć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć€ć¾ćŸćÆäŗˆęœŸć—ćŖć„ä¾‹å¤–ćŒčµ·ćć‚‹ć¾ć§ē¹°ć‚Ščæ”恗
    // ćƒŖćƒˆćƒ©ć‚¤äø­ćÆć€åŒć˜sequenceNumber悒ä½æē”Ø恗ē¶šć‘ć‚‹
    Iterator.continually {
      Try {
        sendSequenceNo = sequenceNumber
        send(sendPacket)  // ä¾‹å¤–ć‚’ęŠ•ć’ć¦ć‚‚č‰Æ恄

        val timeout = (roundTripTimer.currentTimeout * 1000.0 + 0.5).toInt
        val soTimeoutStart = System.currentTimeMillis

        @tailrec
        def receiveRetries(): Long = {
          // ć‚½ć‚±ćƒƒćƒˆć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆå€¤ć‚’ć€ć™ć§ć«ēµŒéŽć—ćŸę™‚é–“ć§čŖæꕓ恙悋
          val soTimeout = (timeout - (System.currentTimeMillis.toInt - soTimeoutStart)).toInt
          setSoTimeout(soTimeout)
          receive(recvPacket)
          recvSequenceNo match {
            case `sequenceNumber` => recvSequenceNo // ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ćŒäø€č‡“ć—ć¦ć„ć‚Œć°ć€ćƒ«ćƒ¼ćƒ—ć‚’ć‚¹ćƒˆćƒƒćƒ—
            case _ => receiveRetries()
          }
        }

        receiveRetries()
      }
    }.takeWhile {
      case Success(_) => false  // ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ćŒäø€č‡“ć—ć¦ć„ć‚Œć°ć€ćƒ«ćƒ¼ćƒ—ć‚’ć‚¹ćƒˆćƒƒćƒ—
      case Failure(e: SocketTimeoutException) =>
        // ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć§ć€ćƒŖćƒˆćƒ©ć‚¤ć™ć‚‹ć‹ć©ć†ć‹
        if (roundTripTimer.isTimeout()) {
          reinit = true
          throw e
        } else {
          // ćƒŖćƒˆćƒ©ć‚¤ć™ć‚‹
          true
        }
      case Failure(e) => throw e
    }.foreach { retry => } // ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ć®äøäø€č‡“ć€ć¾ćŸćÆć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć®ćŸć‚ćƒŖćƒˆćƒ©ć‚¤

    // ę­£ć—ć„ćƒŖćƒ—ćƒ©ć‚¤ć‚’å¾—ćŸ
    // ć‚æć‚¤ćƒžćƒ¼ć‚’åœę­¢ć—ć€ę–°ćŸćŖRTTć®å€¤ć‚’č؈ē®—恙悋
    val ms = System.currentTimeMillis - start
    roundTripTimer.stoppedAt(ms)
  }

  // 順åŗē•Ŗå·ć‚’å‡¦ē†ć™ć‚‹
  @throws(classOf[IOException])
  override def receive(packet: DatagramPacket): Unit = {
    super.receive(packet)

    // 順åŗē•Ŗå·ć‚’čŖ­ćæć€ćć‚Œć‚’ćƒ‘ć‚±ćƒƒćƒˆć‹ć‚‰å‰Šé™¤ć™ć‚‹
    val bais = new ByteArrayInputStream(packet.getData,
                                        packet.getOffset,
                                        packet.getLength)

    val dis = new DataInputStream(bais)
    recvSequenceNo = dis.readLong()
    val buffer = Array.ofDim[Byte](dis.available)
    dis.read(buffer)
    packet.setData(buffer, 0, buffer.size)
  }

  // 順åŗē•Ŗå·ć‚’å‡¦ē†ć™ć‚‹
  @throws(classOf[IOException])
  override def send(packet: DatagramPacket): Unit = {
    val baos = new ByteArrayOutputStream
    val dos = new DataOutputStream(baos)

    // 順åŗē•Ŗå·ć‚’ę›ø恍å‡ŗć—ć€ę¬”ć«ćƒ¦ćƒ¼ć‚¶ćƒ‡ćƒ¼ć‚æ悒ę›ø恍å‡ŗ恙
    dos.writeLong(sendSequenceNo)
    sendSequenceNo += 1
    dos.write(packet.getData, packet.getOffset, packet.getLength)
    dos.flush()

    // ć“ć®ę–°ć—ć„ćƒ‡ćƒ¼ć‚æ恧ꖰ恟ćŖćƒ‘ć‚±ćƒƒćƒˆć‚’ć‚³ćƒ³ć‚¹ćƒˆćƒ©ć‚Æ惈恗态送äæ”恙悋
    val data = baos.toByteArray
    val newPacket = new DatagramPacket(data, data.size, packet.getSocketAddress)
    super.send(newPacket)
  }
}

恓恮ć‚Æćƒ©ć‚¹ćÆć€å…ˆć»ć©ä½œęˆć—ćŸRoundTripTimerć‚Æćƒ©ć‚¹ć®ć‚¤ćƒ³ć‚¹ć‚æćƒ³ć‚¹ć‚’äæęŒć—ć¾ć™ć€‚ē¶™ę‰æå…ƒć®DatagramSocketć‚Æćƒ©ć‚¹ć®ć„ćć¤ć‹ć®ćƒ”ć‚½ćƒƒćƒ‰ć‚’ć‚Ŗćƒ¼ćƒćƒ¼ćƒ©ć‚¤ćƒ‰ć—ć¦ć„ć¦ć€receivećƒ”ć‚½ćƒƒćƒ‰ć§ćÆå—ć‘å–ć£ćŸćƒ¬ć‚¹ćƒćƒ³ć‚¹ć‹ć‚‰ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’å…ˆć«čŖ­ćæå‡ŗć—ć€å‘¼ć³å‡ŗć—å…ƒćŒä½æ恆DatagramPacketć®ć‚¤ćƒ³ć‚¹ć‚æćƒ³ć‚¹ć‹ć‚‰ćÆć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’å–ć‚Šé™¤ć„ć¦ć„ć¾ć™ć€‚sendćƒ”ć‚½ćƒƒćƒ‰ć§ćÆ态送äæ”ćƒ‡ćƒ¼ć‚æć®å‰ć«ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’ęŒæå…„ć—ć€DatagramPacket悒ꖰ恗恏꧋ēÆ‰ć—ć¦ć„ć¾ć™ć€‚

ē‹¬č‡Ŗć®ćƒ”ć‚½ćƒƒćƒ‰ćŒsendReceive恧态恓恔悉ćÆć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆćŒä½æē”Ø恙悋恓ćØć‚’ę„å›³ć—ć¦ć„ć¾ć™ć€‚ęœ€åˆć«RoundTripTimerć‚Æćƒ©ć‚¹ć®newPacketćƒ”ć‚½ćƒƒćƒ‰ć‚’å‘¼ć³å‡ŗ恗态ćƒŖćƒˆćƒ©ć‚¤å›žę•°ć‚’ćƒŖć‚»ćƒƒćƒˆć—ć¾ć™ć€‚

ē¶šć„恦态DatagramPacket悒送äæ”怂

恂ćØćÆ态ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆå€¤ć‚’čŖæę•“ć—ć¤ć¤ć€å—äæ”ć—ćŸćƒ‡ćƒ¼ć‚æć®ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ćŒćŠć‹ć—ć‹ć£ćŸć‚‰å†åŗ¦å—äæ”å¾…ć”ć€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć—ćŸć‚‰ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆå€¤ć‚’čŖæꕓ恗恦ćƒŖćƒˆćƒ©ć‚¤ć€ć‚‚ć—ććÆćƒŖćƒˆćƒ©ć‚¤å›žę•°ć‚’ć‚Ŗćƒ¼ćƒćƒ¼ć—ć¦ć„ć‚Œć°č«¦ć‚ć‚‹ćØć„ć†ę„Ÿć˜ć«ćŖć£ć¦ć„ć¾ć™ć€‚

ę­£åøøćŖć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’å—äæ”ć§ććŸå “åˆćÆć€ćƒ«ćƒ¼ćƒ—ć‚’ć‚¹ćƒˆćƒƒćƒ—ć—ć¦RoundTripTimerć‚Æćƒ©ć‚¹ć®stoppedAtćƒ”ć‚½ćƒƒćƒ‰ć‚’å‘¼ć³å‡ŗć—ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æć‚¤ćƒ ć®ęŽØčØˆå€¤ć‚’ę±‚ć‚ć€ćć®å¹³å‡åå·®ć‚’ę›“ę–°ć—ć¾ć™ć€‚

恔ćŖćæć«ć€å…ƒć®ę›øē±ćŒé–“é•ć£ć¦ć„ćŸć®ćÆ态恓恓恮sendReceivećƒ”ć‚½ćƒƒćƒ‰ćŒ

      // ć‚½ć‚±ćƒƒćƒˆć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’ć™ć§ć«ēµŒéŽć—ćŸę™‚é–“ć§čŖæꕓ恙悋 
 
        int soTimeout = timeout.(int)
  (System.currentTimeMillis().soTimeoutStart);

ćæ恟恄ćŖꄟ恘恧ę›øć‹ć‚Œć¦ć„ćŸć®ć§ć™ćŒć€ę­£ć—ććÆ

        // ć‚½ć‚±ćƒƒćƒˆć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’ć™ć§ć«ēµŒéŽć—ćŸę™‚é–“ć§čŖæꕓ恙悋 
 
        int soTimeout = timeout-(int)
  (System.currentTimeMillis()-soTimeoutStart);

恧恙怂

ćć—ć¦ć€ć“ć®ReliableDatagramSocket悒ä½æē”Ø恙悋Echoć‚µćƒ¼ćƒć€‚

object ReliableEchoServer {
  def main(args: Array[String]): Unit = {
    val buffer = Array.ofDim[Byte](1024)
    val recvPacket = new DatagramPacket(buffer, buffer.size)

    for (socket <- new ReliableDatagramSocket(PORT)) {
      log("Reliable Single Thread Scala UDP Server", socket.getLocalSocketAddress, "Startup.")

      Iterator.from(1).foreach { i =>
        // 受äæ”ćƒ‘ć‚±ćƒƒćƒˆć®ćƒćƒƒćƒ•ć‚”ć‚’ć€ęœ€å¤§ć«ćƒŖć‚»ćƒƒćƒˆć™ć‚‹
        recvPacket.setData(buffer, 0, buffer.size)
        socket.receive(recvPacket)

        // ćƒŖćƒ—ćƒ©ć‚¤ćÆ态ćƒŖć‚Æć‚Øć‚¹ćƒˆćØåŒć˜é †åŗē•Ŗ号ćØ恙悋
        val seqNo = socket.recvSequenceNo
        socket.sendSequenceNo = seqNo

        // ćƒ¬ć‚¹ćƒćƒ³ć‚¹ćØ恗恦态ćƒŖć‚Æć‚Øć‚¹ćƒˆć‚’ć‚Øć‚³ćƒ¼ćƒćƒƒć‚Æ恙悋
        socket.send(recvPacket)

        log(s"Receive & Send SeqNo[${socket.recvSequenceNo}]")
      }
    }
  }
}

ć“ć‚Œć¾ć§ć®ć‚Æćƒ©ć‚¹ć«ęÆ”ć¹ć‚‹ćØć€ć ć„ć¶å°ć•ć„ć§ć™ć­ć€‚ćŖćŠć€ć‚·ćƒ³ć‚°ćƒ«ć‚¹ćƒ¬ćƒƒćƒ‰ć§å‹•ćć“ćØć‚’å‰ęć«ć—ć¦ć„ć¾ć™ć€‚ć“ć“ć§ć®ćƒć‚¤ćƒ³ćƒˆćÆ态ReliableDatagramSocket#receiveć§ćƒ‡ćƒ¼ć‚æć‚’å—äæ”恗恦ę›øćęˆ»ć™éš›ć€ć¤ć¾ć‚Šsendćƒ”ć‚½ćƒƒćƒ‰ć‚’å‘¼ć¶å‰ć«é€äæ”ē”Øć®ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’čØ­å®šć—ć¦ć„ć‚‹ć“ćØ恧恙恭怂

ęœ€å¾ŒćÆ态Echoć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆå“ć€‚

object ReliableEchoClient {
  def main(args: Array[String]): Unit = {
    // 送äæ”ē”Øćƒćƒƒćƒ•ć‚”
    val sendBuffer = Array.ofDim[Byte](1024)
    // 受äæ”ē”Øćƒćƒƒćƒ•ć‚”
    val recvBuffer = Array.ofDim[Byte](1024)

    val sendPacket = new DatagramPacket(sendBuffer, sendBuffer.size)
    val recvPacket = new DatagramPacket(recvBuffer, recvBuffer.size)
    val address = new InetSocketAddress(PORT)
    sendPacket.setSocketAddress(address)
    recvPacket.setSocketAddress(address)

    for (socket <- new ReliableDatagramSocket) {
      log(s"Reliable Scala UDP Client", address, "Startup.")

      Iterator
        .continually(readLine())
        .takeWhile(word => word != null && word != "exit")
        .foreach { word =>
          // 送äæ”ē”Øć€å—äæ”ē”Øćƒ‘ć‚±ćƒƒćƒˆć‚’ćć‚Œćžć‚ŒåˆęœŸåŒ–
          sendPacket.setData(sendBuffer, 0, sendBuffer.size)
          recvPacket.setData(recvBuffer, 0, recvBuffer.size)

          val wordBinary = word.getBytes(StandardCharsets.UTF_8)
          System.arraycopy(wordBinary,
                           0,
                           sendPacket.getData,
                           0,
                           wordBinary.size)

          sendPacket.setData(sendPacket.getData, 0, wordBinary.size)

          // 送äæ” ļ¼† 受äæ”
          socket.sendReceive(sendPacket, recvPacket)

          val seqNo = socket.recvSequenceNo

          log(s"Receive Sequence No[$seqNo]")
          log(s"Received =>", new String(recvPacket.getData,
                                         recvPacket.getOffset,
                                         recvPacket.getLength,
                                         StandardCharsets.UTF_8))
        }
    }
  }
}

ä»Šć¾ć§ćØåŒć˜ć‚ˆć†ć«ć€ć‚³ćƒ³ć‚½ćƒ¼ćƒ«ć‹ć‚‰å…„åŠ›ć•ć‚ŒćŸć‚µćƒ¼ćƒćø送äæ”ć™ć‚‹ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć§ć™ćŒć€ćƒ‡ćƒ¼ć‚æć®é€å—äæ”ćÆReliableDatagramSocket#sendReceive恧äø€ę‹¬ć—ć¦č”Œć£ć¦ć„ć¾ć™ć€‚

å‹•ć‹ć—ć¦ćæ悋ćØ态恓悓ćŖę„Ÿć˜ć§ć™ć­ć€‚ć¾ćšćÆć‚µćƒ¼ćƒčµ·å‹•ć€‚

$ scala ReliableEchoServer
[Sat Sep 21 23:29:24 JST 2013] Reliable Single Thread Scala UDP Server 0.0.0.0/0.0.0.0:50000 Startup.

ć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆčµ·å‹•ć€‚

$ scala ReliableEchoClient
[Sat Sep 21 23:29:47 JST 2013] Reliable Scala UDP Client 0.0.0.0/0.0.0.0:50000 Startup.

恂ćØćÆć€é©å½“ć«ę–‡å­—åˆ—ć‚’å…„åŠ›ć—ć¦ć„ć‚Œć°ć‚µćƒ¼ćƒćŒåæœē­”ć—ć¦ćć‚Œć¾ć™ć€‚

Hello World
[Sat Sep 21 23:30:22 JST 2013] Receive Sequence No[0]
[Sat Sep 21 23:30:22 JST 2013] Received => Hello World
恓悓恫恔ćÆ态äø–ē•Œ
[Sat Sep 21 23:30:26 JST 2013] Receive Sequence No[1]
[Sat Sep 21 23:30:26 JST 2013] Received => 恓悓恫恔ćÆ态äø–ē•Œ
Reliable UDP Echo.
[Sat Sep 21 23:30:35 JST 2013] Receive Sequence No[2]
[Sat Sep 21 23:30:35 JST 2013] Received => Reliable UDP Echo.
exit

ć‚µćƒ¼ćƒå“ć«ćÆ态恓悓ćŖ惭悰恌怂

[Sat Sep 21 23:30:22 JST 2013] Receive & Send SeqNo[0]
[Sat Sep 21 23:30:26 JST 2013] Receive & Send SeqNo[1]
[Sat Sep 21 23:30:35 JST 2013] Receive & Send SeqNo[2]

ć§ć€ć“ć®ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć§ć™ćŒć€ReliableDatagramSocketćŒć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’ęŒć£ć¦ć—ć¾ć£ć¦ć„ć‚‹ć®ć§ć€č¤‡ę•°ć®ęŽ„ē¶šå…ˆć«ćÆåƾåæœć§ćć¾ć›ć‚“ć€‚ć‚ćć¾ć§ć€é€šäæ”ć—ć¦ćć‚‹ć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆćÆć²ćØć¤ć®ęƒ³å®šć§ć™ć€‚ęœ¬ę„ćÆ态ꎄē¶šå…ˆć”ćØć«ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ē•Ŗå·ć‚’ē®”ē†ć™ć¹ćć ćØę›øē±ć«ć‚‚ę›øć„ć¦ć‚ć‚Šć¾ć—ćŸć—ć­ć€‚ć‚µćƒ¼ćƒćƒ—ćƒ­ć‚°ćƒ©ćƒ ćŒć‚·ćƒ³ć‚°ćƒ«ć‚¹ćƒ¬ćƒƒćƒ‰ć«ćŖć£ć¦ć„ć‚‹ć®ćÆ态恓悌恌ē†ē”±ć§ć™ć€‚

ćØćÆ恄恈态TCPć‚½ć‚±ćƒƒćƒˆć®ę§˜ć«ć€ęŽ„ē¶šćŒSocketć‚Æćƒ©ć‚¹ć®ć‚¤ćƒ³ć‚¹ć‚æćƒ³ć‚¹ćæćŸć„ć«č”Ø恛悋悏恑恧悂ćŖć„ć®ć§ć€ćć“ćÆå·„å¤«ćŒåæ…要ćŖćØ恓悍恧恙悈恭怂恂ćØć€ćƒ›ćƒ³ćƒˆćÆćƒ‘ć‚±ćƒƒćƒˆåˆ†å‰²ć‚‚č€ƒę…®ć—ćŖ恄ćØ恧恙悈恭怂

ćć®ć†ć”ć€JGroupsć®ć‚½ćƒ¼ć‚¹ć‚‚čæ½ć£ć¦ćæ悈恆恋ćŖā€¦UDPćØ恋ćÆå°‘ć—ćæć¾ć—ćŸć‘ć©ć€ć“ć®ć‚ćŸć‚Šć®äæ”é ¼ę€§ę‹…äæć®ćØ恓悍ćÆć€ć¾ć č¦‹ć‚Œć¦ć„ć¾ć›ć‚“ć€‚

ę¬”ć®ćƒ†ćƒ¼ćƒžćÆ态UDP恧NIO恧恙怂

ęœ€å¾ŒćÆć€ä»Šå›žę›øć„ćŸć‚½ćƒ¼ć‚¹ć§ć™ć‚ˆć€‚ć”ć‚‡ć£ćØé•·ć‚ć®ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć§ć—ćŸć­ć€‚
ReliableUdpClientServer.scala

import scala.annotation.tailrec
import scala.math.{abs, max, min}
import scala.util.{Failure, Success, Try}

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException}
import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date

import Reliabilities._

// ć“ć®ćƒ—ćƒ­ć‚°ćƒ©ćƒ ć§ć€ę™‚é–“ć®å˜ä½ćÆć™ć¹ć¦ć€Œē§’ć€

object Reliabilities {
  // ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć®ęœ€å°ćƒ»ęœ€å¤§å€¤
  val MIN_RETRANSMIT_TIMEOUT: Int = 1
  val MAX_RETRANSMIT_TIMEOUT: Int = 64
  // ć²ćØć¤ć®ćƒ‡ćƒ¼ć‚æć‚°ćƒ©ćƒ ć®ęœ€å¤§å†é€å›žę•°ć€3恋4恏悉恄
  val MAX_RETRANSMISSIONS: Int = 4

  // 通äæ”å…ˆćƒćƒ¼ćƒˆ
  val PORT: Int = 50000

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")

  // AutoCloseable悒forå¼ć§ä½æćˆć‚‹ć‚ˆć†ć«ć™ć‚‹ćŸć‚ć®ć€Implicit Class
  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

class RoundTripTimer {
  // ꜀悂꜀čæ‘ć®ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æ悤惠ļ¼ˆRTTļ¼‰
  private var roundTripTime: Float = 0.0F
  // å¹³ę»‘åŒ–ć—ćŸRTT
  private var smoothedTripTime: Float = 0.0F
  // å¹³ę»‘åŒ–ć—ćŸęؙęŗ–偏差
  private var deviation: Float = 0.75F
  // å†é€ć‚«ć‚¦ćƒ³ćƒˆļ¼š0, 1, 2
  private var retransmissions: Short = 0
  // ē¾åœØć®å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆ
  var currentTimeout = minmax(calculateRetransmitTimeout)

  // 再送ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’čæ”恙
  private def calculateRetransmitTimeout: Int =
    (smoothedTripTime + 4.0 * deviation).toInt

  // äøŠé™ć®ć‚ć‚‹å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’čæ”恙
  private def minmax(rto: Float): Float =
    min(max(rto, MIN_RETRANSMIT_TIMEOUT), MAX_RETRANSMIT_TIMEOUT)

  // ꖰ恟ćŖćƒ‘ć‚±ćƒƒćƒˆć‚’é€äæ”恙悋åŗ¦ć«ć€ē¾åœØć®å†é€ć‚«ć‚¦ćƒ³ćƒˆć‚’åˆęœŸåŒ–ć™ć‚‹
  def newPacket(): Unit =
    retransmissions = 0

  /**
   * ęˆåŠŸć—ćŸå—äæ”恮ē›“å¾Œć«å‘¼ć°ć‚Œć€ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ć‚æ悤惠悒č؈ē®—恗态
   * ę¬”ć«å¹³ę»‘åŒ–ć—ćŸćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗ćØćć®åˆ†ę•£ļ¼ˆåå·®ļ¼‰ć‚’č؈ē®—恙悋
   **/
  def stoppedAt(ms: Long): Unit = {
    // ć“ć®ćƒ‘ć‚±ćƒƒćƒˆć®ćƒ©ć‚¦ćƒ³ćƒ‰ćƒˆćƒŖ惃惗悒č؈ē®—恙悋
    roundTripTime = ms/1000

    // ćƒ©ć‚¦ćƒ³ćƒ‰ćƒŖ惃惗ć‚æć‚¤ćƒ ć®ęŽØč؈値ćØćć®å¹³å‡åå·®ć‚’ę›“ę–°ć™ć‚‹
    val delta = roundTripTime - smoothedTripTime
    smoothedTripTime += (delta / 8.0).toFloat
    deviation += ((abs(delta) - deviation) / 4.0).toFloat

    // ē¾åœØ恮ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’å†č؈ē®—恙悋
    currentTimeout= minmax(calculateRetransmitTimeout)
  }

  /**
   * ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆćŒē”Ÿć˜ćŸå¾Œć«å‘¼ć°ć‚Œć‚‹ć€‚ć‚®ćƒ–ć‚¢ćƒƒćƒ—ć™ć¹ćę™‚é–“ćŖ悉 true 悒čæ”卓恗态
   * å†é€ć§ćć‚‹ćŖ悉 false 悒čæ”卓恙悋
   **/
  def isTimeout(): Boolean = {
    currentTimeout *= 2  // ę¬”ć®å†é€ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆ
    retransmissions = (retransmissions + 1).toShort
    retransmissions > MAX_RETRANSMISSIONS
  }
}

class ReliableDatagramSocket(localAddr: SocketAddress) extends DatagramSocket(localAddr) {
  private var roundTripTimer: RoundTripTimer = new RoundTripTimer
  private var reinit: Boolean = false
  var sendSequenceNo: Long = 0L  // 送äæ”恮順åŗē•Ŗ号
  var recvSequenceNo: Long = 0L  // 受äæ”恮順åŗē•Ŗ号

  init()

  def this(port: Int) = this(new InetSocketAddress(port))
  def this(port: Int, localAddr: InetAddress) = this(new InetSocketAddress(localAddr, port))
  def this() = this(null)

  // åˆęœŸåŒ–
  private def init(): Unit =
    roundTripTimer = new RoundTripTimer

  // ć‚³ćƒć‚Æćƒˆć—ćŸå¾Œć€ęŽ„ē¶šē”Ø恮ēµ±čØˆć‚’ļ¼ˆå†ļ¼‰åˆęœŸåŒ–恙悋
  override def connect(dest: InetAddress, port: Int): Unit = {
    super.connect(dest, port)
    init()
  }

  // ć‚³ćƒć‚Æćƒˆć—ćŸå¾Œć€ęŽ„ē¶šē”Ø恮ēµ±čØˆć‚’ļ¼ˆå†ļ¼‰åˆęœŸåŒ–恙悋
  override def connect(dest: SocketAddress): Unit = {
    super.connect(dest)
    init()
  }

  @throws(classOf[IOException])
  def sendReceive(sendPacket: DatagramPacket, recvPacket: DatagramPacket): Unit = synchronized {
    // ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆå¾Œć«å†åˆęœŸåŒ–ć™ć‚‹
    if (reinit) {
      init()
      reinit = false
    }

    roundTripTimer.newPacket()

    val start = System.currentTimeMillis
    val sequenceNumber = sendSequenceNo

    // ęœ€å¾Œć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć€ć¾ćŸćÆäŗˆęœŸć—ćŖć„ä¾‹å¤–ćŒčµ·ćć‚‹ć¾ć§ē¹°ć‚Ščæ”恗
    // ćƒŖćƒˆćƒ©ć‚¤äø­ćÆć€åŒć˜sequenceNumber悒ä½æē”Ø恗ē¶šć‘ć‚‹
    Iterator.continually {
      Try {
        sendSequenceNo = sequenceNumber
        send(sendPacket)  // ä¾‹å¤–ć‚’ęŠ•ć’ć¦ć‚‚č‰Æ恄

        val timeout = (roundTripTimer.currentTimeout * 1000.0 + 0.5).toInt
        val soTimeoutStart = System.currentTimeMillis

        @tailrec
        def receiveRetries(): Long = {
          // ć‚½ć‚±ćƒƒćƒˆć®ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆå€¤ć‚’ć€ć™ć§ć«ēµŒéŽć—ćŸę™‚é–“ć§čŖæꕓ恙悋
          val soTimeout = (timeout - (System.currentTimeMillis.toInt - soTimeoutStart)).toInt
          setSoTimeout(soTimeout)
          receive(recvPacket)
          recvSequenceNo match {
            case `sequenceNumber` => recvSequenceNo // ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ćŒäø€č‡“ć—ć¦ć„ć‚Œć°ć€ćƒ«ćƒ¼ćƒ—ć‚’ć‚¹ćƒˆćƒƒćƒ—
            case _ => receiveRetries()
          }
        }

        receiveRetries()
      }
    }.takeWhile {
      case Success(_) => false  // ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ćŒäø€č‡“ć—ć¦ć„ć‚Œć°ć€ćƒ«ćƒ¼ćƒ—ć‚’ć‚¹ćƒˆćƒƒćƒ—
      case Failure(e: SocketTimeoutException) =>
        // ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć§ć€ćƒŖćƒˆćƒ©ć‚¤ć™ć‚‹ć‹ć©ć†ć‹
        if (roundTripTimer.isTimeout()) {
          reinit = true
          throw e
        } else {
          // ćƒŖćƒˆćƒ©ć‚¤ć™ć‚‹
          true
        }
      case Failure(e) => throw e
    }.foreach { retry => } // ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ć®äøäø€č‡“ć€ć¾ćŸćÆć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć®ćŸć‚ćƒŖćƒˆćƒ©ć‚¤

    // ę­£ć—ć„ćƒŖćƒ—ćƒ©ć‚¤ć‚’å¾—ćŸ
    // ć‚æć‚¤ćƒžćƒ¼ć‚’åœę­¢ć—ć€ę–°ćŸćŖRTTć®å€¤ć‚’č؈ē®—恙悋
    val ms = System.currentTimeMillis - start
    roundTripTimer.stoppedAt(ms)
  }

  // 順åŗē•Ŗå·ć‚’å‡¦ē†ć™ć‚‹
  @throws(classOf[IOException])
  override def receive(packet: DatagramPacket): Unit = {
    super.receive(packet)

    // 順åŗē•Ŗå·ć‚’čŖ­ćæć€ćć‚Œć‚’ćƒ‘ć‚±ćƒƒćƒˆć‹ć‚‰å‰Šé™¤ć™ć‚‹
    val bais = new ByteArrayInputStream(packet.getData,
                                        packet.getOffset,
                                        packet.getLength)

    val dis = new DataInputStream(bais)
    recvSequenceNo = dis.readLong()
    val buffer = Array.ofDim[Byte](dis.available)
    dis.read(buffer)
    packet.setData(buffer, 0, buffer.size)
  }

  // 順åŗē•Ŗå·ć‚’å‡¦ē†ć™ć‚‹
  @throws(classOf[IOException])
  override def send(packet: DatagramPacket): Unit = {
    val baos = new ByteArrayOutputStream
    val dos = new DataOutputStream(baos)

    // 順åŗē•Ŗå·ć‚’ę›ø恍å‡ŗć—ć€ę¬”ć«ćƒ¦ćƒ¼ć‚¶ćƒ‡ćƒ¼ć‚æ悒ę›ø恍å‡ŗ恙
    dos.writeLong(sendSequenceNo)
    sendSequenceNo += 1
    dos.write(packet.getData, packet.getOffset, packet.getLength)
    dos.flush()

    // ć“ć®ę–°ć—ć„ćƒ‡ćƒ¼ć‚æ恧ꖰ恟ćŖćƒ‘ć‚±ćƒƒćƒˆć‚’ć‚³ćƒ³ć‚¹ćƒˆćƒ©ć‚Æ惈恗态送äæ”恙悋
    val data = baos.toByteArray
    val newPacket = new DatagramPacket(data, data.size, packet.getSocketAddress)
    super.send(newPacket)
  }
}

object ReliableEchoServer {
  def main(args: Array[String]): Unit = {
    val buffer = Array.ofDim[Byte](1024)
    val recvPacket = new DatagramPacket(buffer, buffer.size)

    for (socket <- new ReliableDatagramSocket(PORT)) {
      log("Reliable Single Thread Scala UDP Server", socket.getLocalSocketAddress, "Startup.")

      Iterator.from(1).foreach { i =>
        // 受äæ”ćƒ‘ć‚±ćƒƒćƒˆć®ćƒćƒƒćƒ•ć‚”ć‚’ć€ęœ€å¤§ć«ćƒŖć‚»ćƒƒćƒˆć™ć‚‹
        recvPacket.setData(buffer, 0, buffer.size)
        socket.receive(recvPacket)

        // ćƒŖćƒ—ćƒ©ć‚¤ćÆ态ćƒŖć‚Æć‚Øć‚¹ćƒˆćØåŒć˜é †åŗē•Ŗ号ćØ恙悋
        val seqNo = socket.recvSequenceNo
        socket.sendSequenceNo = seqNo

        // ćƒ¬ć‚¹ćƒćƒ³ć‚¹ćØ恗恦态ćƒŖć‚Æć‚Øć‚¹ćƒˆć‚’ć‚Øć‚³ćƒ¼ćƒćƒƒć‚Æ恙悋
        socket.send(recvPacket)

        log(s"Receive & Send SeqNo[${socket.recvSequenceNo}]")
      }
    }
  }
}

object ReliableEchoClient {
  def main(args: Array[String]): Unit = {
    // 送äæ”ē”Øćƒćƒƒćƒ•ć‚”
    val sendBuffer = Array.ofDim[Byte](1024)
    // 受äæ”ē”Øćƒćƒƒćƒ•ć‚”
    val recvBuffer = Array.ofDim[Byte](1024)

    val sendPacket = new DatagramPacket(sendBuffer, sendBuffer.size)
    val recvPacket = new DatagramPacket(recvBuffer, recvBuffer.size)
    val address = new InetSocketAddress(PORT)
    sendPacket.setSocketAddress(address)
    recvPacket.setSocketAddress(address)

    for (socket <- new ReliableDatagramSocket) {
      log(s"Reliable Scala UDP Client", address, "Startup.")

      Iterator
        .continually(readLine())
        .takeWhile(word => word != null && word != "exit")
        .foreach { word =>
          // 送äæ”ē”Øć€å—äæ”ē”Øćƒ‘ć‚±ćƒƒćƒˆć‚’ćć‚Œćžć‚ŒåˆęœŸåŒ–
          sendPacket.setData(sendBuffer, 0, sendBuffer.size)
          recvPacket.setData(recvBuffer, 0, recvBuffer.size)

          val wordBinary = word.getBytes(StandardCharsets.UTF_8)
          System.arraycopy(wordBinary,
                           0,
                           sendPacket.getData,
                           0,
                           wordBinary.size)

          sendPacket.setData(sendPacket.getData, 0, wordBinary.size)

          // 送äæ” ļ¼† 受äæ”
          socket.sendReceive(sendPacket, recvPacket)

          val seqNo = socket.recvSequenceNo

          log(s"Receive Sequence No[$seqNo]")
          log(s"Received =>", new String(recvPacket.getData,
                                         recvPacket.getOffset,
                                         recvPacket.getLength,
                                         StandardCharsets.UTF_8))
        }
    }
  }
}