CLOVER🍀

That was when it all began.

QuarkusのRESTEasy Reativeを使って、エンドポイントとスレッドの関係を確認する

これは、なにをしたくて書いたもの?

Quarkusのドキュメントを見ていて、RESTEasy Reactiveというものの存在が気になりまして。

RESTEasy Mutinyが名前を変えたものなのかな?と思ったのですが、どうやらそうではなさそうなので見てみることに
しました。

ドキュメントを見ていると、スレッド回りが気になるので最終的にはこのあたりを確認したいと思います。

Getting Started with Reactive

現在(2.2.3.Final)のQuarkusのReactive向けのGetting Startedのドキュメントはこちら。

Quarkus - Getting Started With Reactive

ここでは、従来のモデルがリクエストをスレッド(ワーカースレッド)に割り当てていること、リアクティブな
モデルではワーカースレッドを使わずノンブロッキングIOスレッドで処理を行うためメモリとCPUを節約でき、
スレッドのスイッチのコストも減らせると書かれています。

Getting Started with Reactive / Imperative vs. Reactive: a question of threads

ただ、IOを扱う処理を書くような場合は少し工夫が必要です。Quarkusでは、この方法としてMutinyとKotlinコルーチンを
提案しています。

Getting Started with Reactive / From sequential to continuation style

Quarkusのドキュメントでもそうですが、こちらでもこの選択肢としてはMutinyを取ることにします。

MutinyとはSmallRye Mutinyのことですが、Quarkusのドキュメント内にもMutinyの使い方が書かれています。

Quarkus - Mutiny - Async for bare mortal

もっと詳しく見る場合は、SmallRye Mutinyのドキュメントを参照しましょう。

Mutiny!

また、Quarkusにおけるリアクティブアーキテクチャの話は、こちらのドキュメントに記載されています。

Quarkus - Quarkus Reactive Architecture

QuarkusのリアクティブなAPIを持つExtentionも紹介されています。リアクティブなExtensionにはどのようなものが
あるのか知りたい時は、このページを見るとよいでしょう。

ここまでは、Quarkusとリアクティブの話でした。
ドキュメントが参照しているコードでは、RESTEasy ReactiveとHibernate Reactiveが使われています。

RESTEasy Reactive

続いては、RESTEasy Reactiveに話を移しましょう。

Quarkus - Writing REST Services with RESTEasy Reactive

RESTEasy Reactiveは、Vert.xを基盤にしたJAX-RSの実装です。エンドポイントはブロッキングとノンブロッキングの
2種類を扱うことができます。

アーティファクトのIDとしては、quarkus-resteasy-reactiveになります。Extensionではresteasy-reactiveという名前で
指定することになりますね。

ドキュメントはしばらくはJAX-RSの説明が続きますが、リクエストパラメーターへのアクセス方法あたりから
変化があります。

Writing REST Services With RESTEasy Reactive / Accessing request parameters

通常のJAX-RSと異なり、RESTEasy Reactiveの場合はRESTEasyのアノテーションでリクエストパラメーターに
アクセスするようです。

org.jboss.resteasy.reactive package summary - resteasy-reactive-common 2.0.0.Final javadoc

たとえば、@QueryParamの代わりに@RestQueryを使う、などです。

You can also use any of the JAX-RS annotations @PathParam, @QueryParam, @HeaderParam, @CookieParam, @FormParam or @MatrixParam for this, but they require you to specify the parameter name.

なのですが、一応通常のJAX-RSのアノテーションも使えるみたいなんですよね。差異は、パラメーター名を明示する
必要があるかどうか、です。

RESTEasyの@Rest〜アノテーションを使う場合は、パラメーター名の指定は不要なのですが、メソッドの引数名を
残すようにコンパイラに設定する必要があります。

don’t forget to configure your compiler to generate parameter name information with -parameters (javac) or or <maven.compiler.parameters>

ただ、これはQuarkusの機能でプロジェクトを作成すると、デフォルトで有効化された状態になっています。

https://github.com/quarkusio/quarkus/blob/2.2.3.Final/independent-projects/tools/base-codestarts/src/main/resources/codestarts/quarkus/buildtool/maven/base/pom.tpl.qute.xml#L142-L150

