これは、なにをしたくて書いたもの?
- RESTEasyが3.6.0.FinalでRxJava 2を組み込めるようにしているらしいので、試してみたい
- せっかくなので、無限Stream的なレスポンスが継続するような使い方をしてみたい
- Listenerを使って、イベント発火に応じてレスポンスを作成する感じで
- できれば、クライアント/サーバーで試してみたい
RESTEasyのReactive Support?
ドキュメントは、こちら。
CompletionStageを使ったものやらありますが、今回はRxJava 2について書いたものを対象とします。
Pluggable reactive types: RxJava 2 in RESTEasy
クライアント側についても、記載がありますよ。
環境
今回の環境は、こちら。
$ java -version openjdk version "1.8.0_181" OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13) OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode) $ mvn -version Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T03:33:14+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-33-generic", arch: "amd64", family: "unix"
お題
最初に書いたやりたいことに沿って、次のようなことをやってみます。
- RESTEasyでRxJava 2のFlowableを使ったResourceクラスを作成する
- データは、InfinispanのCacheに対するListenerでデータの追加、変更の通知からレスポンスのデータを作成する
- Cacheへのデータの追加、変更は、同じくResourceで行う
- 最後にテストコードでクライアント側も書いてみる
準備
Maven依存関係など、あらかたの設定はこちら。
<dependencies> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-rxjava2</artifactId> <version>3.6.1.Final</version> </dependency> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-cdi</artifactId> <version>3.6.1.Final</version> </dependency> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-undertow</artifactId> <version>3.6.1.Final</version> </dependency> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>9.3.1.Final</version> </dependency> <dependency> <groupId>org.jboss.weld.se</groupId> <artifactId>weld-se-core</artifactId> <version>3.0.5.Final</version> </dependency> <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> <version>1</version> </dependency> <dependency> <groupId>javax.enterprise</groupId> <artifactId>cdi-api</artifactId> <version>2.0.SP1</version> </dependency> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-client</artifactId> <version>3.6.1.Final</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.2.0</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.21.0</version> <dependencies> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-surefire-provider</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.2.0</version> </dependency> </dependencies> </plugin> </plugins> </build>
まあ長いですが、
- RESTEasy関連のモジュール
- Infinispan
- Weld(CDIの実装として)
という感じで作って、テストを
- RESTEasy Client
- JUnit 5
で作っていきます。
サンプルコード
Resourceクラス
最初に、RxJava 2を使ったResourceクラスを載せます。最初に雛形的な。 src/main/java/org/littlewings/resteasy/reactive/ReactiveResource.java
package org.littlewings.resteasy.reactive; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import org.infinispan.Cache; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; import org.jboss.logging.Logger; import org.jboss.resteasy.annotations.Stream; @Path("reactive") @ApplicationScoped public class ReactiveResource { Logger logger = Logger.getLogger(ReactiveResource.class); @Inject EmbeddedCacheManager cacheManager; // あとで }
どこからからか、InfinispanのEmbeddedCacheManagerは@Injectしますが、それはあとで載せます。
Flowableを返す部分は、InfinispanのListenerを作成して、このように定義しました。
@GET @Path("stream") @Produces(MediaType.TEXT_PLAIN) @Stream public Flowable<String> values() { Cache<String, String> cache = cacheManager.getCache("streamCache"); return Flowable .<String>create( emitter -> { MyListener listener = new MyListener(emitter); cache.addListener(listener); emitter.setCancellable(() -> { cache.removeListener(listener); logger.infof("[%s] unregistered listener", Thread.currentThread().getName()); }); logger.infof("[%s] registered listener", Thread.currentThread().getName()); }, BackpressureStrategy.BUFFER ) .doOnError(error -> logger.infof("error => %s", error.getMessage())) .doOnCancel(() -> logger.infof("cancel!!")) // ココでListenerを解除してもよい? .doOnComplete(() -> logger.infof("complete")) .doOnTerminate(() -> logger.infof("terminate")); } @Listener(clustered = true) static class MyListener { Logger logger = Logger.getLogger(MyListener.class); FlowableEmitter<String> emitter; MyListener(FlowableEmitter<String> emitter) { this.emitter = emitter; } @CacheEntryCreated @CacheEntryModified public void receive(CacheEntryEvent<String, String> event) { if (!event.isPre()) { logger.infof("[%s] received = %s", Thread.currentThread().getName(), event.getKey() + "=" + event.getValue()); emitter.onNext(event.getKey() + "=" + event.getValue()); } } }
まず、ポイントとしてはメソッドの戻り値の型がFlowableになっていることと、メソッドに@Streamアノテーションが付いていることです。
@GET @Path("stream") @Produces(MediaType.TEXT_PLAIN) @Stream public Flowable<String> values() {
@Streamアノテーションについては、
RESTEasyのRxJava 2向けのモジュールでは、Flowable、Single、Obserbable向けのProviderが作られています。
@Streamアノテーションは、RESTEasy独自のアノテーションです。
クライアントに送信するデータは、Cache#putに合わせてListener経由で取得し、FlowableEmitterにonNextで渡します。
クライアントの切断後に、できればListenerを解除したかったので、いろいろ試行錯誤した結果FlowableEmitter#setCancellable、 もしくはFlowableのdoOnCancelで解除するのが良さそうな感じです。
emitter.setCancellable(() -> {
cache.removeListener(listener);
logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
});
その他、doOn〜が付いていますが、今回はあんまり意味がありませんでした…。
このあたりは、確認のためにログを仕込んでいます。
データの登録は、こんな感じで。
@GET @Path("put") @Produces(MediaType.TEXT_PLAIN) public String put(@QueryParam("key") String key, @QueryParam("value") String value) { Cache<String, String> cache = cacheManager.getCache("streamCache"); cache.put(key, value); return "OK!"; }
このCache#putに応じて、ListenerがFlowableEmitterに対してonNextするという感じです。
CacheManagerを、CDI管理Beanに登録
InfinispanのEmbeddedCacheManagerを、CDI管理BeanとするためのProducer。 src/main/java/org/littlewings/resteasy/reactive/CacheProducer.java
package org.littlewings.resteasy.reactive; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.context.Dependent; import javax.enterprise.inject.Disposes; import javax.enterprise.inject.Produces; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; @Dependent public class CacheProducer { @Produces @ApplicationScoped public EmbeddedCacheManager cacheManager() { EmbeddedCacheManager cacheManager = new DefaultCacheManager(); //EmbeddedCacheManager cacheManager = new DefaultCacheManager(new GlobalConfigurationBuilder().clusteredDefault().build()); cacheManager.createCache("streamCache", new ConfigurationBuilder().build()); return cacheManager; } public void cacheManager(@Disposes EmbeddedCacheManager cacheManager) { cacheManager.stop(); } }
まんまです。クラスタ化はお好みで。
Java EEサーバーというよりは、Undertow上でWeldを突っ込んで動かすため、beans.xmlを用意します。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd" bean-discovery-mode="annotated" version="2.0"> </beans>
なにげに、初CDI 2.0です。
Applicationのサブクラス
通常、JAX-RSの有効化として。今回、中身はありません。
src/main/java/org/littlewings/resteasy/reactive/JaxrsActivator.java
package org.littlewings.resteasy.reactive; import javax.ws.rs.ApplicationPath; import javax.ws.rs.core.Application; @ApplicationPath("") public class JaxrsActivator extends Application { }
起動クラス
最後に、起動クラスを。Undertowへのデプロイも、ここで行います。
src/main/java/org/littlewings/resteasy/reactive/Server.java
package org.littlewings.resteasy.reactive; import javax.enterprise.inject.se.SeContainer; import javax.enterprise.inject.se.SeContainerInitializer; import io.undertow.Undertow; import org.jboss.resteasy.cdi.CdiInjectorFactory; import org.jboss.resteasy.cdi.ResteasyCdiExtension; import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer; import org.jboss.resteasy.spi.ResteasyDeployment; public class Server { UndertowJaxrsServer server; SeContainer container; int port; public Server() { this(8080); } public Server(int port) { this.port = port; } public static void main(String... args) { Server server = new Server(); server.start(); } public void start() { SeContainerInitializer initializer = SeContainerInitializer.newInstance(); container = initializer.initialize(); ResteasyCdiExtension extension = container.select(ResteasyCdiExtension.class).get(); server = new UndertowJaxrsServer(); ResteasyDeployment deployment = new ResteasyDeployment(); deployment.setApplication(new JaxrsActivator()); deployment.setActualResourceClasses(extension.getResources()); deployment.setActualProviderClasses(extension.getProviders()); deployment.setInjectorFactoryClass(CdiInjectorFactory.class.getName()); server.deploy(deployment); server.start(Undertow.builder().addHttpListener(port, "0.0.0.0")); } public void shutdown() { server.stop(); container.close(); } }
ちょっとCDIを使うように細工をしてあるくらいですね、変わったところは。
CDI 2.0のSeContainer/SeContainerInitializerは、初めて使いました。
確認
それでは、動作確認してみましょう。
起動。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.resteasy.reactive.Server
クライアントの登録。
$ curl -iv localhost:8080/reactive/stream * Trying 127.0.0.1... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /reactive/stream HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.58.0 > Accept: */* > < HTTP/1.1 200 OK HTTP/1.1 200 OK < Connection: keep-alive Connection: keep-alive < Transfer-Encoding: chunked Transfer-Encoding: chunked < Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8" Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8" < Date: Tue, 04 Sep 2018 13:04:49 GMT Date: Tue, 04 Sep 2018 13:04:49 GMT <
ここで、curlが待ち状態に入ります。
他のターミナルで、こんな感じにデータを送ってみると
$ curl -iv 'localhost:8080/reactive/put?key=key1&value=value1' $ curl -iv 'localhost:8080/reactive/put?key=key2&value=value2'
待機している側には、こんな感じで表示されます。
data: key1=value1 data: key2=value2
このままcurlでデータを投げ込んでいてもよいですし、さらに追加でイベントを受け取るクライアントを追加してもOKです。
$ curl -iv localhost:8080/reactive/stream * Trying 127.0.0.1... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /reactive/stream HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.58.0 > Accept: */* > < HTTP/1.1 200 OK HTTP/1.1 200 OK < Connection: keep-alive Connection: keep-alive < Transfer-Encoding: chunked Transfer-Encoding: chunked < Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8" Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8" < Date: Tue, 04 Sep 2018 13:07:52 GMT Date: Tue, 04 Sep 2018 13:07:52 GMT <
このあと、データを投げ込むと、いずれのコンソールにも放り込んだデータがレスポンスとして表示されることを確認できます。
で、ここでひとつクライアントを切断してみます。
そしてデータを投げ込むと、途中でUndertowがBroken Pipeを検出して切断されていたクライアントの相手をしていたPublisher(Flowable)に キャンセルをかけます。
9 04, 2018 10:09:16 午後 org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl writeEvent ERROR: RESTEASY002030: Failed to write event org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl@58e53250 java.io.IOException: Broken pipe ... 9 04, 2018 10:09:16 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$values$3 INFO: cancel!! 9 04, 2018 10:09:16 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$null$0 INFO: [XNIO-1 task-7] unregistered listener
Listenerを解除しているところが確認できます。
ところで、いろいろdoOn〜を設定していたのですが、動いているのは今回はdoOnCancelのみです…。
.doOnError(error -> logger.infof("error => %s", error.getMessage())) .doOnCancel(() -> logger.infof("cancel!!")) // ココでListenerを解除してもよい? .doOnComplete(() -> logger.infof("complete")) .doOnTerminate(() -> logger.infof("terminate"));
これなんですが、最初デプロイ先をNettyにしていたらcancelすら起こらずに途方に暮れ、Servletコンテナに変更した方がいいかな?と Undertowに変えたら、こういう挙動になったので、今回はこちらを採用…。
TomcatやJetty、WildFlyだとどういう挙動になるんでしょうね…。
また、スタックトレースにちょこっと現れていますが、この仕組みを追うとServer-Sent-Eventまわりのコードが出てきます。
なお、このブログでSSEが出てきたのは初めてです…。
クライアント側
最後に、クライアント側のコードを書いてみましょう。
このあたりを参考にして。
src/test/java/org/littlewings/resteasy/reactive/ReactiveClientTest.java
package org.littlewings.resteasy.reactive; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import io.reactivex.subscribers.TestSubscriber; import org.jboss.resteasy.rxjava2.FlowableRxInvoker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ReactiveClientTest { Server server; @BeforeEach public void setUp() { server = new Server(); server.start(); } @AfterEach public void tearDown() { server.shutdown(); } // ここに、テストを書く! }
テストの開始、終了時にServerを起動、停止するように仕込んでおきます。
クライアント側は、Invocation.Builderのrxを使って、RxJava 2と統合します。
@Test public void rxclient() { Client client = ClientBuilder.newBuilder().build(); FlowableRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class); TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test(); client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close(); subscriber.assertValue("key1=value1"); client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close(); subscriber.assertValues("key1=value1", "key2=value2"); client.close(); subscriber.awaitTerminalEvent(); }
今回は、このFlowableRxInvoker(というかFlowable)から取得できるTestSubscriberでテストを行いました。
TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();
curlでやった時と同じように、データの送信の度にFlowable(SSE)で受け取っている方にデータが返ってくることが確認できます。
client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close(); subscriber.assertValue("key1=value1"); client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close(); subscriber.assertValues("key1=value1", "key2=value2");
Flowableで受け取る先を、増やしてもOK。
@Test public void multipleInvoke() { Client client = ClientBuilder.newBuilder().build(); FlowableRxInvoker invoker1 = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class); TestSubscriber<String> subscriber1 = (TestSubscriber<String>) invoker1.get(String.class).test(); client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close(); subscriber1.assertValue("key1=value1"); FlowableRxInvoker invoker2 = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class); TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test(); client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close(); subscriber1.assertValues("key1=value1", "key2=value2"); subscriber2.assertValues("key2=value2"); client.close(); subscriber1.awaitTerminalEvent(); subscriber2.awaitTerminalEvent(); }
途中で切断するケースも。
@Test public void rxclientAndClosed() { Client client = ClientBuilder.newBuilder().build(); FlowableRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class); TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test(); client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close(); subscriber.assertValue("key1=value1"); client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close(); subscriber.assertValues("key1=value1", "key2=value2"); client.close(); Client client2 = ClientBuilder.newBuilder().build(); FlowableRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class); TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test(); client2.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close(); client2.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close(); client2.target("http://localhost:8080/reactive/put?key=key3&value=value3").request().get().close(); subscriber2.assertValues("key1=value1", "key2=value2", "key3=value3"); client2.close(); subscriber.awaitTerminalEvent(); subscriber2.awaitTerminalEvent(); }
最初のクライアントを切断した後に
client.close(); Client client2 = ClientBuilder.newBuilder().build(); FlowableRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class); TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();
何回かデータを送っていると
client2.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close(); client2.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close(); client2.target("http://localhost:8080/reactive/put?key=key3&value=value3").request().get().close(); subscriber2.assertValues("key1=value1", "key2=value2", "key3=value3");
サーバー側ではBroken Pipeになってたりします。
9 04, 2018 10:19:12 午後 org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl writeEvent ERROR: RESTEASY002030: Failed to write event org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl@76307346 java.io.IOException: Broken pipe
で、Listener解除、と。
9 04, 2018 10:19:12 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$values$3 INFO: cancel!! 9 04, 2018 10:19:12 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$null$0 INFO: [XNIO-1 task-6] unregistered listener
特にListenerの解除時の挙動とかけっこう不安なのですが、とりあえずやりたかったことは確認できました、と。 けっこう大変でしたけど、面白かったです。