CLOVER🍀

That was when it all began.

Infinispan 10のNew Reactive API(Hot Rod)を試す

これは、なにをしたくて書いたもの?

まだ開発中のInfinispan 10ですが、こちらに新しいReactive APIが含まれようとしています。

Infinispan: Infinispan 10.0.0.Beta4

The first implementation of our new Reactive API have been merged. This is still work in progress and the API will receive major changes until the Final release.
The new API includes a new API module and a new KeyValueStore Hot Rod client where search, continuous search and Key Value store methods are included.

最終リリースに向けてまだ変わる可能性はありますが、1度試してみようかなと。

Infinispan New Reactive API

ちょっと前から、InfinispanのGitHubリポジトリで新しいAPIが開発されているのは、なんとなく気づいていました。

[ISPN-9893] New Reactive API - JBoss Issue Tracker

ドキュメントにこそ載っていませんが、Hot Rod向けの新しいクライアントAPIが含まれたようなので、こちらを見ていこうかなと。

https://github.com/infinispan/infinispan/tree/10.0.0.CR2/client/infinispan-key-value-store-hotrod

この新しいAPIが含まれたのは、Infinispan 10.0.0.Beta4の時で、Embedded/Hot RodのAPIの新しいエントリポイントとして
作られているようです。

[ISPN-9922] Create infinispan-api and hotrod-client reactive module - JBoss Issue Tracker

現在の対象はHot Rodのみで、KeyValueStoreを基点にして基本的な操作や検索、Continuous Queryを実行することができるようです。

まだこのAPIに関するドキュメントはないのですが、ソースコードやテストコードを見る限りは、Infinispanというクラスから
KeyValueStoreというインターフェースを取得し、こちらを基点に操作を行うようです。

https://github.com/infinispan/infinispan/blob/10.0.0.CR2/api/src/main/java/org/infinispan/api/Infinispan.java

https://github.com/infinispan/infinispan/blob/10.0.0.CR2/api/src/main/java/org/infinispan/api/reactive/KeyValueStore.java

InfinispanやKeyValueStoreのインターフェース定義を見るとよいのですが、ほとんどのメソッドの戻り値はCompletionStage、
もしくはPublisherになっています。

$ curl -s https://raw.githubusercontent.com/infinispan/infinispan/10.0.0.CR2/api/src/main/java/org/infinispan/api/reactive/KeyValueStore.java | grep -E 'interface|\}|;' | grep -v '*'
package org.infinispan.api.reactive;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.api.Experimental;
import org.infinispan.api.reactive.listener.KeyValueStoreListener;
import org.infinispan.api.reactive.query.QueryRequest;
import org.reactivestreams.Publisher;
public interface KeyValueStore<K, V> {
   CompletionStage<V> get(K key);
   CompletionStage<Boolean> insert(K key, V value);
   CompletionStage<Void> save(K key, V value);
   CompletionStage<Void> delete(K key);
   Publisher<K> keys();
   Publisher<? extends Map.Entry<K, V>> entries();
   Publisher<WriteResult<K>> saveMany(Publisher<Map.Entry<K, V>> pairs);
   CompletionStage<Long> estimateSize();
   CompletionStage<Void> clear();
   Publisher<KeyValueEntry<K, V>> find(String ickleQuery);
   Publisher<KeyValueEntry<K, V>> find(QueryRequest queryRequest);
   Publisher<KeyValueEntry<K, V>> findContinuously(String ickleQuery);
   <T> Publisher<KeyValueEntry<K, T>> findContinuously(QueryRequest queryRequest);
   Publisher<KeyValueEntry<K, V>> listen(KeyValueStoreListener listener);
}

つまり、Reactive APIを使う際には、これらのAPIを駆使することになるわけですね、と。

前置きはこれくらいにして、今回はエントリに対する基本的な操作を見ていきましょう。

APIを使うための参考情報としては、APIのインターフェースそのものと、テストコードを参考にしました。

https://github.com/infinispan/infinispan/tree/10.0.0.CR2/api

