CLOVER🍀

That was when it all began.

Clojureで孊ぶ、UDPネットワヌクプログラミング - マルチキャスト

いよいよ、マルチキャストを䜿ったネットワヌクプログラミングに。

これたで、DatagramSocketクラスを䜿ったプログラムは、盞手がひずりしかいない、ナニキャストでしたが、今床は耇数の盞手に送信するマルチキャストです。
DatagramSocketクラスで、マルチキャストが䜿えないずいうわけではないですが 

正しくは、1回の送信で耇数のメンバヌが受信できるず蚀った方がいいでしょうか。

マルチキャストアドレスは耇数のアドレスが参加しおいるグルヌプで、そのグルヌプに個々のメンバヌが参加joinしたり、退出leaveしたりできたす。぀たり、グルヌプの構成メンバヌは動的に倉わるわけですね。

マルチキャストアドレスは、224.0.0.0〜239.255.255.255たでの範囲だそうで、さらに224.0.0.0〜224.0.0.255は予玄されおいるらしいです。

たた、スコヌプずいう抂念があり、TTLも䜿うこずができたす。スコヌプずしおは、

スコヌプ 説明
ノヌドロヌカルnode-local そのロヌカルノヌドを越えおはならないネットワヌクむンタヌフェヌスから出力されない
リンクロヌカルlink-local) ルヌタの向こう偎ぞは広たらない
サむトロヌカルsite-local サむトの倖ぞ広たらない。サむトはネットワヌク管理者の定矩による
組織ロヌカルorganization-local 組織の倖ぞ広たらない。組織の定矩は、その組織のネットワヌク管理者の定矩による
区域ロヌカルregion-local 区域ず定矩されおいる範囲の倖ぞ広たらない
倧陞ロヌカルcontinent-local 倧陞ず定矩されおいる範囲の倖ぞ広たらない
グロヌバルglobal どこぞでも広たる

がありたす。ここでのスコヌプには、IPv4ずIPv6のスコヌプが混圚しおいるず 。TTLを䜿ったスコヌプは動的なスコヌプで、スコヌプの定矩ず照らし合わせるず

TTL スコヌプ
0 ノヌドロヌカル
1 リンクロヌカル
< 32 サむトロヌカル
< 64 区域ロヌカル
< 128 倧陞ロヌカル
< 255 グロヌバル

ずなるそうです。なお、TTLのデフォルト倀は1で、MulticastSocket#setTimeToLiveで蚭定するこずができたすが、TTL自䜓がIPv4のみでしか䜿えたせん。

はい。では、プログラムにいっおみたしょう。お題ずしお、自身がマルチキャストグルヌプに参加しお、マルチキャストを受信し぀぀、送信も行うチャットプログラム的なものを䜜っおみたす。

udp_mcast_member.clj

(import '(java.net DatagramPacket InetAddress InetSocketAddress MulticastSocket NetworkInterface SocketTimeoutException)
        '(java.util Date))
(require '[clojure.string :as str])

(def port 7600)
(def address (InetSocketAddress. (InetAddress/getByName "228.8.8.8") port))
(def timeout 500)

(def receive-continually (ref true))

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  word (str/join \  more-words))))

