CLOVER🍀

That was when it all began.

RESTEasy × Reactorを試す

これは、なにをしたくて書いたもの?

RESTEasy 4.1.0.Finalがリリースされました。

RESTEasy: RESTEasy 4.1.0.Final is available! |JBoss Developer

このリリースで、Reactor向けのモジュールが2つ追加されています。

two new modules for integration with the Project Reactor , namely a new implementation of the Async client engine interface based on Reactor Netty

ひとつは、MonoやFluxをサポートするモジュール。

Pluggable reactive types: RxJava 2 in RESTEasy

もうひとつは、RESTEasy JAX-RS ClientのHTTPエンジンとして、Reactor Nettyを使えるようにするモジュールです。

Reactor Netty Client Engine

以前、RESTEasyとRxJava 2を統合するモジュールで遊んだことがあるのですが、このReactor版として遊んでみることにします。

RESTEasy × RxJava 2を試す - CLOVER🍀

なので、今回はReactor Nettyを使ったJAX-RS Clientは利用しません。

Reactor Nettyを使ったJAX-RS Clientについては、こちらを参照。

RESTEasy JAX-RS Client × Reactor Netty - CLOVER🍀

お題

RxJava 2の時は、RESTEasyとRxJava 2を使い、InfinispanのListenerでServer Sent Eventsでデータをクライアントに返すプログラムを
作成しましたが、今回はReactorが使えるのでこんな感じのお題にしてみたいと思います。

  • RESTEasyでReactorのFluxを使ったResourceクラスを作成する
  • データは、RedisのPublish/Subscribeの仕組みを使って、データの追加、変更の通知からレスポンスのデータを作成する
  • Redisへのアクセスは、Reactorを使うこともあってLettuceを使用する
  • 最後にテストコードでクライアント側も書いてみる

作成するプログラムの大まかな挙動は、RxJava 2の時に作ったものと同じです。

Redisへのアクセスは、先にも書きましたがLettuceを使用します。

Lettuce

環境

今回の環境は、こちらです。

$ java -version
openjdk version "11.0.3" 2019-04-16
OpenJDK Runtime Environment (build 11.0.3+7-Ubuntu-1ubuntu218.04.1)
OpenJDK 64-Bit Server VM (build 11.0.3+7-Ubuntu-1ubuntu218.04.1, mixed mode, sharing)