https://github.com/quarkusio/quarkus/blob/2.2.3.Final/independent-projects/tools/base-codestarts/src/main/resources/codestarts/quarkus/buildtool/maven/base/pom.tpl.qute.xml#L13

リクエストで使用できる型。

Writing REST Services With RESTEasy Reactive / Accessing the request body

レスポンスで使用できる追加型。表の外にも書いてありますが、SmallRye MutinyのUni、Multiを使えたり、
CompletionStageの利用も可能です。

Writing REST Services With RESTEasy Reactive / Returning a response body

オブジェクトとJSONの変換については、quarkus-resteasy-reactive-jackson(Jackson)か
quarkus-resteasy-reactive-jsonb(JSON-B)のいずれかのExtensionを使用します。

JSON serialisation

どちらも、リアクティブ用ですね。

基本的にメソッドの戻り値はリソースを指しますが、HTTPボディ以外の内容(ステータスコードやHTTPヘッダーなど)も
設定したい場合は、RestResponseを使います。
※JAX-RSのResponseも利用可能ですが、型安全ではありません

Writing REST Services With RESTEasy Reactive / Setting other response properties

リアクティブやストリーミングのサポートについて。

Writing REST Services With RESTEasy Reactive / Async/reactive support

Writing REST Services With RESTEasy Reactive / Streaming support

リソースメソッドへ、コンテキストオブジェクトとして渡せるクラス。UriInfoやHttpHeadersなどです。

Writing REST Services With RESTEasy Reactive / Accessing context objects

少し高度な話題と実行モデル

後半には、少し高度な話題が入っています。

Writing REST Services With RESTEasy Reactive / More advanced usage

個人的にはブロッキング、ノンブロッキングの実行モデル、

Writing REST Services With RESTEasy Reactive / Execution model, blocking, non-blocking

例外処理あたりがまずは気になるところですが。

Writing REST Services With RESTEasy Reactive / Exception mapping

今回は、こちらの実行モデルの方を見ていきたいと思います。

Writing REST Services With RESTEasy Reactive / Execution model, blocking, non-blocking

RESTEasy Reactiveでは、2種類のスレッドを使います。

  • イベントループスレッド(別名IOスレッド) … 主にリクエストを読み取り、レスポンスを書き出す用途で使用
  • ワーカースレッド … 時間のかかる操作をオフロードするためのプール

イベントループスレッド(IOスレッド)は、すべてのIO操作を非同期で実行し、IO操作完了時のリスナーの役割も持ちます。

RESTEasy Reactiveではエンドポイントとなるメソッドの宣言で、デフォルトで使われるスレッドが変わります。

メソッドの戻り値が以下の型の場合、IOスレッドが使われます。

  • io.smallrye.mutiny.Uni
  • io.smallrye.mutiny.Multi
  • java.util.concurrent.CompletionStage
  • org.reactivestreams.Publisher
  • Kotlinのsuspendメソッド

それ以外の型の場合は、ワーカースレッドで動作します。

If a method or class is annotated with javax.transaction.Transactional then it will also be treated as a blocking method. This is because JTA is a blocking technology, and is generally used with other blocking technology such as Hibernate and JDBC.

また、メソッドやクラスに@Transactionalアノテーションを付与しても、ブロッキング扱いになりワーカースレッドが
使われることになります。

この動作は@Blocking、@NonBlockingアノテーションを使用してオーバーライドすることができます。
※@Transactionalとの併用含む

Blocking - smallrye-common-annotation 1.5.0 javadoc

NonBlocking - smallrye-common-annotation 1.5.0 javadoc

全体的なデフォルトの振る舞いは、javax.ws.rs.core.Applicationのサブクラスに@Blocking、@NonBlockingを付与する
ことでオーバーライドすることもできるようです。

If you want to override the default behaviour you can annotate a javax.ws.rs.core.Application subclass in your application with @Blocking or @NonBlocking, and this will set the default for every method that does not have an explicit annotation.

Writing REST Services With RESTEasy Reactive / Overriding the default behaviour

今回は、このあたりを確認してみたいと思います。

RESTEasy ReactiveとRESTEasy Classic、RESTEasy ReactiveとRESTEasy Mutiny

