これは、なにをしたくて書いたもの?
QuarkusにWebSocketのExtensionがあるのを見て、久しぶりにWebSocketをちょっと触ってみたいなと思いまして。
簡単に遊んでみることにしました。
Quarkus WebSockets Extension
Quarkusにはwebsockets
というExtensionがあり、こちらにWebSocketの機能が含まれています。
このExtensionにはWebSocketのサーバー、クライアントの両方が含まれており、クライアントのみを使う場合はwebsockets-client
を
使用します(websockets-client
はwebsockets
に含まれています)。
実際にプログラムを書く際に使うのは、Jakarta WebSocket 1.1のAPIになります。
Jakarta WebSocket 1.1 | The Eclipse Foundation
Javadocはこちら。
Jakarta WebSocket API documentation
WebSocketといえばチャットですね。Quarkusのガイドもチャットの例になっていますが、こちらを横目に見つつ、今回もチャットを
テーマに書いていきたいと思います。
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.1 2021-10-19 OpenJDK Runtime Environment (build 17.0.1+12-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.1+12-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.1, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-100-generic", arch: "amd64", family: "unix"
WebSocketサーバーを作成する
まずはサーバー側から作成していきましょう。websockets
Extensionを指定して、プロジェクトを作成。
$ mvn io.quarkus.platform:quarkus-maven-plugin:2.7.1.Final:create \ -DprojectGroupId=org.littlewings \ -DprojectArtifactId=websocket-server \ -DprojectVersion=0.0.1-SNAPSHOT \ -Dextensions="websockets"
選択されたExtensionと、Codestart。
[INFO] selected extensions: - io.quarkus:quarkus-websockets [INFO] applying codestarts... [INFO] 📚 java 🔨 maven 📦 quarkus 📝 config-properties 🔧 dockerfiles 🔧 maven-wrapper 🚀 websockets-codestart
作成されたプロジェクト内に入ってみます。
$ cd websocket-server
Maven依存関係はこちら。
<dependencies> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-websockets</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-arc</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> </dependencies>
quarkus-websockets
には前述のとおりWebSocketのクライアント側も含まれていますが、今回はこのままいきます。
ディレクトリ構成。
$ tree . ├── README.md ├── mvnw ├── mvnw.cmd ├── pom.xml └── src └── main ├── docker │ ├── Dockerfile.jvm │ ├── Dockerfile.legacy-jar │ ├── Dockerfile.native │ └── Dockerfile.native-micro ├── java │ └── org │ └── littlewings │ └── StartWebSocket.java └── resources ├── META-INF │ └── resources │ └── index.html └── application.properties 9 directories, 11 files
テストコードは生成されないんですね。
サンプルとして含まれているソースコードはこちら。
src/main/java/org/littlewings/StartWebSocket.java
package ilove.quark.us; import javax.enterprise.context.ApplicationScoped; import javax.websocket.EncodeException; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import static java.util.Objects.requireNonNull; @ServerEndpoint("/start-websocket/{name}") @ApplicationScoped public class StartWebSocket { @OnOpen public void onOpen(Session session, @PathParam("name") String name) { System.out.println("onOpen> " + name); } @OnClose public void onClose(Session session, @PathParam("name") String name) { System.out.println("onClose> " + name); } @OnError public void onError(Session session, @PathParam("name") String name, Throwable throwable) { System.out.println("onError> " + name + ": " + throwable); } @OnMessage public void onMessage(String message, @PathParam("name") String name) { System.out.println("onMessage> " + name + ": " + message); } }
使わないので削除します。
$ rm src/main/java/org/littlewings/StartWebSocket.java
ガイドのサンプルに近いですが、サーバー側はこんな感じにしました。
src/main/java/org/littlewings/quarkus/websocket/ChatServer.java
package org.littlewings.quarkus.websocket; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.enterprise.context.ApplicationScoped; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import org.jboss.logging.Logger; @ServerEndpoint("/chat/{user}") @ApplicationScoped public class ChatServer { Logger logger = Logger.getLogger(ChatServer.class); ConcurrentMap<String, Session> users = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session, @PathParam("user") String user) { String username = URLDecoder.decode(user, StandardCharsets.UTF_8); users.put(username, session); broadcast(String.format("[enter user: %s]", username)); broadcast(String.format("[current users: %s]", String.join(", ", users.keySet()))); } @OnClose public void onClose(Session session, @PathParam("user") String user) { String username = URLDecoder.decode(user, StandardCharsets.UTF_8); users.remove(username, session); broadcast(String.format("[leave user: %s]", username)); broadcast(String.format("[current users: %s]", String.join(", ", users.keySet()))); } @OnMessage public void onMessage(Session session, @PathParam("user") String user, String message) { String username = URLDecoder.decode(user, StandardCharsets.UTF_8); broadcast(String.format("[%s] %s", username, message)); } @OnError public void onError(Session session, @PathParam("user") String user, Throwable throwable) { String username = URLDecoder.decode(user, StandardCharsets.UTF_8); users.remove(username, session); broadcast(String.format("left user[%s] on error", username)); logger.errorf(throwable, "unexpected error, user: %s", user); } void broadcast(String message) { logger.infof("notification for all users [%s]", String.join(", ", users.keySet())); users.values().forEach(session -> session.getAsyncRemote().sendText(message, result -> { if (result.getException() != null) { logger.warnf("unable send message, reason = %s", result.getException().getMessage()); } }) ); } }
少しだけ説明しておくと、@ServerEndpoint
アノテーションでユーザーを@PathParam
として取得できるようにして、このクラスで
ひとつのチャットルームを管理するようにしています。
@ServerEndpoint("/chat/{user}") @ApplicationScoped public class ChatServer {
@ApplicationScoped
アノテーションは付与しなくても、WebSocket自体は動作します。
ですが、今回の構成の場合で@ApplicationScoped
アノテーションを付与しない場合、WebSocketのセッションごとに新しいサーバーインスタンスが
できてしまってユーザーが孤立したルームにいることになってしまうので、@ApplicationScoped
アノテーションを付与してインスタンスを
共有するようにしています。
確認ですが、ガイドで使っているHTML+JavaScriptを少しカスタマイズして使いましょう。
こんな感じで。
src/main/resources/META-INF/resources/chat.html
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Quarkus Chat!</title> <link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css"> <link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css"> <style> #chat { resize: none; overflow: hidden; min-height: 300px; max-height: 300px; } </style> </head> <body> <nav class="navbar navbar-default navbar-pf" role="navigation"> <div class="navbar-header"> <a class="navbar-brand" href="/"> <p><strong>>> Quarkus Chat!</strong></p> </a> </div> </nav> <div class="container"> <br/> <div class="row"> <input id="name" class="col-md-4" type="text" placeholder="your name"> <button id="connect" class="col-md-1 btn btn-primary" type="button">connect</button> <br/> <br/> </div> <div class="row"> <textarea class="col-md-8" id="chat"></textarea> </div> <div class="row"> <input class="col-md-6" id="msg" type="text" placeholder="enter your message"> <button class="col-md-1 btn btn-primary" id="send" type="button" disabled>send</button> </div> </div> <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/js/patternfly.min.js"></script> <script type="text/javascript"> let connected = false; let socket; $(document).ready(() => { $("#connect").click(connect); $("#send").click(sendMessage); $("#name").keypress(event => { if(event.keyCode === 13 || event.which === 13) { connect(); } }); $("#msg").keypress(event => { if(event.keyCode === 13 || event.which === 13) { sendMessage(); } }); $("#chat").change(() => scrollToBottom()); $("#name").focus(); }); const connect = () => { if (!connected) { const name = $("#name").val(); socket = new WebSocket("ws://" + location.host + "/chat/" + name); socket.onopen = () => { connected = true; $("#send").attr("disabled", false); $("#connect").attr("disabled", true); $("#name").attr("disabled", true); $("#msg").focus(); }; socket.onmessage = m => { $("#chat").append(m.data + "\n"); scrollToBottom(); }; } }; const sendMessage = () => { if (connected) { const value = $("#msg").val(); socket.send(value); $("#msg").val(""); } }; const scrollToBottom = () => { $('#chat').scrollTop($('#chat')[0].scrollHeight); }; </script> </body> </html>
パッケージングして
$ mvn package
起動。
$ java -jar target/quarkus-app/quarkus-run.jar
有効になったExtensionは、こちら。
2022-02-20 03:24:44,344 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-context-propagation, vertx, websockets, websockets-client]
http://localhost:8080/chat.html
にアクセスすると、こんなページが現れます。
名前を入力してconnect
。
メッセージを入力。
別タブで、他のユーザーでアクセスしてみます。
メッセージを送ると
もう片方にも反映されます。
もちろん、逆方向もOKです。
チャットルームから出ていくには、今回の実装だとタブを閉じればOKです。
この時点では、2人のユーザーを残したままにしておきます。
WebSocketクライアントを作成する
次に、WebSocketクライアントを作成しましょう。Extensionにwebsockets-client
を指定。
$ mvn io.quarkus.platform:quarkus-maven-plugin:2.7.1.Final:create \ -DprojectGroupId=org.littlewings \ -DprojectArtifactId=websocket-client \ -DprojectVersion=0.0.1-SNAPSHOT \ -Dextensions="websockets-client"
選択されたExtensionとCodestart。
[INFO] selected extensions: - io.quarkus:quarkus-websockets-client [INFO] applying codestarts... [INFO] 📚 java 🔨 maven 📦 quarkus 📝 config-properties 🔧 dockerfiles 🔧 maven-wrapper 🚀 resteasy-codestart
プロジェクト内に移動。
$ cd websocket-client
Maven依存関係はこちら。
<dependencies> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-websockets-client</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-arc</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> </dependencies>
RESTEasyなども入っていますね。RESTEasyに関しては、今回は外しますが。
作成されたディレクトリ構成。WebSocketクライアントの場合は、WebSocketとして利用するコードの雛形はなさそうです。
$ tree . ├── README.md ├── mvnw ├── mvnw.cmd ├── pom.xml └── src ├── main │ ├── docker │ │ ├── Dockerfile.jvm │ │ ├── Dockerfile.legacy-jar │ │ ├── Dockerfile.native │ │ └── Dockerfile.native-micro │ ├── java │ │ └── org │ │ └── littlewings │ │ └── GreetingResource.java │ └── resources │ ├── META-INF │ │ └── resources │ │ └── index.html │ └── application.properties └── test └── java └── org └── littlewings ├── GreetingResourceTest.java └── NativeGreetingResourceIT.java 13 directories, 13 files
既存のコードを削除。
$ rm src/main/java/org/littlewings/*.java src/test/java/org/littlewings/*.java
WebSocketクライアントはmainメソッドから始まるアプリケーションにしようかなと思うのですが、このままだとRESTEasyが含まれているため
Webアプリケーションになってしまうため、RESTEasyをMaven依存関係から外します。
<!-- <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> -->
残ったのはこちらです。
</dependencyManagement> <dependencies> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-websockets-client</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-arc</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> </dependencies>
WebSocketクライアント。
src/main/java/org/littlewings/quarkus/websocket/ChatClient.java
package org.littlewings.quarkus.websocket; import javax.websocket.ClientEndpoint; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import org.jboss.logging.Logger; @ClientEndpoint public class ChatClient { Logger logger = Logger.getLogger(ChatClient.class); @OnOpen public void onOpen(Session session) { logger.infof("connect to server[%s], session[%s]", session.getRequestURI(), session.getId()); } @OnClose public void onClose(Session session) { logger.infof("disconnect server[%s], session[%s]", session.getRequestURI(), session.getId()); } @OnMessage public void onMessage(Session session, String message) { System.out.println(message); } }
mainクラス。
src/main/java/org/littlewings/quarkus/websocket/App.java
package org.littlewings.quarkus.websocket; import java.io.Console; import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketContainer; import io.quarkus.runtime.QuarkusApplication; import io.quarkus.runtime.annotations.QuarkusMain; import org.jboss.logging.Logger; @QuarkusMain public class App implements QuarkusApplication { Logger logger = Logger.getLogger(App.class); @Override public int run(String... args) throws Exception { String username = URLEncoder.encode(args[0], StandardCharsets.UTF_8); String url = String.format("ws://localhost:8080/chat/%s", username); logger.infof("connect to WebSocket Server[%s]", url); WebSocketContainer container = ContainerProvider.getWebSocketContainer(); Session session = container.connectToServer(ChatClient.class, URI.create(url)); TimeUnit.MILLISECONDS.sleep(300L); Console console = System.console(); String message; while ((message = console.readLine("enter text> ")) != null) { if (message.trim().isBlank()) { continue; } if ("exit".equals(message)) { session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "byebye!!")); break; } else { session.getBasicRemote().sendText(message); TimeUnit.MILLISECONDS.sleep(300L); } } return 0; } }
このプログラムは、WebSocketサーバー側と接続したら、exit
と入力するまでメッセージを繰り返し入力できます。
TimeUnit#sleep
が入っているのは、WebSocketサーバーからの通知とenter text>
の表示が極力重ならないようにするため、ですね。
気持ち程度ですが。
ユーザー名は、起動引数で指定します。
では、パッケージングして
$ mvn package
起動。
$ java -jar target/quarkus-app/quarkus-run.jar フグ田マスオ
すでにチャットルームにいた2人のルームに追加されます。
2022-02-20 03:39:34,357 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-context-propagation, vertx, websockets-client] 2022-02-20 03:39:34,361 INFO [org.lit.qua.web.App] (main) connect to WebSocket Server[ws://localhost:8080/chat/%E3%83%95%E3%82%B0%E7%94%B0%E3%83%9E%E3%82%B9%E3%82%AA] 2022-02-20 03:39:34,490 INFO [org.lit.qua.web.ChatClient] (main) connect to server[ws://localhost:8080/chat/%E3%83%95%E3%82%B0%E7%94%B0%E3%83%9E%E3%82%B9%E3%82%AA], session[4oAxO-3KC58XJFjF_aUxDvLRs1OFKRxm9LwVUXuz] [enter user: フグ田マスオ] [current users: 磯野カツオ, 磯野ワカメ, フグ田マスオ]
メッセージ送信。
enter text> こんにちは [フグ田マスオ] こんにちは
ブラウザ側にも反映されます。
OKですね。
もうひとり、入ってもらいましょう。
$ java -jar target/quarkus-app/quarkus-run.jar フグ田サザエ
これで4人になりました。
2022-02-20 03:41:33,262 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-context-propagation, vertx, websockets-client] 2022-02-20 03:41:33,265 INFO [org.lit.qua.web.App] (main) connect to WebSocket Server[ws://localhost:8080/chat/%E3%83%95%E3%82%B0%E7%94%B0%E3%82%B5%E3%82%B6%E3%82%A8] 2022-02-20 03:41:33,381 INFO [org.lit.qua.web.ChatClient] (main) connect to server[ws://localhost:8080/chat/%E3%83%95%E3%82%B0%E7%94%B0%E3%82%B5%E3%82%B6%E3%82%A8], session[3QIWpY0x4p4cZrHPTUQub2FD1yjupFGgKICr77uE] [enter user: フグ田サザエ] [current users: 磯野カツオ, フグ田サザエ, 磯野ワカメ, フグ田マスオ] enter text> やあ [フグ田サザエ] やあ
メッセージは、他のコンソールにも通知されます。
enter text> [enter user: フグ田サザエ] [current users: 磯野カツオ, フグ田サザエ, 磯野ワカメ, フグ田マスオ] [フグ田サザエ] やあ
exit
して退出。
enter text> exit 2022-02-20 03:42:42,896 INFO [org.lit.qua.web.ChatClient] (main) disconnect server[ws://localhost:8080/chat/%E3%83%95%E3%82%B0%E7%94%B0%E3%82%B5%E3%82%B6%E3%82%A8], session[3QIWpY0x4p4cZrHPTUQub2FD1yjupFGgKICr77uE] 2022-02-20 03:42:43,003 INFO [io.quarkus] (main) websocket-client stopped in 0.105s
他のユーザーからの見え方。
[leave user: フグ田サザエ] [current users: 磯野カツオ, 磯野ワカメ, フグ田マスオ]
ブラウザ側でも、同じですね。
これで、確認したいことはだいたい済みました。
QuarkusのJakarta WebSocketの実装は?
ところで、QuarkusのJakarta WebSocketの実装はなにを使っているのでしょう。
サーバー側でmvn dependency:tree
のWebSocketまわりを見てみます。
$ mvn dependency:tree
クライアント含めて、こんな感じですね。
[INFO] +- io.quarkus:quarkus-websockets:jar:2.7.1.Final:compile [INFO] | +- io.quarkus:quarkus-core:jar:2.7.1.Final:compile [INFO] | | +- jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile [INFO] | | +- jakarta.enterprise:jakarta.enterprise.cdi-api:jar:2.0.2:compile [INFO] | | | +- jakarta.el:jakarta.el-api:jar:3.0.3:compile [INFO] | | | \- jakarta.interceptor:jakarta.interceptor-api:jar:1.2.5:compile [INFO] | | +- jakarta.inject:jakarta.inject-api:jar:1.0:compile [INFO] | | +- io.quarkus:quarkus-ide-launcher:jar:2.7.1.Final:compile [INFO] | | +- io.quarkus:quarkus-development-mode-spi:jar:2.7.1.Final:compile [INFO] | | +- io.smallrye.config:smallrye-config:jar:2.8.2:compile [INFO] | | | \- io.smallrye.config:smallrye-config-core:jar:2.8.2:compile [INFO] | | | +- org.eclipse.microprofile.config:microprofile-config-api:jar:2.0:compile [INFO] | | | +- io.smallrye.common:smallrye-common-expression:jar:1.8.0:compile [INFO] | | | | \- io.smallrye.common:smallrye-common-function:jar:1.8.0:compile [INFO] | | | +- io.smallrye.common:smallrye-common-constraint:jar:1.8.0:compile [INFO] | | | +- io.smallrye.common:smallrye-common-classloader:jar:1.8.0:compile [INFO] | | | \- io.smallrye.config:smallrye-config-common:jar:2.8.2:compile [INFO] | | +- org.jboss.logging:jboss-logging:jar:3.4.3.Final:compile [INFO] | | +- org.jboss.logmanager:jboss-logmanager-embedded:jar:1.0.9:compile [INFO] | | +- org.jboss.logging:jboss-logging-annotations:jar:2.2.1.Final:compile [INFO] | | +- org.jboss.threads:jboss-threads:jar:3.4.2.Final:compile [INFO] | | +- org.slf4j:slf4j-api:jar:1.7.33:compile [INFO] | | +- org.jboss.slf4j:slf4j-jboss-logmanager:jar:1.1.0.Final:compile [INFO] | | +- org.graalvm.sdk:graal-sdk:jar:21.3.1:compile [INFO] | | +- org.wildfly.common:wildfly-common:jar:1.5.4.Final-format-001:compile [INFO] | | +- io.quarkus:quarkus-bootstrap-runner:jar:2.7.1.Final:compile [INFO] | | \- io.quarkus:quarkus-fs-util:jar:0.0.8:compile [INFO] | +- io.quarkus:quarkus-vertx-http:jar:2.7.1.Final:compile [INFO] | | +- io.quarkus:quarkus-security-runtime-spi:jar:2.7.1.Final:compile [INFO] | | +- io.quarkus:quarkus-mutiny:jar:2.7.1.Final:compile [INFO] | | | +- io.smallrye.reactive:mutiny:jar:1.3.1:compile [INFO] | | | | \- org.reactivestreams:reactive-streams:jar:1.0.3:compile [INFO] | | | +- io.quarkus:quarkus-smallrye-context-propagation:jar:2.7.1.Final:compile [INFO] | | | | \- io.smallrye:smallrye-context-propagation:jar:1.2.2:compile [INFO] | | | | +- io.smallrye:smallrye-context-propagation-api:jar:1.2.2:compile [INFO] | | | | \- io.smallrye:smallrye-context-propagation-storage:jar:1.2.2:compile [INFO] | | | \- io.smallrye.reactive:mutiny-smallrye-context-propagation:jar:1.3.1:compile [INFO] | | +- io.quarkus:quarkus-vertx-http-dev-console-runtime-spi:jar:2.7.1.Final:compile [INFO] | | +- io.quarkus.security:quarkus-security:jar:1.1.4.Final:compile [INFO] | | +- io.quarkus:quarkus-vertx:jar:2.7.1.Final:compile [INFO] | | | +- io.quarkus:quarkus-netty:jar:2.7.1.Final:compile [INFO] | | | | +- io.netty:netty-codec:jar:4.1.73.Final:compile [INFO] | | | | +- io.netty:netty-codec-http2:jar:4.1.73.Final:compile [INFO] | | | | \- io.netty:netty-handler:jar:4.1.73.Final:compile [INFO] | | | | \- io.netty:netty-tcnative-classes:jar:2.0.46.Final:compile [INFO] | | | +- io.netty:netty-codec-haproxy:jar:4.1.73.Final:compile [INFO] | | | | +- io.netty:netty-buffer:jar:4.1.73.Final:compile [INFO] | | | | \- io.netty:netty-transport:jar:4.1.73.Final:compile [INFO] | | | +- io.smallrye.common:smallrye-common-annotation:jar:1.8.0:compile [INFO] | | | +- io.smallrye.reactive:smallrye-mutiny-vertx-core:jar:2.18.1:compile [INFO] | | | | +- io.smallrye.reactive:smallrye-mutiny-vertx-runtime:jar:2.18.1:compile [INFO] | | | | \- io.smallrye.reactive:vertx-mutiny-generator:jar:2.18.1:compile [INFO] | | | | \- io.vertx:vertx-codegen:jar:4.2.4:compile [INFO] | | | \- io.smallrye:smallrye-fault-tolerance-vertx:jar:5.2.1:compile [INFO] | | +- io.smallrye.reactive:smallrye-mutiny-vertx-web:jar:2.18.1:compile [INFO] | | | +- io.smallrye.reactive:smallrye-mutiny-vertx-web-common:jar:2.18.1:compile [INFO] | | | +- io.smallrye.reactive:smallrye-mutiny-vertx-auth-common:jar:2.18.1:compile [INFO] | | | \- io.smallrye.reactive:smallrye-mutiny-vertx-bridge-common:jar:2.18.1:compile [INFO] | | \- io.vertx:vertx-web:jar:4.2.4:compile [INFO] | | +- io.vertx:vertx-web-common:jar:4.2.4:compile [INFO] | | +- io.vertx:vertx-auth-common:jar:4.2.4:compile [INFO] | | +- io.vertx:vertx-bridge-common:jar:4.2.4:compile [INFO] | | \- io.vertx:vertx-core:jar:4.2.4:compile [INFO] | | +- io.netty:netty-common:jar:4.1.73.Final:compile [INFO] | | +- io.netty:netty-handler-proxy:jar:4.1.73.Final:compile [INFO] | | | \- io.netty:netty-codec-socks:jar:4.1.73.Final:compile [INFO] | | +- io.netty:netty-resolver:jar:4.1.73.Final:compile [INFO] | | +- io.netty:netty-resolver-dns:jar:4.1.73.Final:compile [INFO] | | | \- io.netty:netty-codec-dns:jar:4.1.73.Final:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.13.1:compile [INFO] | +- io.quarkus:quarkus-websockets-client:jar:2.7.1.Final:compile [INFO] | | \- io.quarkus.http:quarkus-http-websocket-core:jar:4.1.5:compile [INFO] | | \- io.netty:netty-codec-http:jar:4.1.73.Final:compile [INFO] | +- io.quarkus.http:quarkus-http-websocket-vertx:jar:4.1.5:compile [INFO] | \- jakarta.websocket:jakarta.websocket-api:jar:1.1.2:compile
このあたりですね。
[INFO] | +- io.quarkus:quarkus-websockets-client:jar:2.7.1.Final:compile [INFO] | | \- io.quarkus.http:quarkus-http-websocket-core:jar:4.1.5:compile [INFO] | | \- io.netty:netty-codec-http:jar:4.1.73.Final:compile [INFO] | +- io.quarkus.http:quarkus-http-websocket-vertx:jar:4.1.5:compile [INFO] | \- jakarta.websocket:jakarta.websocket-api:jar:1.1.2:compile
どうやら、Quarkus自身の実装のようです。
GitHub - quarkusio/quarkus-http
これはなにかというと、Vert.xを使ったServletの実装です。
A Vert.x based Servlet implementation.
WebSocketに関しては、このあたりですね。
https://github.com/quarkusio/quarkus-http/tree/4.1.6/websocket/vertx
https://github.com/quarkusio/quarkus-http/tree/4.1.6/websocket/core
QuarkusのExtensionとしては、このあたりです。quarkus-http
には、これらから依存関係が引き込まれています。
https://github.com/quarkusio/quarkus/tree/2.7.1.Final/extensions/websockets
ちなみに、quarkus-websockets
というアーティファクトは、実はサーバー側のことを指しています。
ここまで、WebSocketまわりのアーティファクトの説明をしてきましたが、quarkus-httpリポジトリの説明では「Servletの実装」と言いつつ、
これらのプロジェクトにはServletへの依存関係がありません。
Servletは、こちらですからね。
https://github.com/quarkusio/quarkus-http/tree/4.1.6/servlet
実際の動作を確認するために、チャットサーバーにすったくトレースを出力するように仕込んでみましょう。
@OnMessage public void onMessage(Session session, @PathParam("user") String user, String message) { Thread.dumpStack(); String username = URLDecoder.decode(user, StandardCharsets.UTF_8); broadcast(String.format("[%s] %s", username, message)); }
再度パッケージングして
$ mvn clean && mvn package
起動。
$ java -jar target/quarkus-app/quarkus-run.jar
メッセージを送って、得られるスタックトレースはこちら。
java.lang.Exception: Stack trace at java.base/java.lang.Thread.dumpStack(Thread.java:1380) at org.littlewings.quarkus.websocket.ChatServer.onMessage(ChatServer.java:45) at org.littlewings.quarkus.websocket.ChatServer_ClientProxy.onMessage(Unknown Source) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at io.undertow.websockets.annotated.BoundMethod.invoke(BoundMethod.java:87) at io.undertow.websockets.annotated.AnnotatedEndpoint$2.onMessage(AnnotatedEndpoint.java:140) at io.undertow.websockets.FrameHandler$5.run(FrameHandler.java:330) at io.undertow.websockets.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:143) at io.undertow.websockets.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:140) at io.quarkus.websockets.client.runtime.WebsocketCoreRecorder$4$1.call(WebsocketCoreRecorder.java:181) at io.undertow.websockets.ServerWebSocketContainer.invokeEndpointMethod(ServerWebSocketContainer.java:532) at io.undertow.websockets.ServerWebSocketContainer.invokeEndpointMethod(ServerWebSocketContainer.java:521) at io.undertow.websockets.FrameHandler.invokeTextHandler(FrameHandler.java:310) at io.undertow.websockets.FrameHandler.onText(FrameHandler.java:225) at io.undertow.websockets.FrameHandler.processFrame(FrameHandler.java:143) at io.undertow.websockets.FrameHandler.channelRead0(FrameHandler.java:130) at io.undertow.websockets.FrameHandler.channelRead0(FrameHandler.java:60) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)
パッケージ名だけ見ると、とてもUndertowです。
at io.undertow.websockets.annotated.BoundMethod.invoke(BoundMethod.java:87) at io.undertow.websockets.annotated.AnnotatedEndpoint$2.onMessage(AnnotatedEndpoint.java:140) at io.undertow.websockets.FrameHandler$5.run(FrameHandler.java:330) at io.undertow.websockets.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:143) at io.undertow.websockets.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:140) at io.quarkus.websockets.client.runtime.WebsocketCoreRecorder$4$1.call(WebsocketCoreRecorder.java:181) at io.undertow.websockets.ServerWebSocketContainer.invokeEndpointMethod(ServerWebSocketContainer.java:532) at io.undertow.websockets.ServerWebSocketContainer.invokeEndpointMethod(ServerWebSocketContainer.java:521)
ですが、Undertowなのは見かけだけですね。
また、他に注意点としては、quarkus-httpが提供するWebSocketではSession#getBasicRemote
による同期的な操作はできません。
たとえば以下のように修正して
/* users.values().forEach(session -> session.getAsyncRemote().sendText(message, result -> { if (result.getException() != null) { logger.warnf("unable send message, reason = %s", result.getException().getMessage()); } }) ); */ users.values().forEach(session -> { try { session.getBasicRemote().sendText(message); } catch (IOException e) { throw new UncheckedIOException(e); } });
パッケージングして
$ mvn clean && mvn package
起動。
$ java -jar target/quarkus-app/quarkus-run.jar
メッセージを送ろうとすると、例外がスローされます。
2022-02-20 04:10:32,688 ERROR [org.lit.qua.web.ChatServer] (vert.x-eventloop-thread-14) unexpected error, user: %E7%A3%AF%E9%87%8E%E3%82%AB%E3%83%84%E3%82%AA: java.lang.IllegalStateException: Cannot use the basic remote from an IO thread at io.undertow.websockets.UndertowSession.getBasicRemote(UndertowSession.java:336) at org.littlewings.quarkus.websocket.ChatServer.lambda$broadcast$0(ChatServer.java:72) at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780) at org.littlewings.quarkus.websocket.ChatServer.broadcast(ChatServer.java:70) at org.littlewings.quarkus.websocket.ChatServer.onOpen(ChatServer.java:32) at org.littlewings.quarkus.websocket.ChatServer_ClientProxy.onOpen(Unknown Source) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at io.undertow.websockets.annotated.BoundMethod.invoke(BoundMethod.java:87) at io.undertow.websockets.annotated.AnnotatedEndpoint$3.run(AnnotatedEndpoint.java:156) at io.undertow.websockets.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:143) at io.undertow.websockets.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:140) at io.quarkus.websockets.client.runtime.WebsocketCoreRecorder$4$1.call(WebsocketCoreRecorder.java:181) at io.undertow.websockets.ServerWebSocketContainer.invokeEndpointMethod(ServerWebSocketContainer.java:532) at io.undertow.websockets.ServerWebSocketContainer.invokeEndpointMethod(ServerWebSocketContainer.java:521) at io.undertow.websockets.annotated.AnnotatedEndpoint.invokeMethod(AnnotatedEndpoint.java:151) at io.undertow.websockets.annotated.AnnotatedEndpoint.onOpen(AnnotatedEndpoint.java:101) at io.undertow.websockets.EndpointSessionHandler.connected(EndpointSessionHandler.java:119) at io.undertow.websockets.vertx.VertxWebSocketHandler$1.accept(VertxWebSocketHandler.java:116) at io.undertow.websockets.vertx.VertxWebSocketHandler$1.accept(VertxWebSocketHandler.java:113) at io.undertow.websockets.handshake.Handshake$1.accept(Handshake.java:147) at io.undertow.websockets.vertx.VertxWebSocketHttpExchange$1.handle(VertxWebSocketHttpExchange.java:129) at io.undertow.websockets.vertx.VertxWebSocketHttpExchange$1.handle(VertxWebSocketHttpExchange.java:121) at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54) at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:81) at io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:173) at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51) at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211) at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23) at io.vertx.core.Promise.complete(Promise.java:66) at io.vertx.core.impl.future.PromiseImpl.operationComplete(PromiseImpl.java:65) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:414) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) at io.vertx.core.net.impl.ConnectionBase.endReadAndFlush(ConnectionBase.java:142) at io.vertx.core.net.impl.VertxHandler.channelReadComplete(VertxHandler.java:148) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:397) at io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:390) at io.netty.handler.timeout.IdleStateHandler.channelReadComplete(IdleStateHandler.java:295) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:397) at io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:390) at io.netty.handler.codec.ByteToMessageDecoder.channelReadComplete(ByteToMessageDecoder.java:339) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:397) at io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:390) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelReadComplete(DefaultChannelPipeline.java:1415) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:397) at io.netty.channel.DefaultChannelPipeline.fireChannelReadComplete(DefaultChannelPipeline.java:925) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)
これは、Vert.xのイベントループスレッド(IOスレッド)で動作しているので、ブロッキングな処理を許していないということでしょうね。
回避するにはワーカースレッドを使うことになるはずですが、今回はその調べものと確認はパスします。
ここまでにしましょう。
まとめ
QuarkusのWebSockets Extensionを使ってみました。
試したサンプルはガイドの内容から大して変わりませんが、Quarkusで使っているJakarta WebSocketの情報まで追えたりしたので
良かったかなと。