https://github.com/infinispan/infinispan/tree/10.0.0.CR2/client/infinispan-key-value-store-hotrod/src/test/java/org/infinispan/api/client/impl

環境

今回の環境は、こちらです。

$ java -version
openjdk version "11.0.4" 2019-07-16
OpenJDK Runtime Environment (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3)
OpenJDK 64-Bit Server VM (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3, mixed mode, sharing)


$ mvn -version
Apache Maven 3.6.2 (40f52333136460af0dc0d7232c0dc0bcf0d9e117; 2019-08-28T00:06:16+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.4, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-64-generic", arch: "amd64", family: "unix"

Infinispanのバージョンは、10.0.0.CR2とします。

Infinispan Serverは172.17.0.2のIPアドレスで稼働しており、以下のコマンドで起動しているものとします。

$ bin/server.sh -Dinfinispan.bind.address=172.17.0.2

準備

今回のMaven依存関係は、こちらです。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-key-value-store-client</artifactId>
            <version>10.0.0.CR2</version>
        </dependency>

infinispan-key-value-store-clientを使用します。

また、テストコードで動作確認を行うので、JUnit 5とAssertJを依存関係に追加し、Maven Surefire Pluginの設定も行います。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.5.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.5.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.13.2</version>
            <scope>test</scope>
        </dependency>

<!-- 省略 -->

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

テストコードの雛形

今回使うテストコードの雛形は、こんな感じで。
src/test/java/org/littlewings/infinispan/reactive/hotrod/InfinispanClientTest.java

package org.littlewings.infinispan.reactive.hotrod;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;

import io.reactivex.Flowable;
import io.reactivex.subscribers.TestSubscriber;
import org.assertj.core.data.MapEntry;
import org.infinispan.api.Infinispan;
import org.infinispan.api.configuration.ClientConfig;
import org.infinispan.api.reactive.KeyValueStore;
import org.infinispan.api.reactive.KeyValueStoreConfig;
import org.infinispan.api.reactive.WriteResult;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class InfinispanClientTest {

    // ここに、テストを書く!
}

各テストの開始時には、以下のようにCache定義を行うようにしました。

    @BeforeEach
    public void setUp() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .build();
        RemoteCacheManager manager = new RemoteCacheManager(configuration);

        manager
                .administration()
                .getOrCreateCache(
                        "gettingStartedCache",
                        new org.infinispan.configuration.cache.ConfigurationBuilder().build()
                )
                .clear();

        manager
                .administration()
                .getOrCreateCache(
                        "bulkCache",
                        new org.infinispan.configuration.cache.ConfigurationBuilder().build()
                )
                .clear();

        manager.stop();
    }

テストケースは2つ作成し、それぞれ「gettingStartedCache」、「bulkCache」というCacheを使うのですが、それぞれ実行時に
まだInfinispan Serverに存在しない場合は作成します。

基本的な操作をしてみる

では、いわゆるput/get/removeな基本的な操作をしてみましょう。

作成したコードは、こんな感じです。

    @Test
    public void gettingStarted() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .build();

        ClientConfig config = ClientConfig.from(configuration.properties());
        Infinispan infinispan = Infinispan.newClient(config);

        KeyValueStore<String, String> store =
                infinispan
                        .<String, String>getKeyValueStore("gettingStartedCache", KeyValueStoreConfig.defaultConfig())
                        .toCompletableFuture()
                        .join();

        CompletionStage<String> value = store
                .insert("key1", "value1")
                .thenCompose(b -> store.get("key1"));

        assertThat(value.toCompletableFuture().join()).isEqualTo("value1");

        CompletionStage<Long> estimateSize =
                store
                        .insert("key2", "value2")
                        .thenCompose(b -> store.insert("key3", "value3"))
                        .thenCompose(b -> store.estimateSize());

        assertThat(estimateSize.toCompletableFuture().join()).isEqualTo(3);

        CompletionStage<String> deletedEntry =
                store
                        .delete("key1")
                        .thenCompose(v -> store.get("key1"));

        assertThat(deletedEntry.toCompletableFuture().join()).isNull();

        CompletionStage<String> value3 = store
                .insert("key3", "value3-1")
                .thenCompose(b -> store.get("key3"));

        assertThat(value3.toCompletableFuture().join()).isEqualTo("value3");

        infinispan.stop().toCompletableFuture().join();
    }