(with-open [socket (MulticastSocket. port)]
  (.setSoTimeout socket timeout)
  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
    (.joinGroup socket address intf))

  (log "Clojure Multicast Chat Application, Startup." \[ address \])

  (letfn [(receive []
            (loop []
              (if @receive-continually
                (do
                  (try
                    (let [buffer (byte-array 8192)
                          packet (DatagramPacket. buffer 0 (alength buffer))]
                      (.receive socket packet)
                      (println (str "message => " (String. (.getData packet)
                                                           (.getOffset packet)
                                                           (.getLength packet)
                                                           "UTF-8"))))
                    (catch SocketTimeoutException _))
                    (recur))
                (do
                  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
                    (.leaveGroup socket address intf))
                  (log "Leave Multicast Group Completed.")))))
          (read-and-send [reader]
            (loop []
              (let [word (reader)]
                (case word
                    nil (recur)
                    "" (recur)
                    "_exit_" (do
                               (dosync (ref-set receive-continually false)))
                    (do
                      (let [word-binary (.getBytes word "UTF-8")]
                        (.send socket (DatagramPacket. word-binary 0 (alength word-binary) address))
                        (recur)))))))]
    (let [threads [(Thread. receive) (Thread. #(read-and-send read-line))]]
          (doseq [t threads] (.start t))
          (doseq [t threads] (.join t)))))

マルチキャストを受信するためには、MulticastSocketクラスのむンスタンスを䜜成したす。

(with-open [socket (MulticastSocket. port)]

続いお、マルチキャストグルヌプにMulticastSocket#joinGroupメ゜ッドで参加したす。ここでは、すべおのネットワヌクむンタヌフェヌスでマルチキャストグルヌプに参加しおいたす。

  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
    (.joinGroup socket address intf))

あずは、普通にMulticastSocket#receiveメ゜ッドでマルチキャストの受信ができたす。

                      (.receive socket packet)

マルチキャストグルヌプから退出する時は、MulticastSocket#leaveGroupメ゜ッドを䜿甚したす。

                  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
                    (.leaveGroup socket address intf))

ちなみに、MulticastSocketクラスはDatagramSocketクラスのサブクラスです。

デヌタの送信は、MulticastSocket#sendメ゜ッドで行いたす。たあ、これは別にMulticastSocketクラスを䜿わないおもいいのですが。

                        (.send socket (DatagramPacket. word-binary 0 (alength word-binary) address))

そしお、今回のプログラムはスレッドを2぀䜿っお、片方を受信甚に、もう片方を送信様に䜿っおいお䜕も入力せず、たたは「_exit_」ず入力しおEnterを抌すたで実行し続けたす。

では、動かしおみたしょう。

# ひず぀め
$ clj udp_mcast_member.clj 
[Wed Sep 25 23:11:13 JST 2013] Clojure Multicast Chat Application, Startup.[ /228.8.8.8:7600 ]

# ふた぀め
$ clj udp_mcast_member.clj 
[Wed Sep 25 23:11:44 JST 2013] Clojure Multicast Chat Application, Startup.[ /228.8.8.8:7600 ]

起動したら、どちらか片方のコン゜ヌルで文字を入力しおみたす。

# ひず぀めの方で
Hello Multicast!!
message => Hello Multicast!!
こんにちは、
message => こんにちは、
侖界
message => 侖界
_exit_
[Wed Sep 25 23:14:29 JST 2013] Leave Multicast Group Completed.

「_exit_」ず入力しおEnterを抌したので、ここで終了です。

もう片方では

message => Hello Multicast!!
message => こんにちは、
message => 侖界

ずメッセヌゞだけ出力されおいたす。

䞀応、成功ですね。

なお、先に少し觊れたしたが、マルチキャストグルヌプにデヌタを送信するだけなら、MulticastSocketクラスを䜿う必芁はありたせん。

あくたで、MulticastSocketクラスが必芁なのは、グルヌプに参加しおデヌタを受信したい時ですね。

こちらは、マルチキャストグルヌプにデヌタを送信するだけのプログラムです。
udp_sender.clj

(import '(java.net DatagramPacket DatagramSocket InetAddress InetSocketAddress)
        '(java.util Date))
(require '[clojure.string :as str])

(def port 7600)
(def address (InetSocketAddress. (InetAddress/getByName "228.8.8.8") port))

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  word (str/join \  more-words))))

(with-open [socket (DatagramSocket.)]
  (log "Multicast Message Sender" "Startup.")

  (doseq [word (take-while #(not (or (nil? %) (empty? %) (= % "_exit_")))
                           (repeatedly read-line))]
    (let [word-binary (.getBytes word "UTF-8")]
      (.send socket (DatagramPacket. word-binary 0 (alength word-binary) address)))))

