これは、なにをしたくて書いたもの?
Java 16でUNIXドメインソケットが扱えるようになったらしく、Java 17も出たことですし触っておこうかなと。
JEPs in JDK 17 integrated since JDK 11
JEP 380: Unix-Domain Socket Channels
UUNIXnixドメインソケットとは
UNIXドメインソケットとは、ひとつのサーバー内でのプロセス間通信に使われるインターフェースです。
表現方法としてはファイルになり、作成したソケットファイルを使ってクライアント・サーバーの通信を可能にします。
といっても、ネットワークに関するヘッダー等もなく、データをコピーしてやり取りするパイプのような挙動になります。
プログラム上ではソケットとして扱え、単一マシン上の通信であればTCP/IPよりも高速動作させることができます。
JEP 380
Java 16で導入されたJEP 380で、UNIXドメインソケットをJavaで利用できるようになりました。
JEP 380: Unix-Domain Socket Channels
JEP-380: Unix domain socket channels – Inside.java
特徴としては、以下になります。
- 主要なUnixプラットフォームとWindowsのサポート
java.net.UnixDomainSocketAddress
クラスの追加java.net.StandardProtocolFamily
列挙型の追加ServerSocketChannel
およびSocketChannel
の修正
UnixDomainSocketAddress
クラスは、こちら。
UnixDomainSocketAddress (Java SE 17 & JDK 17)
制限事項としては、こちらになります。
- Selectorは使えない
- UDPは未対応
ServerSocketChannel
およびSocketChannel
をサポートするのみであり、ServerSocket
やSocket
といった既存のソケットクラスには対応していない
では、使っていってみましょう。
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.1 2021-10-19 OpenJDK Runtime Environment (build 17.0.1+12-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.1+12-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.1, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-91-generic", arch: "amd64", family: "unix"
準備
簡単なテストコード、それからEchoサーバー/クライアントを作成して確認することにします。
Maven依存関係は、テストまわりのみです。
<dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.8.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.21.0</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
簡単に使い方を確認
まずは、簡単に使い方を確認します。
テストコードの雛形は、こちら。
src/test/java/org/littlewings/io/UnixDomainSocketTest.java package org.littlewings.io; import java.io.IOException; import java.net.BindException; import java.net.ServerSocket; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; import java.nio.channels.DatagramChannel; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class UnixDomainSocketTest { // ここに、テストを書く! }
UnixDomainSocketAddress
のインスタンスを作成するには、UnixDomainSocketAddress#of
にPath
またはString
で
ソケットファイルのパスを指定します。一時ファイルでも構いません。
@Test public void createUnixDomainSocketAddress() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); Path tmpPath = Files.createTempFile("socket", "sock"); UnixDomainSocketAddress address2 = UnixDomainSocketAddress.of(tmpPath); Files.delete(tmpPath); }
ServerSocketChannel
で使う時は、ServerSocketChannel#open
でStandardProtocolFamily.UNIX
を指定し、
作成したUnixDomainSocketAddress
をServerSocketChannel#bind
に渡します。
@Test public void useServerSocketChannel() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); serverSocketChannel.bind(address); Files.delete(path); }
UnixDomainSocketAddress#of
で指定するソケットファイルは、存在する必要はありません。
ServerSocketChannel#bind
時に作成されます。
ちなみに、このソケットファイルはサーバー側のプロセス停止時に削除しておかないと、さらにServerSocketChannel#bind
を
指定すると「アドレスは既に使用中です」と怒られることになります。
@Test public void deleteSocketFile() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); serverSocketChannel.bind(address); UnixDomainSocketAddress address2 = UnixDomainSocketAddress.of(path); ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open(StandardProtocolFamily.UNIX); assertThatThrownBy(() -> serverSocketChannel2.bind(address2)) .isInstanceOf(BindException.class) .hasMessage("アドレスは既に使用中です"); Files.delete(path); }
クライアント側で使う時は、SocketChannel#open
でStandardProtocolFamily.UNIX
を指定して作成したSocketChannel
に
対して、SocketChannel#connect
でUnixDomainSocketAddress
を指定して使います。
@Disabled @Test public void useClientSocketChannel() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); SocketChannel socketChannel = SocketChannel.open(StandardProtocolFamily.UNIX); socketChannel.connect(address); }
単体では動作しないので、このテストケースは@Disabled
にしていますが…。
続いては、注意事項。
Selector
は使えません。
@Test public void cannotUseSelector() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(address); Selector selector = Selector.open(); serverSocketChannel.register(selector, serverSocketChannel.validOps()); // Selectorは使えない Files.delete(path); }
一応ServerSocketChannel
で試してみましたが、接続はできてもReadが機能しませんでした…。
java.net
なソケットはサポートしていません。
@Test public void oioUnsupported() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); ServerSocket serverSocket = new ServerSocket(); assertThatThrownBy(() -> serverSocket.bind(address)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Unsupported address type"); }
UDPもサポートしていません。
@Test public void udpUnsupported() throws IOException { Path path = Paths.get("/tmp/socket.sock"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path); assertThatThrownBy(() -> DatagramChannel.open(StandardProtocolFamily.UNIX)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Protocol family not supported"); }
Echoサーバー/クライアントを作成する
続いては、UNIXドメインソケットを使ってサンプル的にEchoサーバーとクライアントを作成したいと思います。
まずは、サーバーから。
src/main/java/org/littlewings/io/UnixDomainSocketEchoServer.java
package org.littlewings.io; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; public class UnixDomainSocketEchoServer implements Closeable { Logger logger = Logger.getLogger(UnixDomainSocketEchoServer.class.getName()); Path socketPath; volatile ServerSocketChannel serverSocketChannel; Thread serverThread; ExecutorService executor; CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch shutdownLatch = new CountDownLatch(1); Duration sleepTime; public static void main(String... args) throws IOException { try (UnixDomainSocketEchoServer server = new UnixDomainSocketEchoServer(Paths.get("/tmp/echo-server.sock"))) { server.start(); System.console().readLine("> stop, enter..."); } } public UnixDomainSocketEchoServer(Path socketPath) { this(socketPath, Duration.ofSeconds(0L)); } public UnixDomainSocketEchoServer(Path socketPath, Duration sleepTime) { this.socketPath = socketPath; this.sleepTime = sleepTime; } public void start() { serverThread = new Thread(() -> { try { executor = Executors.newFixedThreadPool(10); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(socketPath); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { Files.deleteIfExists(socketPath); } catch (IOException e) { // ignore } })); serverSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); serverSocketChannel.bind(address); logger.info("[" + getThreadName() + "] Echo Server using Unix Domain Socket[" + socketPath + "], started"); startLatch.countDown(); SocketChannel channel; while ((channel = serverSocketChannel.accept()) != null) { handleClient(executor, channel); } } catch (AsynchronousCloseException e) { // close } catch (IOException e) { throw new UncheckedIOException(e); } shutdownLatch.countDown(); }); serverThread.start(); try { startLatch.await(); } catch (InterruptedException e) { // ignore } } void handleClient(ExecutorService executor, SocketChannel channel) { executor.submit(() -> { try (channel) { logger.info("[" + getThreadName() + "] accept client[" + channel + "]"); try { TimeUnit.SECONDS.sleep(sleepTime.getSeconds()); } catch (InterruptedException e) { // ignore } ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); String message = StandardCharsets.UTF_8.decode(buffer).toString(); logger.info("[" + getThreadName() + "] request message = [" + message + "], client[" + channel + "]"); String replyMessage = "Reply: " + message; channel.write(ByteBuffer.wrap(replyMessage.getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { throw new UncheckedIOException(e); } }); } String getThreadName() { return Thread.currentThread().getName(); } @Override public void close() throws IOException { serverSocketChannel.close(); try { shutdownLatch.await(); } catch (InterruptedException e) { // ignore } executor.shutdown(); Files.delete(socketPath); logger.info("[" + getThreadName() + "] Echo Server using Unix Domain Socket[" + socketPath + "], shutdown"); } }
コンストラクタでは、ソケットファイルのPath
だけ指定します。
public UnixDomainSocketEchoServer(Path socketPath) { this(socketPath, Duration.ofSeconds(0L)); } public UnixDomainSocketEchoServer(Path socketPath, Duration sleepTime) { this.socketPath = socketPath; this.sleepTime = sleepTime; }
あとでスリープさせるテストもしたいので、Duration
も引数に指定できるようにしていますが。
サーバーの起動処理。別スレッドで動作させることにします。
public void start() { serverThread = new Thread(() -> { try { executor = Executors.newFixedThreadPool(10); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(socketPath); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { Files.deleteIfExists(socketPath); } catch (IOException e) { // ignore } })); serverSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); serverSocketChannel.bind(address); logger.info("[" + getThreadName() + "] Echo Server using Unix Domain Socket[" + socketPath + "], started"); startLatch.countDown(); SocketChannel channel; while ((channel = serverSocketChannel.accept()) != null) { handleClient(executor, channel); } } catch (AsynchronousCloseException e) { // close } catch (IOException e) { throw new UncheckedIOException(e); } shutdownLatch.countDown(); }); serverThread.start(); try { startLatch.await(); } catch (InterruptedException e) { // ignore } }
念の為、ShutdownHookでソケットファイルが残っていたら削除する処理も仕込んでおきます。
同時接続数は、いったん10まで受け付けられるようにして、ServerSocketChannel#accept
で受け取った接続は
別のメソッドに渡します。
try { executor = Executors.newFixedThreadPool(10); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(socketPath); serverSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); serverSocketChannel.bind(address); logger.info("[" + getThreadName() + "] Echo Server using Unix Domain Socket[" + socketPath + "], started"); startLatch.countDown(); SocketChannel channel; while ((channel = serverSocketChannel.accept()) != null) { handleClient(executor, channel); } } catch (AsynchronousCloseException e) { // close } catch (IOException e) { throw new UncheckedIOException(e); } shutdownLatch.countDown();
受け取った接続は、ExecutorService
で処理。
void handleClient(ExecutorService executor, SocketChannel channel) { executor.submit(() -> { try (channel) { logger.info("[" + getThreadName() + "] accept client[" + channel + "]"); try { TimeUnit.SECONDS.sleep(sleepTime.getSeconds()); } catch (InterruptedException e) { // ignore } ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); String message = StandardCharsets.UTF_8.decode(buffer).toString(); logger.info("[" + getThreadName() + "] request message = [" + message + "], client[" + channel + "]"); String replyMessage = "Reply: " + message; channel.write(ByteBuffer.wrap(replyMessage.getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { throw new UncheckedIOException(e); } }); }
ひとつの接続にひとつのスレッドを割り当てるスタイルですね。
停止。一応、起動処理で使った別スレッドを待ち合わせています。
@Override public void close() throws IOException { serverSocketChannel.close(); try { shutdownLatch.await(); } catch (InterruptedException e) { // ignore } executor.shutdown(); Files.delete(socketPath); logger.info("[" + getThreadName() + "] Echo Server using Unix Domain Socket[" + socketPath + "], shutdown"); }
main
メソッドも用意したので
public static void main(String... args) throws IOException { try (UnixDomainSocketEchoServer server = new UnixDomainSocketEchoServer(Paths.get("/tmp/echo-server.sock"))) { server.start(); System.console().readLine("> stop, enter..."); } }
起動してみましょう。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.io.UnixDomainSocketEchoServer
Enterを入力すると停止します。
12月 26, 2021 2:05:59 午前 org.littlewings.io.UnixDomainSocketEchoServer lambda$start$0 情報: [Thread-1] Echo Server using Unix Domain Socket[/tmp/echo-server.sock], started > stop, enter...
nc
コマンドで確認。
$ echo -n 'Hello World' | nc -U /tmp/echo-server.sock Reply: Hello World
OKですね。
この時のサーバー側のログ。
12月 26, 2021 2:06:45 午前 org.littlewings.io.UnixDomainSocketEchoServer lambda$handleClient$1 情報: [pool-1-thread-1] request message = [Hello World], client[java.nio.channels.SocketChannel[connected local= remote=]]
接続情報は、記載がありませんね…。
続いて、クライアント側を作成。
src/main/java/org/littlewings/io/UnixDomainSocketEchoClient.java
package org.littlewings.io; import java.io.IOException; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.logging.Logger; public class UnixDomainSocketEchoClient { Logger logger = Logger.getLogger(UnixDomainSocketEchoClient.class.getName()); Path socketPath; public static void main(String... args) throws IOException { UnixDomainSocketEchoClient client = new UnixDomainSocketEchoClient(Paths.get("/tmp/echo-server.sock")); client.sendMessage("Hello World"); } public UnixDomainSocketEchoClient(Path socketPath) { this.socketPath = socketPath; } public String sendMessage(String message) throws IOException { logger.info("Echo Client using Unix Domain Socket, started"); UnixDomainSocketAddress address = UnixDomainSocketAddress.of(socketPath); try (SocketChannel socketChannel = SocketChannel.open(StandardProtocolFamily.UNIX)) { socketChannel.connect(address); logger.info("connect server[" + socketChannel.getRemoteAddress() + "]"); socketChannel.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8))); logger.info("send message[" + message + "]"); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.read(buffer); buffer.flip(); String replyMessage = StandardCharsets.UTF_8.decode(buffer).toString(); logger.info("response message[" + replyMessage + "]"); logger.info("Echo Client using Unix Domain Socket, end"); return replyMessage; } } }
こちらは、まあシンプルですね。
確認。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.io.UnixDomainSocketEchoClient
OKです。
12月 26, 2021 2:08:47 午前 org.littlewings.io.UnixDomainSocketEchoClient sendMessage 情報: Echo Client using Unix Domain Socket, started 12月 26, 2021 2:08:47 午前 org.littlewings.io.UnixDomainSocketEchoClient sendMessage 情報: connect server[/tmp/echo-server.sock] 12月 26, 2021 2:08:47 午前 org.littlewings.io.UnixDomainSocketEchoClient sendMessage 情報: send message[Hello World] 12月 26, 2021 2:08:47 午前 org.littlewings.io.UnixDomainSocketEchoClient sendMessage 情報: response message[Reply: Hello World] 12月 26, 2021 2:08:47 午前 org.littlewings.io.UnixDomainSocketEchoClient sendMessage 情報: Echo Client using Unix Domain Socket, end
最後に、テストコードで確認してみます。
src/test/java/org/littlewings/io/UnixDomainSocketEchoTest.java
package org.littlewings.io; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class UnixDomainSocketEchoTest { @Test public void simple() throws IOException { Path socketPath = Path.of("/tmp/echo-server.sock"); try (UnixDomainSocketEchoServer server = new UnixDomainSocketEchoServer(socketPath)) { server.start(); UnixDomainSocketEchoClient client = new UnixDomainSocketEchoClient(socketPath); String replyMessage = client.sendMessage("Hello World"); assertThat(replyMessage).isEqualTo("Reply: Hello World"); } } @Test public void multi() throws IOException { Path socketPath = Path.of("/tmp/echo-server.sock"); try (UnixDomainSocketEchoServer server = new UnixDomainSocketEchoServer(socketPath, Duration.ofSeconds(5L))) { server.start(); int size = 5; List<String> messages = IntStream.rangeClosed(1, size).mapToObj(i -> "Hello-" + UUID.randomUUID()).toList(); ExecutorService executor = Executors.newFixedThreadPool(size); List<CompletableFuture<String>> futures = messages .stream() .map(message -> CompletableFuture.supplyAsync(() -> { try { UnixDomainSocketEchoClient client = new UnixDomainSocketEchoClient(socketPath); return client.sendMessage(message); } catch (IOException e) { throw new UncheckedIOException(e); } }, executor) ) .toList(); List<String> responseMessages = futures.stream().map(CompletableFuture::join).toList(); List<String> expected = messages.stream().map(m -> "Reply: " + m).toList(); assertThat(responseMessages).containsExactlyElementsOf(expected); } } }
シンプルに使うパターンと
@Test public void simple() throws IOException { Path socketPath = Path.of("/tmp/echo-server.sock"); try (UnixDomainSocketEchoServer server = new UnixDomainSocketEchoServer(socketPath)) { server.start(); UnixDomainSocketEchoClient client = new UnixDomainSocketEchoClient(socketPath); String replyMessage = client.sendMessage("Hello World"); assertThat(replyMessage).isEqualTo("Reply: Hello World"); } }
サーバー側にスリープ+クライアント側をマルチスレッドにして、各接続を正しく扱えるかどうかを確認。
@Test public void multi() throws IOException { Path socketPath = Path.of("/tmp/echo-server.sock"); try (UnixDomainSocketEchoServer server = new UnixDomainSocketEchoServer(socketPath, Duration.ofSeconds(5L))) { server.start(); int size = 5; List<String> messages = IntStream.rangeClosed(1, size).mapToObj(i -> "Hello-" + UUID.randomUUID()).toList(); ExecutorService executor = Executors.newFixedThreadPool(size); List<CompletableFuture<String>> futures = messages .stream() .map(message -> CompletableFuture.supplyAsync(() -> { try { UnixDomainSocketEchoClient client = new UnixDomainSocketEchoClient(socketPath); return client.sendMessage(message); } catch (IOException e) { throw new UncheckedIOException(e); } }, executor) ) .toList(); List<String> responseMessages = futures.stream().map(CompletableFuture::join).toList(); List<String> expected = messages.stream().map(m -> "Reply: " + m).toList(); assertThat(responseMessages).containsExactlyElementsOf(expected); } }
確認。
$ mvn test
こんなところでしょう。
まとめ
Java 16で追加された、UNIXドメインソケットを試してみました。
既存のIO、NIOの流れでそのまま使えるのかな?と思いきや、Selector
で使えなかったりjava.net
では使えなかったりと
いろいろありますが。
覚えておいてもよいかなと思います。