CLOVER🍀

That was when it all began.

Quarkus × RESTEasy Mutiny(Reactive)を試す

これは、なにをしたくて書いたもの?

Quarkusのドキュメントを見ていて、RxJavaでもないReactiveなAPIがいるのに気づきまして。

Quarkus - Getting started with Reactive

Quarkus 1.3.0から、MunityというReactive Streamsの実装が統合されているようです。

Quarkus 1.3.0.Final released - New class loader infrastructure, GraalVM 20 support and much much more

Mutiny arising

A new reactive programming API has been introduced. This API named Mutiny replaces the Axle and Reactive Streams Operators models (Reactive Streams and CompletionStage). The previous models are still functional, but deprecated and will be removed in the future.

REST Clientでも使えるみたいです。

Using the REST Client (including JSON) / Async Support

あと、Reactive SQL Clientsでも顔を出していますね。

Quarkus - Reactive SQL Clients

せっかくなので、ちょっと試してみようかな、と。

SmallRye Mutiny

正確には、SmallRye Munityという名前のようです。

GitHub - smallrye/smallrye-mutiny: An Intuitive Event-Driven Reactive Programming Library for Java

先ほど書いたとおり、Reactive Streamsの実装です。

ドキュメントおよびJavadocは、こちら。

Mutiny!

https://smallrye.io/smallrye-mutiny/apidocs/

見たところ、完全に新しく作っているみたいですね。

CompletionStageやRxJava 2、Reactorとの変換も可能なようです。

How do I interact with CompletionStages?

How do I interact with RX Java 2?

How do I interact with Reactor?

今回使うMutinyは0.5.4ですが、ドキュメントは現時点では0.7.0を指しています。

環境

今回の環境は、こちら。

