最後、やるかどうか迷いましたが、せっかくなのでNIOを使ってUDPマルチキャスト通信まで試してみることにしました。
今回は、これまで参考にしていた書籍の範囲外です。
なぜなら、マルチキャスト用のChannel APIは、JDK 7で追加されたものだからです。2007年に、日本語訳が出た書籍でしたからね。
- 作者: エズモンド・ピット,岩谷宏
- 出版社/メーカー: ソフトバンク クリエイティブ
- 発売日: 2007/04/28
- メディア: 大型本
- 購入: 9人 クリック: 100回
- この商品を含むブログ (25件) を見る
で、マイナーなクラスなのか、これを取り扱っているエントリはほとんど見かけません。なのせ、せっかくなので今回だけはJavaで書こうと思います。
利用するNIOを使ったマルチキャストで登場する主要なインターフェース、クラスは、以下になります。
MulticastChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/MulticastChannel.html
MembershipKey
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/MembershipKey.html
だいたいの使い方は、それぞれのドキュメントに書いてあります。
MulticastChannelインターフェースを実装したクラスとしては、既存のDatagramChannelクラスがあるので、実はMulticastChannelインターフェースそのものを意識して使うことはないのかもしれません。
今回も、簡単なチャットっぽいアプリケーションを書いてみたいと思います。
まずは、マルチキャスト用のポートとInetAddressのインスタンスを取得します。
int port = 7600; InetAddress address = InetAddress.getByName("228.8.8.8");
DatagramChannel#openを呼び出します。
// DatagramChannelの作成 // DatagramChannelは、MulticastChannelインターフェースを実装している // DatagramChannel.openは、以下の様にちゃんとProtocolFamiryを指定した方が、 // ホントは良いらしい // DatagramChannel.open(StandardProtocolFamily.INET) try (DatagramChannel channel = DatagramChannel.open()) {
コメントにも書いていますが、open時に本当はProtocolFamiryを指定すべきみたいですね。コメントの例は、IPv4になっています。
ノンブロッキングモードに設定しつつ、bindする前にsetOptionでSO_RESUSEADDRオプションを有効にします。
// ノンブロッキングモードに設定 channel.configureBlocking(false); // 複数のメンバーが同じアドレスにバインドできるようにするため、 // bindメソッドの呼び出し前にSO_REUSEADDRオプションを指定する channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.socket().bind(new InetSocketAddress(port));
SO_REUSEADDRオプションを有効にしなかった場合、同じマルチキャストアドレスを使用するプログラムが、同じホスト上で起動できなくなります。
ちなみに、NetworkChannelインターフェース(SocketChannelやDatagramChannelが実装しているインターフェースです)のsetOptionメソッド自体、JDK 7で追加されたものみたいです。
JDK 7で初めて、NIOでソケットオプションを指定できるようになったんですね。
サポートされているオプションは、ServerSocketChannel、SocketChannel、DatagramSocketChannelのドキュメントを参照してください。
ServerSocketChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/ServerSocketChannel.html
SocketChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/SocketChannel.html
DatagramChannel
http://docs.oracle.com/javase/jp/7/api/java/nio/channels/DatagramChannel.html
少し、脱線しましたね。
DatagramChannel#bindを行った後は、DatagramChannelをネットワークインターフェースを指定して、マルチキャストグループに参加させます。
// MembershipKeyを保持するList List<MembershipKey> membershipKeys = new ArrayList<>(); Enumeration<NetworkInterface> enums = NetworkInterface.getNetworkInterfaces(); while (enums.hasMoreElements()) { NetworkInterface ni = enums.nextElement(); // IPマルチキャストデータグラムのネットワークインターフェースを設定 channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni); // InetAddressとNetworkInterfaceを指定して、 // マルチキャストグループに参加する MembershipKey mkey = channel.join(address, ni); membershipKeys.add(mkey); }
今回は、すべてのネットワークインターフェースをマルチキャストグループに参加させました。NetworkInterfaceのインスタンスは、DatagramChannel#setOptionで、マルチキャスト用のネットワークインターフェースとしてオプション指定するみたいです。
あとは、DatagramChannel#joinでNetworkInterfaceをマルチキャストグループに参加させます。joinメソッドの戻り値は、MembershipKeyというクラスのインスタンスですが、これは後でマルチキャストグループを抜ける時に使用します。
なお、OIOのMulticastSocketと違ってNetworkInterfaceを指定せずにマルチキャストグループに参加するメソッドはないので、NetworkInterfaceは必ず何らかの形で指定することになります。
ここまで行ったら、普通のNIOを使ったUDPプログラムになります。
Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_READ);
Selector#selectや、Selector#selectedKeyを使って、Channelを監視したりしましょう。
マルチキャストグループを抜ける時は、参加時に取得したMembershipKeyクラスのdropメソッドを呼び出します。
// マルチキャストグループを抜ける時は、 // MembershipKey#dropを呼び出す for (MembershipKey mkey : membershipKeys) { mkey.drop(); }
最後に、DatagramChannelをクローズしてください。
で、チャットプログラムなのですが、ここまでの例で作成したSelectorを使ってマルチキャストパケットを受け取るスレッドと
private class Receiver implements Runnable { int timeout = 500; Selector selector; Receiver(Selector selector) { this.selector = selector; } public void run() { try { // 登録されているチャネルがある限り、 // もしくはメッセージ入力を終了するまで、ループする while (!selector.keys().isEmpty() && receiveContinually.get()) { // 以降は、標準的なNIOを使った処理 selector.select(timeout); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isValid()) { if (key.isReadable()) { handleReadable(key); } if (key.isWritable()) { // writeは考慮しない } } } } } catch (IOException e) { e.printStackTrace(); } } private void handleReadable(SelectionKey key) throws IOException { DatagramChannel channel = (DatagramChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(8192); SocketAddress address = channel.receive(buffer); if (address != null) { buffer.flip(); String word = new String(buffer.array(), buffer.position(), buffer.limit(), StandardCharsets.UTF_8); System.out.println("message => " + word); } } }
それとは別にDatagramChannelを開いて、マルチキャストアドレスにデータを送信するクラスを作成して
private class Sender implements Runnable { public void run() { try (DatagramChannel channel = DatagramChannel.open(); InputStreamReader isr = new InputStreamReader(System.in); BufferedReader reader = new BufferedReader(isr)) { int port = 7600; SocketAddress address = new InetSocketAddress(InetAddress.getByName("228.8.8.8"), port); channel.configureBlocking(false); channel.connect(address); while (channel.isConnected() == false); log("Client Startup."); ByteBuffer buffer = ByteBuffer.allocate(8192); while (true) { String line = reader.readLine(); if (line == null || line.isEmpty()) { continue; } else if ("_exit_".equals(line)) { receiveContinually.set(false); break; } buffer.clear(); byte[] wordBinary = line.getBytes(StandardCharsets.UTF_8); buffer.put(wordBinary); buffer.flip(); while (channel.send(buffer, address) == 0); } } catch (IOException e) { e.printStackTrace(); } } }
データ送信側に「_exit_」と入力すると、受信側がループを抜けるようにしました。フラグは、こちらです。
private AtomicBoolean receiveContinually = new AtomicBoolean(true);
受信スレッドは、あくまでデータを受ける側ですからね。リプライはしませんからね。自身のコンソールには書き出しますが。
そんなスレッドを、同時に走らせて「_exit_」と入力されたらメイン側が抜けるようになります。
List<Thread> threads = new ArrayList<>(); threads.add(new Thread(new Receiver(selector))); threads.add(new Thread(new Sender())); for (Thread th : threads) { th.start(); } try { for (Thread th : threads) { th.join(); } } catch (InterruptedException e) { }
抜けたら、マルチキャストグループを抜けてお終い。
プログラムの全容は後で載せますが、動かすとこんな感じになります。一応、2つ起動しましょうか。
*同じホストです
# ひとつ $ java UdpMulticastChannelApp [Sat Sep 28 00:29:27 JST 2013] Non Blocking Java Multicast Server /228.8.8.8 7600 Startup. [Sat Sep 28 00:29:27 JST 2013] Client Startup. # ふたつ $ java UdpMulticastChannelApp [Sat Sep 28 00:29:30 JST 2013] Non Blocking Java Multicast Server /228.8.8.8 7600 Startup. [Sat Sep 28 00:29:30 JST 2013] Client Startup.
片方で、いくらかメッセージを入力してみます。
$ java UdpMulticastChannelApp [Sat Sep 28 00:29:27 JST 2013] Non Blocking Java Multicast Server /228.8.8.8 7600 Startup. [Sat Sep 28 00:29:27 JST 2013] Client Startup. Hello Workd message => Hello Workd NIOでマルチキャスト通信 message => NIOでマルチキャスト通信 こんにちは message => こんにちは _exit_ [Sat Sep 28 00:30:44 JST 2013] Leave Multicast Group Completed.
「message =>」と出ているのは、受信側のスレッドが受けたデータですね。「_exit_」で終了です。
もう片方のコンソールには
message => Hello Workd message => NIOでマルチキャスト通信 message => こんにちは
と出力され、マルチキャストパケットを受信できていることがわかります。
基本的には、NIOを使ったユニキャスト通信と同じなので、困るところはないはず…だったのですが、マルチキャスト通信はMulticastSocketで書いたプログラムを、NIOはUDPユニキャストで行ったプログラムを参照して書いてたら、ごっちゃになってけっこう苦労しました。
まあ、とりあえずひととおりUDPまわりのクラスは、使えたのでいいかな?
いったん、UDPのお勉強はここまでです。
最後に、今回作成したプログラム全体を載せます。
UdpMulticastChannelApp.java
import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.StandardProtocolFamily; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.channels.DatagramChannel; import java.nio.channels.MembershipKey; import java.nio.channels.MulticastChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; import java.util.Date; import java.util.Enumeration; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; public class UdpMulticastChannelApp { public static void main(String[] args) { new UdpMulticastChannelApp().execute(); } public void execute() { try { int port = 7600; InetAddress address = InetAddress.getByName("228.8.8.8"); // DatagramChannelの作成 // DatagramChannelは、MulticastChannelインターフェースを実装している // DatagramChannel.openは、以下の様にちゃんとProtocolFamiryを指定した方が、 // ホントは良いらしい // DatagramChannel.open(StandardProtocolFamily.INET) try (DatagramChannel channel = DatagramChannel.open()) { // ノンブロッキングモードに設定 channel.configureBlocking(false); // 複数のメンバーが同じアドレスにバインドできるようにするため、 // bindメソッドの呼び出し前にSO_REUSEADDRオプションを指定する channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.socket().bind(new InetSocketAddress(port)); // MembershipKeyを保持するList List<MembershipKey> membershipKeys = new ArrayList<>(); Enumeration<NetworkInterface> enums = NetworkInterface.getNetworkInterfaces(); while (enums.hasMoreElements()) { NetworkInterface ni = enums.nextElement(); // IPマルチキャストデータグラムのネットワークインターフェースを設定 channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni); // InetAddressとNetworkInterfaceを指定して、 // マルチキャストグループに参加する MembershipKey mkey = channel.join(address, ni); membershipKeys.add(mkey); } Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_READ); log("Non Blocking Java Multicast Server", address, port, "Startup."); List<Thread> threads = new ArrayList<>(); threads.add(new Thread(new Receiver(selector))); threads.add(new Thread(new Sender())); for (Thread th : threads) { th.start(); } try { for (Thread th : threads) { th.join(); } } catch (InterruptedException e) { } // マルチキャストグループを抜ける時は、 // MembershipKey#dropを呼び出す for (MembershipKey mkey : membershipKeys) { mkey.drop(); } log("Leave Multicast Group Completed."); } } catch (IOException e) { e.printStackTrace(); } } private AtomicBoolean receiveContinually = new AtomicBoolean(true); private class Receiver implements Runnable { int timeout = 500; Selector selector; Receiver(Selector selector) { this.selector = selector; } public void run() { try { // 登録されているチャネルがある限り、 // もしくはメッセージ入力を終了するまで、ループする while (!selector.keys().isEmpty() && receiveContinually.get()) { // 以降は、標準的なNIOを使った処理 selector.select(timeout); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isValid()) { if (key.isReadable()) { handleReadable(key); } if (key.isWritable()) { // writeは考慮しない } } } } } catch (IOException e) { e.printStackTrace(); } } private void handleReadable(SelectionKey key) throws IOException { DatagramChannel channel = (DatagramChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(8192); SocketAddress address = channel.receive(buffer); if (address != null) { buffer.flip(); String word = new String(buffer.array(), buffer.position(), buffer.limit(), StandardCharsets.UTF_8); System.out.println("message => " + word); } } } private class Sender implements Runnable { public void run() { try (DatagramChannel channel = DatagramChannel.open(); InputStreamReader isr = new InputStreamReader(System.in); BufferedReader reader = new BufferedReader(isr)) { int port = 7600; SocketAddress address = new InetSocketAddress(InetAddress.getByName("228.8.8.8"), port); channel.configureBlocking(false); channel.connect(address); while (channel.isConnected() == false); log("Client Startup."); ByteBuffer buffer = ByteBuffer.allocate(8192); while (true) { String line = reader.readLine(); if (line == null || line.isEmpty()) { continue; } else if ("_exit_".equals(line)) { receiveContinually.set(false); break; } buffer.clear(); byte[] wordBinary = line.getBytes(StandardCharsets.UTF_8); buffer.put(wordBinary); buffer.flip(); while (channel.send(buffer, address) == 0); } } catch (IOException e) { e.printStackTrace(); } } } private void log(Object msg, Object... msgs) { StringBuilder builder = new StringBuilder(); builder.append('['); builder.append(new Date().toString()); builder.append(']'); builder.append(' '); builder.append(msg); for (int i = 0; i < msgs.length; i++) { builder.append(' '); if (i < msgs.length - 1) { builder.append(' '); } builder.append(msgs[i]); } System.out.println(builder.toString()); } }