これは、なにをしたくて書いたもの?
Quarkusのドキュメントを見ていて、RxJavaでもないReactiveなAPIがいるのに気づきまして。
Quarkus - Getting started with Reactive
Quarkus 1.3.0から、MunityというReactive Streamsの実装が統合されているようです。
Mutiny arising
A new reactive programming API has been introduced. This API named Mutiny replaces the Axle and Reactive Streams Operators models (Reactive Streams and CompletionStage). The previous models are still functional, but deprecated and will be removed in the future.
REST Clientでも使えるみたいです。
Using the REST Client (including JSON) / Async Support
あと、Reactive SQL Clientsでも顔を出していますね。
Quarkus - Reactive SQL Clients
せっかくなので、ちょっと試してみようかな、と。
SmallRye Mutiny
正確には、SmallRye Munityという名前のようです。
GitHub - smallrye/smallrye-mutiny: An Intuitive Event-Driven Reactive Programming Library for Java
先ほど書いたとおり、Reactive Streamsの実装です。
ドキュメントおよびJavadocは、こちら。
https://smallrye.io/smallrye-mutiny/apidocs/
見たところ、完全に新しく作っているみたいですね。
CompletionStageやRxJava 2、Reactorとの変換も可能なようです。
How do I interact with CompletionStages?
How do I interact with RX Java 2?
How do I interact with Reactor?
今回使うMutinyは0.5.4ですが、ドキュメントは現時点では0.7.0を指しています。
環境
今回の環境は、こちら。
$ java --version openjdk 11.0.8 2020-07-14 OpenJDK Runtime Environment (build 11.0.8+10-post-Ubuntu-0ubuntu118.04.1) OpenJDK 64-Bit Server VM (build 11.0.8+10-post-Ubuntu-0ubuntu118.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.8, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-112-generic", arch: "amd64", family: "unix"
準備
では、プロジェクトを作っていきましょう。
Extensionに「resteasy-mutiny」と「rest-client」を加えて、プロジェクトを作成。JSONは、Jacksonで扱うことにします。
$ mvn io.quarkus:quarkus-maven-plugin:1.6.1.Final:create \ -DprojectGroupId=org.littlewings \ -DprojectArtifactId=reactive-resteasy-mutiny \ -Dextensions="resteasy-mutiny, resteasy-jackson, rest-client"
できあがったプロジェクトのディレクトリに入って
$ cd reactive-resteasy-mutiny
依存関係に含まれる内容を見てみます。
<dependencies> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-jackson</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-mutiny</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-rest-client</artifactId> </dependency> </dependencies>
「mvn dependency:tree」も見てみましょう。
$ mvn dependency:tree
結果は、こちら。
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ reactive-resteasy-mutiny --- [INFO] org.littlewings:reactive-resteasy-mutiny:jar:1.0-SNAPSHOT [INFO] +- io.quarkus:quarkus-resteasy:jar:1.6.1.Final:compile [INFO] | +- io.quarkus:quarkus-vertx-http:jar:1.6.1.Final:compile [INFO] | | +- io.quarkus:quarkus-development-mode-spi:jar:1.6.1.Final:compile [INFO] | | +- io.quarkus.security:quarkus-security:jar:1.1.2.Final:compile [INFO] | | +- jakarta.enterprise:jakarta.enterprise.cdi-api:jar:2.0.2:compile [INFO] | | | \- jakarta.el:jakarta.el-api:jar:3.0.3:compile [INFO] | | +- io.quarkus:quarkus-vertx-core:jar:1.6.1.Final:compile [INFO] | | | +- io.quarkus:quarkus-netty:jar:1.6.1.Final:compile [INFO] | | | | +- io.netty:netty-codec:jar:4.1.49.Final:compile [INFO] | | | | \- io.netty:netty-handler:jar:4.1.49.Final:compile [INFO] | | | \- io.vertx:vertx-core:jar:3.9.1:compile [INFO] | | | +- io.netty:netty-common:jar:4.1.49.Final:compile [INFO] | | | +- io.netty:netty-buffer:jar:4.1.49.Final:compile [INFO] | | | +- io.netty:netty-transport:jar:4.1.49.Final:compile [INFO] | | | +- io.netty:netty-handler-proxy:jar:4.1.49.Final:compile [INFO] | | | | \- io.netty:netty-codec-socks:jar:4.1.49.Final:compile [INFO] | | | +- io.netty:netty-codec-http:jar:4.1.49.Final:compile [INFO] | | | +- io.netty:netty-codec-http2:jar:4.1.49.Final:compile [INFO] | | | +- io.netty:netty-resolver:jar:4.1.49.Final:compile [INFO] | | | \- io.netty:netty-resolver-dns:jar:4.1.49.Final:compile [INFO] | | | \- io.netty:netty-codec-dns:jar:4.1.49.Final:compile [INFO] | | \- io.vertx:vertx-web:jar:3.9.1:compile [INFO] | | +- io.vertx:vertx-web-common:jar:3.9.1:compile [INFO] | | +- io.vertx:vertx-auth-common:jar:3.9.1:compile [INFO] | | \- io.vertx:vertx-bridge-common:jar:3.9.1:compile [INFO] | \- io.quarkus:quarkus-resteasy-server-common:jar:1.6.1.Final:compile [INFO] | \- jakarta.validation:jakarta.validation-api:jar:2.0.2:compile [INFO] +- io.quarkus:quarkus-junit5:jar:1.6.1.Final:test [INFO] | +- io.quarkus:quarkus-bootstrap-core:jar:1.6.1.Final:test [INFO] | | +- io.quarkus:quarkus-bootstrap-app-model:jar:1.6.1.Final:test [INFO] | | \- io.quarkus:quarkus-bootstrap-maven-resolver:jar:1.6.1.Final:test [INFO] | | +- org.ow2.asm:asm:jar:8.0.1:test [INFO] | | +- org.apache.maven:maven-embedder:jar:3.6.3:test [INFO] | | | +- org.apache.maven:maven-settings:jar:3.6.3:test [INFO] | | | +- org.apache.maven:maven-core:jar:3.6.3:test [INFO] | | | | +- org.apache.maven:maven-artifact:jar:3.6.3:test [INFO] | | | | \- org.codehaus.plexus:plexus-component-annotations:jar:2.1.0:test [INFO] | | | +- org.apache.maven:maven-plugin-api:jar:3.6.3:test [INFO] | | | +- org.apache.maven:maven-model:jar:3.6.3:test [INFO] | | | +- org.apache.maven:maven-model-builder:jar:3.6.3:test [INFO] | | | +- org.apache.maven:maven-builder-support:jar:3.6.3:test [INFO] | | | +- org.apache.maven.resolver:maven-resolver-api:jar:1.4.1:test [INFO] | | | +- org.apache.maven.resolver:maven-resolver-util:jar:1.4.1:test [INFO] | | | +- org.apache.maven.shared:maven-shared-utils:jar:3.2.1:test [INFO] | | | +- com.google.inject:guice:jar:no_aop:4.2.1:test [INFO] | | | +- org.codehaus.plexus:plexus-utils:jar:3.2.1:test [INFO] | | | +- org.codehaus.plexus:plexus-classworlds:jar:2.5.2:test [INFO] | | | \- commons-cli:commons-cli:jar:1.4:test [INFO] | | +- org.eclipse.sisu:org.eclipse.sisu.plexus:jar:0.3.4:test [INFO] | | +- org.apache.maven:maven-settings-builder:jar:3.6.3:test [INFO] | | | +- org.codehaus.plexus:plexus-interpolation:jar:1.25:test [INFO] | | | \- org.sonatype.plexus:plexus-sec-dispatcher:jar:1.4:test [INFO] | | | \- org.sonatype.plexus:plexus-cipher:jar:1.4:test [INFO] | | +- org.apache.maven:maven-resolver-provider:jar:3.6.3:test [INFO] | | | +- org.apache.maven:maven-repository-metadata:jar:3.6.3:test [INFO] | | | +- org.apache.maven.resolver:maven-resolver-spi:jar:1.4.1:test [INFO] | | | \- org.apache.maven.resolver:maven-resolver-impl:jar:1.4.1:test [INFO] | | +- org.apache.maven.resolver:maven-resolver-connector-basic:jar:1.4.1:test [INFO] | | +- org.apache.maven.resolver:maven-resolver-transport-wagon:jar:1.4.1:test [INFO] | | +- org.apache.maven.wagon:wagon-http:jar:3.3.4:test [INFO] | | | +- org.apache.maven.wagon:wagon-http-shared:jar:3.3.4:test [INFO] | | | | \- org.jsoup:jsoup:jar:1.12.1:test [INFO] | | | \- org.apache.maven.wagon:wagon-provider-api:jar:3.3.4:test [INFO] | | \- org.apache.maven.wagon:wagon-file:jar:3.3.4:test [INFO] | +- org.eclipse.sisu:org.eclipse.sisu.inject:jar:0.3.4:test [INFO] | +- io.quarkus:quarkus-test-common:jar:1.6.1.Final:test [INFO] | | +- io.quarkus:quarkus-core-deployment:jar:1.6.1.Final:test [INFO] | | | +- io.quarkus.gizmo:gizmo:jar:1.0.3.Final:test [INFO] | | | | \- org.ow2.asm:asm-util:jar:8.0.1:test [INFO] | | | | +- org.ow2.asm:asm-tree:jar:8.0.1:test [INFO] | | | | \- org.ow2.asm:asm-analysis:jar:8.0.1:test [INFO] | | | +- io.quarkus:quarkus-devtools-utilities:jar:1.6.1.Final:test [INFO] | | | \- io.quarkus:quarkus-builder:jar:1.6.1.Final:test [INFO] | | +- io.quarkus:quarkus-jsonp-deployment:jar:1.6.1.Final:test [INFO] | | | \- io.quarkus:quarkus-jsonp:jar:1.6.1.Final:test [INFO] | | | \- org.glassfish:jakarta.json:jar:1.1.6:test [INFO] | | \- org.jboss:jandex:jar:2.1.3.Final:test [INFO] | +- org.junit.jupiter:junit-jupiter:jar:5.6.2:test [INFO] | | +- org.junit.jupiter:junit-jupiter-api:jar:5.6.2:test [INFO] | | | +- org.apiguardian:apiguardian-api:jar:1.1.0:test [INFO] | | | +- org.opentest4j:opentest4j:jar:1.2.0:test [INFO] | | | \- org.junit.platform:junit-platform-commons:jar:1.6.2:test [INFO] | | +- org.junit.jupiter:junit-jupiter-params:jar:5.6.2:test [INFO] | | \- org.junit.jupiter:junit-jupiter-engine:jar:5.6.2:test [INFO] | | \- org.junit.platform:junit-platform-engine:jar:1.6.2:test [INFO] | +- io.quarkus:quarkus-core:jar:1.6.1.Final:compile [INFO] | | +- jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile [INFO] | | +- jakarta.inject:jakarta.inject-api:jar:1.0:compile [INFO] | | +- io.quarkus:quarkus-ide-launcher:jar:1.6.1.Final:compile [INFO] | | +- io.smallrye.config:smallrye-config:jar:1.8.4:compile [INFO] | | | +- io.smallrye.common:smallrye-common-annotation:jar:1.0.2:compile [INFO] | | | +- io.smallrye.config:smallrye-config-common:jar:1.8.4:compile [INFO] | | | +- io.smallrye.common:smallrye-common-expression:jar:1.0.2:compile [INFO] | | | | \- io.smallrye.common:smallrye-common-function:jar:1.0.2:compile [INFO] | | | \- io.smallrye.common:smallrye-common-constraint:jar:1.0.2:compile [INFO] | | +- org.jboss.logging:jboss-logging:jar:3.3.2.Final:compile [INFO] | | +- org.jboss.logmanager:jboss-logmanager-embedded:jar:1.0.4:compile [INFO] | | +- org.jboss.logging:jboss-logging-annotations:jar:2.1.0.Final:compile [INFO] | | +- org.jboss.threads:jboss-threads:jar:3.1.1.Final:compile [INFO] | | +- org.slf4j:slf4j-api:jar:1.7.30:compile [INFO] | | +- org.jboss.slf4j:slf4j-jboss-logging:jar:1.2.0.Final:compile [INFO] | | +- org.graalvm.sdk:graal-sdk:jar:20.1.0:compile [INFO] | | +- org.wildfly.common:wildfly-common:jar:1.5.4.Final-format-001:compile [INFO] | | \- io.quarkus:quarkus-bootstrap-runner:jar:1.6.1.Final:compile [INFO] | \- com.thoughtworks.xstream:xstream:jar:1.4.11.1:test [INFO] | +- xmlpull:xmlpull:jar:1.1.3.1:test [INFO] | \- xpp3:xpp3_min:jar:1.1.4c:test [INFO] +- io.rest-assured:rest-assured:jar:4.3.0:test [INFO] | +- org.codehaus.groovy:groovy:jar:3.0.2:test [INFO] | +- org.codehaus.groovy:groovy-xml:jar:3.0.2:test [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.5.12:compile [INFO] | | +- org.apache.httpcomponents:httpcore:jar:4.4.13:compile [INFO] | | \- commons-codec:commons-codec:jar:1.14:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.5.3:test [INFO] | +- org.hamcrest:hamcrest:jar:2.1:test [INFO] | +- org.ccil.cowan.tagsoup:tagsoup:jar:1.2.1:test [INFO] | +- io.rest-assured:json-path:jar:4.3.0:test [INFO] | | +- org.codehaus.groovy:groovy-json:jar:3.0.2:test [INFO] | | \- io.rest-assured:rest-assured-common:jar:4.3.0:test [INFO] | \- io.rest-assured:xml-path:jar:4.3.0:test [INFO] | +- org.apache.commons:commons-lang3:jar:3.9:test [INFO] | +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2:test [INFO] | | \- jakarta.activation:jakarta.activation-api:jar:1.2.1:compile [INFO] | \- org.apache.sling:org.apache.sling.javax.activation:jar:0.1.0:test [INFO] +- io.quarkus:quarkus-resteasy-jackson:jar:1.6.1.Final:compile [INFO] | +- io.quarkus:quarkus-jackson:jar:1.6.1.Final:compile [INFO] | | +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.4:compile [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.4:compile [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.4:compile [INFO] | | \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.10.4:compile [INFO] | +- org.jboss.resteasy:resteasy-jackson2-provider:jar:4.5.5.Final:compile [INFO] | | +- org.jboss.resteasy:resteasy-jaxb-provider:jar:4.5.5.Final:compile [INFO] | | | \- org.glassfish.jaxb:jaxb-runtime:jar:2.3.3-b02:compile [INFO] | | | +- org.glassfish.jaxb:txw2:jar:2.3.3-b02:compile [INFO] | | | \- com.sun.istack:istack-commons-runtime:jar:3.0.10:compile [INFO] | | +- com.fasterxml.jackson.core:jackson-core:jar:2.10.4:compile [INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.4:compile [INFO] | | +- com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:2.10.4:compile [INFO] | | | +- com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:2.10.4:compile [INFO] | | | \- com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.10.4:compile [INFO] | | +- com.github.fge:json-patch:jar:1.9:compile [INFO] | | | \- com.github.fge:jackson-coreutils:jar:1.6:compile [INFO] | | | \- com.github.fge:msg-simple:jar:1.1:compile [INFO] | | | \- com.github.fge:btf:jar:1.2:compile [INFO] | | \- com.google.guava:guava:jar:27.0.1-jre:compile [INFO] | | +- com.google.guava:failureaccess:jar:1.0.1:compile [INFO] | | \- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile [INFO] | \- org.jboss.spec.javax.xml.bind:jboss-jaxb-api_2.3_spec:jar:2.0.0.Final:compile [INFO] +- io.quarkus:quarkus-resteasy-mutiny:jar:1.6.1.Final:compile [INFO] | +- io.quarkus:quarkus-arc:jar:1.6.1.Final:compile [INFO] | | +- io.quarkus.arc:arc:jar:1.6.1.Final:compile [INFO] | | | \- jakarta.transaction:jakarta.transaction-api:jar:1.3.3:compile [INFO] | | \- org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api:jar:1.0.1:compile [INFO] | +- io.quarkus:quarkus-mutiny:jar:1.6.1.Final:compile [INFO] | | +- io.smallrye.reactive:mutiny:jar:0.5.4:compile [INFO] | | | \- org.reactivestreams:reactive-streams:jar:1.0.3:compile [INFO] | | +- io.quarkus:quarkus-smallrye-context-propagation:jar:1.6.1.Final:compile [INFO] | | | \- io.smallrye:smallrye-context-propagation:jar:1.0.13:compile [INFO] | | | \- io.smallrye:smallrye-context-propagation-api:jar:1.0.13:compile [INFO] | | \- io.smallrye.reactive:mutiny-context-propagation:jar:0.5.4:compile [INFO] | \- org.jboss.resteasy:resteasy-client:jar:4.5.5.Final:compile [INFO] | +- org.jboss.resteasy:resteasy-client-api:jar:4.5.5.Final:compile [INFO] | +- org.jboss.resteasy:resteasy-core-spi:jar:4.5.5.Final:compile [INFO] | +- org.jboss.resteasy:resteasy-core:jar:4.5.5.Final:compile [INFO] | | +- com.ibm.async:asyncutil:jar:0.1.0:compile [INFO] | | \- org.eclipse.microprofile.config:microprofile-config-api:jar:1.4:compile [INFO] | +- commons-io:commons-io:jar:2.6:compile [INFO] | \- org.jboss.spec.javax.ws.rs:jboss-jaxrs-api_2.1_spec:jar:2.0.1.Final:compile [INFO] \- io.quarkus:quarkus-rest-client:jar:1.6.1.Final:compile [INFO] +- io.quarkus:quarkus-resteasy-common:jar:1.6.1.Final:compile [INFO] | \- com.sun.activation:jakarta.activation:jar:1.2.1:compile [INFO] +- org.jboss.resteasy:resteasy-client-microprofile:jar:4.5.5.Final:compile [INFO] | \- org.eclipse.microprofile.rest.client:microprofile-rest-client-api:jar:1.4.1:compile [INFO] +- jakarta.interceptor:jakarta.interceptor-api:jar:1.2.5:compile [INFO] +- org.apache.httpcomponents:httpasyncclient:jar:4.1.4:compile [INFO] | \- org.apache.httpcomponents:httpcore-nio:jar:4.4.13:compile [INFO] \- org.jboss.logging:commons-logging-jboss-logging:jar:1.0.0.Final:compile
SmallRye Munityまわりだけ抜き出すと、こんな感じです。
[INFO] | +- io.quarkus:quarkus-mutiny:jar:1.6.1.Final:compile [INFO] | | +- io.smallrye.reactive:mutiny:jar:0.5.4:compile [INFO] | | | \- org.reactivestreams:reactive-streams:jar:1.0.3:compile [INFO] | | +- io.quarkus:quarkus-smallrye-context-propagation:jar:1.6.1.Final:compile [INFO] | | | \- io.smallrye:smallrye-context-propagation:jar:1.0.13:compile [INFO] | | | \- io.smallrye:smallrye-context-propagation-api:jar:1.0.13:compile [INFO] | | \- io.smallrye.reactive:mutiny-context-propagation:jar:0.5.4:compile
RESTEasy Munityだと、こんな感じです。
[INFO] +- io.quarkus:quarkus-resteasy-mutiny:jar:1.6.1.Final:compile [INFO] | +- io.quarkus:quarkus-arc:jar:1.6.1.Final:compile [INFO] | | +- io.quarkus.arc:arc:jar:1.6.1.Final:compile [INFO] | | | \- jakarta.transaction:jakarta.transaction-api:jar:1.3.3:compile [INFO] | | \- org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api:jar:1.0.1:compile [INFO] | +- io.quarkus:quarkus-mutiny:jar:1.6.1.Final:compile [INFO] | | +- io.smallrye.reactive:mutiny:jar:0.5.4:compile [INFO] | | | \- org.reactivestreams:reactive-streams:jar:1.0.3:compile [INFO] | | +- io.quarkus:quarkus-smallrye-context-propagation:jar:1.6.1.Final:compile [INFO] | | | \- io.smallrye:smallrye-context-propagation:jar:1.0.13:compile [INFO] | | | \- io.smallrye:smallrye-context-propagation-api:jar:1.0.13:compile [INFO] | | \- io.smallrye.reactive:mutiny-context-propagation:jar:0.5.4:compile [INFO] | \- org.jboss.resteasy:resteasy-client:jar:4.5.5.Final:compile [INFO] | +- org.jboss.resteasy:resteasy-client-api:jar:4.5.5.Final:compile [INFO] | +- org.jboss.resteasy:resteasy-core-spi:jar:4.5.5.Final:compile [INFO] | +- org.jboss.resteasy:resteasy-core:jar:4.5.5.Final:compile [INFO] | | +- com.ibm.async:asyncutil:jar:0.1.0:compile [INFO] | | \- org.eclipse.microprofile.config:microprofile-config-api:jar:1.4:compile [INFO] | +- commons-io:commons-io:jar:2.6:compile
RxJavaなどは、依存関係に現れませんね。
SmallRye Munityを使った、JAX-RSリソースクラスを作成する
SmallRye Munityを使った、JAX-RSリソースクラスを作っていきます。 src/main/java/org/littlewings/quarkus/resteasy/api/HelloMunityResource.java
package org.littlewings.quarkus.resteasy.api; import java.time.Duration; import java.time.LocalDateTime; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import org.jboss.resteasy.annotations.SseElementType; @Path("hello") public class HelloMunityResource { @GET @Path("uni") @Produces(MediaType.TEXT_PLAIN) public Uni<String> uni(@QueryParam("message") String message) { return Uni .createFrom() .item(message) .onItem() .apply(m -> String.format("Hello %s", message)) .onItem() .invoke(v -> System.out.printf("[%s] uni / %s%n", LocalDateTime.now(), v)); } @GET @Path("multi/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.TEXT_PLAIN) public Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) { return Multi .createFrom() .ticks() .every(Duration.ofSeconds(1)) .onItem() .apply(i -> String.format("Hello %s - %d", message, i)) .onItem() .invoke(v -> System.out.printf("[%s] multi-stream / %s%n", LocalDateTime.now(), v)) .transform() .byTakingFirstItems(count); } @GET @Path("multi/json") @Produces(MediaType.APPLICATION_JSON) public Multi<String> multiJson(@QueryParam("count") int count, @QueryParam("message") String message) { return Multi .createFrom() .ticks() .every(Duration.ofSeconds(1)) .onItem() .apply(i -> String.format("Hello %s - %d", message, i)) .onItem() .invoke(v -> System.out.printf("[%s] multi-json / %s%n", LocalDateTime.now(), v)) .transform() .byTakingFirstItems(count); } }
ほとんど、Getting Startedのマネですね。
Quarkus - Getting started with Reactive
UniがRxJava 2でいうSingle、ReactorでいうMonoにあたります。MultiはFlowableやFluxになります、と。
UniもMultiも、createFromメソッドから始めます。
Uniを使ったリソースメソッドでは、applyで変換して返すように作成。
@GET @Path("uni") @Produces(MediaType.TEXT_PLAIN) public Uni<String> uni(@QueryParam("message") String message) { return Uni .createFrom() .item(message) .onItem() .apply(m -> String.format("Hello %s", message)) .onItem() .invoke(v -> System.out.printf("[%s] uni / %s%n", LocalDateTime.now(), v)); }
2回目のonItemのあとのinvokeはロギングのためなので、これでもOKです。
@GET @Path("uni") @Produces(MediaType.TEXT_PLAIN) public Uni<String> uni(@QueryParam("message") String message) { return Uni .createFrom() .item(message) .onItem() .apply(m -> String.format("Hello %s", message)); }
続いて、Multiを使った場合。
@GET @Path("multi/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.TEXT_PLAIN) public Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) { return Multi .createFrom() .ticks() .every(Duration.ofSeconds(1)) .onItem() .apply(i -> String.format("Hello %s - %d", message, i)) .onItem() .invoke(v -> System.out.printf("[%s] multi-stream / %s%n", LocalDateTime.now(), v)) .transform() .byTakingFirstItems(count); }
ticks、everyで1秒おきに値を生成するようにして、この時に受け取ったクエリパラーメーターを使ってメッセージを作成するように
しています。そして、このままだとMultiが無限に続くのでbyTakingFirstItemsで指定の個数で切り取ります。
この個数も、クエリパラメーターから決定します。
これで、1秒おきにレスポンスに書き出されるSSE(Server Sent Event)の実装になっています。@Producesアノテーションに
指定しているMediaTypeおよび、@SseElementTypeアノテーションがポイントですね。
なお、こちらもロギングを省くとこんな感じです。
@GET @Path("multi/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.TEXT_PLAIN) public Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) { return Multi .createFrom() .ticks() .every(Duration.ofSeconds(1)) .onItem() .apply(i -> String.format("Hello %s - %d", message, i)) .transform() .byTakingFirstItems(count); }
最後のリソースメソッドも、1秒おきに要素を生成しますが、これは最後にまとめて返すことになります。
@GET @Path("multi/json") @Produces(MediaType.APPLICATION_JSON) public Multi<String> multiJson(@QueryParam("count") int count, @QueryParam("message") String message) { return Multi .createFrom() .ticks() .every(Duration.ofSeconds(1)) .onItem() .apply(i -> String.format("Hello %s - %d", message, i)) .onItem() .invoke(v -> System.out.printf("[%s] multi-json / %s%n", LocalDateTime.now(), v)) .transform() .byTakingFirstItems(count); }
ここまで作成したら、確認してみましょう。
$ mvn package $ java -jar target/reactive-resteasy-mutiny-1.0-SNAPSHOT-runner.jar
確認。
## Uni $ curl -i 'localhost:8080/hello/uni?message=World' HTTP/1.1 200 OK Content-Length: 11 Content-Type: text/plain;charset=UTF-8 Hello World #### サーバー側のログ [2020-07-28T23:43:14.694090] uni / Hello World ## Multi(SSE) $ curl -i 'localhost:8080/hello/multi/stream?count=5&message=World' HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: text/event-stream;element-type="text/plain" data: Hello World - 0 data: Hello World - 1 data: Hello World - 2 data: Hello World - 3 data: Hello World - 4 #### サーバー側のログ [2020-07-28T23:44:01.991498] multi-stream / Hello World - 0 [2020-07-28T23:44:02.990446] multi-stream / Hello World - 1 [2020-07-28T23:44:03.990384] multi-stream / Hello World - 2 [2020-07-28T23:44:04.990275] multi-stream / Hello World - 3 [2020-07-28T23:44:05.990208] multi-stream / Hello World - 4 ## Multi $ curl -i 'localhost:8080/hello/multi/json?count=5&message=World' HTTP/1.1 200 OK Content-Length: 91 Content-Type: application/json ["Hello World - 0","Hello World - 1","Hello World - 2","Hello World - 3","Hello World - 4"] #### サーバー側のログ [2020-07-28T23:44:32.413521] multi-json / Hello World - 0 [2020-07-28T23:44:33.413838] multi-json / Hello World - 1 [2020-07-28T23:44:34.413875] multi-json / Hello World - 2 [2020-07-28T23:44:35.413725] multi-json / Hello World - 3 [2020-07-28T23:44:36.413793] multi-json / Hello World - 4
この中で「/hello/multi/stream」のアクセスは1秒おきにコンソールに出力され、SSEになっていることが確認できます。
ここまで、OKそうです。
REST Clientから使う
続いて、SmallRye MunityをREST Clientから使ってみましょう。
Using the REST Client (including JSON) / Async Support
ガイドを見ていたら、使えそうな感じがしましたしね。
で、REST Clientのアクセス先ですが、先ほど作成したリソースクラスにしましょう。
こんな感じでインターフェースを作成。
src/main/java/org/littlewings/quarkus/resteasy/client/HelloService.java
package org.littlewings.quarkus.resteasy.client; import java.util.List; import java.util.concurrent.CompletionStage; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; import org.jboss.resteasy.annotations.SseElementType; @Path("hello") @RegisterRestClient public interface HelloService { @GET @Path("uni") @Produces(MediaType.TEXT_PLAIN) Uni<String> uni(@QueryParam("message") String message); @GET @Path("multi/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.TEXT_PLAIN) Uni<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message); @GET @Path("multi/json") @Produces(MediaType.APPLICATION_JSON) Uni<List<String>> multiJson(@QueryParam("count") int count, @QueryParam("message") String message); }
メソッド名こそ、先ほどのリソースクラスと合わせていますが、Multiを使っていた部分は一律Uniにしています。
Multiだと、どうにも動かなかったので…。
このクライアントを使う、JAX-RSリソースクラスを用意。
src/main/java/org/littlewings/quarkus/resteasy/api/RestClientMutinyResource.java
package org.littlewings.quarkus.resteasy.api; import java.util.List; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import io.smallrye.mutiny.Uni; import org.eclipse.microprofile.rest.client.inject.RestClient; import org.littlewings.quarkus.resteasy.client.HelloService; @Path("client") public class RestClientMutinyResource { @Inject @RestClient HelloService helloService; @GET @Path("uni") @Produces(MediaType.TEXT_PLAIN) public Uni<String> uni(@QueryParam("message") String message) { return helloService.uni(message); } @GET @Path("multi/stream") @Produces(MediaType.TEXT_PLAIN) public Uni<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message) { return helloService.multiStream(count, message); } @GET @Path("multi/json") @Produces(MediaType.APPLICATION_JSON) public Uni<List<String>> multiJson(@QueryParam("count") int count, @QueryParam("message") String message) { return helloService.multiJson(count, message); } }
こちらは、受け取ったレスポンスをそのまま返すだけです。が、やっぱりこちらもMultiは使いません。
最後にこのクライアントのアクセス先ですが、同じアプリケーションを2つ立ち上げることで実現しようと思います。
src/main/resources/application.properties
# Configuration file # key = value org.littlewings.quarkus.resteasy.client.HelloService/mp-rest/url=http://localhost:9080
ポートは9080ということで。
パッケージングして
$ mvn package
ひとつ、アプリケーションを起動。こちらは、8080ポートでリッスンしています。
$ java -jar target/reactive-resteasy-mutiny-1.0-SNAPSHOT-runner.jar
もうひとつは、9080ポートで起動させます。
$ java -Dquarkus.http.port=9080 -jar target/reactive-resteasy-mutiny-1.0-SNAPSHOT-runner.jar
確認。
## Uni $ curl -i 'localhost:8080/client/uni?message=World' HTTP/1.1 200 OK Content-Length: 11 Content-Type: text/plain;charset=UTF-8 Hello World ## Uni(Clientからのアクセス先はMulti(SSE)) $ curl -i 'localhost:8080/client/multi/stream?count=5&message=World' HTTP/1.1 200 OK Content-Length: 117 Content-Type: text/plain;charset=UTF-8 data: Hello World - 0 data: Hello World - 1 data: Hello World - 2 data: Hello World - 3 data: Hello World - 4 ## Uni(Clientからのアクセス先はMulti) $ curl -i 'localhost:8080/client/multi/json?count=5&message=World' HTTP/1.1 200 OK Content-Length: 91 Content-Type: application/json ["Hello World - 0","Hello World - 1","Hello World - 2","Hello World - 3","Hello World - 4"]
見た目だけは先ほどと変わりませんが、今回は結果が一気に出力されます。
9080ポートで起動している方のサーバーのログは、こんな感じでMultiも動作しているのがわかります。
[2020-07-28T23:57:28.973486] uni / Hello World [2020-07-28T23:57:57.050177] multi-stream / Hello World - 0 [2020-07-28T23:57:58.048699] multi-stream / Hello World - 1 [2020-07-28T23:57:59.048899] multi-stream / Hello World - 2 [2020-07-28T23:58:00.048558] multi-stream / Hello World - 3 [2020-07-28T23:58:01.048324] multi-stream / Hello World - 4 [2020-07-28T23:58:18.255281] multi-json / Hello World - 0 [2020-07-28T23:58:19.255660] multi-json / Hello World - 1 [2020-07-28T23:58:20.255789] multi-json / Hello World - 2 [2020-07-28T23:58:21.255831] multi-json / Hello World - 3 [2020-07-28T23:58:22.255617] multi-json / Hello World - 4
とりあえず、動くには動きました、と。
REST ClientとMulti
REST Clientを使ったインターフェースを書く時に、サーバー側はMultiを使っているにも関わらず、クライアント側はUniと
しました。なお、CompletionStageにしても動作はします。どちらも、クライアントはストリームっぽい動きにはなりませんが。
これ、Multiだとどうやっても動かなくて、諦めたんですよね…。
こんな定義にして(リソースクラスは、このMultiをそのまま返すように作成)実行すると、
@GET @Path("multi/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.TEXT_PLAIN) Multi<String> multiStream(@QueryParam("count") int count, @QueryParam("message") String message);
こんな感じになります。
2020-07-28 22:32:47,495 ERROR [org.jbo.res.res.i18n] (sse-event-source(http://localhost:9080/hello/multi/stream?count=5&message=World)-thread-1) RESTEASY002020: Unhandled asynchronous exception, sending back 500: javax.ws.rs.ProcessingException: java.lang.NullPointerException at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.filterRequest(ClientInvocation.java:696) at org.jboss.resteasy.microprofile.client.impl.MpClientInvocation.filterRequest(MpClientInvocation.java:75) at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.invoke(ClientInvocation.java:485) at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.invoke(ClientInvocation.java:65) at org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl$EventHandler.run(SseEventSourceImpl.java:329) at org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceScheduler$1.run(SseEventSourceScheduler.java:92) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.NullPointerException at org.jboss.resteasy.microprofile.client.utils.ClientRequestContextUtils.getMethod(ClientRequestContextUtils.java:25) at org.jboss.resteasy.microprofile.client.MethodInjectionFilter.filter(MethodInjectionFilter.java:15) at org.jboss.resteasy.client.jaxrs.internal.ClientInvocation.filterRequest(ClientInvocation.java:683) ... 11 more
クライアント側がコケてしまいます。
追ってみたのですが、よくわかりませんでした…。
これを見た感じ、使えてもおかしくなさそうなんですけどねぇ。
テストコードとかを見ても、REST Clientのメソッドの戻り値をMultiにしている例は見当たらなかったので、今回は1度
諦めて、そのうちまた見返してみましょう。