これは、なにをしたくて書いたもの?
- RESTEasyが3.6.0.FinalでRxJava 2を組み込めるようにしているらしいので、試してみたい
- せっかくなので、無限Stream的なレスポンスが継続するような使い方をしてみたい
- Listenerを使って、イベント発火に応じてレスポンスを作成する感じで
- できれば、クライアント/サーバーで試してみたい
RESTEasyのReactive Support?
ドキュメントは、こちら。
Reactive programming support
CompletionStageを使ったものやらありますが、今回はRxJava 2について書いたものを対象とします。
Pluggable reactive types: RxJava 2 in RESTEasy
クライアント側についても、記載がありますよ。
環境
今回の環境は、こちら。
$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)
$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T03:33:14+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-33-generic", arch: "amd64", family: "unix"
お題
最初に書いたやりたいことに沿って、次のようなことをやってみます。
- RESTEasyでRxJava 2のFlowableを使ったResourceクラスを作成する
- データは、InfinispanのCacheに対するListenerでデータの追加、変更の通知からレスポンスのデータを作成する
- Cacheへのデータの追加、変更は、同じくResourceで行う
- 最後にテストコードでクライアント側も書いてみる
準備
Maven依存関係など、あらかたの設定はこちら。
<dependencies>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-rxjava2</artifactId>
<version>3.6.1.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-cdi</artifactId>
<version>3.6.1.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-undertow</artifactId>
<version>3.6.1.Final</version>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
<version>9.3.1.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se-core</artifactId>
<version>3.0.5.Final</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>javax.enterprise</groupId>
<artifactId>cdi-api</artifactId>
<version>2.0.SP1</version>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-client</artifactId>
<version>3.6.1.Final</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
まあ長いですが、
- RESTEasy関連のモジュール
- RxJava 2(resteasy-rxjava2)
- CDI(なんとなく)
- Undertow(Servletコンテナ)
- Infinispan
- Weld(CDIの実装として)
という感じで作って、テストを
で作っていきます。
サンプルコード
Resourceクラス
最初に、RxJava 2を使ったResourceクラスを載せます。最初に雛形的な。
src/main/java/org/littlewings/resteasy/reactive/ReactiveResource.java
package org.littlewings.resteasy.reactive;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.jboss.logging.Logger;
import org.jboss.resteasy.annotations.Stream;
@Path("reactive")
@ApplicationScoped
public class ReactiveResource {
Logger logger = Logger.getLogger(ReactiveResource.class);
@Inject
EmbeddedCacheManager cacheManager;
}
どこからからか、InfinispanのEmbeddedCacheManagerは@Injectしますが、それはあとで載せます。
Flowableを返す部分は、InfinispanのListenerを作成して、このように定義しました。
@GET
@Path("stream")
@Produces(MediaType.TEXT_PLAIN)
@Stream
public Flowable<String> values() {
Cache<String, String> cache = cacheManager.getCache("streamCache");
return Flowable
.<String>create(
emitter -> {
MyListener listener = new MyListener(emitter);
cache.addListener(listener);
emitter.setCancellable(() -> {
cache.removeListener(listener);
logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
});
logger.infof("[%s] registered listener", Thread.currentThread().getName());
}, BackpressureStrategy.BUFFER
)
.doOnError(error -> logger.infof("error => %s", error.getMessage()))
.doOnCancel(() -> logger.infof("cancel!!"))
.doOnComplete(() -> logger.infof("complete"))
.doOnTerminate(() -> logger.infof("terminate"));
}
@Listener(clustered = true)
static class MyListener {
Logger logger = Logger.getLogger(MyListener.class);
FlowableEmitter<String> emitter;
MyListener(FlowableEmitter<String> emitter) {
this.emitter = emitter;
}
@CacheEntryCreated
@CacheEntryModified
public void receive(CacheEntryEvent<String, String> event) {
if (!event.isPre()) {
logger.infof("[%s] received = %s", Thread.currentThread().getName(), event.getKey() + "=" + event.getValue());
emitter.onNext(event.getKey() + "=" + event.getValue());
}
}
}
まず、ポイントとしてはメソッドの戻り値の型がFlowableになっていることと、メソッドに@Streamアノテーションが付いていることです。
@GET
@Path("stream")
@Produces(MediaType.TEXT_PLAIN)
@Stream
public Flowable<String> values() {
@Streamアノテーションについては、
RESTEasyのRxJava 2向けのモジュールでは、Flowable、Single、Obserbable向けのProviderが作られています。
https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/main/java/org/jboss/resteasy/rxjava2/FlowableProvider.java
https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/main/java/org/jboss/resteasy/rxjava2/SingleProvider.java
https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/main/java/org/jboss/resteasy/rxjava2/ObservableProvider.java
@Streamアノテーションは、RESTEasy独自のアノテーションです。
クライアントに送信するデータは、Cache#putに合わせてListener経由で取得し、FlowableEmitterにonNextで渡します。
クライアントの切断後に、できればListenerを解除したかったので、いろいろ試行錯誤した結果FlowableEmitter#setCancellable、
もしくはFlowableのdoOnCancelで解除するのが良さそうな感じです。
emitter.setCancellable(() -> {
cache.removeListener(listener);
logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
});
その他、doOn〜が付いていますが、今回はあんまり意味がありませんでした…。
このあたりは、確認のためにログを仕込んでいます。
データの登録は、こんな感じで。
@GET
@Path("put")
@Produces(MediaType.TEXT_PLAIN)
public String put(@QueryParam("key") String key, @QueryParam("value") String value) {
Cache<String, String> cache = cacheManager.getCache("streamCache");
cache.put(key, value);
return "OK!";
}
このCache#putに応じて、ListenerがFlowableEmitterに対してonNextするという感じです。
CacheManagerを、CDI管理Beanに登録
InfinispanのEmbeddedCacheManagerを、CDI管理BeanとするためのProducer。
src/main/java/org/littlewings/resteasy/reactive/CacheProducer.java
package org.littlewings.resteasy.reactive;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Disposes;
import javax.enterprise.inject.Produces;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
@Dependent
public class CacheProducer {
@Produces
@ApplicationScoped
public EmbeddedCacheManager cacheManager() {
EmbeddedCacheManager cacheManager = new DefaultCacheManager();
cacheManager.createCache("streamCache", new ConfigurationBuilder().build());
return cacheManager;
}
public void cacheManager(@Disposes EmbeddedCacheManager cacheManager) {
cacheManager.stop();
}
}
まんまです。クラスタ化はお好みで。
Java EEサーバーというよりは、Undertow上でWeldを突っ込んで動かすため、beans.xmlを用意します。
src/main/resources/META-INF/beans.xml
xml version="1.0" encoding="UTF-8"
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlnsxsi="http://www.w3.org/2001/XMLSchema-instance"
xsischemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd"
bean-discovery-mode="annotated" version="2.0">
</beans>
なにげに、初CDI 2.0です。
Applicationのサブクラス
通常、JAX-RSの有効化として。今回、中身はありません。
src/main/java/org/littlewings/resteasy/reactive/JaxrsActivator.java
package org.littlewings.resteasy.reactive;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;
@ApplicationPath("")
public class JaxrsActivator extends Application {
}
起動クラス
最後に、起動クラスを。Undertowへのデプロイも、ここで行います。
src/main/java/org/littlewings/resteasy/reactive/Server.java
package org.littlewings.resteasy.reactive;
import javax.enterprise.inject.se.SeContainer;
import javax.enterprise.inject.se.SeContainerInitializer;
import io.undertow.Undertow;
import org.jboss.resteasy.cdi.CdiInjectorFactory;
import org.jboss.resteasy.cdi.ResteasyCdiExtension;
import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer;
import org.jboss.resteasy.spi.ResteasyDeployment;
public class Server {
UndertowJaxrsServer server;
SeContainer container;
int port;
public Server() {
this(8080);
}
public Server(int port) {
this.port = port;
}
public static void main(String... args) {
Server server = new Server();
server.start();
}
public void start() {
SeContainerInitializer initializer = SeContainerInitializer.newInstance();
container = initializer.initialize();
ResteasyCdiExtension extension = container.select(ResteasyCdiExtension.class).get();
server = new UndertowJaxrsServer();
ResteasyDeployment deployment = new ResteasyDeployment();
deployment.setApplication(new JaxrsActivator());
deployment.setActualResourceClasses(extension.getResources());
deployment.setActualProviderClasses(extension.getProviders());
deployment.setInjectorFactoryClass(CdiInjectorFactory.class.getName());
server.deploy(deployment);
server.start(Undertow.builder().addHttpListener(port, "0.0.0.0"));
}
public void shutdown() {
server.stop();
container.close();
}
}
ちょっとCDIを使うように細工をしてあるくらいですね、変わったところは。
CDI 2.0のSeContainer/SeContainerInitializerは、初めて使いました。
確認
それでは、動作確認してみましょう。
起動。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.resteasy.reactive.Server
クライアントの登録。
$ curl -iv localhost:8080/reactive/stream
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /reactive/stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.58.0
> Accept: */*
>
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Connection: keep-alive
Connection: keep-alive
< Transfer-Encoding: chunked
Transfer-Encoding: chunked
< Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
< Date: Tue, 04 Sep 2018 13:04:49 GMT
Date: Tue, 04 Sep 2018 13:04:49 GMT
<
ここで、curlが待ち状態に入ります。
他のターミナルで、こんな感じにデータを送ってみると
$ curl -iv 'localhost:8080/reactive/put?key=key1&value=value1'
$ curl -iv 'localhost:8080/reactive/put?key=key2&value=value2'
待機している側には、こんな感じで表示されます。
data: key1=value1
data: key2=value2
このままcurlでデータを投げ込んでいてもよいですし、さらに追加でイベントを受け取るクライアントを追加してもOKです。
$ curl -iv localhost:8080/reactive/stream
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /reactive/stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.58.0
> Accept: */*
>
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Connection: keep-alive
Connection: keep-alive
< Transfer-Encoding: chunked
Transfer-Encoding: chunked
< Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
< Date: Tue, 04 Sep 2018 13:07:52 GMT
Date: Tue, 04 Sep 2018 13:07:52 GMT
<
このあと、データを投げ込むと、いずれのコンソールにも放り込んだデータがレスポンスとして表示されることを確認できます。
で、ここでひとつクライアントを切断してみます。
そしてデータを投げ込むと、途中でUndertowがBroken Pipeを検出して切断されていたクライアントの相手をしていたPublisher(Flowable)に
キャンセルをかけます。
9 04, 2018 10:09:16 午後 org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl writeEvent
ERROR: RESTEASY002030: Failed to write event org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl@58e53250
java.io.IOException: Broken pipe
...
9 04, 2018 10:09:16 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$values$3
INFO: cancel!!
9 04, 2018 10:09:16 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$null$0
INFO: [XNIO-1 task-7] unregistered listener
Listenerを解除しているところが確認できます。
ところで、いろいろdoOn〜を設定していたのですが、動いているのは今回はdoOnCancelのみです…。
.doOnError(error -> logger.infof("error => %s", error.getMessage()))
.doOnCancel(() -> logger.infof("cancel!!"))
.doOnComplete(() -> logger.infof("complete"))
.doOnTerminate(() -> logger.infof("terminate"));
これなんですが、最初デプロイ先をNettyにしていたらcancelすら起こらずに途方に暮れ、Servletコンテナに変更した方がいいかな?と
Undertowに変えたら、こういう挙動になったので、今回はこちらを採用…。
TomcatやJetty、WildFlyだとどういう挙動になるんでしょうね…。
また、スタックトレースにちょこっと現れていますが、この仕組みを追うとServer-Sent-Eventまわりのコードが出てきます。
https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-jaxrs/src/main/java/org/jboss/resteasy/core/AsyncResponseConsumer.java
https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-jaxrs/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java#L139
なお、このブログでSSEが出てきたのは初めてです…。
クライアント側
最後に、クライアント側のコードを書いてみましょう。
このあたりを参考にして。
https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/test/java/org/jboss/resteasy/rxjava2/RxTest.java
src/test/java/org/littlewings/resteasy/reactive/ReactiveClientTest.java
package org.littlewings.resteasy.reactive;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import io.reactivex.subscribers.TestSubscriber;
import org.jboss.resteasy.rxjava2.FlowableRxInvoker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class ReactiveClientTest {
Server server;
@BeforeEach
public void setUp() {
server = new Server();
server.start();
}
@AfterEach
public void tearDown() {
server.shutdown();
}
}
テストの開始、終了時にServerを起動、停止するように仕込んでおきます。
クライアント側は、Invocation.Builderのrxを使って、RxJava 2と統合します。
@Test
public void rxclient() {
Client client = ClientBuilder.newBuilder().build();
FlowableRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();
client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
subscriber.assertValue("key1=value1");
client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
subscriber.assertValues("key1=value1", "key2=value2");
client.close();
subscriber.awaitTerminalEvent();
}
今回は、このFlowableRxInvoker(というかFlowable)から取得できるTestSubscriberでテストを行いました。
TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();
curlでやった時と同じように、データの送信の度にFlowable(SSE)で受け取っている方にデータが返ってくることが確認できます。
client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
subscriber.assertValue("key1=value1");
client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
subscriber.assertValues("key1=value1", "key2=value2");
Flowableで受け取る先を、増やしてもOK。
@Test
public void multipleInvoke() {
Client client = ClientBuilder.newBuilder().build();
FlowableRxInvoker invoker1 = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
TestSubscriber<String> subscriber1 = (TestSubscriber<String>) invoker1.get(String.class).test();
client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
subscriber1.assertValue("key1=value1");
FlowableRxInvoker invoker2 = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();
client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
subscriber1.assertValues("key1=value1", "key2=value2");
subscriber2.assertValues("key2=value2");
client.close();
subscriber1.awaitTerminalEvent();
subscriber2.awaitTerminalEvent();
}
途中で切断するケースも。
@Test
public void rxclientAndClosed() {
Client client = ClientBuilder.newBuilder().build();
FlowableRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();
client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
subscriber.assertValue("key1=value1");
client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
subscriber.assertValues("key1=value1", "key2=value2");
client.close();
Client client2 = ClientBuilder.newBuilder().build();
FlowableRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();
client2.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
client2.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
client2.target("http://localhost:8080/reactive/put?key=key3&value=value3").request().get().close();
subscriber2.assertValues("key1=value1", "key2=value2", "key3=value3");
client2.close();
subscriber.awaitTerminalEvent();
subscriber2.awaitTerminalEvent();
}
最初のクライアントを切断した後に
client.close();
Client client2 = ClientBuilder.newBuilder().build();
FlowableRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();
何回かデータを送っていると
client2.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
client2.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
client2.target("http://localhost:8080/reactive/put?key=key3&value=value3").request().get().close();
subscriber2.assertValues("key1=value1", "key2=value2", "key3=value3");
サーバー側ではBroken Pipeになってたりします。
9 04, 2018 10:19:12 午後 org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl writeEvent
ERROR: RESTEASY002030: Failed to write event org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl@76307346
java.io.IOException: Broken pipe
で、Listener解除、と。
9 04, 2018 10:19:12 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$values$3
INFO: cancel!!
9 04, 2018 10:19:12 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$null$0
INFO: [XNIO-1 task-6] unregistered listener
特にListenerの解除時の挙動とかけっこう不安なのですが、とりあえずやりたかったことは確認できました、と。
けっこう大変でしたけど、面白かったです。