CLOVER🍀

That was when it all began.

Infinispan 14.0の新しいHot Rod Client APIのMutiny版を試す

これは、なにをしたくて書いたもの?

前にこういうエントリーを書きました。

Infinispan 14.0の新しいHot Rod Client APIを試して諦めたという話 - CLOVER🍀

Infinispan 14.0.0.Finalから新しいHot Rod Clientが含まれており、同期、非同期、Mutinyの3つの利用形態があったのですが、14.0.0.Finalの
時点ではMutiny版がまったく動きませんでした。

これが14.0.4.Finalで動くようになったみたいなので、前回のエントリーの続きにしたいと思います。

新しいHot Rod Client API

少し、復習的に。

Infinispan 14.0.0.Finalがリリースされた時に書かれたブログエントリーに、新しいHot Rod Client APIが含まれているということが
書かれていました。

Infinispan 14.0.0.Final

特徴は以下です。

  • 完全に再設計、新しく書き直されたHot Rod Client
    • 既存のHot Rod Clientへの依存は一切なし
  • 同期、非同期、Mutinyの3つのプログラミングモデルの中から選択
  • EmbeddedとRemote(Hot Rod)で共通のAPIとなる予定だが、現時点ではRemote(Hot Rod)のみ

なお、現時点ではドキュメントもなく、既存のHot Rod Clientの機能をすべてカバーしているわけでもありません。

実体のクラスは以下なのですが、UnsupportedOperationExceptionを投げてくるメソッドがそれなりにあります。

https://github.com/infinispan/infinispan/blob/14.0.4.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCacheImpl.java

Configurationの取得、Near Cacheへのリスナーの追加、Cacheのサイズ計算、リスナーの追加、Entry Processor
トランザクションなどですね。

Hot Rod Clientのドキュメントには、既存のAPIしか記載がありません。

Using Hot Rod Java clients

新しいHot Rod Clientのモジュールはこちら。

https://github.com/infinispan/infinispan/tree/14.0.4.Final/client/hotrod

テストコードは、非同期版のもののみがあります(14.0.0.Finalのリリース時点と変わりません)。

https://github.com/infinispan/infinispan/tree/14.0.4.Final/client/hotrod/src/test/java/org/infinispan/hotrod

そして、Infinispan 14.0.4.FinalでMutiny版が動作するようになったという話ですが、以下のissueが解決されたからです。

[ISPN-14016] Implement HotRodMutinyCaches methods - Red Hat Issue Tracker

ISPN-14016 Implement HotRodMutinyCaches methods by jabolina · Pull Request #10531 · infinispan/infinispan · GitHub

[14.0] ISPN-14016 Implement HotRodMutinyCaches methods by jabolina · Pull Request #10529 · infinispan/infinispan · GitHub

これ、14.0.0.CR1の時の状態のまま、14.0.0.Finalになったということですね…。以前は以下の部分のキャストで失敗していましたが、
これは既知の内容だったようです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodMutinyCaches.java#L21-L36

今回は、この修正されたMutiny版を使ってみたいと思います。

お題

基本的には前回のエントリーと同じですが、Mutiny版をさらっと載せて終わります。

  • キーと値をStringにしたキャッシュで試す
  • キーをString、値を独自のクラスにしたキャッシュで試す
  • 確認は、テストコードで行う
  • Infinispan Serverはクラスター構成とする

