これは、なにをしたくて書いたもの?
Quarkusのドキュメントを見ていて、RESTEasy Reactiveというものの存在が気になりまして。
RESTEasy Mutinyが名前を変えたものなのかな?と思ったのですが、どうやらそうではなさそうなので見てみることに
しました。
ドキュメントを見ていると、スレッド回りが気になるので最終的にはこのあたりを確認したいと思います。
Getting Started with Reactive
現在(2.2.3.Final)のQuarkusのReactive向けのGetting Startedのドキュメントはこちら。
Quarkus - Getting Started With Reactive
ここでは、従来のモデルがリクエストをスレッド(ワーカースレッド)に割り当てていること、リアクティブな
モデルではワーカースレッドを使わずノンブロッキングIOスレッドで処理を行うためメモリとCPUを節約でき、
スレッドのスイッチのコストも減らせると書かれています。
Getting Started with Reactive / Imperative vs. Reactive: a question of threads
ただ、IOを扱う処理を書くような場合は少し工夫が必要です。Quarkusでは、この方法としてMutinyとKotlinコルーチンを
提案しています。
Getting Started with Reactive / From sequential to continuation style
Quarkusのドキュメントでもそうですが、こちらでもこの選択肢としてはMutinyを取ることにします。
MutinyとはSmallRye Mutinyのことですが、Quarkusのドキュメント内にもMutinyの使い方が書かれています。
Quarkus - Mutiny - Async for bare mortal
もっと詳しく見る場合は、SmallRye Mutinyのドキュメントを参照しましょう。
また、Quarkusにおけるリアクティブアーキテクチャの話は、こちらのドキュメントに記載されています。
Quarkus - Quarkus Reactive Architecture
QuarkusのリアクティブなAPIを持つExtentionも紹介されています。リアクティブなExtensionにはどのようなものが
あるのか知りたい時は、このページを見るとよいでしょう。
ここまでは、Quarkusとリアクティブの話でした。
ドキュメントが参照しているコードでは、RESTEasy ReactiveとHibernate Reactiveが使われています。
RESTEasy Reactive
続いては、RESTEasy Reactiveに話を移しましょう。
Quarkus - Writing REST Services with RESTEasy Reactive
RESTEasy Reactiveは、Vert.xを基盤にしたJAX-RSの実装です。エンドポイントはブロッキングとノンブロッキングの
2種類を扱うことができます。
アーティファクトのIDとしては、quarkus-resteasy-reactive
になります。Extensionではresteasy-reactive
という名前で
指定することになりますね。
ドキュメントはしばらくはJAX-RSの説明が続きますが、リクエストパラメーターへのアクセス方法あたりから
変化があります。
Writing REST Services With RESTEasy Reactive / Accessing request parameters
通常のJAX-RSと異なり、RESTEasy Reactiveの場合はRESTEasyのアノテーションでリクエストパラメーターに
アクセスするようです。
org.jboss.resteasy.reactive package summary - resteasy-reactive-common 2.0.0.Final javadoc
たとえば、@QueryParam
の代わりに@RestQuery
を使う、などです。
You can also use any of the JAX-RS annotations @PathParam, @QueryParam, @HeaderParam, @CookieParam, @FormParam or @MatrixParam for this, but they require you to specify the parameter name.
なのですが、一応通常のJAX-RSのアノテーションも使えるみたいなんですよね。差異は、パラメーター名を明示する
必要があるかどうか、です。
RESTEasyの@Rest〜
アノテーションを使う場合は、パラメーター名の指定は不要なのですが、メソッドの引数名を
残すようにコンパイラに設定する必要があります。
don’t forget to configure your compiler to generate parameter name information with -parameters (javac) or
or <maven.compiler.parameters>
ただ、これはQuarkusの機能でプロジェクトを作成すると、デフォルトで有効化された状態になっています。
リクエストで使用できる型。
Writing REST Services With RESTEasy Reactive / Accessing the request body
レスポンスで使用できる追加型。表の外にも書いてありますが、SmallRye MutinyのUni
、Multi
を使えたり、
CompletionStage
の利用も可能です。
Writing REST Services With RESTEasy Reactive / Returning a response body
オブジェクトとJSONの変換については、quarkus-resteasy-reactive-jackson
(Jackson)か
quarkus-resteasy-reactive-jsonb
(JSON-B)のいずれかのExtensionを使用します。
どちらも、リアクティブ用ですね。
基本的にメソッドの戻り値はリソースを指しますが、HTTPボディ以外の内容(ステータスコードやHTTPヘッダーなど)も
設定したい場合は、RestResponse
を使います。
※JAX-RSのResponse
も利用可能ですが、型安全ではありません
Writing REST Services With RESTEasy Reactive / Setting other response properties
リアクティブやストリーミングのサポートについて。
Writing REST Services With RESTEasy Reactive / Async/reactive support
Writing REST Services With RESTEasy Reactive / Streaming support
リソースメソッドへ、コンテキストオブジェクトとして渡せるクラス。UriInfo
やHttpHeaders
などです。
Writing REST Services With RESTEasy Reactive / Accessing context objects
少し高度な話題と実行モデル
後半には、少し高度な話題が入っています。
Writing REST Services With RESTEasy Reactive / More advanced usage
Writing REST Services With RESTEasy Reactive / Execution model, blocking, non-blocking
例外処理あたりがまずは気になるところですが。
Writing REST Services With RESTEasy Reactive / Exception mapping
今回は、こちらの実行モデルの方を見ていきたいと思います。
Writing REST Services With RESTEasy Reactive / Execution model, blocking, non-blocking
RESTEasy Reactiveでは、2種類のスレッドを使います。
- イベントループスレッド(別名IOスレッド) … 主にリクエストを読み取り、レスポンスを書き出す用途で使用
- ワーカースレッド … 時間のかかる操作をオフロードするためのプール
イベントループスレッド(IOスレッド)は、すべてのIO操作を非同期で実行し、IO操作完了時のリスナーの役割も持ちます。
RESTEasy Reactiveではエンドポイントとなるメソッドの宣言で、デフォルトで使われるスレッドが変わります。
メソッドの戻り値が以下の型の場合、IOスレッドが使われます。
io.smallrye.mutiny.Uni
io.smallrye.mutiny.Multi
java.util.concurrent.CompletionStage
org.reactivestreams.Publisher
- Kotlinのsuspendメソッド
それ以外の型の場合は、ワーカースレッドで動作します。
If a method or class is annotated with javax.transaction.Transactional then it will also be treated as a blocking method. This is because JTA is a blocking technology, and is generally used with other blocking technology such as Hibernate and JDBC.
また、メソッドやクラスに@Transactional
アノテーションを付与しても、ブロッキング扱いになりワーカースレッドが
使われることになります。
この動作は@Blocking
、@NonBlocking
アノテーションを使用してオーバーライドすることができます。
※@Transactional
との併用含む
Blocking - smallrye-common-annotation 1.5.0 javadoc
NonBlocking - smallrye-common-annotation 1.5.0 javadoc
全体的なデフォルトの振る舞いは、javax.ws.rs.core.Application
のサブクラスに@Blocking
、@NonBlocking
を付与する
ことでオーバーライドすることもできるようです。
If you want to override the default behaviour you can annotate a javax.ws.rs.core.Application subclass in your application with @Blocking or @NonBlocking, and this will set the default for every method that does not have an explicit annotation.
Writing REST Services With RESTEasy Reactive / Overriding the default behaviour
今回は、このあたりを確認してみたいと思います。
RESTEasy ReactiveとRESTEasy Classic、RESTEasy ReactiveとRESTEasy Mutiny
Getting Started(非リアクティブ)などで使われているQuarkusのRESTEasy Extensionは、今はRESTEasy Classicという
呼び方をするみたいです。
アーティファクトIDとしては、quarkus-resteasy
ですね。
RESTEasy ReactiveとRESTEasy Classicの違いは、以下のブログに書かれています。
Quarkus - Massive performance without headaches
Quarkus - RESTEasy Reactive - To block or not to block
RESTEasy Classicは常にワーカースレッド上で動作し、RESTEasy ReactiveはIOスレッドを使うのかワーカースレッドを
使うのかを開発者が選択できることに違いがあります。
ちなみに、ブロッキングを選択してもRESTEasy Reactiveの方がパフォーマンスが良いとされているようです。
Massive performance without headaches / What is the performance implication of using @Blocking?
RESTEasy Classicも内部ではVert.xを使っているのですが、RESTEasy Reactiveと比べると統合の度合いが低いと
しています。
その他、RESTEasy Reactiveは新しく作った実装であること、ThreadLocalの排除、Arcの利用の最適化などが
ポイントのようです。
あとはRESTEasy Mutinyとの違いが気になるところですね。こちらも、違いはRESTEasy Reactive、RESTEasy Classicの
違いと同じく、ワーカースレッドで動作することです。
Massive performance without headaches / How does it compare to RESTEasy Classic with Mutiny?
つまり、API上はリアクティブなものを使っていても、リソースを効率的に扱えなかったのがRESTEasy Mutiny
だったということになります。
ここまでくると、デフォルトをRESTEasy Reactiveにしても…と思うのですが、それはやりすぎなんでしょうね。
とはいえ、リアクティブなAPIの方を使うとブロックするAPIを使った時に困るので、これで導入されたのが
メソッドの戻り値でリクエストを処理するのに使用するスレッドを選択する方法のようです。
RESTEasy Reactive - To block or not to block / To block or not to block, that is the question.
RESTEasy Reactive - To block or not to block / New world, new rules!
現在では、RESTEasy ReactiveとRESTEasy ClassicはExtensionのディレクトリ内で別のツリーで管理されています。
https://github.com/quarkusio/quarkus/tree/2.2.3.Final/extensions/resteasy-reactive
https://github.com/quarkusio/quarkus/tree/2.2.3.Final/extensions/resteasy-classic
RESTEasy Mutinyは、RESTEasy Classic側に属します。
RESTEasy Reactiveの場合は、アーティファクトのIDにreactive
が入っているので間違えることはないと思いますが、
どのようなExtensionがあるかは確認できるでしょう。
長くなりましたが、ここまでのQuarkusでのRESTEasyのリアクティブAPIに関する変化や情報を見てきました。
そろそろ、動かしてみましょうか。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.2 (ea98e05a04480131370aa0c110b8c54cf726c06f) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-86-generic", arch: "amd64", family: "unix"
プロジェクトの作成とお題。
では、Quarkusプロジェクトを作成しましょう。
$ mvn io.quarkus.platform:quarkus-maven-plugin:2.2.3.Final:create \ -DprojectGroupId=org.littlewings \ -DprojectArtifactId=resteasy-reactive \ -DprojectVersion=0.0.1-SNAPSHOT \ -Dextensions="resteasy-reactive,resteasy-reactive-jackson"
Extensionには、RESTEasy ReactiveとJacksonを指定。
[INFO] --- quarkus-maven-plugin:2.2.3.Final:create (default-cli) @ standalone-pom --- [INFO] ----------- [INFO] selected extensions: - io.quarkus:quarkus-resteasy-reactive - io.quarkus:quarkus-resteasy-reactive-jackson [INFO] applying codestarts... [INFO] 📚 java 🔨 maven 📦 quarkus 📝 config-properties 🔧 dockerfiles 🔧 maven-wrapper 🚀 resteasy-reactive-codestart [INFO] -----------
作成されたプロジェクト内に移動。
$ cd resteasy-reactive
依存関係は、こんな感じになりました。
<dependencies> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-reactive</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-reactive-jackson</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-arc</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> </dependencies>
quarkus-resteasy
は入っていませんね。
参考までに、こんな感じでRESTEasy ReactiveとRESTEasy Classicを同時に依存関係に加えると
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-reactive</artifactId> </dependency>
アプリケーションの起動に失敗するので注意しましょう。
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Multiple matching properties for name "security.jaxrs.deny-unannotated-endpoints" property was matched by both public boolean io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs and public boolean io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs. This is likely because you have an incompatible combination of extensions that both define the same properties (e.g. including both reactive and blocking database extensions) Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Multiple matching properties for name "security.jaxrs.deny-unannotated-endpoints" property was matched by both public boolean io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs and public boolean io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs. This is likely because you have an incompatible combination of extensions that both define the same properties (e.g. including both reactive and blocking database extensions) Caused by: java.lang.IllegalArgumentException: Multiple matching properties for name "security.jaxrs.deny-unannotated-endpoints" property was matched by both public boolean io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs and public boolean io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs. This is likely because you have an incompatible combination of extensions that both define the same properties (e.g. including both reactive and blocking database extensions)
ちなみに、生成されたソースコードはこんな感じです。
src/main/java/org/littlewings/ReactiveGreetingResource.java
package org.littlewings; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; @Path("/hello") public class ReactiveGreetingResource { @GET @Produces(MediaType.TEXT_PLAIN) public String hello() { return "Hello RESTEasy Reactive"; } }
ワーカースレッドを使う定義になっていますね。
テストコードは、こんな感じでした。
src/test/java/org/littlewings/ReactiveGreetingResourceTest.java
package org.littlewings; import io.quarkus.test.junit.QuarkusTest; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; import static org.hamcrest.CoreMatchers.is; @QuarkusTest public class ReactiveGreetingResourceTest { @Test public void testHelloEndpoint() { given() .when().get("/hello") .then() .statusCode(200) .body(is("Hello RESTEasy Reactive")); } }
src/test/java/org/littlewings/NativeReactiveGreetingResourceIT.java
package org.littlewings; import io.quarkus.test.junit.NativeImageTest; @NativeImageTest public class NativeReactiveGreetingResourceIT extends ReactiveGreetingResourceTest { // Execute the same tests but in native mode. }
これらはいったん削除して。
$ rm -rf src/main/java/org/littlewings/ReactiveGreetingResource.java src/test/java/org/littlewings/*
今回は、JAX-RSのエンドポイントの宣言と、実際に使われるスレッドをテストコードで確認してみたいと思います。
JAX-RSリソースクラスの雛形と、レスポンスに使うクラスを作る
最初にJAX-RSリソースクラスを定義します。
src/main/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResource.java
package org.littlewings.quarkus.resteasyreactive; import java.util.concurrent.CompletionStage; import javax.inject.Inject; import javax.transaction.Transactional; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import org.jboss.resteasy.reactive.RestQuery; import org.reactivestreams.Publisher; @Path("endpoints") public class VariousEndpointResource { @Inject ExecutionThreadService executionThreadService; // ここにリソースメソッドを定義する }
中身は、後で順次埋めていきます。
どのスレッドで実行されているかを確認したいので、現在のスレッド名を返すクラスを作成します。
これは、JAX-RSリソースクラスへインジェクションしています。
src/main/java/org/littlewings/quarkus/resteasyreactive/ExecutionThreadService.java
package org.littlewings.quarkus.resteasyreactive; import javax.enterprise.context.ApplicationScoped; import io.smallrye.mutiny.Uni; @ApplicationScoped public class ExecutionThreadService { public Uni<String> getCurrentThreadName() { return Uni.createFrom().item(Thread.currentThread().getName()); } }
レスポンスに使うクラス。メッセージとスレッド名を含めることにしました。メッセージは、JAX-RSリソースクラスで
QueryStringとして受け取ることにします。
src/main/java/org/littlewings/quarkus/resteasyreactive/ExecutionInfo.java
package org.littlewings.quarkus.resteasyreactive; public class ExecutionInfo { String message; String currentThreadName; public static ExecutionInfo create(String message, String currentThreadName) { ExecutionInfo info = new ExecutionInfo(); info.message = message; info.currentThreadName = currentThreadName; return info; } // getter/setterは省略 }
テストコードの雛形
続いて、テストコードの雛形を作ります。
src/test/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResourceTest.java
package org.littlewings.quarkus.resteasyreactive; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; import static org.hamcrest.Matchers.*; @QuarkusTest @TestHTTPEndpoint(VariousEndpointResource.class) public class VariousEndpointResourceTest { // ここに、テストコードを書く! }
以降は、JAX-RSリソースクラスのメソッドと、対応するテストメソッドを順番に書いていくことにします。
JAX-RSリソースクラスのメソッドの型で、利用するスレッドが変わることを確認する
では、順番に書いていきます。
確認したいのは、こちらですね。
Writing REST Services With RESTEasy Reactive / Execution model, blocking, non-blocking
まずは、リアクティブな型ではないものから。
@GET @Path("blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo blockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); }
QueryStringは、@RestQuery
アノテーションを付与して扱っています。あと、リアクティブな型ではないとは言いつつ、
ブロックするコードにはなっています。
対応するテストコード。
@Test public void blockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("blocking-simple") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド }
ひとつ目の例なのでなんとも言えない感じはしますが、これでワーカースレッドで動作していることが確認できます。
次は、Uni
を戻り値にしています。
@GET @Path("non-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> nonBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); }
今度は、スレッド名がvert.x-eventloop-thread-
で始まるものになります。こちらはIOスレッドです。
@Test public void nonBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド }
戻り値の型をMulti
に。
@GET @Path("non-blocking-multi") @Produces(MediaType.APPLICATION_JSON) public Multi<ExecutionInfo> nonBlockingMulti(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .toMulti(); }
対応するテストコード。IOスレッドになります。
@Test public void nonBlockingMultiTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-multi") .then() .statusCode(200) .body("message", hasSize(1)) .body("message", hasItem(is("★★★Hello World!!★★★"))) .body("currentThreadName", hasSize(1)) .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-"))); // IOスレッド }
戻り値の型をCompletionStage
に。
@GET @Path("non-blocking-completion-stage") @Produces(MediaType.APPLICATION_JSON) public CompletionStage<ExecutionInfo> nonBlockingCompletionStage(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .subscribeAsCompletionStage(); }
対応するテストコード。IOスレッドになります。
@Test public void nonBlockingCompletionStageTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-completion-stage") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド }
戻り値の型をPublisher
に。
@GET @Path("non-blocking-publisher") @Produces(MediaType.APPLICATION_JSON) public Publisher<ExecutionInfo> nonBlockingPublisher(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .toMulti(); }
対応するテストコード。IOスレッドになります。
@Test public void nonBlockingPublisherTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-publisher") .then() .statusCode(200) .body("message", hasSize(1)) .body("message", hasItem(is("★★★Hello World!!★★★"))) .body("currentThreadName", hasSize(1)) .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-"))); // IOスレッド }
ドキュメント通りなことを確認しました。
@Blocking、@NonBlocking、@Transactionalアノテーションを使う
続いては、メソッド定義にアノテーションを加えて、実行モデルをオーバーライドしてみましょう。
Uni
を使ったリソース定義を、@Blocking
にしてみます。
@Blocking @GET @Path("annotated-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> annotatedBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); }
テスト。ワーカースレッドが使われるようになりました。
@Test public void annotatedBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("annotated-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド }
リアクティブな型ではない戻り値のメソッドを、@NonBlocking
にしてみます。
@NonBlocking @GET @Path("annotated-non-blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo annotatedNonBlockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); }
これは、エラーになります。
@Test public void annotatedNonBlockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("annotated-non-blocking-simple") .then() .statusCode(500); // java.lang.IllegalStateException: The current thread cannot be blocked: }
@NonBlocking
アノテーションを付与しても、リアクティブな型以外を戻り値とすることはできないようです。
ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-6) HTTP Request to /endpoints/annotated-non-blocking-simple?message=Hello%20World%21%21 failed, error id: bb7d38f6-b297-4035-9cb5-fc0d6aa459c4-1: java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-6
となると、こうなるわけですが。
@NonBlocking @GET @Path("annotated-non-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> annotatedNonBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); }
ふつうですね。このパターンだと、@NonBlocking
を使う意味はあまりないような…。
@Test public void annotatedNonBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("annotated-non-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド }
次は、@Transactional
を使ってみましょう。
Uni
を戻り値にしているメソッドに、@Transactional
アノテーションを付与します。
@Transactional @GET @Path("transactional-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> transactionalBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); }
テストコード。Uni
を使っているにも関わらず、ワーカースレッドを使うようになりました。
@Test public void transactionalBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド }
リアクティブではない型にしてみます。
@Transactional @GET @Path("transactional-blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo transactionalBlockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); }
ふつうですね。
テスト。まあ、ワーカースレッドになります。
@Test public void transactionalBlockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-blocking-simple") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド }
さらに、ここに@NonBlocking
アノテーションを付与してみます。
@Transactional @NonBlocking @GET @Path("transactional-annotated-non-blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo transactionalAnnotatedNonBlockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); }
これは、エラーになります。
ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-0) HTTP Request to /endpoints/transactional-annotated-non-blocking-simple?message=Hello%20World%21%21 failed, error id: afecd7b1-f35b-4b4d-9a9f-ced880f81956-2: java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-0
@Test public void transactionalAnnotatedNonBlockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-annotated-non-blocking-simple") .then() .statusCode(500); // java.lang.IllegalStateException: The current thread cannot be blocked: }
ここで、戻り値をUni
にすると
@Transactional @NonBlocking @GET @Path("transactional-non-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> transactionalNonBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); }
動作するようになります。
@Test public void transactionalNonBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-non-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド }
これらのことから、@NonBlocking
アノテーションを付与したからといって、通常のメソッドがノンブロッキングに
扱われるわけではないことがわかります。ちゃんと、リアクティブな型で宣言し、矛盾がないようにする必要が
ありそうです。
@Blocking
とリアクティブな型の組み合わせは良さそうなんですけどね。
確認したいところは、およそ見てた感じです。
javax.ws.rs.core.Application
のサブクラスを使ったデフォルトの振る舞いを変更するパターンは、今回はパスします。
IOスレッドとワーカースレッドの数は?
今回、IOスレッドとワーカースレッドで動作していることを確認しましたが、このスレッドの数はどこで設定
するのでしょう?
IOスレッドは、quarkus.vertx.event-loops-pool-size
で設定するようです。
Vert.x Reference Guide / quarkus.vertx.event-loops-pool-size
ワーカースレッドは、quarkus.thread-pool.core-threads
で設定するようです。
All Configuration Options / quarkus.thread-pool.core-threads
メソッドの戻り値の型や、ブロッキング/ノンブロッキングは、どのあたりで判定している?
メソッドの戻り値の型を検出している部分。これは、ビルド時みたいですね。
メソッドにアノテーションが付与されていた場合の処理。こちらもビルド時ですね。
BlockingHandler
で、ワーカースレッドを使用するスレッドプールで処理を実行します。
ブロッキングと判定されたメソッドに対して、BlockingHandler
を設定する部分。これは、実行時ですね。
こんな感じのようです。
まとめ
QuarkusのRESTEasy Reactiveを使って、エンドポイントとスレッドの関係を確認してみました。
ドキュメントなどがたくさんあったので、RESTEasy Classicの位置づけや、RESTEasy Mutinyとの違いなどが
確認できて良かったです。
だいぶ長くなりましたけどね。
例外まわりは、また別の機会に確認したいなと思います。
オマケ
断片的に載せていた、JAX-RSリソースクラスとテストクラスのソースコードを全体を載せておきます。
JAX-RSリソースクラス。
src/main/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResource.java
package org.littlewings.quarkus.resteasyreactive; import java.util.concurrent.CompletionStage; import javax.inject.Inject; import javax.transaction.Transactional; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import org.jboss.resteasy.reactive.RestQuery; import org.reactivestreams.Publisher; @Path("endpoints") public class VariousEndpointResource { @Inject ExecutionThreadService executionThreadService; @GET @Path("blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo blockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); } @GET @Path("non-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> nonBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); } @GET @Path("non-blocking-multi") @Produces(MediaType.APPLICATION_JSON) public Multi<ExecutionInfo> nonBlockingMulti(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .toMulti(); } @GET @Path("non-blocking-completion-stage") @Produces(MediaType.APPLICATION_JSON) public CompletionStage<ExecutionInfo> nonBlockingCompletionStage(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .subscribeAsCompletionStage(); } @GET @Path("non-blocking-publisher") @Produces(MediaType.APPLICATION_JSON) public Publisher<ExecutionInfo> nonBlockingPublisher(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .toMulti(); } @Blocking @GET @Path("annotated-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> annotatedBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); } @NonBlocking @GET @Path("annotated-non-blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo annotatedNonBlockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); } @NonBlocking @GET @Path("annotated-non-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> annotatedNonBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); } @Transactional @GET @Path("transactional-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> transactionalBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); } @Transactional @GET @Path("transactional-blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo transactionalBlockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); } @Transactional @NonBlocking @GET @Path("transactional-annotated-non-blocking-simple") @Produces(MediaType.APPLICATION_JSON) public ExecutionInfo transactionalAnnotatedNonBlockingSimple(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)) .await() .indefinitely(); } @Transactional @NonBlocking @GET @Path("transactional-non-blocking-uni") @Produces(MediaType.APPLICATION_JSON) public Uni<ExecutionInfo> transactionalNonBlockingUni(@RestQuery String message) { return executionThreadService .getCurrentThreadName() .onItem() .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName)); } }
テストクラス。
src/test/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResourceTest.java
package org.littlewings.quarkus.resteasyreactive; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; import static org.hamcrest.Matchers.*; @QuarkusTest @TestHTTPEndpoint(VariousEndpointResource.class) public class VariousEndpointResourceTest { @Test public void blockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("blocking-simple") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド } @Test public void nonBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド } @Test public void nonBlockingMultiTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-multi") .then() .statusCode(200) .body("message", hasSize(1)) .body("message", hasItem(is("★★★Hello World!!★★★"))) .body("currentThreadName", hasSize(1)) .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-"))); // IOスレッド } @Test public void nonBlockingCompletionStageTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-completion-stage") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド } @Test public void nonBlockingPublisherTest() { given() .when() .queryParam("message", "Hello World!!") .get("non-blocking-publisher") .then() .statusCode(200) .body("message", hasSize(1)) .body("message", hasItem(is("★★★Hello World!!★★★"))) .body("currentThreadName", hasSize(1)) .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-"))); // IOスレッド } @Test public void annotatedBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("annotated-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド } @Test public void annotatedNonBlockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("annotated-non-blocking-simple") .then() .statusCode(500); // java.lang.IllegalStateException: The current thread cannot be blocked: } @Test public void annotatedNonBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("annotated-non-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド } @Test public void transactionalBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド } @Test public void transactionalBlockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-blocking-simple") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("executor-thread-")); // ワーカースレッド } @Test public void transactionalAnnotatedNonBlockingSimpleTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-annotated-non-blocking-simple") .then() .statusCode(500); // java.lang.IllegalStateException: The current thread cannot be blocked: } @Test public void transactionalNonBlockingUniTest() { given() .when() .queryParam("message", "Hello World!!") .get("transactional-non-blocking-uni") .then() .statusCode(200) .body("message", is("★★★Hello World!!★★★")) .body("currentThreadName", startsWith("vert.x-eventloop-thread-")); // IOスレッド } }