CLOVER🍀

That was when it all began.

RESTEasy × RxJava 2を試す

これは、なにをしたくて書いたもの?

  • RESTEasyが3.6.0.FinalでRxJava 2を組み込めるようにしているらしいので、試してみたい
  • せっかくなので、無限Stream的なレスポンスが継続するような使い方をしてみたい
    • Listenerを使って、イベント発火に応じてレスポンスを作成する感じで
  • できれば、クライアント/サーバーで試してみたい

RESTEasyのReactive Support?

ドキュメントは、こちら。

Reactive programming support

CompletionStageを使ったものやらありますが、今回はRxJava 2について書いたものを対象とします。

Pluggable reactive types: RxJava 2 in RESTEasy

クライアント側についても、記載がありますよ。

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)


$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T03:33:14+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-33-generic", arch: "amd64", family: "unix"

お題

最初に書いたやりたいことに沿って、次のようなことをやってみます。

  • RESTEasyでRxJava 2のFlowableを使ったResourceクラスを作成する
    • データは、InfinispanのCacheに対するListenerでデータの追加、変更の通知からレスポンスのデータを作成する
  • Cacheへのデータの追加、変更は、同じくResourceで行う
  • 最後にテストコードでクライアント側も書いてみる

準備

Maven依存関係など、あらかたの設定はこちら。

    <dependencies>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-rxjava2</artifactId>
            <version>3.6.1.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-cdi</artifactId>
            <version>3.6.1.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-undertow</artifactId>
            <version>3.6.1.Final</version>
        </dependency>

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>9.3.1.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.weld.se</groupId>
            <artifactId>weld-se-core</artifactId>
            <version>3.0.5.Final</version>
        </dependency>
        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>
        <dependency>
            <groupId>javax.enterprise</groupId>
            <artifactId>cdi-api</artifactId>
            <version>2.0.SP1</version>
        </dependency>

        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-client</artifactId>
            <version>3.6.1.Final</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.21.0</version>
                <dependencies>
                    <dependency>
                        <groupId>org.junit.platform</groupId>
                        <artifactId>junit-platform-surefire-provider</artifactId>
                        <version>1.2.0</version>
                    </dependency>
                    <dependency>
                        <groupId>org.junit.jupiter</groupId>
                        <artifactId>junit-jupiter-engine</artifactId>
                        <version>5.2.0</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>

まあ長いですが、

  • RESTEasy関連のモジュール
    • RxJava 2(resteasy-rxjava2)
    • CDI(なんとなく)
    • Undertow(Servletコンテナ)
  • Infinispan
  • Weld(CDIの実装として)

という感じで作って、テストを

  • RESTEasy Client
  • JUnit 5

で作っていきます。

サンプルコード

Resourceクラス

最初に、RxJava 2を使ったResourceクラスを載せます。最初に雛形的な。 src/main/java/org/littlewings/resteasy/reactive/ReactiveResource.java

package org.littlewings.resteasy.reactive;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.jboss.logging.Logger;
import org.jboss.resteasy.annotations.Stream;

@Path("reactive")
@ApplicationScoped
public class ReactiveResource {
    Logger logger = Logger.getLogger(ReactiveResource.class);

    @Inject
    EmbeddedCacheManager cacheManager;

    // あとで
}

どこからからか、InfinispanのEmbeddedCacheManagerは@Injectしますが、それはあとで載せます。