$ mvn -version
Apache Maven 3.6.1 (d66c9c0b3152b2e69ee9bac180bb8fcc8e6af555; 2019-04-05T04:00:29+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.3, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-54-generic", arch: "amd64", family: "unix"

準備

Maven依存関係など、あらかたの設定はこちら。

    <dependencies>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-reactor</artifactId>
            <version>4.1.0.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-undertow</artifactId>
            <version>4.1.0.Final</version>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-client</artifactId>
            <version>4.1.0.Final</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.4.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.4.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.12.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.0</version>
            </plugin>
        </plugins>
    </build>

今回の中心は、resteasy-reactorです。

        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-reactor</artifactId>
            <version>4.1.0.Final</version>
        </dependency>

JAX-RSを駆動させるサーバーアダプターとしては、Undertow向けのものを使用します。

resteasy-clientは、テストで使用します。

また、Redisは以下のもので用意します。

  • バージョン … 5.0.5
  • IPアドレス … 172.17.0.2
  • パスワード … redispass

JAX-RSリソース関連のクラスを作成する

では、サーバーサイドから書いていきましょう。

作成した、RESTEasy+ReactorなJAX-RSリソースクラス。
src/main/java/org/littlewings/resteasy/reactor/ReactiveResource.java

package org.littlewings.resteasy.reactor;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import org.jboss.logging.Logger;
import org.jboss.resteasy.annotations.Stream;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

@Path("reactive")
public class ReactiveResource {
    Logger logger = Logger.getLogger(ReactiveResource.class);

    RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379");

    @GET
    @Path("stream")
    @Produces(MediaType.TEXT_PLAIN)
    @Stream
    public Flux<String> stream() {
        StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub();
        RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive();

        return Flux
                .<String>create(emitter -> {
                    redisPubSubReactiveCommands
                            .subscribe("channel")
                            .block();

                    Disposable disposable =
                            redisPubSubReactiveCommands
                            .observeChannels()
                            .doOnNext(channelMessage -> {
                                        emitter.next(channelMessage.getMessage());

                                        logger.info(String.format(
                                                "[%s] received message = %s",
                                                channelMessage.getChannel(),
                                                channelMessage.getMessage()));
                                    }
                            )
                            .subscribe();

                    emitter.onCancel(() -> {
                        redisPubSubReactiveCommands.unsubscribe("channel");
                        disposable.dispose();
                        logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
                    });

                    logger.infof("[%s] registered listener", Thread.currentThread().getName());
                })
                .doOnError(error -> logger.infof("error => %s", error.getMessage()))
                .doOnCancel(() -> logger.infof("cancel!!"))
                .doOnComplete(() -> logger.infof("complete"))
                .doOnTerminate(() -> {
                    logger.infof("terminate");
                    redisPubSubReactiveCommands.shutdown(false);
                    statefulRedisPubSubConnection.close();
                    // redisClient.shutdown();
                });
    }

    @GET
    @Path("put")
    @Produces(MediaType.TEXT_PLAIN)
    public String put(@QueryParam("message") String message) {
        StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub();
        RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive();

        redisPubSubReactiveCommands
                .publish("channel", message)
                .block();

        redisPubSubReactiveCommands.shutdown(false);
        statefulRedisPubSubConnection.close();
        // redisClient.shutdown();

        return "OK!";
    }
}

RxJava 2の時もそうでしたが、ポイントは@Streamアノテーションを使っていることです。

    @GET
    @Path("stream")
    @Produces(MediaType.TEXT_PLAIN)
    @Stream
    public Flux<String> stream() {

@StreamとFluxやMonoを組み合わせることで、Server Sent Eventsを使ったリアクティブな実装を行うことができます。

Representation on the wire

今回使用しているRESTEasyのReactor向けのモジュールでは、FluxやMonoに対するProviderが用意されています。

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-reactor/src/main/java/org/jboss/resteasy/reactor/FluxProvider.java

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-reactor/src/main/java/org/jboss/resteasy/reactor/MonoProvider.java

今回は、Fluxのみしか使いませんが。

Server Sent Eventsを使ったクライアントへのデータの送信は、RedisのSubscriberとして作成します。

Publish/Subscribe

LettuceのRedisClientから、Publish/Subscribe用のコマンドを作成します。

        StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub();
        RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive();

あとは、このコマンドでRedisからSubscribeした内容を、Fluxを使ってクライアントに送信します。

        return Flux
                .<String>create(emitter -> {
                    redisPubSubReactiveCommands
                            .subscribe("channel")
                            .block();

                    Disposable disposable =
                            redisPubSubReactiveCommands
                            .observeChannels()
                            .doOnNext(channelMessage -> {
                                        emitter.next(channelMessage.getMessage());

                                        logger.info(String.format(
                                                "[%s] received message = %s",
                                                channelMessage.getChannel(),
                                                channelMessage.getMessage()));
                                    }
                            )
                            .subscribe();

                    emitter.onCancel(() -> {
                        redisPubSubReactiveCommands.unsubscribe("channel");
                        disposable.dispose();
                        logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
                    });

                    logger.infof("[%s] registered listener", Thread.currentThread().getName());
                })
                .doOnError(error -> logger.infof("error => %s", error.getMessage()))
                .doOnCancel(() -> logger.infof("cancel!!"))
                .doOnComplete(() -> logger.infof("complete"))
                .doOnTerminate(() -> {
                    logger.infof("terminate");
                    redisPubSubReactiveCommands.shutdown(false);
                    statefulRedisPubSubConnection.close();
                    // redisClient.shutdown();
                });

クライアントが切断したことを検知したら、RedisからのSubscribeは解除するようにします。

                    emitter.onCancel(() -> {
                        redisPubSubReactiveCommands.unsubscribe("channel");
                        disposable.dispose();
                        logger.infof("[%s] unregistered listener", Thread.currentThread().getName());
                    });

このキャンセルを拾うのに、サーバーをNettyで動かしていてはダメで(RxJava 2で動かしていた時もそうでしたが)、Undertowを
利用することにしました。

データの登録は、同じチャンネルに対してPublishします。

    @GET
    @Path("put")
    @Produces(MediaType.TEXT_PLAIN)
    public String put(@QueryParam("message") String message) {
        StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub();
        RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive();

        redisPubSubReactiveCommands
                .publish("channel", message)
                .block();

        redisPubSubReactiveCommands.shutdown(false);
        statefulRedisPubSubConnection.close();
        // redisClient.shutdown();

        return "OK!";
    }

Redisへの接続をいくつも作っているのですが、これはRedisに対してSubscribeすると、その接続を専有するからですね…。

Redisに関する情報をそのまま書いているのは、今回は簡単に済ませているというのもありますが。

Applicationクラス、起動クラス

あとは、Applicationクラスと起動クラスを書いていきましょう。

Applicationクラス(のサブクラス)は、こちら。
src/main/java/org/littlewings/resteasy/reactor/JaxrsActivator.java

package org.littlewings.resteasy.reactor;

import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("")
public class JaxrsActivator extends Application {
    Set<Object> singletons = new HashSet<>();

    public JaxrsActivator() {
        singletons.add(new ReactiveResource());
    }

    @Override
    public Set<Object> getSingletons() {
        return singletons;
    }
}

起動クラスは、こちら。Undertowに、Applicationクラスをデプロイして終了です。
src/main/java/org/littlewings/resteasy/reactor/Server.java

package org.littlewings.resteasy.reactor;

import io.undertow.Undertow;
import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer;

public class Server {
    UndertowJaxrsServer server;
    int port;

    public static void main(String... args) {
        Server server = new Server();
        server.start();
    }

    public Server() {
        this(8080);
    }

    public Server(int port) {
        this.port = port;
    }

    public void start() {
        server = new UndertowJaxrsServer();
        server.deploy(JaxrsActivator.class);

        server.start(Undertow.builder().addHttpListener(port, "localhost"));
    }

    public void shutdown() {
        server.stop();
    }
}

確認する

では、確認してみましょう。

サーバーを起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.resteasy.reactor.Server

ひとつ、curlでServer Sent Eventsを扱うエンドポイントに接続します。

$ curl -i localhost:8080/reactive/stream
HTTP/1.1 200 OK
Connection: keep-alive
Transfer-Encoding: chunked
Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
Date: Sat, 29 Jun 2019 09:25:49 GMT

このターミナルは、待ち状態になります。

では、別のターミナルからデータを送ってみます。

$ curl localhost:8080/reactive/put?message=hello1
OK!

$ curl localhost:8080/reactive/put?message=hello2
OK!

$ curl localhost:8080/reactive/put?message=hello3
OK!

すると、最初に起動させておいたターミナルの方に、データが届きます。

data: hello1

data: hello2

data: hello3

ここで、もうひとつ別のターミナルからServer Sent Eventsを扱うエンドポイントに接続します。

$ curl -i localhost:8080/reactive/stream
HTTP/1.1 200 OK
Connection: keep-alive
Transfer-Encoding: chunked
Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8"
Date: Sat, 29 Jun 2019 09:28:11 GMT

データを送ってみます。

$ curl localhost:8080/reactive/put?message=hello4
OK!

$ curl localhost:8080/reactive/put?message=hello5
OK!

$ curl localhost:8080/reactive/put?message=hello6
OK!

すると、待機状態になっている2つのターミナルに、データが表示されます。

## 最初に接続した方(最初の3つのメッセージは、以前に受信したもの)
data: hello1

data: hello2

data: hello3

data: hello4

data: hello5

data: hello6


## 後から接続した方
data: hello1

data: hello2

data: hello3

data: hello4

data: hello5

data: hello6

ここで、最初に起動したServer Sent Eventsに接続しているcurlプロセスを終了させます。で、そのままデータを送っていると

$ curl localhost:8080/reactive/put?message=hello7
OK!

$ curl localhost:8080/reactive/put?message=hello8
OK!

$ curl localhost:8080/reactive/put?message=hello9
OK!

クライアントがいなくなったことを検出してCancelが発生するので、これに合わせてSubscribeを解除。

INFO: cancel!!
6月 29, 2019 6:30:20 午後 org.littlewings.resteasy.reactor.ReactiveResource lambda$stream$1
INFO: [lettuce-nioEventLoop-4-1] unregistered listener

動作確認としては、OKですね。

あと、@Streamに関連するソースも少々載せておきます…。

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-core/src/main/java/org/jboss/resteasy/core/AsyncResponseConsumer.java#L71

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java#L103

クライアント側

では、続いてクライアント側にいきましょう。

Reactor向けのRxInvokerがあるので、こちらを使ってMonoやFluxを使ってレスポンスを受け取ることができます。

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-reactor/src/main/java/org/jboss/resteasy/reactor/FluxRxInvokerImpl.java

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-reactor/src/main/java/org/jboss/resteasy/reactor/FluxRxInvokerProvider.java

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-reactor/src/main/java/org/jboss/resteasy/reactor/MonoRxInvokerImpl.java

https://github.com/resteasy/Resteasy/blob/4.1.0.Final/resteasy-reactor/src/main/java/org/jboss/resteasy/reactor/MonoRxInvokerProvider.java

で、作成したテストコードの雛形。
src/test/java/org/littlewings/resteasy/reactor/ReactiveClientTest.java

package org.littlewings.resteasy.reactor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;

import org.jboss.resteasy.reactor.FluxRxInvoker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import static org.assertj.core.api.Assertions.assertThat;

public class ReactiveClientTest {
    Server server;

    @BeforeEach
    public void setUp() {
        server = new Server();
        server.start();
    }

    @AfterEach
    public void tearDown() {
        server.shutdown();
    }

    // ここに、テストを書く!
}

今回は、FluxRxInvokerを使って、レスポンスをFluxとして受け取るようにしました。

    @Test
    public void rxclient() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);

        Client client = ClientBuilder.newBuilder().build();

        FluxRxInvoker invoker =
                client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class);
        @SuppressWarnings("unchecked")
        Flux<String> responseStream = (Flux<String>) invoker.get(String.class);

        List<String> receiveMessages = Collections.synchronizedList(new ArrayList<>());

        responseStream.subscribe(m -> {
                    receiveMessages.add(m);
                    latch.countDown();
                },
                th -> latch.countDown(),
                () -> latch.countDown());

        client.target("http://localhost:8080/reactive/put?message=hello1").request().get().close();
        client.target("http://localhost:8080/reactive/put?message=hello2").request().get().close();

        latch.await();

        assertThat(receiveMessages)
                .containsExactly("hello1", "hello2");

        client.close();
    }