Getting Started(非リアクティブ)などで使われているQuarkusのRESTEasy Extensionは、今はRESTEasy Classicという
呼び方をするみたいです。

アーティファクトIDとしては、quarkus-resteasyですね。

RESTEasy ReactiveとRESTEasy Classicの違いは、以下のブログに書かれています。

Quarkus - Massive performance without headaches

Quarkus - RESTEasy Reactive - To block or not to block

RESTEasy Classicは常にワーカースレッド上で動作し、RESTEasy ReactiveはIOスレッドを使うのかワーカースレッドを
使うのかを開発者が選択できることに違いがあります。

ちなみに、ブロッキングを選択してもRESTEasy Reactiveの方がパフォーマンスが良いとされているようです。

Massive performance without headaches / What is the performance implication of using @Blocking?

Massive performance without headaches / Why does RESTEasy Reactive using @Blocking perform better than RESTEasy Classic?

RESTEasy Classicも内部ではVert.xを使っているのですが、RESTEasy Reactiveと比べると統合の度合いが低いと
しています。

その他、RESTEasy Reactiveは新しく作った実装であること、ThreadLocalの排除、Arcの利用の最適化などが
ポイントのようです。

あとはRESTEasy Mutinyとの違いが気になるところですね。こちらも、違いはRESTEasy Reactive、RESTEasy Classicの
違いと同じく、ワーカースレッドで動作することです。

Massive performance without headaches / How does it compare to RESTEasy Classic with Mutiny?

つまり、API上はリアクティブなものを使っていても、リソースを効率的に扱えなかったのがRESTEasy Mutiny
だったということになります。

ここまでくると、デフォルトをRESTEasy Reactiveにしても…と思うのですが、それはやりすぎなんでしょうね。

とはいえ、リアクティブなAPIの方を使うとブロックするAPIを使った時に困るので、これで導入されたのが
メソッドの戻り値でリクエストを処理するのに使用するスレッドを選択する方法のようです。

RESTEasy Reactive - To block or not to block / To block or not to block, that is the question.

RESTEasy Reactive - To block or not to block / New world, new rules!

現在では、RESTEasy ReactiveとRESTEasy ClassicはExtensionのディレクトリ内で別のツリーで管理されています。

https://github.com/quarkusio/quarkus/tree/2.2.3.Final/extensions/resteasy-reactive

https://github.com/quarkusio/quarkus/tree/2.2.3.Final/extensions/resteasy-classic

RESTEasy Mutinyは、RESTEasy Classic側に属します。

RESTEasy Reactiveの場合は、アーティファクトのIDにreactiveが入っているので間違えることはないと思いますが、
どのようなExtensionがあるかは確認できるでしょう。

長くなりましたが、ここまでのQuarkusでのRESTEasyのリアクティブAPIに関する変化や情報を見てきました。

そろそろ、動かしてみましょうか。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.2 (ea98e05a04480131370aa0c110b8c54cf726c06f)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-86-generic", arch: "amd64", family: "unix"

プロジェクトの作成とお題。

では、Quarkusプロジェクトを作成しましょう。

$ mvn io.quarkus.platform:quarkus-maven-plugin:2.2.3.Final:create \
    -DprojectGroupId=org.littlewings \
    -DprojectArtifactId=resteasy-reactive \
    -DprojectVersion=0.0.1-SNAPSHOT \
    -Dextensions="resteasy-reactive,resteasy-reactive-jackson"

Extensionには、RESTEasy ReactiveとJacksonを指定。

[INFO] --- quarkus-maven-plugin:2.2.3.Final:create (default-cli) @ standalone-pom ---
[INFO] -----------
[INFO] selected extensions: 
- io.quarkus:quarkus-resteasy-reactive
- io.quarkus:quarkus-resteasy-reactive-jackson

[INFO] 
applying codestarts...
[INFO] 📚  java
🔨  maven
📦  quarkus
📝  config-properties
🔧  dockerfiles
🔧  maven-wrapper
🚀  resteasy-reactive-codestart
[INFO] 
-----------

作成されたプロジェクト内に移動。

$ cd resteasy-reactive

依存関係は、こんな感じになりました。

  <dependencies>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-reactive</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-arc</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-junit5</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.rest-assured</groupId>
      <artifactId>rest-assured</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

