ãããŸã§ãClojureãäžå¿ã«UDPã䜿ã£ãããã°ã©ãã³ã°ãããŠããŸããããä»åã¯ã¡ãã£ãšScalaã®ã¿ã«ããŸãã
ãé¡ã¯ãé«ä¿¡é ŒåïŒReliableïŒUDPã
UDPã¯ãããŒã¿ã°ã©ã ãæ£ããé åºã§å°çããããšãä¿èšŒããŸãããããã±ãããã¹ãæ€åºã§ããŸããããããå æããããã«ã
- ãã¹ãŠã®ããŒã¿ã°ã©ã ã«ã·ãŒã±ã³ã¹çªå·ãä»ãã
- éä¿¡åŽã¯ãïŒåä¿¡åŽããã®ïŒç¢ºèªã«ã¿ã€ã ã¢ãŠããèšãã確èªãæé以å ã«æ¥ãªãã£ãããªã¯ãšã¹ããåéãã
ãšããããšã§ä¿¡é Œæ§ãä»äžããŸãã
ã¿ã€ã ã¢ãŠããšåéã«ã€ããŠã¯ãåã«äžå®ã®ã¿ã€ã ã¢ãŠãæéã䜿ãããæ¹ã§ã¯ãªãããããã¯ãŒã¯ã®æ§è³ªãç¶æ ãè² è·ãªã©ãèæ ®ããå¿ èŠããããŸããããã§ãTCPã«ç¿ã
- ãã±ãããéä¿¡è ããåä¿¡è ã«å°çããŠåã³æ»ããŸã§ã®åŸåŸ©æéïŒRound-Trip TimeïŒã©ãŠã³ãããªããã¿ã€ã ïŒã®çŸæç¹ã®æšèšå€ãçµ±èšçã«æ±ããç¶æããããã®æšèšå€ãå ã«ããã±ããããããã¯ãŒã¯ã«éãåºããã®ã®ããããéãããŠãããã¯ãŒã¯ãç¬å çã«å æºããªãããã«ãã
- 次ã®ãªãã©ã€ãŸã§ã®åŸ ã¡æéãèšç®ããæããææ°çããã¯ãªããã䜿ãã®ã§ããšã©ãŒãç¹°ãæ¿ãããã床ã«æ¬¡ã®åéãŸã§ã®æéãææ°çã«äŒžã³ãŠããããããšããããã¯ãŒã¯äžã®ãã±ããæ°ãã ãã ãæžã£ãŠããã®ã§ã次ã®åéãããŸãããå¯èœæ§ãé«ããªã
ãšããããšãè¡ããŸããããã§ãææ°çããã¯ãªããšããã®ã¯ãäŸãã°3床ç®ã®åéãŸã§ã®æéã¯ãåºæºå€nã®3åã§ã¯ãªãã2ã®3ä¹åãšãããããªããšã§ããTCPã®èŒ»èŒ³åé¿ã®ãã¯ããã¯ãšã
ãšãŸãããããããªããšãæžããŠããŸãããã¡ãã®æžç±ã®åã売ãã§ãã
Javaãããã¯ãŒã¯ããã°ã©ãã³ã°ã®çé«
- äœè : ãšãºã¢ã³ãã»ããã,岩谷å®
- åºç瀟/ã¡ãŒã«ãŒ: ãœãããã³ã¯ ã¯ãªãšã€ãã£ã
- çºå£²æ¥: 2007/04/28
- ã¡ãã£ã¢: 倧åæ¬
- è³Œå ¥: 9人 ã¯ãªãã¯: 100å
- ãã®ååãå«ãããã° (25件) ãèŠã
ãã¡ãã®æžç±ã«ãReliableãªUDP EchoãµãŒãã®äŸããã£ãã®ã§ããããScalaã§æžãçŽããŸãããæ¬æ¥ã¯Clojureã§ãã£ãæ¹ãâŠãªã®ã§ãããä»ã®Clojureåã ãšå®è£ ãšãããã°ãå°é£ã«ãªãæ°ãããã®ã§ãããã¯ãã£ããScalaã§ã
ã¡ãªã¿ã«ãæžç±ã®ãµã³ãã«ã«ã¯æãããªèª€ãããã£ãŠããã¡ãã¯ãµããŒãããŒãžãèŠãŠä¿®æ£ããŸããã
Javaãããã¯ãŒã¯ããã°ã©ãã³ã°ã®çé«--follow-up
http://homepage1.nifty.com/algafield/jnet/
ãããŠãæžç±ã«ã¯èŒã£ãŠããªãã£ãReliableãªã¯ã©ã€ã¢ã³ããè¿œå ããŠããŸãã
ä»åã®ããã°ã©ã ã§æãéèŠãªã®ã¯ãReliableDatagramSocketãšããã¯ã©ã¹ã§ã以äžã®æ©èœãå®è£ ãããã®ã§ãã
- éä¿¡ããããã±ããã«ãŠããŒã¯ãªã·ãŒã±ã³ã¹çªå·ãä»ããåä¿¡ãããã±ãããããã®ã·ãŒã±ã³ã¹ãåãåºãããã®æäœã¯ãåŒã³åºãå ã«ã¯èŠããªããšããã§è¡ããã
- å ¥åããã·ãŒã±ã³ã¹çªå·ãåãåºãã¡ãœãããšããããåºåã«ã»ããããã¡ãœãããããµãŒãããªãã©ã€ãçšæããæã«äœ¿ã
- ã¯ã©ã€ã¢ã³ãã䜿ããsendReceiveã¡ãœãããäœæãããããã¯ãéä¿¡çšã®ããŒã¿ã°ã©ã ãšåä¿¡çšã®ããŒã¿ã°ã©ã ãåŒæ°ã«åããéä¿¡ã®ããŒã¿ã°ã©ã ãéãåºããŠãµãŒãã®ãªãã©ã€ãåŸ ã€ããã®åŸãæå®æéãŸã§ã«ã¬ã¹ãã³ã¹ããªããã°ãééã調æŽããªããã¿ã€ã ã¢ãŠããšåéãæ倧ãªãã©ã€ã«ãŠã³ããŸã§ç¹°ãè¿ããæ倧ã«ãŠã³ããè¶ ãããšãSocketTimeoutExceptionãæãã
ã¿ã€ã ã¢ãŠãæã®äŸå€ã¯ãSocketTimeoutExceptionã«ããŸããããããã«ãJDK 1.4ããåã¯ããããã§ãããâŠã
ã§ã¯ãæžããŠãããŸããã»ãŒåçµãªã®ã§ãScalaã£ãœããªããšããã¯ãæå¬ããŸãã¯importæã
import scala.annotation.tailrec import scala.math.{abs, max, min} import scala.util.{Failure, Success, Try} import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException} import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress, SocketTimeoutException} import java.nio.charset.StandardCharsets import java.util.Date import Reliabilities._
Reliabilitiesãšããã®ã¯ãããããèŒããå®æ°ãšããå®çŸ©ãããªããžã§ã¯ãã§ãã
ã§ããã®å®æ°ãšããå®çŸ©ããReliabilitiesãªããžã§ã¯ãã
// ãã®ããã°ã©ã ã§ãæéã®åäœã¯ãã¹ãŠãç§ã object Reliabilities { // ã¿ã€ã ã¢ãŠãã®æå°ã»æå€§å€ val MIN_RETRANSMIT_TIMEOUT: Int = 1 val MAX_RETRANSMIT_TIMEOUT: Int = 64 // ã²ãšã€ã®ããŒã¿ã°ã©ã ã®æ倧åéåæ°ã3ã4ããã val MAX_RETRANSMISSIONS: Int = 4 // éä¿¡å ããŒã val PORT: Int = 50000 def log(msg: Any, msgs: Any*): Unit = println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}") // AutoCloseableãforåŒã§äœ¿ããããã«ããããã®ãImplicit Class implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal { def foreach(fun: A => Unit): Unit = try { fun(underlying) } finally { underlying.close() } } }
ã³ã¡ã³ãã«ããããŸãããä»åç»å Žããããã°ã©ã ã«äœ¿ãããæéã®åäœã¯ãå šãŠãç§ãã§ãã
ç¶ããŠãã©ãŠã³ãããªããã¿ã€ã ãèšç®ããã¯ã©ã¹ã
class RoundTripTimer { // æãæè¿ã®ã©ãŠã³ãããªããã¿ã€ã ïŒRTTïŒ private var roundTripTime: Float = 0.0F // å¹³æ»åããRTT private var smoothedTripTime: Float = 0.0F // å¹³æ»åããæšæºåå·® private var deviation: Float = 0.75F // åéã«ãŠã³ãïŒ0, 1, 2 private var retransmissions: Short = 0 // çŸåšã®åéã¿ã€ã ã¢ãŠã var currentTimeout = minmax(calculateRetransmitTimeout) // åéã¿ã€ã ã¢ãŠããè¿ã private def calculateRetransmitTimeout: Int = (smoothedTripTime + 4.0 * deviation).toInt // äžéã®ããåéã¿ã€ã ã¢ãŠããè¿ã private def minmax(rto: Float): Float = min(max(rto, MIN_RETRANSMIT_TIMEOUT), MAX_RETRANSMIT_TIMEOUT) // æ°ããªãã±ãããéä¿¡ãã床ã«ãçŸåšã®åéã«ãŠã³ããåæåãã def newPacket(): Unit = retransmissions = 0 /** * æåããåä¿¡ã®çŽåŸã«åŒã°ããã©ãŠã³ãããªããã¿ã€ã ãèšç®ãã * 次ã«å¹³æ»åããã©ãŠã³ãããªãããšãã®åæ£ïŒåå·®ïŒãèšç®ãã **/ def stoppedAt(ms: Long): Unit = { // ãã®ãã±ããã®ã©ãŠã³ãããªãããèšç®ãã roundTripTime = ms/1000 // ã©ãŠã³ããªããã¿ã€ã ã®æšèšå€ãšãã®å¹³ååå·®ãæŽæ°ãã val delta = roundTripTime - smoothedTripTime smoothedTripTime += (delta / 8.0).toFloat deviation += ((abs(delta) - deviation) / 4.0).toFloat // çŸåšã®ã¿ã€ã ã¢ãŠããåèšç®ãã currentTimeout= minmax(calculateRetransmitTimeout) } /** * ã¿ã€ã ã¢ãŠããçããåŸã«åŒã°ãããã®ãã¢ãããã¹ãæéãªã true ãè¿åŽãã * åéã§ãããªã false ãè¿åŽãã **/ def isTimeout(): Boolean = { currentTimeout *= 2 // 次ã®åéã¿ã€ã ã¢ãŠã retransmissions = (retransmissions + 1).toShort retransmissions > MAX_RETRANSMISSIONS } }
ãã®åŸç»å ŽããReliableDatagramPacket#sendReceiveã¡ãœããã䜿ã£ããã±ããéä¿¡æã«ããã®ã¯ã©ã¹ã®newPacketã¡ãœãããåŒã³åºãããªãã©ã€åæ°ããªã»ããããŸãã䜿çšå ã§ã¿ã€ã ã¢ãŠããæ€åºããæã¯ãisTimeoutã¡ãœãããåŒã³åºããªãã©ã€ã®äžéã確èªãããšå ±ã«ã次åã®åéã¿ã€ã ã¢ãŠãã䌞ã°ããŠãããŸããæ£åžžã«éä¿¡ãã§ããå Žåã¯ãstoppedAtã¡ãœãããåŒã³åºãã©ãŠã³ãããªããã¿ã€ã ã®æšèšå€ãæ±ãããã®å¹³ååå·®ãæŽæ°ããŸãã
ãããŠãReliableDatagramSocketã¯ã©ã¹ã
class ReliableDatagramSocket(localAddr: SocketAddress) extends DatagramSocket(localAddr) { private var roundTripTimer: RoundTripTimer = new RoundTripTimer private var reinit: Boolean = false var sendSequenceNo: Long = 0L // éä¿¡ã®é åºçªå· var recvSequenceNo: Long = 0L // åä¿¡ã®é åºçªå· init() def this(port: Int) = this(new InetSocketAddress(port)) def this(port: Int, localAddr: InetAddress) = this(new InetSocketAddress(localAddr, port)) def this() = this(null) // åæå private def init(): Unit = roundTripTimer = new RoundTripTimer // ã³ãã¯ãããåŸãæ¥ç¶çšã®çµ±èšãïŒåïŒåæåãã override def connect(dest: InetAddress, port: Int): Unit = { super.connect(dest, port) init() } // ã³ãã¯ãããåŸãæ¥ç¶çšã®çµ±èšãïŒåïŒåæåãã override def connect(dest: SocketAddress): Unit = { super.connect(dest) init() } @throws(classOf[IOException]) def sendReceive(sendPacket: DatagramPacket, recvPacket: DatagramPacket): Unit = synchronized { // ã¿ã€ã ã¢ãŠãåŸã«ååæåãã if (reinit) { init() reinit = false } roundTripTimer.newPacket() val start = System.currentTimeMillis val sequenceNumber = sendSequenceNo // æåŸã®ã¿ã€ã ã¢ãŠãããŸãã¯äºæããªãäŸå€ãèµ·ãããŸã§ç¹°ãè¿ã // ãªãã©ã€äžã¯ãåãsequenceNumberã䜿çšãç¶ãã Iterator.continually { Try { sendSequenceNo = sequenceNumber send(sendPacket) // äŸå€ãæããŠãè¯ã val timeout = (roundTripTimer.currentTimeout * 1000.0 + 0.5).toInt val soTimeoutStart = System.currentTimeMillis @tailrec def receiveRetries(): Long = { // ãœã±ããã®ã¿ã€ã ã¢ãŠãå€ãããã§ã«çµéããæéã§èª¿æŽãã val soTimeout = (timeout - (System.currentTimeMillis.toInt - soTimeoutStart)).toInt setSoTimeout(soTimeout) receive(recvPacket) recvSequenceNo match { case `sequenceNumber` => recvSequenceNo // ã·ãŒã±ã³ã¹ãäžèŽããŠããã°ãã«ãŒããã¹ããã case _ => receiveRetries() } } receiveRetries() } }.takeWhile { case Success(_) => false // ã·ãŒã±ã³ã¹ãäžèŽããŠããã°ãã«ãŒããã¹ããã case Failure(e: SocketTimeoutException) => // ã¿ã€ã ã¢ãŠãã§ããªãã©ã€ãããã©ãã if (roundTripTimer.isTimeout()) { reinit = true throw e } else { // ãªãã©ã€ãã true } case Failure(e) => throw e }.foreach { retry => } // ã·ãŒã±ã³ã¹ã®äžäžèŽããŸãã¯ã¿ã€ã ã¢ãŠãã®ãããªãã©ã€ // æ£ãããªãã©ã€ãåŸã // ã¿ã€ããŒãåæ¢ããæ°ããªRTTã®å€ãèšç®ãã val ms = System.currentTimeMillis - start roundTripTimer.stoppedAt(ms) } // é åºçªå·ãåŠçãã @throws(classOf[IOException]) override def receive(packet: DatagramPacket): Unit = { super.receive(packet) // é åºçªå·ãèªã¿ãããããã±ããããåé€ãã val bais = new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength) val dis = new DataInputStream(bais) recvSequenceNo = dis.readLong() val buffer = Array.ofDim[Byte](dis.available) dis.read(buffer) packet.setData(buffer, 0, buffer.size) } // é åºçªå·ãåŠçãã @throws(classOf[IOException]) override def send(packet: DatagramPacket): Unit = { val baos = new ByteArrayOutputStream val dos = new DataOutputStream(baos) // é åºçªå·ãæžãåºãã次ã«ãŠãŒã¶ããŒã¿ãæžãåºã dos.writeLong(sendSequenceNo) sendSequenceNo += 1 dos.write(packet.getData, packet.getOffset, packet.getLength) dos.flush() // ãã®æ°ããããŒã¿ã§æ°ããªãã±ãããã³ã³ã¹ãã©ã¯ãããéä¿¡ãã val data = baos.toByteArray val newPacket = new DatagramPacket(data, data.size, packet.getSocketAddress) super.send(newPacket) } }
ãã®ã¯ã©ã¹ã¯ãå ã»ã©äœæããRoundTripTimerã¯ã©ã¹ã®ã€ã³ã¹ã¿ã³ã¹ãä¿æããŸããç¶æ¿å ã®DatagramSocketã¯ã©ã¹ã®ããã€ãã®ã¡ãœããããªãŒããŒã©ã€ãããŠããŠãreceiveã¡ãœããã§ã¯åãåã£ãã¬ã¹ãã³ã¹ããã·ãŒã±ã³ã¹çªå·ãå ã«èªã¿åºããåŒã³åºãå ã䜿ãDatagramPacketã®ã€ã³ã¹ã¿ã³ã¹ããã¯ã·ãŒã±ã³ã¹çªå·ãåãé€ããŠããŸããsendã¡ãœããã§ã¯ãéä¿¡ããŒã¿ã®åã«ã·ãŒã±ã³ã¹çªå·ãæ¿å ¥ããDatagramPacketãæ°ããæ§ç¯ããŠããŸãã
ç¬èªã®ã¡ãœãããsendReceiveã§ããã¡ãã¯ã¯ã©ã€ã¢ã³ãã䜿çšããããšãæå³ããŠããŸããæåã«RoundTripTimerã¯ã©ã¹ã®newPacketã¡ãœãããåŒã³åºãããªãã©ã€åæ°ããªã»ããããŸãã
ç¶ããŠãDatagramPacketãéä¿¡ã
ããšã¯ãã¿ã€ã ã¢ãŠãå€ã調æŽãã€ã€ãåä¿¡ããããŒã¿ã®ã·ãŒã±ã³ã¹çªå·ãããããã£ããå床åä¿¡åŸ ã¡ãã¿ã€ã ã¢ãŠããããã¿ã€ã ã¢ãŠãå€ã調æŽããŠãªãã©ã€ããããã¯ãªãã©ã€åæ°ããªãŒããŒããŠããã°è«Šãããšããæãã«ãªã£ãŠããŸãã
æ£åžžãªã·ãŒã±ã³ã¹çªå·ãåä¿¡ã§ããå Žåã¯ãã«ãŒããã¹ãããããŠRoundTripTimerã¯ã©ã¹ã®stoppedAtã¡ãœãããåŒã³åºãã©ãŠã³ãããªããã¿ã€ã ã®æšèšå€ãæ±ãããã®å¹³ååå·®ãæŽæ°ããŸãã
ã¡ãªã¿ã«ãå ã®æžç±ãééã£ãŠããã®ã¯ãããã®sendReceiveã¡ãœããã
// ãœã±ããã®ã¿ã€ã ã¢ãŠãããã§ã«çµéããæéã§èª¿æŽãã int soTimeout = timeout.(int) (System.currentTimeMillis().soTimeoutStart);
ã¿ãããªæãã§æžãããŠããã®ã§ãããæ£ããã¯
// ãœã±ããã®ã¿ã€ã ã¢ãŠãããã§ã«çµéããæéã§èª¿æŽãã int soTimeout = timeout-(int) (System.currentTimeMillis()-soTimeoutStart);
ã§ãã
ãããŠããã®ReliableDatagramSocketã䜿çšããEchoãµãŒãã
object ReliableEchoServer { def main(args: Array[String]): Unit = { val buffer = Array.ofDim[Byte](1024) val recvPacket = new DatagramPacket(buffer, buffer.size) for (socket <- new ReliableDatagramSocket(PORT)) { log("Reliable Single Thread Scala UDP Server", socket.getLocalSocketAddress, "Startup.") Iterator.from(1).foreach { i => // åä¿¡ãã±ããã®ãããã¡ããæ倧ã«ãªã»ãããã recvPacket.setData(buffer, 0, buffer.size) socket.receive(recvPacket) // ãªãã©ã€ã¯ããªã¯ãšã¹ããšåãé åºçªå·ãšãã val seqNo = socket.recvSequenceNo socket.sendSequenceNo = seqNo // ã¬ã¹ãã³ã¹ãšããŠããªã¯ãšã¹ãããšã³ãŒããã¯ãã socket.send(recvPacket) log(s"Receive & Send SeqNo[${socket.recvSequenceNo}]") } } } }
ãããŸã§ã®ã¯ã©ã¹ã«æ¯ã¹ããšãã ãã¶å°ããã§ããããªããã·ã³ã°ã«ã¹ã¬ããã§åãããšãåæã«ããŠããŸããããã§ã®ãã€ã³ãã¯ãReliableDatagramSocket#receiveã§ããŒã¿ãåä¿¡ããŠæžãæ»ãéãã€ãŸãsendã¡ãœãããåŒã¶åã«éä¿¡çšã®ã·ãŒã±ã³ã¹çªå·ãèšå®ããŠããããšã§ããã
æåŸã¯ãEchoã¯ã©ã€ã¢ã³ãåŽã
object ReliableEchoClient { def main(args: Array[String]): Unit = { // éä¿¡çšãããã¡ val sendBuffer = Array.ofDim[Byte](1024) // åä¿¡çšãããã¡ val recvBuffer = Array.ofDim[Byte](1024) val sendPacket = new DatagramPacket(sendBuffer, sendBuffer.size) val recvPacket = new DatagramPacket(recvBuffer, recvBuffer.size) val address = new InetSocketAddress(PORT) sendPacket.setSocketAddress(address) recvPacket.setSocketAddress(address) for (socket <- new ReliableDatagramSocket) { log(s"Reliable Scala UDP Client", address, "Startup.") Iterator .continually(readLine()) .takeWhile(word => word != null && word != "exit") .foreach { word => // éä¿¡çšãåä¿¡çšãã±ãããããããåæå sendPacket.setData(sendBuffer, 0, sendBuffer.size) recvPacket.setData(recvBuffer, 0, recvBuffer.size) val wordBinary = word.getBytes(StandardCharsets.UTF_8) System.arraycopy(wordBinary, 0, sendPacket.getData, 0, wordBinary.size) sendPacket.setData(sendPacket.getData, 0, wordBinary.size) // éä¿¡ ïŒ åä¿¡ socket.sendReceive(sendPacket, recvPacket) val seqNo = socket.recvSequenceNo log(s"Receive Sequence No[$seqNo]") log(s"Received =>", new String(recvPacket.getData, recvPacket.getOffset, recvPacket.getLength, StandardCharsets.UTF_8)) } } } }
ä»ãŸã§ãšåãããã«ãã³ã³ãœãŒã«ããå ¥åããããµãŒããžéä¿¡ããããã°ã©ã ã§ãããããŒã¿ã®éåä¿¡ã¯ReliableDatagramSocket#sendReceiveã§äžæ¬ããŠè¡ã£ãŠããŸãã
åãããŠã¿ããšããããªæãã§ããããŸãã¯ãµãŒãèµ·åã
$ scala ReliableEchoServer [Sat Sep 21 23:29:24 JST 2013] Reliable Single Thread Scala UDP Server 0.0.0.0/0.0.0.0:50000 Startup.
ã¯ã©ã€ã¢ã³ãèµ·åã
$ scala ReliableEchoClient [Sat Sep 21 23:29:47 JST 2013] Reliable Scala UDP Client 0.0.0.0/0.0.0.0:50000 Startup.
ããšã¯ãé©åœã«æååãå ¥åããŠããã°ãµãŒããå¿çããŠãããŸãã
Hello World [Sat Sep 21 23:30:22 JST 2013] Receive Sequence No[0] [Sat Sep 21 23:30:22 JST 2013] Received => Hello World ããã«ã¡ã¯ãäžç [Sat Sep 21 23:30:26 JST 2013] Receive Sequence No[1] [Sat Sep 21 23:30:26 JST 2013] Received => ããã«ã¡ã¯ãäžç Reliable UDP Echo. [Sat Sep 21 23:30:35 JST 2013] Receive Sequence No[2] [Sat Sep 21 23:30:35 JST 2013] Received => Reliable UDP Echo. exit
ãµãŒãåŽã«ã¯ããããªãã°ãã
[Sat Sep 21 23:30:22 JST 2013] Receive & Send SeqNo[0] [Sat Sep 21 23:30:26 JST 2013] Receive & Send SeqNo[1] [Sat Sep 21 23:30:35 JST 2013] Receive & Send SeqNo[2]
ã§ããã®ããã°ã©ã ã§ãããReliableDatagramSocketãã·ãŒã±ã³ã¹çªå·ãæã£ãŠããŸã£ãŠããã®ã§ãè€æ°ã®æ¥ç¶å ã«ã¯å¯Ÿå¿ã§ããŸããããããŸã§ãéä¿¡ããŠããã¯ã©ã€ã¢ã³ãã¯ã²ãšã€ã®æ³å®ã§ããæ¬æ¥ã¯ãæ¥ç¶å ããšã«ã·ãŒã±ã³ã¹çªå·ã管çãã¹ãã ãšæžç±ã«ãæžããŠãããŸããããããµãŒãããã°ã©ã ãã·ã³ã°ã«ã¹ã¬ããã«ãªã£ãŠããã®ã¯ããããçç±ã§ãã
ãšã¯ãããTCPãœã±ããã®æ§ã«ãæ¥ç¶ãSocketã¯ã©ã¹ã®ã€ã³ã¹ã¿ã³ã¹ã¿ããã«è¡šããããã§ããªãã®ã§ãããã¯å·¥å€«ãå¿ èŠãªãšããã§ããããããšããã³ãã¯ãã±ããåå²ãèæ ®ããªããšã§ãããã
ãã®ãã¡ãJGroupsã®ãœãŒã¹ãè¿œã£ãŠã¿ããããªâŠUDPãšãã¯å°ãã¿ãŸãããã©ããã®ãããã®ä¿¡é Œæ§æ ä¿ã®ãšããã¯ããŸã èŠããŠããŸããã
次ã®ããŒãã¯ãUDPã§NIOã§ãã
æåŸã¯ãä»åæžãããœãŒã¹ã§ãããã¡ãã£ãšé·ãã®ããã°ã©ã ã§ãããã
ReliableUdpClientServer.scala
import scala.annotation.tailrec import scala.math.{abs, max, min} import scala.util.{Failure, Success, Try} import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException} import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress, SocketTimeoutException} import java.nio.charset.StandardCharsets import java.util.Date import Reliabilities._ // ãã®ããã°ã©ã ã§ãæéã®åäœã¯ãã¹ãŠãç§ã object Reliabilities { // ã¿ã€ã ã¢ãŠãã®æå°ã»æå€§å€ val MIN_RETRANSMIT_TIMEOUT: Int = 1 val MAX_RETRANSMIT_TIMEOUT: Int = 64 // ã²ãšã€ã®ããŒã¿ã°ã©ã ã®æ倧åéåæ°ã3ã4ããã val MAX_RETRANSMISSIONS: Int = 4 // éä¿¡å ããŒã val PORT: Int = 50000 def log(msg: Any, msgs: Any*): Unit = println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}") // AutoCloseableãforåŒã§äœ¿ããããã«ããããã®ãImplicit Class implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal { def foreach(fun: A => Unit): Unit = try { fun(underlying) } finally { underlying.close() } } } class RoundTripTimer { // æãæè¿ã®ã©ãŠã³ãããªããã¿ã€ã ïŒRTTïŒ private var roundTripTime: Float = 0.0F // å¹³æ»åããRTT private var smoothedTripTime: Float = 0.0F // å¹³æ»åããæšæºåå·® private var deviation: Float = 0.75F // åéã«ãŠã³ãïŒ0, 1, 2 private var retransmissions: Short = 0 // çŸåšã®åéã¿ã€ã ã¢ãŠã var currentTimeout = minmax(calculateRetransmitTimeout) // åéã¿ã€ã ã¢ãŠããè¿ã private def calculateRetransmitTimeout: Int = (smoothedTripTime + 4.0 * deviation).toInt // äžéã®ããåéã¿ã€ã ã¢ãŠããè¿ã private def minmax(rto: Float): Float = min(max(rto, MIN_RETRANSMIT_TIMEOUT), MAX_RETRANSMIT_TIMEOUT) // æ°ããªãã±ãããéä¿¡ãã床ã«ãçŸåšã®åéã«ãŠã³ããåæåãã def newPacket(): Unit = retransmissions = 0 /** * æåããåä¿¡ã®çŽåŸã«åŒã°ããã©ãŠã³ãããªããã¿ã€ã ãèšç®ãã * 次ã«å¹³æ»åããã©ãŠã³ãããªãããšãã®åæ£ïŒåå·®ïŒãèšç®ãã **/ def stoppedAt(ms: Long): Unit = { // ãã®ãã±ããã®ã©ãŠã³ãããªãããèšç®ãã roundTripTime = ms/1000 // ã©ãŠã³ããªããã¿ã€ã ã®æšèšå€ãšãã®å¹³ååå·®ãæŽæ°ãã val delta = roundTripTime - smoothedTripTime smoothedTripTime += (delta / 8.0).toFloat deviation += ((abs(delta) - deviation) / 4.0).toFloat // çŸåšã®ã¿ã€ã ã¢ãŠããåèšç®ãã currentTimeout= minmax(calculateRetransmitTimeout) } /** * ã¿ã€ã ã¢ãŠããçããåŸã«åŒã°ãããã®ãã¢ãããã¹ãæéãªã true ãè¿åŽãã * åéã§ãããªã false ãè¿åŽãã **/ def isTimeout(): Boolean = { currentTimeout *= 2 // 次ã®åéã¿ã€ã ã¢ãŠã retransmissions = (retransmissions + 1).toShort retransmissions > MAX_RETRANSMISSIONS } } class ReliableDatagramSocket(localAddr: SocketAddress) extends DatagramSocket(localAddr) { private var roundTripTimer: RoundTripTimer = new RoundTripTimer private var reinit: Boolean = false var sendSequenceNo: Long = 0L // éä¿¡ã®é åºçªå· var recvSequenceNo: Long = 0L // åä¿¡ã®é åºçªå· init() def this(port: Int) = this(new InetSocketAddress(port)) def this(port: Int, localAddr: InetAddress) = this(new InetSocketAddress(localAddr, port)) def this() = this(null) // åæå private def init(): Unit = roundTripTimer = new RoundTripTimer // ã³ãã¯ãããåŸãæ¥ç¶çšã®çµ±èšãïŒåïŒåæåãã override def connect(dest: InetAddress, port: Int): Unit = { super.connect(dest, port) init() } // ã³ãã¯ãããåŸãæ¥ç¶çšã®çµ±èšãïŒåïŒåæåãã override def connect(dest: SocketAddress): Unit = { super.connect(dest) init() } @throws(classOf[IOException]) def sendReceive(sendPacket: DatagramPacket, recvPacket: DatagramPacket): Unit = synchronized { // ã¿ã€ã ã¢ãŠãåŸã«ååæåãã if (reinit) { init() reinit = false } roundTripTimer.newPacket() val start = System.currentTimeMillis val sequenceNumber = sendSequenceNo // æåŸã®ã¿ã€ã ã¢ãŠãããŸãã¯äºæããªãäŸå€ãèµ·ãããŸã§ç¹°ãè¿ã // ãªãã©ã€äžã¯ãåãsequenceNumberã䜿çšãç¶ãã Iterator.continually { Try { sendSequenceNo = sequenceNumber send(sendPacket) // äŸå€ãæããŠãè¯ã val timeout = (roundTripTimer.currentTimeout * 1000.0 + 0.5).toInt val soTimeoutStart = System.currentTimeMillis @tailrec def receiveRetries(): Long = { // ãœã±ããã®ã¿ã€ã ã¢ãŠãå€ãããã§ã«çµéããæéã§èª¿æŽãã val soTimeout = (timeout - (System.currentTimeMillis.toInt - soTimeoutStart)).toInt setSoTimeout(soTimeout) receive(recvPacket) recvSequenceNo match { case `sequenceNumber` => recvSequenceNo // ã·ãŒã±ã³ã¹ãäžèŽããŠããã°ãã«ãŒããã¹ããã case _ => receiveRetries() } } receiveRetries() } }.takeWhile { case Success(_) => false // ã·ãŒã±ã³ã¹ãäžèŽããŠããã°ãã«ãŒããã¹ããã case Failure(e: SocketTimeoutException) => // ã¿ã€ã ã¢ãŠãã§ããªãã©ã€ãããã©ãã if (roundTripTimer.isTimeout()) { reinit = true throw e } else { // ãªãã©ã€ãã true } case Failure(e) => throw e }.foreach { retry => } // ã·ãŒã±ã³ã¹ã®äžäžèŽããŸãã¯ã¿ã€ã ã¢ãŠãã®ãããªãã©ã€ // æ£ãããªãã©ã€ãåŸã // ã¿ã€ããŒãåæ¢ããæ°ããªRTTã®å€ãèšç®ãã val ms = System.currentTimeMillis - start roundTripTimer.stoppedAt(ms) } // é åºçªå·ãåŠçãã @throws(classOf[IOException]) override def receive(packet: DatagramPacket): Unit = { super.receive(packet) // é åºçªå·ãèªã¿ãããããã±ããããåé€ãã val bais = new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength) val dis = new DataInputStream(bais) recvSequenceNo = dis.readLong() val buffer = Array.ofDim[Byte](dis.available) dis.read(buffer) packet.setData(buffer, 0, buffer.size) } // é åºçªå·ãåŠçãã @throws(classOf[IOException]) override def send(packet: DatagramPacket): Unit = { val baos = new ByteArrayOutputStream val dos = new DataOutputStream(baos) // é åºçªå·ãæžãåºãã次ã«ãŠãŒã¶ããŒã¿ãæžãåºã dos.writeLong(sendSequenceNo) sendSequenceNo += 1 dos.write(packet.getData, packet.getOffset, packet.getLength) dos.flush() // ãã®æ°ããããŒã¿ã§æ°ããªãã±ãããã³ã³ã¹ãã©ã¯ãããéä¿¡ãã val data = baos.toByteArray val newPacket = new DatagramPacket(data, data.size, packet.getSocketAddress) super.send(newPacket) } } object ReliableEchoServer { def main(args: Array[String]): Unit = { val buffer = Array.ofDim[Byte](1024) val recvPacket = new DatagramPacket(buffer, buffer.size) for (socket <- new ReliableDatagramSocket(PORT)) { log("Reliable Single Thread Scala UDP Server", socket.getLocalSocketAddress, "Startup.") Iterator.from(1).foreach { i => // åä¿¡ãã±ããã®ãããã¡ããæ倧ã«ãªã»ãããã recvPacket.setData(buffer, 0, buffer.size) socket.receive(recvPacket) // ãªãã©ã€ã¯ããªã¯ãšã¹ããšåãé åºçªå·ãšãã val seqNo = socket.recvSequenceNo socket.sendSequenceNo = seqNo // ã¬ã¹ãã³ã¹ãšããŠããªã¯ãšã¹ãããšã³ãŒããã¯ãã socket.send(recvPacket) log(s"Receive & Send SeqNo[${socket.recvSequenceNo}]") } } } } object ReliableEchoClient { def main(args: Array[String]): Unit = { // éä¿¡çšãããã¡ val sendBuffer = Array.ofDim[Byte](1024) // åä¿¡çšãããã¡ val recvBuffer = Array.ofDim[Byte](1024) val sendPacket = new DatagramPacket(sendBuffer, sendBuffer.size) val recvPacket = new DatagramPacket(recvBuffer, recvBuffer.size) val address = new InetSocketAddress(PORT) sendPacket.setSocketAddress(address) recvPacket.setSocketAddress(address) for (socket <- new ReliableDatagramSocket) { log(s"Reliable Scala UDP Client", address, "Startup.") Iterator .continually(readLine()) .takeWhile(word => word != null && word != "exit") .foreach { word => // éä¿¡çšãåä¿¡çšãã±ãããããããåæå sendPacket.setData(sendBuffer, 0, sendBuffer.size) recvPacket.setData(recvBuffer, 0, recvBuffer.size) val wordBinary = word.getBytes(StandardCharsets.UTF_8) System.arraycopy(wordBinary, 0, sendPacket.getData, 0, wordBinary.size) sendPacket.setData(sendPacket.getData, 0, wordBinary.size) // éä¿¡ ïŒ åä¿¡ socket.sendReceive(sendPacket, recvPacket) val seqNo = socket.recvSequenceNo log(s"Receive Sequence No[$seqNo]") log(s"Received =>", new String(recvPacket.getData, recvPacket.getOffset, recvPacket.getLength, StandardCharsets.UTF_8)) } } } }