テストはreactor-testのStepVerifierを使って書きたいところだったのですが、FluxをSubscribeしておかないとデータを送ってもPublish時に
どうにもSubscriberがいない状態になってしまったので、今回はCountDownLatchで逃げました。

あとは、Server Sent Eventsによるデータ受信を2回行い、それぞれ別々のデータが受信できることを確認したり

    @Test
    public void rxclientAndClosed() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);

        Client client = ClientBuilder.newBuilder().build();

        FluxRxInvoker invoker =
                client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class);
        @SuppressWarnings("unchecked")
        Flux<String> responseStream = (Flux<String>) invoker.get(String.class);

        List<String> receiveMessages = Collections.synchronizedList(new ArrayList<>());

        responseStream.subscribe(m -> {
                    receiveMessages.add(m);
                    latch.countDown();
                },
                th -> latch.countDown(),
                () -> latch.countDown());

        client.target("http://localhost:8080/reactive/put?message=hello1").request().get().close();
        client.target("http://localhost:8080/reactive/put?message=hello2").request().get().close();

        assertThat(receiveMessages)
                .containsExactly("hello1", "hello2");

        client.close();

        CountDownLatch latch2 = new CountDownLatch(3);


        Client client2 = ClientBuilder.newBuilder().build();

        FluxRxInvoker invoker2 =
                client2.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class);
        @SuppressWarnings("unchecked")
        Flux<String> responseStream2 = (Flux<String>) invoker2.get(String.class);

        List<String> receiveMessages2 = Collections.synchronizedList(new ArrayList<>());

        responseStream2.subscribe(m -> {
                    receiveMessages2.add(m);
                    latch2.countDown();
                },
                th -> latch2.countDown(),
                () -> latch2.countDown());

        client2.target("http://localhost:8080/reactive/put?message=hello3").request().get().close();
        client2.target("http://localhost:8080/reactive/put?message=hello4").request().get().close();
        client2.target("http://localhost:8080/reactive/put?message=hello5").request().get().close();

        latch2.await();

        assertThat(receiveMessages2)
                .containsExactly("hello3", "hello4", "hello5");

        client2.close();
    }

