これは、なにをしたくて書いたもの?
RESTEasy 4.1.0.Finalがリリースされました。
RESTEasy: RESTEasy 4.1.0.Final is available! |JBoss Developer
このリリースで、Reactor向けのモジュールが2つ追加されています。
two new modules for integration with the Project Reactor , namely a new implementation of the Async client engine interface based on Reactor Netty
ひとつは、MonoやFluxをサポートするモジュール。
Pluggable reactive types: RxJava 2 in RESTEasy
もうひとつは、RESTEasy JAX-RS ClientのHTTPエンジンとして、Reactor Nettyを使えるようにするモジュールです。
以前、RESTEasyとRxJava 2を統合するモジュールで遊んだことがあるのですが、このReactor版として遊んでみることにします。
RESTEasy × RxJava 2を試す - CLOVER🍀
なので、今回はReactor Nettyを使ったJAX-RS Clientは利用しません。
Reactor Nettyを使ったJAX-RS Clientについては、こちらを参照。
RESTEasy JAX-RS Client × Reactor Netty - CLOVER🍀
お題
RxJava 2の時は、RESTEasyとRxJava 2を使い、InfinispanのListenerでServer Sent Eventsでデータをクライアントに返すプログラムを
作成しましたが、今回はReactorが使えるのでこんな感じのお題にしてみたいと思います。
- RESTEasyでReactorのFluxを使ったResourceクラスを作成する
- データは、RedisのPublish/Subscribeの仕組みを使って、データの追加、変更の通知からレスポンスのデータを作成する
- Redisへのアクセスは、Reactorを使うこともあってLettuceを使用する
- 最後にテストコードでクライアント側も書いてみる
作成するプログラムの大まかな挙動は、RxJava 2の時に作ったものと同じです。
Redisへのアクセスは、先にも書きましたがLettuceを使用します。
環境
今回の環境は、こちらです。
$ java -version openjdk version "11.0.3" 2019-04-16 OpenJDK Runtime Environment (build 11.0.3+7-Ubuntu-1ubuntu218.04.1) OpenJDK 64-Bit Server VM (build 11.0.3+7-Ubuntu-1ubuntu218.04.1, mixed mode, sharing) $ mvn -version Apache Maven 3.6.1 (d66c9c0b3152b2e69ee9bac180bb8fcc8e6af555; 2019-04-05T04:00:29+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.3, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-54-generic", arch: "amd64", family: "unix"
準備
Maven依存関係など、あらかたの設定はこちら。
<dependencies> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-reactor</artifactId> <version>4.1.0.Final</version> </dependency> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-undertow</artifactId> <version>4.1.0.Final</version> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-client</artifactId> <version>4.1.0.Final</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.4.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.4.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.12.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.0</version> </plugin> </plugins> </build>
今回の中心は、resteasy-reactorです。
<dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-reactor</artifactId> <version>4.1.0.Final</version> </dependency>
JAX-RSを駆動させるサーバーアダプターとしては、Undertow向けのものを使用します。
resteasy-clientは、テストで使用します。
また、Redisは以下のもので用意します。
- バージョン … 5.0.5
- IPアドレス … 172.17.0.2
- パスワード … redispass
JAX-RSリソース関連のクラスを作成する
では、サーバーサイドから書いていきましょう。
作成した、RESTEasy+ReactorなJAX-RSリソースクラス。
src/main/java/org/littlewings/resteasy/reactor/ReactiveResource.java
package org.littlewings.resteasy.reactor; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import org.jboss.logging.Logger; import org.jboss.resteasy.annotations.Stream; import reactor.core.Disposable; import reactor.core.publisher.Flux; @Path("reactive") public class ReactiveResource { Logger logger = Logger.getLogger(ReactiveResource.class); RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379"); @GET @Path("stream") @Produces(MediaType.TEXT_PLAIN) @Stream public Flux<String> stream() { StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub(); RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive(); return Flux .<String>create(emitter -> { redisPubSubReactiveCommands .subscribe("channel") .block(); Disposable disposable = redisPubSubReactiveCommands .observeChannels() .doOnNext(channelMessage -> { emitter.next(channelMessage.getMessage()); logger.info(String.format( "[%s] received message = %s", channelMessage.getChannel(), channelMessage.getMessage())); } ) .subscribe(); emitter.onCancel(() -> { redisPubSubReactiveCommands.unsubscribe("channel"); disposable.dispose(); logger.infof("[%s] unregistered listener", Thread.currentThread().getName()); }); logger.infof("[%s] registered listener", Thread.currentThread().getName()); }) .doOnError(error -> logger.infof("error => %s", error.getMessage())) .doOnCancel(() -> logger.infof("cancel!!")) .doOnComplete(() -> logger.infof("complete")) .doOnTerminate(() -> { logger.infof("terminate"); redisPubSubReactiveCommands.shutdown(false); statefulRedisPubSubConnection.close(); // redisClient.shutdown(); }); } @GET @Path("put") @Produces(MediaType.TEXT_PLAIN) public String put(@QueryParam("message") String message) { StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub(); RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive(); redisPubSubReactiveCommands .publish("channel", message) .block(); redisPubSubReactiveCommands.shutdown(false); statefulRedisPubSubConnection.close(); // redisClient.shutdown(); return "OK!"; } }
RxJava 2の時もそうでしたが、ポイントは@Streamアノテーションを使っていることです。
@GET @Path("stream") @Produces(MediaType.TEXT_PLAIN) @Stream public Flux<String> stream() {
@StreamとFluxやMonoを組み合わせることで、Server Sent Eventsを使ったリアクティブな実装を行うことができます。
今回使用しているRESTEasyのReactor向けのモジュールでは、FluxやMonoに対するProviderが用意されています。
今回は、Fluxのみしか使いませんが。
Server Sent Eventsを使ったクライアントへのデータの送信は、RedisのSubscriberとして作成します。
LettuceのRedisClientから、Publish/Subscribe用のコマンドを作成します。
StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub(); RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive();
あとは、このコマンドでRedisからSubscribeした内容を、Fluxを使ってクライアントに送信します。
return Flux .<String>create(emitter -> { redisPubSubReactiveCommands .subscribe("channel") .block(); Disposable disposable = redisPubSubReactiveCommands .observeChannels() .doOnNext(channelMessage -> { emitter.next(channelMessage.getMessage()); logger.info(String.format( "[%s] received message = %s", channelMessage.getChannel(), channelMessage.getMessage())); } ) .subscribe(); emitter.onCancel(() -> { redisPubSubReactiveCommands.unsubscribe("channel"); disposable.dispose(); logger.infof("[%s] unregistered listener", Thread.currentThread().getName()); }); logger.infof("[%s] registered listener", Thread.currentThread().getName()); }) .doOnError(error -> logger.infof("error => %s", error.getMessage())) .doOnCancel(() -> logger.infof("cancel!!")) .doOnComplete(() -> logger.infof("complete")) .doOnTerminate(() -> { logger.infof("terminate"); redisPubSubReactiveCommands.shutdown(false); statefulRedisPubSubConnection.close(); // redisClient.shutdown(); });
クライアントが切断したことを検知したら、RedisからのSubscribeは解除するようにします。
emitter.onCancel(() -> { redisPubSubReactiveCommands.unsubscribe("channel"); disposable.dispose(); logger.infof("[%s] unregistered listener", Thread.currentThread().getName()); });
このキャンセルを拾うのに、サーバーをNettyで動かしていてはダメで(RxJava 2で動かしていた時もそうでしたが)、Undertowを
利用することにしました。
データの登録は、同じチャンネルに対してPublishします。
@GET @Path("put") @Produces(MediaType.TEXT_PLAIN) public String put(@QueryParam("message") String message) { StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection = redisClient.connectPubSub(); RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands = statefulRedisPubSubConnection.reactive(); redisPubSubReactiveCommands .publish("channel", message) .block(); redisPubSubReactiveCommands.shutdown(false); statefulRedisPubSubConnection.close(); // redisClient.shutdown(); return "OK!"; }
Redisへの接続をいくつも作っているのですが、これはRedisに対してSubscribeすると、その接続を専有するからですね…。
Redisに関する情報をそのまま書いているのは、今回は簡単に済ませているというのもありますが。
Applicationクラス、起動クラス
あとは、Applicationクラスと起動クラスを書いていきましょう。
Applicationクラス(のサブクラス)は、こちら。
src/main/java/org/littlewings/resteasy/reactor/JaxrsActivator.java
package org.littlewings.resteasy.reactor; import java.util.HashSet; import java.util.Set; import javax.ws.rs.ApplicationPath; import javax.ws.rs.core.Application; @ApplicationPath("") public class JaxrsActivator extends Application { Set<Object> singletons = new HashSet<>(); public JaxrsActivator() { singletons.add(new ReactiveResource()); } @Override public Set<Object> getSingletons() { return singletons; } }
起動クラスは、こちら。Undertowに、Applicationクラスをデプロイして終了です。
src/main/java/org/littlewings/resteasy/reactor/Server.java
package org.littlewings.resteasy.reactor; import io.undertow.Undertow; import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer; public class Server { UndertowJaxrsServer server; int port; public static void main(String... args) { Server server = new Server(); server.start(); } public Server() { this(8080); } public Server(int port) { this.port = port; } public void start() { server = new UndertowJaxrsServer(); server.deploy(JaxrsActivator.class); server.start(Undertow.builder().addHttpListener(port, "localhost")); } public void shutdown() { server.stop(); } }
確認する
では、確認してみましょう。
サーバーを起動。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.resteasy.reactor.Server
ひとつ、curlでServer Sent Eventsを扱うエンドポイントに接続します。
$ curl -i localhost:8080/reactive/stream HTTP/1.1 200 OK Connection: keep-alive Transfer-Encoding: chunked Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8" Date: Sat, 29 Jun 2019 09:25:49 GMT
このターミナルは、待ち状態になります。
では、別のターミナルからデータを送ってみます。
$ curl localhost:8080/reactive/put?message=hello1 OK! $ curl localhost:8080/reactive/put?message=hello2 OK! $ curl localhost:8080/reactive/put?message=hello3 OK!
すると、最初に起動させておいたターミナルの方に、データが届きます。
data: hello1 data: hello2 data: hello3
ここで、もうひとつ別のターミナルからServer Sent Eventsを扱うエンドポイントに接続します。
$ curl -i localhost:8080/reactive/stream HTTP/1.1 200 OK Connection: keep-alive Transfer-Encoding: chunked Content-Type: application/x-stream-general;element-type="text/plain;charset=UTF-8" Date: Sat, 29 Jun 2019 09:28:11 GMT
データを送ってみます。
$ curl localhost:8080/reactive/put?message=hello4 OK! $ curl localhost:8080/reactive/put?message=hello5 OK! $ curl localhost:8080/reactive/put?message=hello6 OK!
すると、待機状態になっている2つのターミナルに、データが表示されます。
## 最初に接続した方(最初の3つのメッセージは、以前に受信したもの) data: hello1 data: hello2 data: hello3 data: hello4 data: hello5 data: hello6 ## 後から接続した方 data: hello1 data: hello2 data: hello3 data: hello4 data: hello5 data: hello6
ここで、最初に起動したServer Sent Eventsに接続しているcurlプロセスを終了させます。で、そのままデータを送っていると
$ curl localhost:8080/reactive/put?message=hello7 OK! $ curl localhost:8080/reactive/put?message=hello8 OK! $ curl localhost:8080/reactive/put?message=hello9 OK!
クライアントがいなくなったことを検出してCancelが発生するので、これに合わせてSubscribeを解除。
INFO: cancel!! 6月 29, 2019 6:30:20 午後 org.littlewings.resteasy.reactor.ReactiveResource lambda$stream$1 INFO: [lettuce-nioEventLoop-4-1] unregistered listener
動作確認としては、OKですね。
あと、@Streamに関連するソースも少々載せておきます…。
クライアント側
では、続いてクライアント側にいきましょう。
Reactor向けのRxInvokerがあるので、こちらを使ってMonoやFluxを使ってレスポンスを受け取ることができます。
で、作成したテストコードの雛形。
src/test/java/org/littlewings/resteasy/reactor/ReactiveClientTest.java
package org.littlewings.resteasy.reactor; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import org.jboss.resteasy.reactor.FluxRxInvoker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import static org.assertj.core.api.Assertions.assertThat; public class ReactiveClientTest { Server server; @BeforeEach public void setUp() { server = new Server(); server.start(); } @AfterEach public void tearDown() { server.shutdown(); } // ここに、テストを書く! }
今回は、FluxRxInvokerを使って、レスポンスをFluxとして受け取るようにしました。
@Test public void rxclient() throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); Client client = ClientBuilder.newBuilder().build(); FluxRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class); @SuppressWarnings("unchecked") Flux<String> responseStream = (Flux<String>) invoker.get(String.class); List<String> receiveMessages = Collections.synchronizedList(new ArrayList<>()); responseStream.subscribe(m -> { receiveMessages.add(m); latch.countDown(); }, th -> latch.countDown(), () -> latch.countDown()); client.target("http://localhost:8080/reactive/put?message=hello1").request().get().close(); client.target("http://localhost:8080/reactive/put?message=hello2").request().get().close(); latch.await(); assertThat(receiveMessages) .containsExactly("hello1", "hello2"); client.close(); }
テストはreactor-testのStepVerifierを使って書きたいところだったのですが、FluxをSubscribeしておかないとデータを送ってもPublish時に
どうにもSubscriberがいない状態になってしまったので、今回はCountDownLatchで逃げました。
あとは、Server Sent Eventsによるデータ受信を2回行い、それぞれ別々のデータが受信できることを確認したり
@Test public void rxclientAndClosed() throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); Client client = ClientBuilder.newBuilder().build(); FluxRxInvoker invoker = client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class); @SuppressWarnings("unchecked") Flux<String> responseStream = (Flux<String>) invoker.get(String.class); List<String> receiveMessages = Collections.synchronizedList(new ArrayList<>()); responseStream.subscribe(m -> { receiveMessages.add(m); latch.countDown(); }, th -> latch.countDown(), () -> latch.countDown()); client.target("http://localhost:8080/reactive/put?message=hello1").request().get().close(); client.target("http://localhost:8080/reactive/put?message=hello2").request().get().close(); assertThat(receiveMessages) .containsExactly("hello1", "hello2"); client.close(); CountDownLatch latch2 = new CountDownLatch(3); Client client2 = ClientBuilder.newBuilder().build(); FluxRxInvoker invoker2 = client2.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class); @SuppressWarnings("unchecked") Flux<String> responseStream2 = (Flux<String>) invoker2.get(String.class); List<String> receiveMessages2 = Collections.synchronizedList(new ArrayList<>()); responseStream2.subscribe(m -> { receiveMessages2.add(m); latch2.countDown(); }, th -> latch2.countDown(), () -> latch2.countDown()); client2.target("http://localhost:8080/reactive/put?message=hello3").request().get().close(); client2.target("http://localhost:8080/reactive/put?message=hello4").request().get().close(); client2.target("http://localhost:8080/reactive/put?message=hello5").request().get().close(); latch2.await(); assertThat(receiveMessages2) .containsExactly("hello3", "hello4", "hello5"); client2.close(); }
Server Sent Eventsを扱うエンドポイントに複数接続し、接続後は各接続に対してブロードキャスト的にデータを受け取れることも確認。
@Test public void multipleInvoke() throws InterruptedException { Client client = ClientBuilder.newBuilder().build(); CountDownLatch latch1 = new CountDownLatch(1); FluxRxInvoker invoker1 = client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class); @SuppressWarnings("unchecked") Flux<String> responseStream1 = (Flux<String>) invoker1.get(String.class); List<String> receiveMessages1 = Collections.synchronizedList(new ArrayList<>()); responseStream1.subscribe(m -> { receiveMessages1.add(m); latch1.countDown(); }, th -> latch1.countDown(), () -> latch1.countDown()); client.target("http://localhost:8080/reactive/put?message=hello1").request().get().close(); latch1.await(); assertThat(receiveMessages1) .containsExactly("hello1"); CountDownLatch latch2 = new CountDownLatch(2); FluxRxInvoker invoker2 = client.target("http://localhost:8080/reactive/stream").request().rx(FluxRxInvoker.class); @SuppressWarnings("unchecked") Flux<String> responseStream2 = (Flux<String>) invoker2.get(String.class); List<String> receiveMessages2 = Collections.synchronizedList(new ArrayList<>()); responseStream2.subscribe(m -> { receiveMessages2.add(m); latch2.countDown(); }, th -> latch2.countDown(), () -> latch2.countDown()); client.target("http://localhost:8080/reactive/put?message=hello2").request().get().close(); client.target("http://localhost:8080/reactive/put?message=hello3").request().get().close(); latch2.await(); assertThat(receiveMessages1) .containsExactly("hello1", "hello2", "hello3"); assertThat(receiveMessages2) .containsExactly("hello2", "hello3"); client.close(); }
確認。
$ mvn test
Cancelができていることも、(ログで)確認。
INFO: cancel!! 6月 29, 2019 6:44:08 午後 org.littlewings.resteasy.reactor.ReactiveResource lambda$stream$0 INFO: [channel] received message = hello1 6月 29, 2019 6:44:08 午後 org.littlewings.resteasy.reactor.ReactiveResource lambda$stream$1 INFO: [lettuce-nioEventLoop-7-1] unregistered listener
こちらもOKそうですね。
まとめ
RESTEasyのReactor向けのモジュールを試してみました。
このモジュールは、割とRxJava 2向けのもののコピーでできていたりするのですが、
細かい挙動がけっこう違ったりして、クライアント側でやや苦戦しました。
RxJava 2の時は、TestSubscriberで十分確認できていたのに、今回はCountDownLatchを持ち出すことに…。
サーバー側は、1度RxJava 2でやっていたのでそれほど苦労せず。
あとは、Reactor Nettyを使ったJAX-RS Clientを確認しようとしたのですが、こちらはもっと苦戦したので…またの機会に、少しテーマを
変えてトライしたいと思います。
追記) 書きました。