前回はシングルノードで試していたので、今回はそこは変えてみました。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.5 2022-10-18
OpenJDK Runtime Environment (build 17.0.5+8-Ubuntu-2ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.5+8-Ubuntu-2ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.7 (b89d5959fcde851dcb1c8946a785a163f14e1e29)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.5, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-57-generic", arch: "amd64", family: "unix"

Infinispan Serverは、172.18.0.2〜172.18.0.4の3ノードで動作しているものとします。

$ java --version
openjdk 17.0.5 2022-10-18
OpenJDK Runtime Environment Temurin-17.0.5+8 (build 17.0.5+8)
OpenJDK 64-Bit Server VM Temurin-17.0.5+8 (build 17.0.5+8, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 14.0.4.Final (Flying Saucer)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

起動は、以下のコマンドで。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i)

準備

まずは、Infinispan Serverの準備を行います。ユーザーとキャッシュを作成。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

ユーザーは、管理用とアプリケーション用の2つを用意。

キャッシュを2つ作成。

$ bin/cli.sh -c http://ispn-admin:password@localhost:11222
[infinispan-server-56621@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC simpleCache
[infinispan-server-56621@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC bookCache

定義の確認。

[infinispan-server-56621@cluster//containers/default]> describe caches/simpleCache
{
  "simpleCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}


[infinispan-server-56621@cluster//containers/default]> describe caches/bookCache
{
  "bookCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}

アプリケーション側。Maven依存関係など。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.infinispan</groupId>
                <artifactId>infinispan-bom</artifactId>
                <version>14.0.4.Final</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>jakarta.platform</groupId>
                <artifactId>jakarta.jakartaee-bom</artifactId>
                <version>8.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.junit</groupId>
                <artifactId>junit-bom</artifactId>
                <version>5.9.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-hotrod</artifactId>
            <version>14.0.4.Final</version>
        </dependency>
        <dependency>
            <groupId>jakarta.transaction</groupId>
            <artifactId>jakarta.transaction-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

新しいHot Rod Clientを使うのに必要な依存関係は、infinispan-hotrodjakarta.transaction-apiです。
protostream-processorは必須ではありませんが、独自のクラスを使うので今回は追加しておきます。

Hot Rod Cllient × Mutiny

では、Hot Rod ClientのMutiny版を使っていきます。

まずはエンティティクラスを作成。

src/main/java/org/littlewings/infinispan/remote/newclient/Book.java

package org.littlewings.infinispan.remote.newclient;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.descriptors.Type;

public class Book {
    @ProtoField(number = 1)
    String isbn;

    @ProtoField(number = 2)
    String title;

    @ProtoField(number = 3, type = Type.INT32, defaultValue = "0")
    int price;

    @ProtoFactory
    public static Book create(String isbn, String title, int price) {
        Book book = new Book();
        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    // getter/setterは省略
}

MarshallerおよびProtocol BuffersのIDLを生成するためのGeneratedSchemaインターフェースのサブインターフェース。

src/main/java/org/littlewings/infinispan/remote/newclient/EntitiesInitializer.java

package org.littlewings.infinispan.remote.newclient;

import org.infinispan.protostream.GeneratedSchema;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = {
                Book.class
        },
        schemaFileName = "entities.proto",
        schemaFilePath = "proto/",
        schemaPackageName = "remote_newclient")
public interface EntitiesInitializer extends GeneratedSchema {
}

これでアプリケーション側の準備ができたので、テストコードを書いていきます。

以下を雛形とします。

src/test/java/org/littlewings/infinispan/remote/newclient/HotRodNewClientMutinyTest.java

package org.littlewings.infinispan.remote.newclient;

import java.net.URI;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import org.infinispan.api.Infinispan;
import org.infinispan.api.mutiny.MutinyCache;
import org.infinispan.api.mutiny.MutinyContainer;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.configuration.HotRodConfigurationBuilder;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class HotRodNewClientMutinyTest {

    // ここに、テストを書く!!
}

Stringをキーと値にしたキャッシュへのアクセス。

    @Test
    public void simpleMutinyCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             // または
             // try (Infinispan infinispan = Infinispan.create(uriString)) {
             MutinyContainer container = infinispan.mutiny()) {
            Uni<MutinyCache<String, String>> cache =
                    container
                            .caches()
                            .get("simpleCache");

            Multi<Void> setOperation =
                    cache
                            .toMulti()
                            .onItem()
                            .transformToMultiAndMerge(c ->
                                    Multi
                                            .createFrom()
                                            .items(
                                                    IntStream
                                                            .rangeClosed(1, 100)
                                                            .mapToObj(i -> c.set("key" + i, "value" + i))
                                            )
                                            .onItem()
                                            .transformToUniAndMerge(Function.identity())
                            );

            setOperation.collect().asList().await().indefinitely();

            cache
                    .onItem()
                    .transformToUni(c -> c.get("key1"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem("value1");
            cache
                    .onItem()
                    .transformToUni(c -> c.get("key50"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem("value50");
            cache
                    .onItem()
                    .transformToUni(c -> c.get("key100"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem("value100");

            cache.onItem().transformToUni(c -> c.clear()).await().indefinitely();

            cache
                    .onItem()
                    .transformToUni(c -> c.get("key1"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem(null);
            cache
                    .onItem()
                    .transformToUni(c -> c.get("key50"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem(null);
            cache
                    .onItem()
                    .transformToUni(c -> c.get("key100"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem(null);
        }
    }

このキャッシュへのアクセスは、URI指定で行うことにしました。接続先のホスト、ポートのペアを,区切りでつなぎます。

        URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             // または
             // try (Infinispan infinispan = Infinispan.create(uriString)) {
             MutinyContainer container = infinispan.mutiny()) {
            Uni<MutinyCache<String, String>> cache =
                    container
                            .caches()
                            .get("simpleCache");

キャッシュへのアクセスは、Infinispan#mutinyMutinyContainerを取得した後でMutinyCaches#getから取得できるMutinyCache
使って行います。

SmallRye Mutinyの使い方については、SmallRye Mutinyのガイドを参照。Infinispanが使用しているSmallRye Mutinyがちょっと古いようで、
同じバージョンのガイドはなかったのですが、そんなに困る感じはしませんでした。

SmallRye Mutiny

独自に定義したエンティティを使うキャッシュへのアクセス。

    @Test
    public void bookMutinyCache() {
        // URIではPropertiesの部分は読まない
        //URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             MutinyContainer container = infinispan.mutiny()) {

            Uni<MutinyCache<String, Book>> cache =
                    container
                            .caches()
                            .get("bookCache");

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            Multi<Void> setOperation =
                    cache
                            .toMulti()
                            .onItem()
                            .transformToMultiAndMerge(c ->
                                    Multi
                                            .createFrom()
                                            .items(
                                                    books
                                                            .stream()
                                                            .map(b -> c.set(b.getIsbn(), b))
                                            )
                                            .onItem()
                                            .transformToUniAndMerge(Function.identity())

                            );

            setOperation.collect().asList().await().indefinitely();

            cache
                    .onItem()
                    .transformToUni(c -> c.get("978-1782169970"))
                    .onItem()
                    .invoke(b -> {
                        assertThat(b.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide");
                        assertThat(b.getPrice()).isEqualTo(5344);
                    })
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertCompleted();

            cache
                    .onItem()
                    .transformToUni(c -> c.get("978-0359439379"))
                    .onItem()
                    .invoke(b -> {
                        assertThat(b.getTitle()).isEqualTo("The Apache Ignite Book");
                        assertThat(b.getPrice()).isEqualTo(9964);
                    })
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertCompleted();

            cache.onItem().transformToUni(c -> c.clear()).await().indefinitely();

            cache
                    .onItem()
                    .transformToUni(c -> c.get("978-1782169970"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem(null);

            cache
                    .onItem()
                    .transformToUni(c -> c.get("978-0359439379"))
                    .subscribe()
                    .withSubscriber(UniAssertSubscriber.create())
                    .awaitItem()
                    .assertItem(null);
        }
    }

こちらも、先ほどと同様にURIで接続先などの指定を行いたいのですが、

        // URIではPropertiesの部分は読まない
        //URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

未だにURI指定ではプロパティを扱えないので

https://github.com/infinispan/infinispan/blob/14.0.4.Final/client/hotrod/src/main/java/org/infinispan/hotrod/configuration/HotRodConfigurationBuilder.java#L313-L317

HotRodConfigurationを自分で定義することにします。

        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

HotRodConfigurationBuilder#addServersで接続先を複数指定する場合は、;区切りになります。

ひとまず、前回のエントリーと同じ範囲までの確認はOKです。

Mutiny版の実装

前回も少し書きましたが、Mutiny版はRemoteCacheの各種メソッドが返すCompletionStageやFlow.PublisherをSmallRye Mutinyの UniMulti`に変換することで実現しています。

https://github.com/infinispan/infinispan/blob/14.0.4.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodMutinyCache.java

https://github.com/infinispan/infinispan/blob/14.0.4.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCache.java

https://github.com/infinispan/infinispan/blob/14.0.4.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCacheImpl.java

RemoteCacheのメソッドはCompletionStageまたはFlow.Publisherのどちらかを返すようになっているため、非同期版である HotRodAsyncCacheが1番素直な実装ということになります(同期版はCompletableFuture#get`しているので)。

https://github.com/infinispan/infinispan/blob/14.0.4.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodAsyncCache.java

オマケ

せっかくなので、同期版と非同期版のコードもクラスター構成のInfinispan Serverにアクセスする版を作成しておきました。

src/test/java/org/littlewings/infinispan/remote/newclient/HotRodNewClientSyncAndAsyncTest.java

package org.littlewings.infinispan.remote.newclient;

import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;

import org.infinispan.api.Infinispan;
import org.infinispan.api.async.AsyncCache;
import org.infinispan.api.async.AsyncContainer;
import org.infinispan.api.common.events.cache.CacheEntryEventType;
import org.infinispan.api.sync.SyncCache;
import org.infinispan.api.sync.SyncContainer;
import org.infinispan.api.sync.events.cache.SyncCacheContinuousQueryListener;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.configuration.HotRodConfigurationBuilder;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HotRodNewClientSyncAndAsyncTest {
    @Test
    public void connectInfinispanServerUsingURI() {
        String uriString = "hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222";
        URI uri = URI.create(uriString);

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            cache.clear();
        }
    }

    @Test
    public void simpleSyncCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> cache.set("key" + i, "value" + i));

            assertThat(cache.get("key1")).isEqualTo("value1");
            assertThat(cache.get("key50")).isEqualTo("value50");
            assertThat(cache.get("key100")).isEqualTo("value100");

            cache.clear();

            assertThat(cache.get("key1")).isNull();
            assertThat(cache.get("key50")).isNull();
            assertThat(cache.get("key100")).isNull();
        }
    }

    @Test
    public void simpleSyncCacheUnsupportedOperation() {
        URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            assertThatThrownBy(() -> cache.entries())
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.keys())
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.listen(new SyncCacheContinuousQueryListener<>() {
            }))
                    .isInstanceOf(UnsupportedOperationException.class);

            assertThatThrownBy(() -> cache.estimateSize())
                    .isInstanceOf(UnsupportedOperationException.class);

        }
    }

    @Test
    public void bookSyncCache() {
        // URIではPropertiesの部分は読まない
        //URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, Book> cache =
                    container
                            .caches()
                            .get("bookCache");

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            books.forEach(b -> cache.set(b.getIsbn(), b));

            assertThat(cache.get("978-1782169970").getTitle())
                    .isEqualTo("Infinispan Data Grid Platform Definitive Guide");
            assertThat(cache.get("978-1782169970").getPrice())
                    .isEqualTo(5344);
            assertThat(cache.get("978-0359439379").getTitle())
                    .isEqualTo("The Apache Ignite Book");
            assertThat(cache.get("978-0359439379").getPrice())
                    .isEqualTo(9964);

            cache.clear();

            assertThat(cache.get("978-1782169970")).isNull();
            assertThat(cache.get("978-1782169970")).isNull();
        }
    }

    @Test
    public void simpleAsyncCacheUnsupportedOperation() {
        URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, String> cache =
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .toCompletableFuture()
                            .join();

            // cache.entries();
            // cache.keys()
            assertThatThrownBy(() -> cache.listen(CacheEntryEventType.CREATED))
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.estimateSize())
                    .isInstanceOf(UnsupportedOperationException.class);
        }
    }

    @Test
    public void simpleAsyncCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, String> cache =
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .toCompletableFuture()
                            .join();

            IntStream
                    .rangeClosed(1, 100)
                    .<CompletionStage<?>>mapToObj(i -> cache.set("key" + i, "value" + i))
                    .map(CompletionStage::toCompletableFuture)
                    .forEach(CompletableFuture::join);

            cache
                    .get("key1")
                    .thenAccept(value -> assertThat(value).isEqualTo("value1"))
                    .thenCompose(v -> cache.get("key50"))
                    .thenAccept(value -> assertThat(value).isEqualTo("value50"))
                    .thenCompose(v -> cache.get("key100"))
                    .thenAccept(value -> assertThat(value).isEqualTo("value100"))
                    .toCompletableFuture()
                    .join();

            cache.clear().toCompletableFuture().join();

            cache
                    .get("key1")
                    .thenAccept(value -> assertThat(value).isNull())
                    .thenCompose(v -> cache.get("key50"))
                    .thenAccept(value -> assertThat(value).isNull())
                    .thenCompose(v -> cache.get("key100"))
                    .thenAccept(value -> assertThat(value).isNull())
                    .toCompletableFuture()
                    .join();
        }
    }

    @Test
    public void bookAsyncCache() {
        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, Book> cache =
                    container
                            .caches()
                            .<String, Book>get("bookCache")
                            .toCompletableFuture()
                            .join();

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            books
                    .stream()
                    .map(b -> cache.set(b.getIsbn(), b).toCompletableFuture())
                    .forEach(CompletableFuture::join);

            cache
                    .get("978-1782169970")
                    .thenAccept(b -> {
                        assertThat(b.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide");
                        assertThat(b.getPrice()).isEqualTo(5344);
                    })
                    .thenCompose(v -> cache.get("978-0359439379"))
                    .thenAccept(b -> {
                        assertThat(b.getTitle()).isEqualTo("The Apache Ignite Book");
                        assertThat(b.getPrice()).isEqualTo(9964);
                    })
                    .toCompletableFuture()
                    .join();

            cache.clear().toCompletableFuture().join();

            cache
                    .get("978-1782169970")
                    .thenAccept(b -> assertThat(b).isNull())
                    .thenCompose(v -> cache.get("978-0359439379"))
                    .thenAccept(b -> assertThat(b).isNull())
                    .toCompletableFuture()
                    .join();
        }
    }
}

まとめ

Infinspan 14.0で実装された新しいHot Rod Client APIのうち、Mutiny版が14.0.4.Finalで動くようになったので試してみました。

まだまだ機能が足りないので、既存のHot Rod Clientの実装の置き換えるようなポジションになるのかはよくわかりませんが…。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-new-hotrod-client-mutiny