Server Sent Eventsを扱うエンドポイントに複数接続し、接続後は各接続に対してブロードキャスト的にデータを受け取れることも確認。

    @Test
    public void multipleInvoke() throws InterruptedException {
        Client client = ClientBuilder.newBuilder().build();


        CountDownLatch latch1 = new CountDownLatch(1);

        FluxRxInvoker invoker1 =
                client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class);
        @SuppressWarnings("unchecked")
        Flux<String> responseStream1 = (Flux<String>) invoker1.get(String.class);

        List<String> receiveMessages1 = Collections.synchronizedList(new ArrayList<>());

        responseStream1.subscribe(m -> {
                    receiveMessages1.add(m);
                    latch1.countDown();
                },
                th -> latch1.countDown(),
                () -> latch1.countDown());

        client.target("http://localhost:8080/reactive/put?message=hello1").request().get().close();

        latch1.await();

        assertThat(receiveMessages1)
                .containsExactly("hello1");


        CountDownLatch latch2 = new CountDownLatch(2);

        FluxRxInvoker invoker2 =
                client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class);
        @SuppressWarnings("unchecked")
        Flux<String> responseStream2 = (Flux<String>) invoker2.get(String.class);

        List<String> receiveMessages2 = Collections.synchronizedList(new ArrayList<>());

        responseStream2.subscribe(m -> {
                    receiveMessages2.add(m);
                    latch2.countDown();
                },
                th -> latch2.countDown(),
                () -> latch2.countDown());

        client.target("http://localhost:8080/reactive/put?message=hello2").request().get().close();
        client.target("http://localhost:8080/reactive/put?message=hello3").request().get().close();

        latch2.await();

        assertThat(receiveMessages1)
                .containsExactly("hello1", "hello2", "hello3");
        assertThat(receiveMessages2)
                .containsExactly("hello2", "hello3");


        client.close();
    }