quarkus-resteasyは入っていませんね。

参考までに、こんな感じでRESTEasy ReactiveとRESTEasy Classicを同時に依存関係に加えると

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-reactive</artifactId>
    </dependency>

アプリケーションの起動に失敗するので注意しましょう。

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Multiple matching properties for name "security.jaxrs.deny-unannotated-endpoints" property was matched by both public boolean io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs and public boolean io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs. This is likely because you have an incompatible combination of extensions that both define the same properties (e.g. including both reactive and blocking database extensions)
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Multiple matching properties for name "security.jaxrs.deny-unannotated-endpoints" property was matched by both public boolean io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs and public boolean io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs. This is likely because you have an incompatible combination of extensions that both define the same properties (e.g. including both reactive and blocking database extensions)
Caused by: java.lang.IllegalArgumentException: Multiple matching properties for name "security.jaxrs.deny-unannotated-endpoints" property was matched by both public boolean io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs and public boolean io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs. This is likely because you have an incompatible combination of extensions that both define the same properties (e.g. including both reactive and blocking database extensions)

ちなみに、生成されたソースコードはこんな感じです。

src/main/java/org/littlewings/ReactiveGreetingResource.java

package org.littlewings;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class ReactiveGreetingResource {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "Hello RESTEasy Reactive";
    }
}

ワーカースレッドを使う定義になっていますね。

テストコードは、こんな感じでした。

src/test/java/org/littlewings/ReactiveGreetingResourceTest.java

package org.littlewings;

import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;

@QuarkusTest
public class ReactiveGreetingResourceTest {

    @Test
    public void testHelloEndpoint() {
        given()
          .when().get("/hello")
          .then()
             .statusCode(200)
             .body(is("Hello RESTEasy Reactive"));
    }

}

src/test/java/org/littlewings/NativeReactiveGreetingResourceIT.java

package org.littlewings;

import io.quarkus.test.junit.NativeImageTest;

@NativeImageTest
public class NativeReactiveGreetingResourceIT extends ReactiveGreetingResourceTest {

    // Execute the same tests but in native mode.
}

これらはいったん削除して。