$ java --version
openjdk 11.0.8 2020-07-14
OpenJDK Runtime Environment (build 11.0.8+10-post-Ubuntu-0ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.8+10-post-Ubuntu-0ubuntu118.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.8, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-112-generic", arch: "amd64", family: "unix"

準備

では、プロジェクトを作っていきましょう。

Extensionに「resteasy-mutiny」と「rest-client」を加えて、プロジェクトを作成。JSONは、Jacksonで扱うことにします。

$ mvn io.quarkus:quarkus-maven-plugin:1.6.1.Final:create \
    -DprojectGroupId=org.littlewings \
    -DprojectArtifactId=reactive-resteasy-mutiny \
    -Dextensions="resteasy-mutiny, resteasy-jackson, rest-client"

できあがったプロジェクトのディレクトリに入って

$ cd reactive-resteasy-mutiny

依存関係に含まれる内容を見てみます。

  <dependencies>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-junit5</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.rest-assured</groupId>
      <artifactId>rest-assured</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-jackson</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-mutiny</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-rest-client</artifactId>
    </dependency>
  </dependencies>

「mvn dependency:tree」も見てみましょう。

$ mvn dependency:tree

結果は、こちら。

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ reactive-resteasy-mutiny ---
[INFO] org.littlewings:reactive-resteasy-mutiny:jar:1.0-SNAPSHOT
[INFO] +- io.quarkus:quarkus-resteasy:jar:1.6.1.Final:compile
[INFO] |  +- io.quarkus:quarkus-vertx-http:jar:1.6.1.Final:compile
[INFO] |  |  +- io.quarkus:quarkus-development-mode-spi:jar:1.6.1.Final:compile
[INFO] |  |  +- io.quarkus.security:quarkus-security:jar:1.1.2.Final:compile
[INFO] |  |  +- jakarta.enterprise:jakarta.enterprise.cdi-api:jar:2.0.2:compile
[INFO] |  |  |  \- jakarta.el:jakarta.el-api:jar:3.0.3:compile
[INFO] |  |  +- io.quarkus:quarkus-vertx-core:jar:1.6.1.Final:compile
[INFO] |  |  |  +- io.quarkus:quarkus-netty:jar:1.6.1.Final:compile
[INFO] |  |  |  |  +- io.netty:netty-codec:jar:4.1.49.Final:compile
[INFO] |  |  |  |  \- io.netty:netty-handler:jar:4.1.49.Final:compile
[INFO] |  |  |  \- io.vertx:vertx-core:jar:3.9.1:compile
[INFO] |  |  |     +- io.netty:netty-common:jar:4.1.49.Final:compile
[INFO] |  |  |     +- io.netty:netty-buffer:jar:4.1.49.Final:compile
[INFO] |  |  |     +- io.netty:netty-transport:jar:4.1.49.Final:compile
[INFO] |  |  |     +- io.netty:netty-handler-proxy:jar:4.1.49.Final:compile
[INFO] |  |  |     |  \- io.netty:netty-codec-socks:jar:4.1.49.Final:compile
[INFO] |  |  |     +- io.netty:netty-codec-http:jar:4.1.49.Final:compile
[INFO] |  |  |     +- io.netty:netty-codec-http2:jar:4.1.49.Final:compile
[INFO] |  |  |     +- io.netty:netty-resolver:jar:4.1.49.Final:compile
[INFO] |  |  |     \- io.netty:netty-resolver-dns:jar:4.1.49.Final:compile
[INFO] |  |  |        \- io.netty:netty-codec-dns:jar:4.1.49.Final:compile
[INFO] |  |  \- io.vertx:vertx-web:jar:3.9.1:compile
[INFO] |  |     +- io.vertx:vertx-web-common:jar:3.9.1:compile
[INFO] |  |     +- io.vertx:vertx-auth-common:jar:3.9.1:compile
[INFO] |  |     \- io.vertx:vertx-bridge-common:jar:3.9.1:compile
[INFO] |  \- io.quarkus:quarkus-resteasy-server-common:jar:1.6.1.Final:compile
[INFO] |     \- jakarta.validation:jakarta.validation-api:jar:2.0.2:compile
[INFO] +- io.quarkus:quarkus-junit5:jar:1.6.1.Final:test
[INFO] |  +- io.quarkus:quarkus-bootstrap-core:jar:1.6.1.Final:test
[INFO] |  |  +- io.quarkus:quarkus-bootstrap-app-model:jar:1.6.1.Final:test
[INFO] |  |  \- io.quarkus:quarkus-bootstrap-maven-resolver:jar:1.6.1.Final:test
[INFO] |  |     +- org.ow2.asm:asm:jar:8.0.1:test
[INFO] |  |     +- org.apache.maven:maven-embedder:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven:maven-settings:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven:maven-core:jar:3.6.3:test
[INFO] |  |     |  |  +- org.apache.maven:maven-artifact:jar:3.6.3:test
[INFO] |  |     |  |  \- org.codehaus.plexus:plexus-component-annotations:jar:2.1.0:test
[INFO] |  |     |  +- org.apache.maven:maven-plugin-api:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven:maven-model:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven:maven-model-builder:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven:maven-builder-support:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven.resolver:maven-resolver-api:jar:1.4.1:test
[INFO] |  |     |  +- org.apache.maven.resolver:maven-resolver-util:jar:1.4.1:test
[INFO] |  |     |  +- org.apache.maven.shared:maven-shared-utils:jar:3.2.1:test
[INFO] |  |     |  +- com.google.inject:guice:jar:no_aop:4.2.1:test
[INFO] |  |     |  +- org.codehaus.plexus:plexus-utils:jar:3.2.1:test
[INFO] |  |     |  +- org.codehaus.plexus:plexus-classworlds:jar:2.5.2:test
[INFO] |  |     |  \- commons-cli:commons-cli:jar:1.4:test
[INFO] |  |     +- org.eclipse.sisu:org.eclipse.sisu.plexus:jar:0.3.4:test
[INFO] |  |     +- org.apache.maven:maven-settings-builder:jar:3.6.3:test
[INFO] |  |     |  +- org.codehaus.plexus:plexus-interpolation:jar:1.25:test
[INFO] |  |     |  \- org.sonatype.plexus:plexus-sec-dispatcher:jar:1.4:test
[INFO] |  |     |     \- org.sonatype.plexus:plexus-cipher:jar:1.4:test
[INFO] |  |     +- org.apache.maven:maven-resolver-provider:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven:maven-repository-metadata:jar:3.6.3:test
[INFO] |  |     |  +- org.apache.maven.resolver:maven-resolver-spi:jar:1.4.1:test
[INFO] |  |     |  \- org.apache.maven.resolver:maven-resolver-impl:jar:1.4.1:test
[INFO] |  |     +- org.apache.maven.resolver:maven-resolver-connector-basic:jar:1.4.1:test
[INFO] |  |     +- org.apache.maven.resolver:maven-resolver-transport-wagon:jar:1.4.1:test
[INFO] |  |     +- org.apache.maven.wagon:wagon-http:jar:3.3.4:test
[INFO] |  |     |  +- org.apache.maven.wagon:wagon-http-shared:jar:3.3.4:test
[INFO] |  |     |  |  \- org.jsoup:jsoup:jar:1.12.1:test
[INFO] |  |     |  \- org.apache.maven.wagon:wagon-provider-api:jar:3.3.4:test
[INFO] |  |     \- org.apache.maven.wagon:wagon-file:jar:3.3.4:test
[INFO] |  +- org.eclipse.sisu:org.eclipse.sisu.inject:jar:0.3.4:test
[INFO] |  +- io.quarkus:quarkus-test-common:jar:1.6.1.Final:test
[INFO] |  |  +- io.quarkus:quarkus-core-deployment:jar:1.6.1.Final:test
[INFO] |  |  |  +- io.quarkus.gizmo:gizmo:jar:1.0.3.Final:test
[INFO] |  |  |  |  \- org.ow2.asm:asm-util:jar:8.0.1:test
[INFO] |  |  |  |     +- org.ow2.asm:asm-tree:jar:8.0.1:test
[INFO] |  |  |  |     \- org.ow2.asm:asm-analysis:jar:8.0.1:test
[INFO] |  |  |  +- io.quarkus:quarkus-devtools-utilities:jar:1.6.1.Final:test
[INFO] |  |  |  \- io.quarkus:quarkus-builder:jar:1.6.1.Final:test
[INFO] |  |  +- io.quarkus:quarkus-jsonp-deployment:jar:1.6.1.Final:test
[INFO] |  |  |  \- io.quarkus:quarkus-jsonp:jar:1.6.1.Final:test
[INFO] |  |  |     \- org.glassfish:jakarta.json:jar:1.1.6:test
[INFO] |  |  \- org.jboss:jandex:jar:2.1.3.Final:test
[INFO] |  +- org.junit.jupiter:junit-jupiter:jar:5.6.2:test
[INFO] |  |  +- org.junit.jupiter:junit-jupiter-api:jar:5.6.2:test
[INFO] |  |  |  +- org.apiguardian:apiguardian-api:jar:1.1.0:test
[INFO] |  |  |  +- org.opentest4j:opentest4j:jar:1.2.0:test
[INFO] |  |  |  \- org.junit.platform:junit-platform-commons:jar:1.6.2:test
[INFO] |  |  +- org.junit.jupiter:junit-jupiter-params:jar:5.6.2:test
[INFO] |  |  \- org.junit.jupiter:junit-jupiter-engine:jar:5.6.2:test
[INFO] |  |     \- org.junit.platform:junit-platform-engine:jar:1.6.2:test
[INFO] |  +- io.quarkus:quarkus-core:jar:1.6.1.Final:compile
[INFO] |  |  +- jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile
[INFO] |  |  +- jakarta.inject:jakarta.inject-api:jar:1.0:compile
[INFO] |  |  +- io.quarkus:quarkus-ide-launcher:jar:1.6.1.Final:compile
[INFO] |  |  +- io.smallrye.config:smallrye-config:jar:1.8.4:compile
[INFO] |  |  |  +- io.smallrye.common:smallrye-common-annotation:jar:1.0.2:compile
[INFO] |  |  |  +- io.smallrye.config:smallrye-config-common:jar:1.8.4:compile
[INFO] |  |  |  +- io.smallrye.common:smallrye-common-expression:jar:1.0.2:compile
[INFO] |  |  |  |  \- io.smallrye.common:smallrye-common-function:jar:1.0.2:compile
[INFO] |  |  |  \- io.smallrye.common:smallrye-common-constraint:jar:1.0.2:compile
[INFO] |  |  +- org.jboss.logging:jboss-logging:jar:3.3.2.Final:compile
[INFO] |  |  +- org.jboss.logmanager:jboss-logmanager-embedded:jar:1.0.4:compile
[INFO] |  |  +- org.jboss.logging:jboss-logging-annotations:jar:2.1.0.Final:compile
[INFO] |  |  +- org.jboss.threads:jboss-threads:jar:3.1.1.Final:compile
[INFO] |  |  +- org.slf4j:slf4j-api:jar:1.7.30:compile
[INFO] |  |  +- org.jboss.slf4j:slf4j-jboss-logging:jar:1.2.0.Final:compile
[INFO] |  |  +- org.graalvm.sdk:graal-sdk:jar:20.1.0:compile
[INFO] |  |  +- org.wildfly.common:wildfly-common:jar:1.5.4.Final-format-001:compile
[INFO] |  |  \- io.quarkus:quarkus-bootstrap-runner:jar:1.6.1.Final:compile
[INFO] |  \- com.thoughtworks.xstream:xstream:jar:1.4.11.1:test
[INFO] |     +- xmlpull:xmlpull:jar:1.1.3.1:test
[INFO] |     \- xpp3:xpp3_min:jar:1.1.4c:test
[INFO] +- io.rest-assured:rest-assured:jar:4.3.0:test
[INFO] |  +- org.codehaus.groovy:groovy:jar:3.0.2:test
[INFO] |  +- org.codehaus.groovy:groovy-xml:jar:3.0.2:test
[INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.5.12:compile
[INFO] |  |  +- org.apache.httpcomponents:httpcore:jar:4.4.13:compile
[INFO] |  |  \- commons-codec:commons-codec:jar:1.14:compile
[INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.5.3:test
[INFO] |  +- org.hamcrest:hamcrest:jar:2.1:test
[INFO] |  +- org.ccil.cowan.tagsoup:tagsoup:jar:1.2.1:test
[INFO] |  +- io.rest-assured:json-path:jar:4.3.0:test
[INFO] |  |  +- org.codehaus.groovy:groovy-json:jar:3.0.2:test
[INFO] |  |  \- io.rest-assured:rest-assured-common:jar:4.3.0:test
[INFO] |  \- io.rest-assured:xml-path:jar:4.3.0:test
[INFO] |     +- org.apache.commons:commons-lang3:jar:3.9:test
[INFO] |     +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2:test
[INFO] |     |  \- jakarta.activation:jakarta.activation-api:jar:1.2.1:compile
[INFO] |     \- org.apache.sling:org.apache.sling.javax.activation:jar:0.1.0:test
[INFO] +- io.quarkus:quarkus-resteasy-jackson:jar:1.6.1.Final:compile
[INFO] |  +- io.quarkus:quarkus-jackson:jar:1.6.1.Final:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.4:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.4:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.4:compile
[INFO] |  |  \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.10.4:compile
[INFO] |  +- org.jboss.resteasy:resteasy-jackson2-provider:jar:4.5.5.Final:compile
[INFO] |  |  +- org.jboss.resteasy:resteasy-jaxb-provider:jar:4.5.5.Final:compile
[INFO] |  |  |  \- org.glassfish.jaxb:jaxb-runtime:jar:2.3.3-b02:compile
[INFO] |  |  |     +- org.glassfish.jaxb:txw2:jar:2.3.3-b02:compile
[INFO] |  |  |     \- com.sun.istack:istack-commons-runtime:jar:3.0.10:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-core:jar:2.10.4:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.4:compile
[INFO] |  |  +- com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:2.10.4:compile
[INFO] |  |  |  +- com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:2.10.4:compile
[INFO] |  |  |  \- com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.10.4:compile
[INFO] |  |  +- com.github.fge:json-patch:jar:1.9:compile
[INFO] |  |  |  \- com.github.fge:jackson-coreutils:jar:1.6:compile
[INFO] |  |  |     \- com.github.fge:msg-simple:jar:1.1:compile
[INFO] |  |  |        \- com.github.fge:btf:jar:1.2:compile
[INFO] |  |  \- com.google.guava:guava:jar:27.0.1-jre:compile
[INFO] |  |     +- com.google.guava:failureaccess:jar:1.0.1:compile
[INFO] |  |     \- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile
[INFO] |  \- org.jboss.spec.javax.xml.bind:jboss-jaxb-api_2.3_spec:jar:2.0.0.Final:compile
[INFO] +- io.quarkus:quarkus-resteasy-mutiny:jar:1.6.1.Final:compile
[INFO] |  +- io.quarkus:quarkus-arc:jar:1.6.1.Final:compile
[INFO] |  |  +- io.quarkus.arc:arc:jar:1.6.1.Final:compile
[INFO] |  |  |  \- jakarta.transaction:jakarta.transaction-api:jar:1.3.3:compile
[INFO] |  |  \- org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api:jar:1.0.1:compile
[INFO] |  +- io.quarkus:quarkus-mutiny:jar:1.6.1.Final:compile
[INFO] |  |  +- io.smallrye.reactive:mutiny:jar:0.5.4:compile
[INFO] |  |  |  \- org.reactivestreams:reactive-streams:jar:1.0.3:compile
[INFO] |  |  +- io.quarkus:quarkus-smallrye-context-propagation:jar:1.6.1.Final:compile
[INFO] |  |  |  \- io.smallrye:smallrye-context-propagation:jar:1.0.13:compile
[INFO] |  |  |     \- io.smallrye:smallrye-context-propagation-api:jar:1.0.13:compile
[INFO] |  |  \- io.smallrye.reactive:mutiny-context-propagation:jar:0.5.4:compile
[INFO] |  \- org.jboss.resteasy:resteasy-client:jar:4.5.5.Final:compile
[INFO] |     +- org.jboss.resteasy:resteasy-client-api:jar:4.5.5.Final:compile
[INFO] |     +- org.jboss.resteasy:resteasy-core-spi:jar:4.5.5.Final:compile
[INFO] |     +- org.jboss.resteasy:resteasy-core:jar:4.5.5.Final:compile
[INFO] |     |  +- com.ibm.async:asyncutil:jar:0.1.0:compile
[INFO] |     |  \- org.eclipse.microprofile.config:microprofile-config-api:jar:1.4:compile
[INFO] |     +- commons-io:commons-io:jar:2.6:compile
[INFO] |     \- org.jboss.spec.javax.ws.rs:jboss-jaxrs-api_2.1_spec:jar:2.0.1.Final:compile
[INFO] \- io.quarkus:quarkus-rest-client:jar:1.6.1.Final:compile
[INFO]    +- io.quarkus:quarkus-resteasy-common:jar:1.6.1.Final:compile
[INFO]    |  \- com.sun.activation:jakarta.activation:jar:1.2.1:compile
[INFO]    +- org.jboss.resteasy:resteasy-client-microprofile:jar:4.5.5.Final:compile
[INFO]    |  \- org.eclipse.microprofile.rest.client:microprofile-rest-client-api:jar:1.4.1:compile
[INFO]    +- jakarta.interceptor:jakarta.interceptor-api:jar:1.2.5:compile
[INFO]    +- org.apache.httpcomponents:httpasyncclient:jar:4.1.4:compile
[INFO]    |  \- org.apache.httpcomponents:httpcore-nio:jar:4.4.13:compile
[INFO]    \- org.jboss.logging:commons-logging-jboss-logging:jar:1.0.0.Final:compile

SmallRye Munityまわりだけ抜き出すと、こんな感じです。

[INFO] |  +- io.quarkus:quarkus-mutiny:jar:1.6.1.Final:compile
[INFO] |  |  +- io.smallrye.reactive:mutiny:jar:0.5.4:compile
[INFO] |  |  |  \- org.reactivestreams:reactive-streams:jar:1.0.3:compile
[INFO] |  |  +- io.quarkus:quarkus-smallrye-context-propagation:jar:1.6.1.Final:compile
[INFO] |  |  |  \- io.smallrye:smallrye-context-propagation:jar:1.0.13:compile
[INFO] |  |  |     \- io.smallrye:smallrye-context-propagation-api:jar:1.0.13:compile
[INFO] |  |  \- io.smallrye.reactive:mutiny-context-propagation:jar:0.5.4:compile

RESTEasy Munityだと、こんな感じです。

[INFO] +- io.quarkus:quarkus-resteasy-mutiny:jar:1.6.1.Final:compile
[INFO] |  +- io.quarkus:quarkus-arc:jar:1.6.1.Final:compile
[INFO] |  |  +- io.quarkus.arc:arc:jar:1.6.1.Final:compile
[INFO] |  |  |  \- jakarta.transaction:jakarta.transaction-api:jar:1.3.3:compile
[INFO] |  |  \- org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api:jar:1.0.1:compile
[INFO] |  +- io.quarkus:quarkus-mutiny:jar:1.6.1.Final:compile
[INFO] |  |  +- io.smallrye.reactive:mutiny:jar:0.5.4:compile
[INFO] |  |  |  \- org.reactivestreams:reactive-streams:jar:1.0.3:compile
[INFO] |  |  +- io.quarkus:quarkus-smallrye-context-propagation:jar:1.6.1.Final:compile
[INFO] |  |  |  \- io.smallrye:smallrye-context-propagation:jar:1.0.13:compile
[INFO] |  |  |     \- io.smallrye:smallrye-context-propagation-api:jar:1.0.13:compile
[INFO] |  |  \- io.smallrye.reactive:mutiny-context-propagation:jar:0.5.4:compile
[INFO] |  \- org.jboss.resteasy:resteasy-client:jar:4.5.5.Final:compile
[INFO] |     +- org.jboss.resteasy:resteasy-client-api:jar:4.5.5.Final:compile
[INFO] |     +- org.jboss.resteasy:resteasy-core-spi:jar:4.5.5.Final:compile
[INFO] |     +- org.jboss.resteasy:resteasy-core:jar:4.5.5.Final:compile
[INFO] |     |  +- com.ibm.async:asyncutil:jar:0.1.0:compile
[INFO] |     |  \- org.eclipse.microprofile.config:microprofile-config-api:jar:1.4:compile
[INFO] |     +- commons-io:commons-io:jar:2.6:compile

RxJavaなどは、依存関係に現れませんね。

SmallRye Munityを使った、JAX-RSリソースクラスを作成する

SmallRye Munityを使った、JAX-RSリソースクラスを作っていきます。 src/main/java/org/littlewings/quarkus/resteasy/api/HelloMunityResource.java

package org.littlewings.quarkus.resteasy.api;

import java.time.Duration;
import java.time.LocalDateTime;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.annotations.SseElementType;

@Path("hello")
public class HelloMunityResource {
    @GET
    @Path("uni")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> uni(@QueryParam("message") String message) {
        return Uni
                .createFrom()
                .item(message)
                .onItem()
                .apply(m -> String.format("Hello %s", message))
                .onItem()
                .invoke(v -> System.out.printf("[%s] uni / %s%n", LocalDateTime.now(), v));
    }

    @GET
    @Path("multi/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.TEXT_PLAIN)
    public Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) {
        return Multi
                .createFrom()
                .ticks()
                .every(Duration.ofSeconds(1))
                .onItem()
                .apply(i -> String.format("Hello %s - %d", message, i))
                .onItem()
                .invoke(v -> System.out.printf("[%s] multi-stream / %s%n", LocalDateTime.now(), v))
                .transform()
                .byTakingFirstItems(count);
    }

    @GET
    @Path("multi/json")
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<String> multiJson(@QueryParam("count") int count, @QueryParam("message") String message) {
        return Multi
                .createFrom()
                .ticks()
                .every(Duration.ofSeconds(1))
                .onItem()
                .apply(i -> String.format("Hello %s - %d", message, i))
                .onItem()
                .invoke(v -> System.out.printf("[%s] multi-json / %s%n", LocalDateTime.now(), v))
                .transform()
                .byTakingFirstItems(count);
    }
}

ほとんど、Getting Startedのマネですね。

Quarkus - Getting started with Reactive

UniがRxJava 2でいうSingle、ReactorでいうMonoにあたります。MultiはFlowableやFluxになります、と。

UniもMultiも、createFromメソッドから始めます。

Uniを使ったリソースメソッドでは、applyで変換して返すように作成。

    @GET
    @Path("uni")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> uni(@QueryParam("message") String message) {
        return Uni
                .createFrom()
                .item(message)
                .onItem()
                .apply(m -> String.format("Hello %s", message))
                .onItem()
                .invoke(v -> System.out.printf("[%s] uni / %s%n", LocalDateTime.now(), v));
    }

2回目のonItemのあとのinvokeはロギングのためなので、これでもOKです。

    @GET
    @Path("uni")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> uni(@QueryParam("message") String message) {
        return Uni
                .createFrom()
                .item(message)
                .onItem()
                .apply(m -> String.format("Hello %s", message));
    }

続いて、Multiを使った場合。

    @GET
    @Path("multi/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.TEXT_PLAIN)
    public Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) {
        return Multi
                .createFrom()
                .ticks()
                .every(Duration.ofSeconds(1))
                .onItem()
                .apply(i -> String.format("Hello %s - %d", message, i))
                .onItem()
                .invoke(v -> System.out.printf("[%s] multi-stream / %s%n", LocalDateTime.now(), v))
                .transform()
                .byTakingFirstItems(count);
    }

ticks、everyで1秒おきに値を生成するようにして、この時に受け取ったクエリパラーメーターを使ってメッセージを作成するように
しています。そして、このままだとMultiが無限に続くのでbyTakingFirstItemsで指定の個数で切り取ります。

この個数も、クエリパラメーターから決定します。

これで、1秒おきにレスポンスに書き出されるSSE(Server Sent Event)の実装になっています。@Producesアノテーション
指定しているMediaTypeおよび、@SseElementTypeアノテーションがポイントですね。

なお、こちらもロギングを省くとこんな感じです。

    @GET
    @Path("multi/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.TEXT_PLAIN)
    public Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) {
        return Multi
                .createFrom()
                .ticks()
                .every(Duration.ofSeconds(1))
                .onItem()
                .apply(i -> String.format("Hello %s - %d", message, i))
                .transform()
                .byTakingFirstItems(count);
    }

最後のリソースメソッドも、1秒おきに要素を生成しますが、これは最後にまとめて返すことになります。

    @GET
    @Path("multi/json")
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<String> multiJson(@QueryParam("count") int count, @QueryParam("message") String message) {
        return Multi
                .createFrom()
                .ticks()
                .every(Duration.ofSeconds(1))
                .onItem()
                .apply(i -> String.format("Hello %s - %d", message, i))
                .onItem()
                .invoke(v -> System.out.printf("[%s] multi-json / %s%n", LocalDateTime.now(), v))
                .transform()
                .byTakingFirstItems(count);
    }

ここまで作成したら、確認してみましょう。

$ mvn package
$ java -jar target/reactive-resteasy-mutiny-1.0-SNAPSHOT-runner.jar

確認。

## Uni
$ curl -i 'localhost:8080/hello/uni?message=World'
HTTP/1.1 200 OK
Content-Length: 11
Content-Type: text/plain;charset=UTF-8

Hello World


#### サーバー側のログ
[2020-07-28T23:43:14.694090] uni / Hello World



## Multi(SSE)
$ curl -i 'localhost:8080/hello/multi/stream?count=5&message=World'
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/event-stream;element-type="text/plain"



data: Hello World - 0

data: Hello World - 1

data: Hello World - 2

data: Hello World - 3

data: Hello World - 4


#### サーバー側のログ
[2020-07-28T23:44:01.991498] multi-stream / Hello World - 0
[2020-07-28T23:44:02.990446] multi-stream / Hello World - 1
[2020-07-28T23:44:03.990384] multi-stream / Hello World - 2
[2020-07-28T23:44:04.990275] multi-stream / Hello World - 3
[2020-07-28T23:44:05.990208] multi-stream / Hello World - 4



## Multi
$ curl -i 'localhost:8080/hello/multi/json?count=5&message=World'
HTTP/1.1 200 OK
Content-Length: 91
Content-Type: application/json

["Hello World - 0","Hello World - 1","Hello World - 2","Hello World - 3","Hello World - 4"]


#### サーバー側のログ
[2020-07-28T23:44:32.413521] multi-json / Hello World - 0
[2020-07-28T23:44:33.413838] multi-json / Hello World - 1
[2020-07-28T23:44:34.413875] multi-json / Hello World - 2
[2020-07-28T23:44:35.413725] multi-json / Hello World - 3
[2020-07-28T23:44:36.413793] multi-json / Hello World - 4

この中で「/hello/multi/stream」のアクセスは1秒おきにコンソールに出力され、SSEになっていることが確認できます。

ここまで、OKそうです。

REST Clientから使う

続いて、SmallRye MunityをREST Clientから使ってみましょう。

Using the REST Client (including JSON) / Async Support

ガイドを見ていたら、使えそうな感じがしましたしね。

で、REST Clientのアクセス先ですが、先ほど作成したリソースクラスにしましょう。

こんな感じでインターフェースを作成。
src/main/java/org/littlewings/quarkus/resteasy/client/HelloService.java

package org.littlewings.quarkus.resteasy.client;

import java.util.List;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.jboss.resteasy.annotations.SseElementType;

@Path("hello")
@RegisterRestClient
public interface HelloService {
    @GET
    @Path("uni")
    @Produces(MediaType.TEXT_PLAIN)
    Uni<String> uni(@QueryParam("message") String message);

    @GET
    @Path("multi/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.TEXT_PLAIN)
    Uni<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message);

    @GET
    @Path("multi/json")
    @Produces(MediaType.APPLICATION_JSON)
    Uni<List<String>> multiJson(@QueryParam("count") int count, @QueryParam("message") String message);
}

メソッド名こそ、先ほどのリソースクラスと合わせていますが、Multiを使っていた部分は一律Uniにしています。
Multiだと、どうにも動かなかったので…。

このクライアントを使う、JAX-RSリソースクラスを用意。
src/main/java/org/littlewings/quarkus/resteasy/api/RestClientMutinyResource.java

package org.littlewings.quarkus.resteasy.api;

import java.util.List;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.littlewings.quarkus.resteasy.client.HelloService;

@Path("client")
public class RestClientMutinyResource {
    @Inject
    @RestClient
    HelloService helloService;

    @GET
    @Path("uni")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> uni(@QueryParam("message") String message) {
        return helloService.uni(message);
    }

    @GET
    @Path("multi/stream")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) {
        return helloService.multiStream(count, message);
    }

    @GET
    @Path("multi/json")
    @Produces(MediaType.APPLICATION_JSON)
    public Uni<List<String>> multiJson(@QueryParam("count") int count, @QueryParam("message") String message) {
        return helloService.multiJson(count, message);
    }
}

こちらは、受け取ったレスポンスをそのまま返すだけです。が、やっぱりこちらもMultiは使いません。

最後にこのクライアントのアクセス先ですが、同じアプリケーションを2つ立ち上げることで実現しようと思います。
src/main/resources/application.properties

# Configuration file
# key = value

org.littlewings.quarkus.resteasy.client.HelloService/mp-rest/url=http://localhost:9080

ポートは9080ということで。

パッケージングして

$ mvn package

ひとつ、アプリケーションを起動。こちらは、8080ポートでリッスンしています。

$ java -jar target/reactive-resteasy-mutiny-1.0-SNAPSHOT-runner.jar

もうひとつは、9080ポートで起動させます。

$ java -Dquarkus.http.port=9080 -jar target/reactive-resteasy-mutiny-1.0-SNAPSHOT-runner.jar

確認。

## Uni
$ curl -i 'localhost:8080/client/uni?message=World'
HTTP/1.1 200 OK
Content-Length: 11
Content-Type: text/plain;charset=UTF-8

Hello World


## Uni(Clientからのアクセス先はMulti(SSE))
$ curl -i 'localhost:8080/client/multi/stream?count=5&message=World'
HTTP/1.1 200 OK
Content-Length: 117
Content-Type: text/plain;charset=UTF-8



data: Hello World - 0

data: Hello World - 1

data: Hello World - 2

data: Hello World - 3

data: Hello World - 4




## Uni(Clientからのアクセス先はMulti)
$ curl -i 'localhost:8080/client/multi/json?count=5&message=World'
HTTP/1.1 200 OK
Content-Length: 91
Content-Type: application/json

["Hello World - 0","Hello World - 1","Hello World - 2","Hello World - 3","Hello World - 4"]

見た目だけは先ほどと変わりませんが、今回は結果が一気に出力されます。

9080ポートで起動している方のサーバーのログは、こんな感じでMultiも動作しているのがわかります。

[2020-07-28T23:57:28.973486] uni / Hello World


[2020-07-28T23:57:57.050177] multi-stream / Hello World - 0
[2020-07-28T23:57:58.048699] multi-stream / Hello World - 1
[2020-07-28T23:57:59.048899] multi-stream / Hello World - 2
[2020-07-28T23:58:00.048558] multi-stream / Hello World - 3
[2020-07-28T23:58:01.048324] multi-stream / Hello World - 4


[2020-07-28T23:58:18.255281] multi-json / Hello World - 0
[2020-07-28T23:58:19.255660] multi-json / Hello World - 1
[2020-07-28T23:58:20.255789] multi-json / Hello World - 2
[2020-07-28T23:58:21.255831] multi-json / Hello World - 3
[2020-07-28T23:58:22.255617] multi-json / Hello World - 4

とりあえず、動くには動きました、と。

REST ClientとMulti

REST Clientを使ったインターフェースを書く時に、サーバー側はMultiを使っているにも関わらず、クライアント側はUniと
しました。なお、CompletionStageにしても動作はします。どちらも、クライアントはストリームっぽい動きにはなりませんが。

これ、Multiだとどうやっても動かなくて、諦めたんですよね…。

こんな定義にして(リソースクラスは、このMultiをそのまま返すように作成)実行すると、

    @GET
    @Path("multi/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.TEXT_PLAIN)
    Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message);

こんな感じになります。

2020-07-28 22:32:47,495 ERROR [org.jbo.res.res.i18n] (sse-event-source(http://localhost:9080/hello/multi/stream?count=5&message=World)-thread-1) RESTEASY002020: Unhandled asynchronous exception, sending back 500: javax.ws.rs.ProcessingException: java.lang.NullPointerException
    at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.filterRequest(ClientInvocation.java:696)
    at org.jboss.resteasy.microprofile.client.impl.MpClientInvocation.filterRequest(MpClientInvocation.java:75)
    at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.invoke(ClientInvocation.java:485)
    at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.invoke(ClientInvocation.java:65)
    at org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl$EventHandler.run(SseEventSourceImpl.java:329)
    at org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceScheduler$1.run(SseEventSourceScheduler.java:92)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at org.jboss.resteasy.microprofile.client.utils.ClientRequestContextUtils.getMethod(ClientRequestContextUtils.java:25)
    at org.jboss.resteasy.microprofile.client.MethodInjectionFilter.filter(MethodInjectionFilter.java:15)
    at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.filterRequest(ClientInvocation.java:683)
    ... 11 more

クライアント側がコケてしまいます。

追ってみたのですが、よくわかりませんでした…。

これを見た感じ、使えてもおかしくなさそうなんですけどねぇ。

https://github.com/quarkusio/quarkus/blob/1.6.1.Final/extensions/resteasy-mutiny/runtime/src/main/java/io/quarkus/resteasy/mutiny/runtime/MultiRxInvokerImpl.java

テストコードとかを見ても、REST Clientのメソッドの戻り値をMultiにしている例は見当たらなかったので、今回は1度
諦めて、そのうちまた見返してみましょう。

Reactive with Mutiny :: Quarkus Tutorial

https://github.com/quarkusio/quarkus/blob/1.6.1.Final/integration-tests/main/src/main/java/io/quarkus/it/rest/ClientResource.java#L185-L209