確認。

$ mvn test

Cancelができていることも、(ログで)確認。

INFO: cancel!!
6月 29, 2019 6:44:08 午後 org.littlewings.resteasy.reactor.ReactiveResource lambda$stream$0
INFO: [channel] received message = hello1
6月 29, 2019 6:44:08 午後 org.littlewings.resteasy.reactor.ReactiveResource lambda$stream$1
INFO: [lettuce-nioEventLoop-7-1] unregistered listener

こちらもOKそうですね。

まとめ

RESTEasyのReactor向けのモジュールを試してみました。

このモジュールは、割とRxJava 2向けのもののコピーでできていたりするのですが、

[RESTEASY-2204] spring-reactor RxInvoker (almost exact copy of rxjava2 one) by crankydillo · Pull Request #2065 · resteasy/Resteasy · GitHub

細かい挙動がけっこう違ったりして、クライアント側でやや苦戦しました。

RxJava 2の時は、TestSubscriberで十分確認できていたのに、今回はCountDownLatchを持ち出すことに…。

サーバー側は、1度RxJava 2でやっていたのでそれほど苦労せず。

あとは、Reactor Nettyを使ったJAX-RS Clientを確認しようとしたのですが、こちらはもっと苦戦したので…またの機会に、少しテーマを
変えてトライしたいと思います。

追記) 書きました。

RESTEasy JAX-RS Client × Reactor Netty - CLOVER🍀