まず、Hot Rod ClientのConfigurationを使用して、ClientConfigを介してInfinispanのインスタンスを取得します。

        Configuration configuration =
                new ConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .build();

        ClientConfig config = ClientConfig.from(configuration.properties());
        Infinispan infinispan = Infinispan.newClient(config);

次に、InfinispanからCache名を指定して、KeyValueStoreのインスタンスを取得します。この時点で、CompletionStageが戻り値になっています。

        KeyValueStore<String, String> store =
                infinispan
                        .<String, String>getKeyValueStore("gettingStartedCache", KeyValueStoreConfig.defaultConfig())
                        .toCompletableFuture()
                        .join();

今回利用しているCacheは、「gettingStartedCache」です。

KeyValueStoreConfigについては、デフォルトのまま使用します。

KeyValueStoreConfigの中身を見ていると、Protocol Buffersで使うスキーマや、Marshallerの設定ができるようですね。

https://github.com/infinispan/infinispan/blob/10.0.0.CR2/api/src/main/java/org/infinispan/api/reactive/KeyValueStoreConfig.java

今回は、このあたりはパスします。

put(insert)/get。

        CompletionStage<String> value = store
                .insert("key1", "value1")
                .thenCompose(b -> store.get("key1"));

        assertThat(value.toCompletableFuture().join()).isEqualTo("value1");

さらに要素を追加して、エントリ数の取得(estimateSize)。

        CompletionStage<Long> estimateSize =
                store
                        .insert("key2", "value2")
                        .thenCompose(b -> store.insert("key3", "value3"))
                        .thenCompose(b -> store.estimateSize());

        assertThat(estimateSize.toCompletableFuture().join()).isEqualTo(3);

delete。

        CompletionStage<String> deletedEntry =
                store
                        .delete("key1")
                        .thenCompose(v -> store.get("key1"));

        assertThat(deletedEntry.toCompletableFuture().join()).isNull();

要素の更新も、insertで。

        CompletionStage<String> value3 = store
                .insert("key3", "value3-1")
                .thenCompose(b -> store.get("key3"));

        assertThat(value3.toCompletableFuture().join()).isEqualTo("value3");

いずれも、CompletionStageがメソッドの戻り値になります。

最後に、Infinispanインスタンスの破棄。あくまで、クライアント側の話です。

        infinispan.stop().toCompletableFuture().join();

バルクオペレーション

次に、複数の要素を保存したり、取得したりしてみましょう。

ソースコードは、こうなりました。

    @Test
    public void bulkOperation() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .build();

        ClientConfig config = ClientConfig.from(configuration.properties());
        Infinispan infinispan = Infinispan.newClient(config);

        KeyValueStore<String, String> store =
                infinispan
                        .<String, String>getKeyValueStore("bulkCache", KeyValueStoreConfig.defaultConfig())
                        .toCompletableFuture()
                        .join();

        ///// put many
        Flowable<WriteResult<String>> writeResult =
                Flowable
                        .fromPublisher(
                                store.saveMany(Flowable.fromIterable(Map.of("key1", "value1", "key2", "value2").entrySet()))
                        );

        TestSubscriber<Map.Entry<String, Boolean>> writeResultSubscriber = TestSubscriber.create();
        writeResult.map(ws -> MapEntry.entry(ws.getKey(), ws.isError())).subscribe(writeResultSubscriber);

        writeResultSubscriber
                .assertValueSetOnly(
                        Set.of(
                                MapEntry.<String, Boolean>entry("key1", false),
                                MapEntry.<String, Boolean>entry("key2", false)
                        )
                );

        ///// keys
        Flowable<String> keys =
                Flowable.fromPublisher(store.keys());

        TestSubscriber<String> keysSubscriber = TestSubscriber.create();
        keys.subscribe(keysSubscriber);
        keysSubscriber.assertValues("key1", "key2");

        ///// entries
        Flowable<Map.Entry<String, String>> entries =
                Flowable.fromPublisher(store.entries());

        TestSubscriber<Map.Entry<String, String>> entriesSubscriber = TestSubscriber.create();
        entries.subscribe(entriesSubscriber);
        entriesSubscriber.assertValues(MapEntry.entry("key1", "value1"), MapEntry.entry("key2", "value2"));

        infinispan.stop().toCompletableFuture().join();
    }

