CLOVER🍀

That was when it all began.

Clojureで学ぶ、UDPネットワークプログラミング - マルチキャスト

いよいよ、マルチキャストを使ったネットワークプログラミングに。

これまで、DatagramSocketクラスを使ったプログラムは、相手がひとりしかいない、ユニキャストでしたが、今度は複数の相手に送信するマルチキャストです。
*DatagramSocketクラスで、マルチキャストが使えないというわけではないですが…

正しくは、1回の送信で複数のメンバーが受信できると言った方がいいでしょうか。

マルチキャストアドレスは複数のアドレスが参加しているグループで、そのグループに個々のメンバーが参加(join)したり、退出(leave)したりできます。つまり、グループの構成メンバーは動的に変わるわけですね。

マルチキャストアドレスは、224.0.0.0〜239.255.255.255までの範囲だそうで、さらに224.0.0.0〜224.0.0.255は予約されているらしいです。

また、スコープという概念があり、TTLも使うことができます。スコープとしては、

スコープ 説明
ノードローカル(node-local) そのローカルノードを越えてはならない(ネットワークインターフェースから出力されない
リンクローカル(link-local) ルータの向こう側へは広まらない
サイトローカル(site-local) サイトの外へ広まらない。サイトはネットワーク管理者の定義による
組織ローカル(organization-local) 組織の外へ広まらない。組織の定義は、その組織のネットワーク管理者の定義による
区域ローカル(region-local) 区域と定義されている範囲の外へ広まらない
大陸ローカル(continent-local) 大陸と定義されている範囲の外へ広まらない
グローバル(global) どこへでも広まる

があります。ここでのスコープには、IPv4とIPv6のスコープが混在していると…。TTLを使ったスコープは動的なスコープで、スコープの定義と照らし合わせると

TTL スコープ
0 ノードローカル
1 リンクローカル
< 32 サイトローカル
< 64 区域ローカル
< 128 大陸ローカル
< 255 グローバル

となるそうです。なお、TTLのデフォルト値は1で、MulticastSocket#setTimeToLiveで設定することができますが、TTL自体がIPv4のみでしか使えません。

はい。では、プログラムにいってみましょう。お題として、自身がマルチキャストグループに参加して、マルチキャストを受信しつつ、送信も行うチャットプログラム的なものを作ってみます。

udp_mcast_member.clj

(import '(java.net DatagramPacket InetAddress InetSocketAddress MulticastSocket NetworkInterface SocketTimeoutException)
        '(java.util Date))
(require '[clojure.string :as str])

(def port 7600)
(def address (InetSocketAddress. (InetAddress/getByName "228.8.8.8") port))
(def timeout 500)

(def receive-continually (ref true))

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  word (str/join \  more-words))))