送信先を、マルチキャストグルヌプのアドレスずポヌトに合わせおおけば、受け取り先のメンバヌさえいれば受信しおくれたす。

今回は、意倖ず簡単にできたした。

そしお、毎床お䞖話になりたす、この曞籍。マルチキャストの説明も、ほがここから持っおきおいたす。

Javaネットワヌクプログラミングの真髄

Javaネットワヌクプログラミングの真髄

動かすだけなら簡単なのですが、やっぱりUDPなので盞手にちゃんず届くこずが保蚌されおいるわけではありたせん。マルチキャストで高信頌型のプロトコルを䜜るのは、盞手が倚数しかも、誰が参加しおるかはMulticastSocketだけではわからないなので、そりゃあ実珟するのは難しいですよね 。

最埌、おこずったのがリラむト版のScalaで こちらは、暙準ラむブラリの䜿い方を間違っおいただけなんですけどね 。
UdpMulticast.scala

import scala.annotation.tailrec
import scala.collection.JavaConverters._

import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, MulticastSocket}
import java.net.{NetworkInterface, SocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date
import java.util.concurrent.atomic.AtomicBoolean

import UdpHelper._

object UdpHelper {
  val PORT: Int = 7600
  val ADDRESS: SocketAddress =
    new InetSocketAddress(InetAddress.getByName("228.8.8.8"),
                          PORT)

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")

  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

object UdpMcastMember {
  val timeout: Int = 500

  val receiveContinually: AtomicBoolean = new AtomicBoolean(true)

  def main(args: Array[String]): Unit =
    for (socket <- new MulticastSocket(PORT)) {
      socket.setSoTimeout(timeout)

      NetworkInterface.getNetworkInterfaces.asScala.foreach { intf =>
        socket.joinGroup(ADDRESS, intf)
      }

      log("Scala Multicast Chat Application, Startup.", '[', ADDRESS, ']')

      val threads = Array(new Thread {
        override def run(): Unit = receive(socket)
      },
                          new Thread {
        override def run: Unit = readAndSend(socket, readLine)
      })

      threads.foreach(_.start())
      threads.foreach(_.join())
    }

  @tailrec
  private def readAndSend(socket: MulticastSocket, reader: => String): Unit =
    reader match {
      case word if word == null || word.isEmpty => readAndSend(socket, reader)
      case word if word == "_exit_" =>
        receiveContinually.set(false)
      case word =>
        val wordBinary = word.getBytes(StandardCharsets.UTF_8)
        socket.send(new DatagramPacket(wordBinary, 0, wordBinary.size, ADDRESS))
        readAndSend(socket, reader)
    }

  @tailrec
  private def receive(socket: MulticastSocket): Unit =
    receiveContinually.get match {
      case false =>
        NetworkInterface.getNetworkInterfaces.asScala.foreach { intf =>
          socket.leaveGroup(ADDRESS, intf)
        }

        log("Leave Multicast Group Complete.")
      case true =>
        try {
          val buffer = Array.ofDim[Byte](8192)
          val packet = new DatagramPacket(buffer, 0, buffer.size)

          socket.receive(packet)

          println("message => " +
                  new String(packet.getData,
                             packet.getOffset,
                             packet.getLength,
                             StandardCharsets.UTF_8))
        } catch {
          case _: SocketTimeoutException =>
        }

        receive(socket)
    }
}

object UdpSender {
  def main(args: Array[String]): Unit =
    for (socket <- new DatagramSocket) {
      log("Multicast Message Sender", "Startup.")

      Iterator
        .continually(readLine())
        .takeWhile(word => word != null && !word.isEmpty && word != "_exit_")
        .foreach { word =>
          val wordBinary = word.getBytes(StandardCharsets.UTF_8)
          socket.send(new DatagramPacket(wordBinary, 0, wordBinary.size, ADDRESS))
        }
    }
}