CLOVER🍀

That was when it all began.

Javaで書く、NIOを使ったUDPマルチキャスト通信

最後、やるかどうか迷いましたが、せっかくなのでNIOを使ってUDPマルチキャスト通信まで試してみることにしました。

今回は、これまで参考にしていた書籍の範囲外です。

なぜなら、マルチキャスト用のChannel APIは、JDK 7で追加されたものだからです。2007年に、日本語訳が出た書籍でしたからね。

Javaネットワークプログラミングの真髄

Javaネットワークプログラミングの真髄

で、マイナーなクラスなのか、これを取り扱っているエントリはほとんど見かけません。なのせ、せっかくなので今回だけはJavaで書こうと思います。

利用するNIOを使ったマルチキャストで登場する主要なインターフェース、クラスは、以下になります。

MulticastChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/MulticastChannel.html

MembershipKey
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/MembershipKey.html

だいたいの使い方は、それぞれのドキュメントに書いてあります。

MulticastChannelインターフェースを実装したクラスとしては、既存のDatagramChannelクラスがあるので、実はMulticastChannelインターフェースそのものを意識して使うことはないのかもしれません。

今回も、簡単なチャットっぽいアプリケーションを書いてみたいと思います。

まずは、マルチキャスト用のポートとInetAddressのインスタンスを取得します。

int port = 7600;
InetAddress address = InetAddress.getByName("228.8.8.8");

DatagramChannel#openを呼び出します。

// DatagramChannelの作成
// DatagramChannelは、MulticastChannelインターフェースを実装している

// DatagramChannel.openは、以下の様にちゃんとProtocolFamiryを指定した方が、
// ホントは良いらしい
// DatagramChannel.open(StandardProtocolFamily.INET)
try (DatagramChannel channel = DatagramChannel.open()) {

コメントにも書いていますが、open時に本当はProtocolFamiryを指定すべきみたいですね。コメントの例は、IPv4になっています。

ノンブロッキングモードに設定しつつ、bindする前にsetOptionでSO_RESUSEADDRオプションを有効にします。

    // ノンブロッキングモードに設定
    channel.configureBlocking(false);
    // 複数のメンバーが同じアドレスにバインドできるようにするため、
    // bindメソッドの呼び出し前にSO_REUSEADDRオプションを指定する
    channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    channel.socket().bind(new InetSocketAddress(port));

SO_REUSEADDRオプションを有効にしなかった場合、同じマルチキャストアドレスを使用するプログラムが、同じホスト上で起動できなくなります。

ちなみに、NetworkChannelインターフェース(SocketChannelやDatagramChannelが実装しているインターフェースです)のsetOptionメソッド自体、JDK 7で追加されたものみたいです。

JDK 7で初めて、NIOでソケットオプションを指定できるようになったんですね。

サポートされているオプションは、ServerSocketChannel、SocketChannel、DatagramSocketChannelのドキュメントを参照してください。

ServerSocketChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/ServerSocketChannel.html

SocketChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/SocketChannel.html

DatagramChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/DatagramChannel.html

少し、脱線しましたね。

DatagramChannel#bindを行った後は、DatagramChannelをネットワークインターフェースを指定して、マルチキャストグループに参加させます。

    // MembershipKeyを保持するList
    List<MembershipKey> membershipKeys = new ArrayList<>();

    Enumeration<NetworkInterface> enums = NetworkInterface.getNetworkInterfaces();
    while (enums.hasMoreElements()) {
        NetworkInterface ni = enums.nextElement();
        // IPマルチキャストデータグラムのネットワークインターフェースを設定
        channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);

        // InetAddressとNetworkInterfaceを指定して、
        // マルチキャストグループに参加する
        MembershipKey mkey = channel.join(address, ni);

        membershipKeys.add(mkey);
    }

今回は、すべてのネットワークインターフェースをマルチキャストグループに参加させました。NetworkInterfaceのインスタンスは、DatagramChannel#setOptionで、マルチキャスト用のネットワークインターフェースとしてオプション指定するみたいです。

あとは、DatagramChannel#joinでNetworkInterfaceをマルチキャストグループに参加させます。joinメソッドの戻り値は、MembershipKeyというクラスのインスタンスですが、これは後でマルチキャストグループを抜ける時に使用します。

なお、OIOのMulticastSocketと違ってNetworkInterfaceを指定せずにマルチキャストグループに参加するメソッドはないので、NetworkInterfaceは必ず何らかの形で指定することになります。

ここまで行ったら、普通のNIOを使ったUDPプログラムになります。

    Selector selector = Selector.open();
    channel.register(selector, SelectionKey.OP_READ);

Selector#selectや、Selector#selectedKeyを使って、Channelを監視したりしましょう。

マルチキャストグループを抜ける時は、参加時に取得したMembershipKeyクラスのdropメソッドを呼び出します。

    // マルチキャストグループを抜ける時は、
    // MembershipKey#dropを呼び出す
    for (MembershipKey mkey : membershipKeys) {
        mkey.drop();
    }

最後に、DatagramChannelをクローズしてください。