複数の要素を一括で保存。

        ///// put many
        Flowable<WriteResult<String>> writeResult =
                Flowable
                        .fromPublisher(
                                store.saveMany(Flowable.fromIterable(Map.of("key1", "value1", "key2", "value2").entrySet()))
                        );

戻り値が、Reactive StreamsのPublisherになります。InfinispanではRxJava2が使われているので、そのままFlowableに変換することに
しました。

Publisherをsubscribeして得られるのは、それぞれのエントリに対する書き込み結果、WriteResultになります。WriteResultからは
対応するエントリのキーと書き込み結果(isError)が得られるので、こちらをTestSubscriberを利用しつつMapEntryに変換して
assertion…。

        TestSubscriber<Map.Entry<String, Boolean>> writeResultSubscriber = TestSubscriber.create();
        writeResult.map(ws -> MapEntry.entry(ws.getKey(), ws.isError())).subscribe(writeResultSubscriber);

        writeResultSubscriber
                .assertValueSetOnly(
                        Set.of(
                                MapEntry.<String, Boolean>entry("key1", false),
                                MapEntry.<String, Boolean>entry("key2", false)
                        )
                );

keys。

        ///// keys
        Flowable<String> keys =
                Flowable.fromPublisher(store.keys());

        TestSubscriber<String> keysSubscriber = TestSubscriber.create();
        keys.subscribe(keysSubscriber);
        keysSubscriber.assertValues("key1", "key2");

entries。こちらは、Map.EntryのPublisherが返ってきます。

        ///// entries
        Flowable<Map.Entry<String, String>> entries =
                Flowable.fromPublisher(store.entries());

        TestSubscriber<Map.Entry<String, String>> entriesSubscriber = TestSubscriber.create();
        entries.subscribe(entriesSubscriber);
        entriesSubscriber.assertValues(MapEntry.entry("key1", "value1"), MapEntry.entry("key2", "value2"));

とりあえず、基本的な操作としてはこんな感じでしょうか。

もう少し中身を

では、少しだけ中身を追ってみましょう。

Infinispanはインターフェースなので、クライアントAPIに対して実装が用意されています。

https://github.com/infinispan/infinispan/blob/10.0.0.CR2/client/infinispan-key-value-store-hotrod/src/main/java/org/infinispan/api/client/impl/InfinispanClientImpl.java

背後にいるのは、RemoteCacheManagerですね。

KeyValueStoreインターフェースの実装の方を見ると、CompletionStageを返すようなメソッドは、RemoteCache#xxxAsyncを
使って実現していることがわかります。

https://github.com/infinispan/infinispan/blob/10.0.0.CR2/client/infinispan-key-value-store-hotrod/src/main/java/org/infinispan/api/reactive/client/impl/KeyValueStoreImpl.java

Publisherを返すようなメソッドは、UnicastProcessorを使用しています。

Embeddedの方は?

ところで、Embedded APIの方はどうしたのかというと、まだ実装されていません。

https://github.com/infinispan/infinispan/blob/10.0.0.CR2/api/src/main/java/org/infinispan/api/Infinispan.java#L30-L32

そのうち、Infinispan#newEmbeddedで、InfinispanのEmbedded用の実装が取得できるようになるのだと…思います。

まとめ

Infinispanの新しいクライアントAPIである、Reactive APIを軽く試してみました。

CompletionStageやPublisherの利用を強制されますが、勉強のいい機会として…。

そのうち、Embeddedが出てくるとよいですね。

また、そのうちQueryについても試してみましょう。