$ rm -rf src/main/java/org/littlewings/ReactiveGreetingResource.java src/test/java/org/littlewings/*

今回は、JAX-RSのエンドポイントの宣言と、実際に使われるスレッドをテストコードで確認してみたいと思います。

JAX-RSリソースクラスの雛形と、レスポンスに使うクラスを作る

最初にJAX-RSリソースクラスを定義します。

src/main/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResource.java

package org.littlewings.quarkus.resteasyreactive;

import java.util.concurrent.CompletionStage;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.reactive.RestQuery;
import org.reactivestreams.Publisher;

@Path("endpoints")
public class VariousEndpointResource {
    @Inject
    ExecutionThreadService executionThreadService;

    // ここにリソースメソッドを定義する

}

中身は、後で順次埋めていきます。

どのスレッドで実行されているかを確認したいので、現在のスレッド名を返すクラスを作成します。
これは、JAX-RSリソースクラスへインジェクションしています。

src/main/java/org/littlewings/quarkus/resteasyreactive/ExecutionThreadService.java

package org.littlewings.quarkus.resteasyreactive;

import javax.enterprise.context.ApplicationScoped;

import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class ExecutionThreadService {
    public Uni<String> getCurrentThreadName() {
        return Uni.createFrom().item(Thread.currentThread().getName());
    }
}

レスポンスに使うクラス。メッセージとスレッド名を含めることにしました。メッセージは、JAX-RSリソースクラスで
QueryStringとして受け取ることにします。

src/main/java/org/littlewings/quarkus/resteasyreactive/ExecutionInfo.java

package org.littlewings.quarkus.resteasyreactive;

public class ExecutionInfo {
    String message;

    String currentThreadName;

    public static ExecutionInfo create(String message, String currentThreadName) {
        ExecutionInfo info = new ExecutionInfo();

        info.message = message;
        info.currentThreadName = currentThreadName;

        return info;
    }


    // getter/setterは省略
}

テストコードの雛形

続いて、テストコードの雛形を作ります。

src/test/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResourceTest.java

package org.littlewings.quarkus.resteasyreactive;

import io.quarkus.test.common.http.TestHTTPEndpoint;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.*;

@QuarkusTest
@TestHTTPEndpoint(VariousEndpointResource.class)
public class VariousEndpointResourceTest {

    // ここに、テストコードを書く!
}

以降は、JAX-RSリソースクラスのメソッドと、対応するテストメソッドを順番に書いていくことにします。

JAX-RSリソースクラスのメソッドの型で、利用するスレッドが変わることを確認する

では、順番に書いていきます。

確認したいのは、こちらですね。

Writing REST Services With RESTEasy Reactive / Execution model, blocking, non-blocking

まずは、リアクティブな型ではないものから。

    @GET
    @Path("blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo blockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

QueryStringは、@RestQueryアノテーションを付与して扱っています。あと、リアクティブな型ではないとは言いつつ、
ブロックするコードにはなっています。

対応するテストコード。

    @Test
    public void blockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("blocking-simple")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

ひとつ目の例なのでなんとも言えない感じはしますが、これでワーカースレッドで動作していることが確認できます。

次は、Uniを戻り値にしています。

    @GET
    @Path("non-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> nonBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

今度は、スレッド名がvert.x-eventloop-thread-で始まるものになります。こちらはIOスレッドです。

    @Test
    public void nonBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

戻り値の型をMultiに。

    @GET
    @Path("non-blocking-multi")
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<ExecutionInfo> nonBlockingMulti(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .toMulti();
    }

対応するテストコード。IOスレッドになります。

    @Test
    public void nonBlockingMultiTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-multi")
                .then()
                .statusCode(200)
                .body("message", hasSize(1))
                .body("message", hasItem(is("★★★Hello World!!★★★")))
                .body("currentThreadName", hasSize(1))
                .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-")));  // IOスレッド
    }

戻り値の型をCompletionStageに。

    @GET
    @Path("non-blocking-completion-stage")
    @Produces(MediaType.APPLICATION_JSON)
    public CompletionStage<ExecutionInfo> nonBlockingCompletionStage(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .subscribeAsCompletionStage();
    }

対応するテストコード。IOスレッドになります。

    @Test
    public void nonBlockingCompletionStageTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-completion-stage")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

戻り値の型をPublisherに。

    @GET
    @Path("non-blocking-publisher")
    @Produces(MediaType.APPLICATION_JSON)
    public Publisher<ExecutionInfo> nonBlockingPublisher(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .toMulti();
    }

対応するテストコード。IOスレッドになります。

    @Test
    public void nonBlockingPublisherTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-publisher")
                .then()
                .statusCode(200)
                .body("message", hasSize(1))
                .body("message", hasItem(is("★★★Hello World!!★★★")))
                .body("currentThreadName", hasSize(1))
                .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-")));  // IOスレッド
    }

ドキュメント通りなことを確認しました。

@Blocking、@NonBlocking、@Transactionalアノテーションを使う

続いては、メソッド定義にアノテーションを加えて、実行モデルをオーバーライドしてみましょう。

Uniを使ったリソース定義を、@Blockingにしてみます。

    @Blocking
    @GET
    @Path("annotated-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> annotatedBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

テスト。ワーカースレッドが使われるようになりました。

    @Test
    public void annotatedBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("annotated-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

リアクティブな型ではない戻り値のメソッドを、@NonBlockingにしてみます。

    @NonBlocking
    @GET
    @Path("annotated-non-blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo annotatedNonBlockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

これは、エラーになります。

    @Test
    public void annotatedNonBlockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("annotated-non-blocking-simple")
                .then()
                .statusCode(500);  // java.lang.IllegalStateException: The current thread cannot be blocked:
    }

@NonBlockingアノテーションを付与しても、リアクティブな型以外を戻り値とすることはできないようです。

ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-6) HTTP Request to /endpoints/annotated-non-blocking-simple?message=Hello%20World%21%21 failed, error id: bb7d38f6-b297-4035-9cb5-fc0d6aa459c4-1: java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-6

となると、こうなるわけですが。

    @NonBlocking
    @GET
    @Path("annotated-non-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> annotatedNonBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

ふつうですね。このパターンだと、@NonBlockingを使う意味はあまりないような…。

    @Test
    public void annotatedNonBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("annotated-non-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

次は、@Transactionalを使ってみましょう。

Uniを戻り値にしているメソッドに、@Transactionalアノテーションを付与します。

    @Transactional
    @GET
    @Path("transactional-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> transactionalBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

テストコード。Uniを使っているにも関わらず、ワーカースレッドを使うようになりました。

    @Test
    public void transactionalBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

リアクティブではない型にしてみます。

    @Transactional
    @GET
    @Path("transactional-blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo transactionalBlockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

ふつうですね。

テスト。まあ、ワーカースレッドになります。

    @Test
    public void transactionalBlockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-blocking-simple")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

さらに、ここに@NonBlockingアノテーションを付与してみます。

    @Transactional
    @NonBlocking
    @GET
    @Path("transactional-annotated-non-blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo transactionalAnnotatedNonBlockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

これは、エラーになります。

ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-0) HTTP Request to /endpoints/transactional-annotated-non-blocking-simple?message=Hello%20World%21%21 failed, error id: afecd7b1-f35b-4b4d-9a9f-ced880f81956-2: java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-0
    @Test
    public void transactionalAnnotatedNonBlockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-annotated-non-blocking-simple")
                .then()
                .statusCode(500);  // java.lang.IllegalStateException: The current thread cannot be blocked:
    }

ここで、戻り値をUniにすると

    @Transactional
    @NonBlocking
    @GET
    @Path("transactional-non-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> transactionalNonBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

動作するようになります。

    @Test
    public void transactionalNonBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-non-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

これらのことから、@NonBlockingアノテーションを付与したからといって、通常のメソッドがノンブロッキングに
扱われるわけではないことがわかります。ちゃんと、リアクティブな型で宣言し、矛盾がないようにする必要が
ありそうです。

@Blockingとリアクティブな型の組み合わせは良さそうなんですけどね。

確認したいところは、およそ見てた感じです。

javax.ws.rs.core.Applicationのサブクラスを使ったデフォルトの振る舞いを変更するパターンは、今回はパスします。

IOスレッドとワーカースレッドの数は?

今回、IOスレッドとワーカースレッドで動作していることを確認しましたが、このスレッドの数はどこで設定
するのでしょう?

IOスレッドは、quarkus.vertx.event-loops-pool-sizeで設定するようです。

Vert.x Reference Guide / quarkus.vertx.event-loops-pool-size

ワーカースレッドは、quarkus.thread-pool.core-threadsで設定するようです。

All Configuration Options / quarkus.thread-pool.core-threads

メソッドの戻り値の型や、ブロッキング/ノンブロッキングは、どのあたりで判定している?

メソッドの戻り値の型を検出している部分。これは、ビルド時みたいですね。

https://github.com/quarkusio/quarkus/blob/2.2.3.Final/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java

メソッドにアノテーションが付与されていた場合の処理。こちらもビルド時ですね。

https://github.com/quarkusio/quarkus/blob/2.2.3.Final/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java#L561-L597

BlockingHandlerで、ワーカースレッドを使用するスレッドプールで処理を実行します。

https://github.com/quarkusio/quarkus/blob/2.2.3.Final/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/BlockingHandler.java

ブロッキングと判定されたメソッドに対して、BlockingHandlerを設定する部分。これは、実行時ですね。

https://github.com/quarkusio/quarkus/blob/2.2.3.Final/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java#L181-L189

こんな感じのようです。

まとめ

QuarkusのRESTEasy Reactiveを使って、エンドポイントとスレッドの関係を確認してみました。

ドキュメントなどがたくさんあったので、RESTEasy Classicの位置づけや、RESTEasy Mutinyとの違いなどが
確認できて良かったです。

だいぶ長くなりましたけどね。

例外まわりは、また別の機会に確認したいなと思います。

オマケ

断片的に載せていた、JAX-RSリソースクラスとテストクラスのソースコードを全体を載せておきます。

JAX-RSリソースクラス。

src/main/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResource.java

package org.littlewings.quarkus.resteasyreactive;

import java.util.concurrent.CompletionStage;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.reactive.RestQuery;
import org.reactivestreams.Publisher;

@Path("endpoints")
public class VariousEndpointResource {
    @Inject
    ExecutionThreadService executionThreadService;

    @GET
    @Path("blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo blockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

    @GET
    @Path("non-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> nonBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

    @GET
    @Path("non-blocking-multi")
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<ExecutionInfo> nonBlockingMulti(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .toMulti();
    }

    @GET
    @Path("non-blocking-completion-stage")
    @Produces(MediaType.APPLICATION_JSON)
    public CompletionStage<ExecutionInfo> nonBlockingCompletionStage(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .subscribeAsCompletionStage();
    }

    @GET
    @Path("non-blocking-publisher")
    @Produces(MediaType.APPLICATION_JSON)
    public Publisher<ExecutionInfo> nonBlockingPublisher(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .toMulti();
    }

    @Blocking
    @GET
    @Path("annotated-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> annotatedBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

    @NonBlocking
    @GET
    @Path("annotated-non-blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo annotatedNonBlockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

    @NonBlocking
    @GET
    @Path("annotated-non-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> annotatedNonBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

    @Transactional
    @GET
    @Path("transactional-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> transactionalBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }

    @Transactional
    @GET
    @Path("transactional-blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo transactionalBlockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

    @Transactional
    @NonBlocking
    @GET
    @Path("transactional-annotated-non-blocking-simple")
    @Produces(MediaType.APPLICATION_JSON)
    public ExecutionInfo transactionalAnnotatedNonBlockingSimple(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName))
                .await()
                .indefinitely();
    }

    @Transactional
    @NonBlocking
    @GET
    @Path("transactional-non-blocking-uni")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<ExecutionInfo> transactionalNonBlockingUni(@RestQuery String message) {
        return executionThreadService
                .getCurrentThreadName()
                .onItem()
                .transform(currentThreadName -> ExecutionInfo.create("★★★" + message + "★★★", currentThreadName));
    }
}

テストクラス。

src/test/java/org/littlewings/quarkus/resteasyreactive/VariousEndpointResourceTest.java

package org.littlewings.quarkus.resteasyreactive;

import io.quarkus.test.common.http.TestHTTPEndpoint;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.*;

@QuarkusTest
@TestHTTPEndpoint(VariousEndpointResource.class)
public class VariousEndpointResourceTest {
    @Test
    public void blockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("blocking-simple")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

    @Test
    public void nonBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

    @Test
    public void nonBlockingMultiTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-multi")
                .then()
                .statusCode(200)
                .body("message", hasSize(1))
                .body("message", hasItem(is("★★★Hello World!!★★★")))
                .body("currentThreadName", hasSize(1))
                .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-")));  // IOスレッド
    }

    @Test
    public void nonBlockingCompletionStageTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-completion-stage")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

    @Test
    public void nonBlockingPublisherTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("non-blocking-publisher")
                .then()
                .statusCode(200)
                .body("message", hasSize(1))
                .body("message", hasItem(is("★★★Hello World!!★★★")))
                .body("currentThreadName", hasSize(1))
                .body("currentThreadName", hasItem(startsWith("vert.x-eventloop-thread-")));  // IOスレッド
    }

    @Test
    public void annotatedBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("annotated-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

    @Test
    public void annotatedNonBlockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("annotated-non-blocking-simple")
                .then()
                .statusCode(500);  // java.lang.IllegalStateException: The current thread cannot be blocked:
    }

    @Test
    public void annotatedNonBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("annotated-non-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }

    @Test
    public void transactionalBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

    @Test
    public void transactionalBlockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-blocking-simple")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("executor-thread-"));  // ワーカースレッド
    }

    @Test
    public void transactionalAnnotatedNonBlockingSimpleTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-annotated-non-blocking-simple")
                .then()
                .statusCode(500);  // java.lang.IllegalStateException: The current thread cannot be blocked:
    }

    @Test
    public void transactionalNonBlockingUniTest() {
        given()
                .when()
                .queryParam("message", "Hello World!!")
                .get("transactional-non-blocking-uni")
                .then()
                .statusCode(200)
                .body("message", is("★★★Hello World!!★★★"))
                .body("currentThreadName", startsWith("vert.x-eventloop-thread-"));  // IOスレッド
    }
}