で、チャットプログラムなのですが、ここまでの例で作成したSelectorを使ってマルチキャストパケットを受け取るスレッドと

    private class Receiver implements Runnable {
        int timeout = 500;

        Selector selector;

        Receiver(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                // 登録されているチャネルがある限り、
                // もしくはメッセージ入力を終了するまで、ループする
                while (!selector.keys().isEmpty() && receiveContinually.get()) {
                    // 以降は、標準的なNIOを使った処理
                    selector.select(timeout);

                    Set<SelectionKey> selectedKeys = selector.selectedKeys();

                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        it.remove();

                        if (key.isValid()) {
                            if (key.isReadable()) {
                                handleReadable(key);
                            }
                            if (key.isWritable()) {
                                // writeは考慮しない
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }        
        }

        private void handleReadable(SelectionKey key) throws IOException {
            DatagramChannel channel = (DatagramChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(8192);

            SocketAddress address = channel.receive(buffer);

            if (address != null) {
                buffer.flip();

                String word = new String(buffer.array(),
                                         buffer.position(),
                                         buffer.limit(),
                                         StandardCharsets.UTF_8);
                System.out.println("message => " + word);
            }
        }
    }

それとは別にDatagramChannelを開いて、マルチキャストアドレスにデータを送信するクラスを作成して

    private class Sender implements Runnable {
        public void run() {
            try (DatagramChannel channel = DatagramChannel.open();
                 InputStreamReader isr = new InputStreamReader(System.in);
                 BufferedReader reader = new BufferedReader(isr)) {

                int port = 7600;
                SocketAddress address = new InetSocketAddress(InetAddress.getByName("228.8.8.8"),
                                                              port);
                

                channel.configureBlocking(false);
                channel.connect(address);

                while (channel.isConnected() == false);

                log("Client Startup.");

                ByteBuffer buffer = ByteBuffer.allocate(8192);

                while (true) {
                    String line = reader.readLine();
                    if (line == null || line.isEmpty()) {
                        continue;
                    } else if ("_exit_".equals(line)) {
                        receiveContinually.set(false);
                        break;
                    }

                    buffer.clear();

                    byte[] wordBinary = line.getBytes(StandardCharsets.UTF_8);
                    buffer.put(wordBinary);
                    buffer.flip();

                    while (channel.send(buffer, address) == 0);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

データ送信側に「_exit_」と入力すると、受信側がループを抜けるようにしました。フラグは、こちらです。

    private AtomicBoolean receiveContinually = new AtomicBoolean(true);

受信スレッドは、あくまでデータを受ける側ですからね。リプライはしませんからね。自身のコンソールには書き出しますが。

そんなスレッドを、同時に走らせて「_exit_」と入力されたらメイン側が抜けるようになります。

                List<Thread> threads = new ArrayList<>();
                threads.add(new Thread(new Receiver(selector)));
                threads.add(new Thread(new Sender()));

                for (Thread th : threads) {
                    th.start();
                }

                try {
                    for (Thread th : threads) {
                        th.join();
                    }
                } catch (InterruptedException e) { }

抜けたら、マルチキャストグループを抜けてお終い。

プログラムの全容は後で載せますが、動かすとこんな感じになります。一応、2つ起動しましょうか。
*同じホストです

# ひとつ
$ java UdpMulticastChannelApp 
[Sat Sep 28 00:29:27 JST 2013] Non Blocking Java Multicast Server  /228.8.8.8  7600 Startup.
[Sat Sep 28 00:29:27 JST 2013] Client Startup.

# ふたつ
$ java UdpMulticastChannelApp 
[Sat Sep 28 00:29:30 JST 2013] Non Blocking Java Multicast Server  /228.8.8.8  7600 Startup.
[Sat Sep 28 00:29:30 JST 2013] Client Startup.

片方で、いくらかメッセージを入力してみます。

$ java UdpMulticastChannelApp 
[Sat Sep 28 00:29:27 JST 2013] Non Blocking Java Multicast Server  /228.8.8.8  7600 Startup.
[Sat Sep 28 00:29:27 JST 2013] Client Startup.
Hello Workd
message => Hello Workd
NIOでマルチキャスト通信
message => NIOでマルチキャスト通信
こんにちは
message => こんにちは
_exit_
[Sat Sep 28 00:30:44 JST 2013] Leave Multicast Group Completed.

「message =>」と出ているのは、受信側のスレッドが受けたデータですね。「_exit_」で終了です。

もう片方のコンソールには

message => Hello Workd
message => NIOでマルチキャスト通信
message => こんにちは

と出力され、マルチキャストパケットを受信できていることがわかります。

基本的には、NIOを使ったユニキャスト通信と同じなので、困るところはないはず…だったのですが、マルチキャスト通信はMulticastSocketで書いたプログラムを、NIOはUDPユニキャストで行ったプログラムを参照して書いてたら、ごっちゃになってけっこう苦労しました。

まあ、とりあえずひととおりUDPまわりのクラスは、使えたのでいいかな?

いったん、UDPのお勉強はここまでです。

最後に、今回作成したプログラム全体を載せます。
UdpMulticastChannelApp.java

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.channels.DatagramChannel;
import java.nio.channels.MembershipKey;
import java.nio.channels.MulticastChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class UdpMulticastChannelApp {
    public static void main(String[] args) {
        new UdpMulticastChannelApp().execute();
    }


    public void execute() {
        try {
            int port = 7600;
            InetAddress address = InetAddress.getByName("228.8.8.8");

            // DatagramChannelの作成
            // DatagramChannelは、MulticastChannelインターフェースを実装している

            // DatagramChannel.openは、以下の様にちゃんとProtocolFamiryを指定した方が、
            // ホントは良いらしい
            // DatagramChannel.open(StandardProtocolFamily.INET)
            try (DatagramChannel channel = DatagramChannel.open()) {
                // ノンブロッキングモードに設定
                channel.configureBlocking(false);
                // 複数のメンバーが同じアドレスにバインドできるようにするため、
                // bindメソッドの呼び出し前にSO_REUSEADDRオプションを指定する
                channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                channel.socket().bind(new InetSocketAddress(port));

                // MembershipKeyを保持するList
                List<MembershipKey> membershipKeys = new ArrayList<>();

                Enumeration<NetworkInterface> enums = NetworkInterface.getNetworkInterfaces();
                while (enums.hasMoreElements()) {
                    NetworkInterface ni = enums.nextElement();
                    // IPマルチキャストデータグラムのネットワークインターフェースを設定
                    channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);

                    // InetAddressとNetworkInterfaceを指定して、
                    // マルチキャストグループに参加する
                    MembershipKey mkey = channel.join(address, ni);

                    membershipKeys.add(mkey);
                }

                Selector selector = Selector.open();
                channel.register(selector, SelectionKey.OP_READ);

                log("Non Blocking Java Multicast Server", address, port, "Startup.");

                List<Thread> threads = new ArrayList<>();
                threads.add(new Thread(new Receiver(selector)));
                threads.add(new Thread(new Sender()));

                for (Thread th : threads) {
                    th.start();
                }

                try {
                    for (Thread th : threads) {
                        th.join();
                    }
                } catch (InterruptedException e) { }

                // マルチキャストグループを抜ける時は、
                // MembershipKey#dropを呼び出す
                for (MembershipKey mkey : membershipKeys) {
                    mkey.drop();
                }

                log("Leave Multicast Group Completed.");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private AtomicBoolean receiveContinually = new AtomicBoolean(true);

    private class Receiver implements Runnable {
        int timeout = 500;

        Selector selector;

        Receiver(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                // 登録されているチャネルがある限り、
                // もしくはメッセージ入力を終了するまで、ループする
                while (!selector.keys().isEmpty() && receiveContinually.get()) {
                    // 以降は、標準的なNIOを使った処理
                    selector.select(timeout);

                    Set<SelectionKey> selectedKeys = selector.selectedKeys();

                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        it.remove();

                        if (key.isValid()) {
                            if (key.isReadable()) {
                                handleReadable(key);
                            }
                            if (key.isWritable()) {
                                // writeは考慮しない
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }        
        }

        private void handleReadable(SelectionKey key) throws IOException {
            DatagramChannel channel = (DatagramChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(8192);

            SocketAddress address = channel.receive(buffer);

            if (address != null) {
                buffer.flip();

                String word = new String(buffer.array(),
                                         buffer.position(),
                                         buffer.limit(),
                                         StandardCharsets.UTF_8);
                System.out.println("message => " + word);
            }
        }
    }

    private class Sender implements Runnable {
        public void run() {
            try (DatagramChannel channel = DatagramChannel.open();
                 InputStreamReader isr = new InputStreamReader(System.in);
                 BufferedReader reader = new BufferedReader(isr)) {

                int port = 7600;
                SocketAddress address = new InetSocketAddress(InetAddress.getByName("228.8.8.8"),
                                                              port);
                

                channel.configureBlocking(false);
                channel.connect(address);

                while (channel.isConnected() == false);

                log("Client Startup.");

                ByteBuffer buffer = ByteBuffer.allocate(8192);

                while (true) {
                    String line = reader.readLine();
                    if (line == null || line.isEmpty()) {
                        continue;
                    } else if ("_exit_".equals(line)) {
                        receiveContinually.set(false);
                        break;
                    }

                    buffer.clear();

                    byte[] wordBinary = line.getBytes(StandardCharsets.UTF_8);
                    buffer.put(wordBinary);
                    buffer.flip();

                    while (channel.send(buffer, address) == 0);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void log(Object msg, Object... msgs) {
        StringBuilder builder = new StringBuilder();
        builder.append('[');
        builder.append(new Date().toString());
        builder.append(']');
        builder.append(' ');
        builder.append(msg);

        for (int i = 0; i < msgs.length; i++) {
            builder.append(' ');
            if (i < msgs.length - 1) {
                builder.append(' ');
            }
            builder.append(msgs[i]);
        }

        System.out.println(builder.toString());
    }
}