(with-open [socket (MulticastSocket. port)]
  (.setSoTimeout socket timeout)
  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
    (.joinGroup socket address intf))

  (log "Clojure Multicast Chat Application, Startup." \[ address \])

  (letfn [(receive []
            (loop []
              (if @receive-continually
                (do
                  (try
                    (let [buffer (byte-array 8192)
                          packet (DatagramPacket. buffer 0 (alength buffer))]
                      (.receive socket packet)
                      (println (str "message => " (String. (.getData packet)
                                                           (.getOffset packet)
                                                           (.getLength packet)
                                                           "UTF-8"))))
                    (catch SocketTimeoutException _))
                    (recur))
                (do
                  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
                    (.leaveGroup socket address intf))
                  (log "Leave Multicast Group Completed.")))))
          (read-and-send [reader]
            (loop []
              (let [word (reader)]
                (case word
                    nil (recur)
                    "" (recur)
                    "_exit_" (do
                               (dosync (ref-set receive-continually false)))
                    (do
                      (let [word-binary (.getBytes word "UTF-8")]
                        (.send socket (DatagramPacket. word-binary 0 (alength word-binary) address))
                        (recur)))))))]
    (let [threads [(Thread. receive) (Thread. #(read-and-send read-line))]]
          (doseq [t threads] (.start t))
          (doseq [t threads] (.join t)))))

マルチキャストを受信するためには、MulticastSocketクラスのインスタンスを作成します。

(with-open [socket (MulticastSocket. port)]

続いて、マルチキャストグループにMulticastSocket#joinGroupメソッドで参加します。ここでは、すべてのネットワークインターフェースでマルチキャストグループに参加しています。

  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
    (.joinGroup socket address intf))

あとは、普通にMulticastSocket#receiveメソッドでマルチキャストの受信ができます。

                      (.receive socket packet)

マルチキャストグループから退出する時は、MulticastSocket#leaveGroupメソッドを使用します。

                  (doseq [intf (enumeration-seq (NetworkInterface/getNetworkInterfaces))]
                    (.leaveGroup socket address intf))

ちなみに、MulticastSocketクラスはDatagramSocketクラスのサブクラスです。

データの送信は、MulticastSocket#sendメソッドで行います。まあ、これは別にMulticastSocketクラスを使わないてもいいのですが。

                        (.send socket (DatagramPacket. word-binary 0 (alength word-binary) address))

そして、今回のプログラムはスレッドを2つ使って、片方を受信用に、もう片方を送信様に使っていて何も入力せず、または「_exit_」と入力してEnterを押すまで実行し続けます。

では、動かしてみましょう。

# ひとつめ
$ clj udp_mcast_member.clj 
[Wed Sep 25 23:11:13 JST 2013] Clojure Multicast Chat Application, Startup.[ /228.8.8.8:7600 ]

# ふたつめ
$ clj udp_mcast_member.clj 
[Wed Sep 25 23:11:44 JST 2013] Clojure Multicast Chat Application, Startup.[ /228.8.8.8:7600 ]

起動したら、どちらか片方のコンソールで文字を入力してみます。

# ひとつめの方で
Hello Multicast!!
message => Hello Multicast!!
こんにちは、
message => こんにちは、
世界
message => 世界
_exit_
[Wed Sep 25 23:14:29 JST 2013] Leave Multicast Group Completed.

「_exit_」と入力してEnterを押したので、ここで終了です。

もう片方では

message => Hello Multicast!!
message => こんにちは、
message => 世界

とメッセージだけ出力されています。

一応、成功ですね。

なお、先に少し触れましたが、マルチキャストグループにデータを送信するだけなら、MulticastSocketクラスを使う必要はありません。

あくまで、MulticastSocketクラスが必要なのは、グループに参加してデータを受信したい時ですね。

こちらは、マルチキャストグループにデータを送信するだけのプログラムです。
udp_sender.clj

(import '(java.net DatagramPacket DatagramSocket InetAddress InetSocketAddress)
        '(java.util Date))
(require '[clojure.string :as str])

(def port 7600)
(def address (InetSocketAddress. (InetAddress/getByName "228.8.8.8") port))

(defn log [word & more-words]
  (println (str \[ (Date.) \] \  word (str/join \  more-words))))

(with-open [socket (DatagramSocket.)]
  (log "Multicast Message Sender" "Startup.")

  (doseq [word (take-while #(not (or (nil? %) (empty? %) (= % "_exit_")))
                           (repeatedly read-line))]
    (let [word-binary (.getBytes word "UTF-8")]
      (.send socket (DatagramPacket. word-binary 0 (alength word-binary) address)))))

送信先を、マルチキャストグループのアドレスとポートに合わせておけば、受け取り先のメンバーさえいれば受信してくれます。

今回は、意外と簡単にできました。

そして、毎度お世話になります、この書籍。マルチキャストの説明も、ほぼここから持ってきています。

Javaネットワークプログラミングの真髄

Javaネットワークプログラミングの真髄

動かすだけなら簡単なのですが、やっぱりUDPなので相手にちゃんと届くことが保証されているわけではありません。マルチキャストで高信頼型のプロトコルを作るのは、相手が多数(しかも、誰が参加してるかはMulticastSocketだけではわからない)なので、そりゃあ実現するのは難しいですよね…。

最後、てこずったのがリライト版のScalaで…こちらは、標準ライブラリの使い方を間違っていただけなんですけどね…。
UdpMulticast.scala

import scala.annotation.tailrec
import scala.collection.JavaConverters._

import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, MulticastSocket}
import java.net.{NetworkInterface, SocketAddress, SocketTimeoutException}
import java.nio.charset.StandardCharsets
import java.util.Date
import java.util.concurrent.atomic.AtomicBoolean

import UdpHelper._

object UdpHelper {
  val PORT: Int = 7600
  val ADDRESS: SocketAddress =
    new InetSocketAddress(InetAddress.getByName("228.8.8.8"),
                          PORT)

  def log(msg: Any, msgs: Any*): Unit =
    println(s"[${new Date}] ${(msg :: msgs.toList).mkString(" ")}")

  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

object UdpMcastMember {
  val timeout: Int = 500

  val receiveContinually: AtomicBoolean = new AtomicBoolean(true)

  def main(args: Array[String]): Unit =
    for (socket <- new MulticastSocket(PORT)) {
      socket.setSoTimeout(timeout)

      NetworkInterface.getNetworkInterfaces.asScala.foreach { intf =>
        socket.joinGroup(ADDRESS, intf)
      }

      log("Scala Multicast Chat Application, Startup.", '[', ADDRESS, ']')

      val threads = Array(new Thread {
        override def run(): Unit = receive(socket)
      },
                          new Thread {
        override def run: Unit = readAndSend(socket, readLine)
      })

      threads.foreach(_.start())
      threads.foreach(_.join())
    }

  @tailrec
  private def readAndSend(socket: MulticastSocket, reader: => String): Unit =
    reader match {
      case word if word == null || word.isEmpty => readAndSend(socket, reader)
      case word if word == "_exit_" =>
        receiveContinually.set(false)
      case word =>
        val wordBinary = word.getBytes(StandardCharsets.UTF_8)
        socket.send(new DatagramPacket(wordBinary, 0, wordBinary.size, ADDRESS))
        readAndSend(socket, reader)
    }

  @tailrec
  private def receive(socket: MulticastSocket): Unit =
    receiveContinually.get match {
      case false =>
        NetworkInterface.getNetworkInterfaces.asScala.foreach { intf =>
          socket.leaveGroup(ADDRESS, intf)
        }

        log("Leave Multicast Group Complete.")
      case true =>
        try {
          val buffer = Array.ofDim[Byte](8192)
          val packet = new DatagramPacket(buffer, 0, buffer.size)

          socket.receive(packet)

          println("message => " +
                  new String(packet.getData,
                             packet.getOffset,
                             packet.getLength,
                             StandardCharsets.UTF_8))
        } catch {
          case _: SocketTimeoutException =>
        }

        receive(socket)
    }
}

object UdpSender {
  def main(args: Array[String]): Unit =
    for (socket <- new DatagramSocket) {
      log("Multicast Message Sender", "Startup.")

      Iterator
        .continually(readLine())
        .takeWhile(word => word != null && !word.isEmpty && word != "_exit_")
        .foreach { word =>
          val wordBinary = word.getBytes(StandardCharsets.UTF_8)
          socket.send(new DatagramPacket(wordBinary, 0, wordBinary.size, ADDRESS))
        }
    }
}