Flowableを返す部分は、InfinispanのListenerを作成して、このように定義しました。

    @GET
    @Path("stream")
    @Produces(MediaType.TEXT_PLAIN)
    @Stream
    public Flowable<String> values() {
        Cache<String, String> cache = cacheManager.getCache("streamCache");

        return Flowable
                .<String>create(
                        emitter -> {
                            MyListener listener = new MyListener(emitter);

                            cache.addListener(listener);

                            emitter.setCancellable(() -> {
                                cache.removeListener(listener);
                                logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
                            });

                            logger.infof("[%s] registered listener", Thread.currentThread().getName());
                        }, BackpressureStrategy.BUFFER
                )
                .doOnError(error -> logger.infof("error => %s", error.getMessage()))
                .doOnCancel(() -> logger.infof("cancel!!"))  // ココでListenerを解除してもよい?
                .doOnComplete(() -> logger.infof("complete"))
                .doOnTerminate(() -> logger.infof("terminate"));
    }

    @Listener(clustered = true)
    static class MyListener {
        Logger logger = Logger.getLogger(MyListener.class);

        FlowableEmitter<String> emitter;

        MyListener(FlowableEmitter<String> emitter) {
            this.emitter = emitter;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public void receive(CacheEntryEvent<String, String> event) {
            if (!event.isPre()) {
                logger.infof("[%s] received = %s", Thread.currentThread().getName(), event.getKey() + "=" + event.getValue());
                emitter.onNext(event.getKey() + "=" + event.getValue());
            }
        }
    }

まず、ポイントとしてはメソッドの戻り値の型がFlowableになっていることと、メソッドに@Streamアノテーションが付いていることです。

    @GET
    @Path("stream")
    @Produces(MediaType.TEXT_PLAIN)
    @Stream
    public Flowable<String> values() {

@Streamアノテーションについては、

RESTEasyのRxJava 2向けのモジュールでは、Flowable、Single、Obserbable向けのProviderが作られています。

https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/main/java/org/jboss/resteasy/rxjava2/FlowableProvider.java

https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/main/java/org/jboss/resteasy/rxjava2/SingleProvider.java

https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/main/java/org/jboss/resteasy/rxjava2/ObservableProvider.java

@Streamアノテーションは、RESTEasy独自のアノテーションです。

クライアントに送信するデータは、Cache#putに合わせてListener経由で取得し、FlowableEmitterにonNextで渡します。

クライアントの切断後に、できればListenerを解除したかったので、いろいろ試行錯誤した結果FlowableEmitter#setCancellable、 もしくはFlowableのdoOnCancelで解除するのが良さそうな感じです。

                            emitter.setCancellable(() -> {
                                cache.removeListener(listener);
                                logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
                            });

その他、doOn〜が付いていますが、今回はあんまり意味がありませんでした…。

このあたりは、確認のためにログを仕込んでいます。

データの登録は、こんな感じで。

    @GET
    @Path("put")
    @Produces(MediaType.TEXT_PLAIN)
    public String put(@QueryParam("key") String key, @QueryParam("value") String value) {
        Cache<String, String> cache = cacheManager.getCache("streamCache");
        cache.put(key, value);

        return "OK!";
    }

このCache#putに応じて、ListenerがFlowableEmitterに対してonNextするという感じです。

CacheManagerを、CDI管理Beanに登録

InfinispanのEmbeddedCacheManagerを、CDI管理BeanとするためのProducer。 src/main/java/org/littlewings/resteasy/reactive/CacheProducer.java

package org.littlewings.resteasy.reactive;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Disposes;
import javax.enterprise.inject.Produces;

import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

@Dependent
public class CacheProducer {
    @Produces
    @ApplicationScoped
    public EmbeddedCacheManager cacheManager() {
        EmbeddedCacheManager cacheManager = new DefaultCacheManager();
        //EmbeddedCacheManager cacheManager = new DefaultCacheManager(new GlobalConfigurationBuilder().clusteredDefault().build());

        cacheManager.createCache("streamCache", new ConfigurationBuilder().build());

        return cacheManager;
    }

    public void cacheManager(@Disposes EmbeddedCacheManager cacheManager) {
        cacheManager.stop();
    }
}

まんまです。クラスタ化はお好みで。

Java EEサーバーというよりは、Undertow上でWeldを突っ込んで動かすため、beans.xmlを用意します。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd"
       bean-discovery-mode="annotated" version="2.0">
</beans>

なにげに、初CDI 2.0です。

Applicationのサブクラス

通常、JAX-RSの有効化として。今回、中身はありません。
src/main/java/org/littlewings/resteasy/reactive/JaxrsActivator.java

package org.littlewings.resteasy.reactive;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("")
public class JaxrsActivator extends Application {
}
起動クラス

最後に、起動クラスを。Undertowへのデプロイも、ここで行います。
src/main/java/org/littlewings/resteasy/reactive/Server.java

package org.littlewings.resteasy.reactive;

import javax.enterprise.inject.se.SeContainer;
import javax.enterprise.inject.se.SeContainerInitializer;

import io.undertow.Undertow;
import org.jboss.resteasy.cdi.CdiInjectorFactory;
import org.jboss.resteasy.cdi.ResteasyCdiExtension;
import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer;
import org.jboss.resteasy.spi.ResteasyDeployment;

public class Server {
    UndertowJaxrsServer server;
    SeContainer container;
    int port;

    public Server() {
        this(8080);
    }

    public Server(int port) {
        this.port = port;
    }

    public static void main(String... args) {
        Server server = new Server();
        server.start();
    }

    public void start() {
        SeContainerInitializer initializer = SeContainerInitializer.newInstance();
        container = initializer.initialize();

        ResteasyCdiExtension extension = container.select(ResteasyCdiExtension.class).get();

        server = new UndertowJaxrsServer();
        ResteasyDeployment deployment = new ResteasyDeployment();

        deployment.setApplication(new JaxrsActivator());
        deployment.setActualResourceClasses(extension.getResources());
        deployment.setActualProviderClasses(extension.getProviders());
        deployment.setInjectorFactoryClass(CdiInjectorFactory.class.getName());

        server.deploy(deployment);
        server.start(Undertow.builder().addHttpListener(port, "0.0.0.0"));
    }

    public void shutdown() {
        server.stop();
        container.close();
    }
}

ちょっとCDIを使うように細工をしてあるくらいですね、変わったところは。

CDI 2.0のSeContainer/SeContainerInitializerは、初めて使いました。

確認

それでは、動作確認してみましょう。

起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.resteasy.reactive.Server

クライアントの登録。

$ curl -iv localhost:8080/reactive/stream
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /reactive/stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Connection: keep-alive
Connection: keep-alive
< Transfer-Encoding: chunked
Transfer-Encoding: chunked
< Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
< Date: Tue, 04 Sep 2018 13:04:49 GMT
Date: Tue, 04 Sep 2018 13:04:49 GMT

< 

ここで、curlが待ち状態に入ります。

他のターミナルで、こんな感じにデータを送ってみると

$ curl -iv 'localhost:8080/reactive/put?key=key1&value=value1'
$ curl -iv 'localhost:8080/reactive/put?key=key2&value=value2'

待機している側には、こんな感じで表示されます。

data: key1=value1

data: key2=value2

このままcurlでデータを投げ込んでいてもよいですし、さらに追加でイベントを受け取るクライアントを追加してもOKです。

$ curl -iv localhost:8080/reactive/stream
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /reactive/stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Connection: keep-alive
Connection: keep-alive
< Transfer-Encoding: chunked
Transfer-Encoding: chunked
< Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
< Date: Tue, 04 Sep 2018 13:07:52 GMT
Date: Tue, 04 Sep 2018 13:07:52 GMT

< 

このあと、データを投げ込むと、いずれのコンソールにも放り込んだデータがレスポンスとして表示されることを確認できます。

で、ここでひとつクライアントを切断してみます。

そしてデータを投げ込むと、途中でUndertowがBroken Pipeを検出して切断されていたクライアントの相手をしていたPublisher(Flowable)に キャンセルをかけます。

9 04, 2018 10:09:16 午後 org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl writeEvent
ERROR: RESTEASY002030: Failed to write event org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl@58e53250
java.io.IOException: Broken pipe

...

9 04, 2018 10:09:16 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$values$3
INFO: cancel!!
9 04, 2018 10:09:16 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$null$0
INFO: [XNIO-1 task-7] unregistered listener

Listenerを解除しているところが確認できます。

ところで、いろいろdoOn〜を設定していたのですが、動いているのは今回はdoOnCancelのみです…。

                .doOnError(error -> logger.infof("error => %s", error.getMessage()))
                .doOnCancel(() -> logger.infof("cancel!!"))  // ココでListenerを解除してもよい?
                .doOnComplete(() -> logger.infof("complete"))
                .doOnTerminate(() -> logger.infof("terminate"));

これなんですが、最初デプロイ先をNettyにしていたらcancelすら起こらずに途方に暮れ、Servletコンテナに変更した方がいいかな?と Undertowに変えたら、こういう挙動になったので、今回はこちらを採用…。

TomcatやJetty、WildFlyだとどういう挙動になるんでしょうね…。

また、スタックトレースにちょこっと現れていますが、この仕組みを追うとServer-Sent-Eventまわりのコードが出てきます。

https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-jaxrs/src/main/java/org/jboss/resteasy/core/AsyncResponseConsumer.java

https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-jaxrs/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java#L139

なお、このブログでSSEが出てきたのは初めてです…。

クライアント側

最後に、クライアント側のコードを書いてみましょう。

このあたりを参考にして。

https://github.com/resteasy/Resteasy/blob/3.6.1.Final/resteasy-rxjava2/src/test/java/org/jboss/resteasy/rxjava2/RxTest.java

src/test/java/org/littlewings/resteasy/reactive/ReactiveClientTest.java

package org.littlewings.resteasy.reactive;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;

import io.reactivex.subscribers.TestSubscriber;
import org.jboss.resteasy.rxjava2.FlowableRxInvoker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ReactiveClientTest {
    Server server;

    @BeforeEach
    public void setUp() {
        server = new Server();
        server.start();
    }

    @AfterEach
    public void tearDown() {
        server.shutdown();
    }

    // ここに、テストを書く!
}

テストの開始、終了時にServerを起動、停止するように仕込んでおきます。

クライアント側は、Invocation.Builderのrxを使って、RxJava 2と統合します。

    @Test
    public void rxclient() {
        Client client = ClientBuilder.newBuilder().build();

        FlowableRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
        TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();

        client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
        subscriber.assertValue("key1=value1");

        client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
        subscriber.assertValues("key1=value1", "key2=value2");

        client.close();

        subscriber.awaitTerminalEvent();
    }

今回は、このFlowableRxInvoker(というかFlowable)から取得できるTestSubscriberでテストを行いました。

        TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();

curlでやった時と同じように、データの送信の度にFlowable(SSE)で受け取っている方にデータが返ってくることが確認できます。

        client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
        subscriber.assertValue("key1=value1");

        client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
        subscriber.assertValues("key1=value1", "key2=value2");

Flowableで受け取る先を、増やしてもOK。

    @Test
    public void multipleInvoke() {
        Client client = ClientBuilder.newBuilder().build();

        FlowableRxInvoker invoker1 = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
        TestSubscriber<String> subscriber1 = (TestSubscriber<String>) invoker1.get(String.class).test();

        client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
        subscriber1.assertValue("key1=value1");

        FlowableRxInvoker invoker2 = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
        TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();

        client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
        subscriber1.assertValues("key1=value1", "key2=value2");
        subscriber2.assertValues("key2=value2");

        client.close();

        subscriber1.awaitTerminalEvent();
        subscriber2.awaitTerminalEvent();
    }

途中で切断するケースも。

    @Test
    public void rxclientAndClosed() {
        Client client = ClientBuilder.newBuilder().build();

        FlowableRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
        TestSubscriber<String> subscriber = (TestSubscriber<String>) invoker.get(String.class).test();

        client.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
        subscriber.assertValue("key1=value1");

        client.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
        subscriber.assertValues("key1=value1", "key2=value2");

        client.close();

        Client client2 = ClientBuilder.newBuilder().build();

        FlowableRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
        TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();

        client2.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
        client2.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
        client2.target("http://localhost:8080/reactive/put?key=key3&value=value3").request().get().close();
        subscriber2.assertValues("key1=value1", "key2=value2", "key3=value3");

        client2.close();

        subscriber.awaitTerminalEvent();
        subscriber2.awaitTerminalEvent();
    }

最初のクライアントを切断した後に

        client.close();

        Client client2 = ClientBuilder.newBuilder().build();

        FlowableRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FlowableRxInvoker.class);
        TestSubscriber<String> subscriber2 = (TestSubscriber<String>) invoker2.get(String.class).test();

何回かデータを送っていると

        client2.target("http://localhost:8080/reactive/put?key=key1&value=value1").request().get().close();
        client2.target("http://localhost:8080/reactive/put?key=key2&value=value2").request().get().close();
        client2.target("http://localhost:8080/reactive/put?key=key3&value=value3").request().get().close();
        subscriber2.assertValues("key1=value1", "key2=value2", "key3=value3");

サーバー側ではBroken Pipeになってたりします。

9 04, 2018 10:19:12 午後 org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl writeEvent
ERROR: RESTEASY002030: Failed to write event org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl@76307346
java.io.IOException: Broken pipe

で、Listener解除、と。

9 04, 2018 10:19:12 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$values$3
INFO: cancel!!
9 04, 2018 10:19:12 午後 org.littlewings.resteasy.reactive.ReactiveResource lambda$null$0
INFO: [XNIO-1 task-6] unregistered listener

特にListenerの解除時の挙動とかけっこう不安なのですが、とりあえずやりたかったことは確認できました、と。 けっこう大変でしたけど、面白かったです。