CLOVER🍀

That was when it all began.

Infinispan Server+Hot Rod ClientのNear Cacheを試す

これは、なにをしたくて書いたもの?

Infinispan ServerにHot Rod Clientでアクセスする際に使える、Near Cacheを使ったことがないなと思ったのでちょっと試してみようと
いうことで。

Hot Rod ClientのNear Cache

Near Cacheは、簡単に言うと「キャッシュのキャッシュ」です。

Using Hot Rod Java clients / Hot Rod Java Client Configuration / Near Caches

InfinispanをEmbedded Modeではなく、Client Server Modeで使用する場合、キャッシュに保存されたエントリーの取得時には
クライアントからサーバーへのネットワーク通信が発生します。

Near Cacheはクライアント上にキャッシュを構成し、エントリーの取得時にサーバーへの通信が発生することを抑える仕組みになります。

Near Cacheには、以下の特徴があります。

  • サーバーからエントリーを取得した際にNear Cacheにも保存し、次回の取得時にはNear Cacheに保存されているエントリーを使用してサーバーへはアクセスしない
  • 対象のエントリーが更新、削除、有効期限切れした場合は、Near Cacheの該当エントリーは無効化される(破棄される)
  • クライアントの接続先が別のサーバーにフェイルオーバーした場合はクリアされる

また、Near Cacheは以下の3パターンの構成をとることができます。

  • 保存するエントリーの数を制限しないNear Cache
  • 保存するエントリーの最大数を制限したNear Cache
  • 保存するエントリーの最大数を制限し、さらにBloom Filterを適用したNear Cache

Near Cacheを有効にしただけだと、保存するエントリー数は無制限になります(実装はjava.util.concurrent.ConcurrentHashMapです)。

Bloom Filterは、設定するとそのクライアントに無関係な無効化メッセージを受信しないようにし、書き込み操作のパフォーマンスを
向上できるもので、以下の特徴があります。

Specifies whether bloom filter should be used for near cache to limit the number of write notifications for unrelated keys.

NearCacheConfigurationBuilder (Infinispan JavaDoc 14.0.7.Final API)

Near Cacheの設定上の注意点としては、maxIdleは使用しないこと、となっています。

Do not use maximum idle expiration with near caches because near-cache reads do not propagate the last access time for entries.

これは、Near Cacheを構成した状態でアクセスしても、サーバー側へアクセスが行われない(=最終アクセス時間に反映されない)ため
ですね。使えないというわけではないですが、事実上lifspanと同じ意味になります。

ちなみに保存する最大数を制限したNear Cacheでは、実装としてCaffeineが使用されます。

GitHub - ben-manes/caffeine: A high performance caching library for Java

説明はこれくらいにして、実際に使って試してみましょう。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.6 2023-01-17
OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-69-generic", arch: "amd64", family: "unix"

Infinispan Serverは、172.18.0.2〜172.18.0.4の3ノードで動作しているものとします。

$ java --version
openjdk 17.0.6 2023-01-17
OpenJDK Runtime Environment Temurin-17.0.6+10 (build 17.0.6+10)
OpenJDK 64-Bit Server VM Temurin-17.0.6+10 (build 17.0.6+10, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 14.0.7.Final (Flying Saucer)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

各ノードの起動は以下のコマンドで行い、クラスターを構成します。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i)

準備

Infinispan Server側には、以下のユーザーを作成しておきます。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

これは、各ノードそれぞれで行います。

アプリケーション側。Maven依存関係など。

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-client-hotrod</artifactId>
            <version>14.0.7.Final</version>
        </dependency>

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>14.0.7.Final</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

確認は、テストコードで行います。

Infinispanのモジュールは主題としてはinfinispan-client-hotrodのみでよいのですが、テストコード内でキャッシュを作成するために
infinispan-coreを加えています。

Near Cacheをパターン別に使ってみる

それでは、Near Cacheを使ってみましょう。構成パターンごとに確認してみます。といっても、今回はBloom Filterについては設定は
しますがあまり見ないことにします。

テストコードの雛形。

src/test/java/org/littlewings/infinispan/remote/nearcache/NearCacheTest.java

package org.littlewings.infinispan.remote.nearcache;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.RemoteCacheManagerAdmin;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.ExhaustedAction;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.impl.InvalidatedNearRemoteCache;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.jmx.RemoteCacheClientStatisticsMXBean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class NearCacheTest {
    @BeforeEach
    void setUp() {
        String uri = String.format(
                "hotrod://%s:%s@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222",
                "ispn-admin",
                "password"
        );

        try (RemoteCacheManager manager = new RemoteCacheManager(uri)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("distCache");

            org.infinispan.configuration.cache.Configuration configuration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering()
                            .cacheMode(org.infinispan.configuration.cache.CacheMode.DIST_SYNC)
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .build();

            admin.getOrCreateCache("distCache", configuration);
        }
    }

    // ここに、テストを書く
}

テストごとに、Distributed Cacheを破棄、作成することにします。この時、Infinispan Server側には以下のキャッシュ定義が作成されます。

server/data/caches.xml

<?xml version="1.0"?>
<infinispan xmlns="urn:infinispan:config:14.0">
    <cache-container>
        <caches>
            <distributed-cache name="distCache" mode="SYNC" remote-timeout="17500" statistics="true">
                <encoding>
                    <key media-type="application/x-protostream"/>
                    <value media-type="application/x-protostream"/>
                </encoding>
                <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
                <state-transfer timeout="60000"/>
            </distributed-cache>
        </caches>
    </cache-container>
</infinispan>
Near Cacheを使わない場合のRemoteCacheの実装

Near Cacheを使うと、RemoteCacheの実装が変更されます。これを最初に確認しておきましょう。

    @Test
    void defaultCache() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCache");
            assertThat(cache).isInstanceOf(RemoteCacheImpl.class);
        }
    }

Near Cacheを使わない場合は、このようにRemoteCacheImplクラスがRemoteCacheインターフェースの実装として使われます。

容量無制限のNear Cacheを使う

では、Near Cacheを有効にしてみます。

@Test
    void enableUnboundedNearCache() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");

        builder.remoteCache("distCache").nearCacheMode(NearCacheMode.INVALIDATED);

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCache");
            assertThat(cache).isInstanceOf(InvalidatedNearRemoteCache.class);

            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(100L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(200L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(100L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(200L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(200L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(100L);
        }
    }

Near Cacheを有効にしているのはここですね。使用するRemoteCache単位に設定を行うことになります。
NearCacheModeINVALIDATEDにすることで、Near Cacheが有効になります。

        builder.remoteCache("distCache").nearCacheMode(NearCacheMode.INVALIDATED);

Near Cacheを有効にすると、対象のRemoteCacheインターフェースの実装がInvalidatedNearRemoteCacheクラスになります。

            RemoteCache<String, String> cache = cacheManager.getCache("distCache");
            assertThat(cache).isInstanceOf(InvalidatedNearRemoteCache.class);

Near Cacheの状況の確認には、RemoteCacheClientStatisticsMXBeanを使ってメトリクスを取得して行うことにします。

            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

最初は、すべての値が0です。

キャッシュにエントリーを100件登録。この時点では、Near Cacheのメトリクスに変化はありません。

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

50件のエントリーを取得。

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

これは、Near Cacheのキャッシュミスとしてカウントされ、同時にNear Cache内のエントリーも増えていきます。

もう1度、同じ操作をすると、今度はNear Cacheへのアクセスのみで処理が完了します。

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

残りの50エントリーにアクセス。

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

最初はキャッシュミスして、それからNear Cacheに保存、次回はNear Cacheからの取得になります。

次に、キャッシュエントリーを更新します。

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(100L);

この時、キャッシュエントリーが無効化されます。また、今回は全キャッシュエントリーを更新したので、Near Cache内のエントリーが
0になりました。

再度アクセスして確認。

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(200L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(100L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(200L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(200L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(100L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(100L);

まずはこんな感じです。

保存するエントリーの最大数を設定したNear Cacheを使う

次は、Near Cacheに保存するエントリー数を制限してみます。

    @Test
    void enableBoundedNearCache() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");

        builder.remoteCache("distCache").nearCacheMode(NearCacheMode.INVALIDATED).nearCacheMaxEntries(30);

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCache");
            assertThat(cache).isInstanceOf(InvalidatedNearRemoteCache.class);

            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(29L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(71L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(29L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(121L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(168L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(168L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(268L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isGreaterThanOrEqualTo(47L).isLessThanOrEqualTo(49L);
            assertThat(clientStatistics.getNearCacheMisses()).isGreaterThanOrEqualTo(351L).isLessThanOrEqualTo(353L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);
        }
    }

設定上は、ここが変わっています。今回は、Near Cacheに保存するエントリー数を30にしました。

        builder.remoteCache("distCache").nearCacheMode(NearCacheMode.INVALIDATED).nearCacheMaxEntries(30);

RemoteCacheの実装は変わっていません。

            assertThat(cache).isInstanceOf(InvalidatedNearRemoteCache.class);

登録するエントリー数やエントリーの取得については、先ほどと同じように100件をベースに行うのですが、当然ながらNear Cacheに
保存できるエントリー数を上回ることになります。

よって、キャッシュヒットやキャッシュミスにはその揺らぎが現れます。

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(29L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(71L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(29L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(121L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(168L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

エントリーの更新後と、その後の取得確認。

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(168L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(268L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isGreaterThanOrEqualTo(47L).isLessThanOrEqualTo(49L);
            assertThat(clientStatistics.getNearCacheMisses()).isGreaterThanOrEqualTo(351L).isLessThanOrEqualTo(353L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

最後のキャッシュヒット、キャッシュミスでは値がぶれやすかったので、アサーションを少し調整しています…。

Bloom Filterを使ってみる

保存するエントリーの最大数を設定した上で、さらにBloom Filterを設定してみます。

    @Test
    void enableBoundedWithBloomFilterNearCache() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");
        builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

        builder.remoteCache("distCache")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCache");
            assertThat(cache).isInstanceOf(InvalidatedNearRemoteCache.class);

            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isZero();
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(50L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 50).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(29L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(71L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(29L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(121L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(51, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value1-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(168L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(168L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(32L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(268L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);

            IntStream.rangeClosed(1, 100).forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value2-" + i));

            assertThat(clientStatistics.getNearCacheHits()).isGreaterThanOrEqualTo(47L).isLessThanOrEqualTo(49L);
            assertThat(clientStatistics.getNearCacheMisses()).isGreaterThanOrEqualTo(351L).isLessThanOrEqualTo(353L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(30L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(30L);
        }
    }

Bloom Filterを使うには、nearCacheUseBloomFiltertrueにすればOKです。

        builder.remoteCache("distCache")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);

コネクションプールも設定する必要があります。

        builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

maxActiveはドキュメントに習って1にしましたが、本来はもう少し大きくしてもよい気がしますね。
なお、このコネクションプールに設定する数は「サーバーごと」に確保されるようです。

動作については、Bloom Filterの適用有無でテストコードは変わらなかったので割愛。

複数のRemoteCacheManagerからアクセスしてみる

構成パターンとは少し異なりますが、複数のRemoteCacheManagerからアクセスして確認してみましょう。

あるクライアントからのエントリーの更新が、他のクライアントに伝播するかどうかの確認です。

    @Test
    void multipleCacheManager() {
        Supplier<RemoteCacheManager> createCacheManager = () -> {
            ConfigurationBuilder builder = new ConfigurationBuilder();
            builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
            builder.security().authentication().username("ispn-user").password("password");
            builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

            builder.remoteCache("distCache")
                    .nearCacheMode(NearCacheMode.INVALIDATED)
                    .nearCacheMaxEntries(30)
                    .nearCacheUseBloomFilter(true);

            Configuration configuration = builder.build();

            return new RemoteCacheManager(configuration);
        };

        try (RemoteCacheManager cacheManager1 = createCacheManager.get();
             RemoteCacheManager cacheManager2 = createCacheManager.get()) {
            RemoteCache<String, String> cache1 = cacheManager1.getCache("distCache");
            RemoteCacheClientStatisticsMXBean clientStatistics1 = cache1.clientStatistics();

            RemoteCache<String, String> cache2 = cacheManager2.getCache("distCache");
            RemoteCacheClientStatisticsMXBean clientStatistics2 = cache2.clientStatistics();

            cache1.put("key", "value-from-cache1");

            assertThat(clientStatistics1.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheSize()).isZero();

            assertThat(cache1.get("key")).isEqualTo("value-from-cache1");

            assertThat(clientStatistics1.getNearCacheHits()).isZero();
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheInvalidations()).isZero();

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isZero();
            assertThat(clientStatistics2.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheInvalidations()).isZero();

            assertThat(cache1.get("key")).isEqualTo("value-from-cache1");

            assertThat(clientStatistics1.getNearCacheHits()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheInvalidations()).isZero();

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isZero();
            assertThat(clientStatistics2.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheInvalidations()).isZero();

            assertThat(cache2.get("key")).isEqualTo("value-from-cache1");

            assertThat(clientStatistics1.getNearCacheHits()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheInvalidations()).isZero();

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics2.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics2.getNearCacheInvalidations()).isZero();

            cache2.put("key", "value-from-cache2");

            assertThat(clientStatistics1.getNearCacheHits()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isZero();
            assertThat(clientStatistics1.getNearCacheInvalidations()).isEqualTo(1L);

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics2.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheInvalidations()).isEqualTo(1L);
        }
    }

保存するエントリーの最大数とBloom Filterを設定したNear Cacheを構成して、2つのRemoteCacheManagerを作成。
RemoteCacheも取得しておきます。

        Supplier<RemoteCacheManager> createCacheManager = () -> {
            ConfigurationBuilder builder = new ConfigurationBuilder();
            builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
            builder.security().authentication().username("ispn-user").password("password");
            builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

            builder.remoteCache("distCache")
                    .nearCacheMode(NearCacheMode.INVALIDATED)
                    .nearCacheMaxEntries(30)
                    .nearCacheUseBloomFilter(true);

            Configuration configuration = builder.build();

            return new RemoteCacheManager(configuration);
        };

        try (RemoteCacheManager cacheManager1 = createCacheManager.get();
             RemoteCacheManager cacheManager2 = createCacheManager.get()) {
            RemoteCache<String, String> cache1 = cacheManager1.getCache("distCache");
            RemoteCacheClientStatisticsMXBean clientStatistics1 = cache1.clientStatistics();

            RemoteCache<String, String> cache2 = cacheManager2.getCache("distCache");
            RemoteCacheClientStatisticsMXBean clientStatistics2 = cache2.clientStatistics();

エントリーの登録。

            cache1.put("key", "value-from-cache1");

            assertThat(clientStatistics1.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheSize()).isZero();

エントリーの取得を、ひとつ目のクライアントから行います。

            assertThat(cache1.get("key")).isEqualTo("value-from-cache1");

            assertThat(clientStatistics1.getNearCacheHits()).isZero();
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheInvalidations()).isZero();

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isZero();
            assertThat(clientStatistics2.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheInvalidations()).isZero();

            assertThat(cache1.get("key")).isEqualTo("value-from-cache1");

            assertThat(clientStatistics1.getNearCacheHits()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheInvalidations()).isZero();

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isZero();
            assertThat(clientStatistics2.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheInvalidations()).isZero();

2回目は、Near Cacheからの取得確認です。

この時、Near Cacheの変更があるのはひとつ目のクライアントのみです。

2つ目のクライアントからの取得。

            assertThat(cache2.get("key")).isEqualTo("value-from-cache1");

            assertThat(clientStatistics1.getNearCacheHits()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheInvalidations()).isZero();

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics2.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics2.getNearCacheInvalidations()).isZero();

ここで、2つ目のクライアントからエントリーを更新してみます。

            cache2.put("key", "value-from-cache2");

            assertThat(clientStatistics1.getNearCacheHits()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics1.getNearCacheSize()).isZero();
            assertThat(clientStatistics1.getNearCacheInvalidations()).isEqualTo(1L);

            assertThat(clientStatistics2.getNearCacheHits()).isZero();
            assertThat(clientStatistics2.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics2.getNearCacheSize()).isZero();
            assertThat(clientStatistics2.getNearCacheInvalidations()).isEqualTo(1L);

すると、両方のクライアントのNear Cacheからエントリーが破棄されます。

Near Cacheと有効期限との関係を見る

ちょっと別ネタとして。Near Cacheの設定上の注意点としてはmaxIdleは使用しないこと、というのがありました。

キャッシュの有効期限としてはmaxIdlelifespanがありますが、この2つと組み合わせるとどうなるかを確認したいと思います。

テストコードの雛形はこちら。

src/test/java/org/littlewings/infinispan/remote/nearcache/NearCacheWithExpirationTest.java

package org.littlewings.infinispan.remote.nearcache;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.RemoteCacheManagerAdmin;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.ExhaustedAction;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.jmx.RemoteCacheClientStatisticsMXBean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class NearCacheWithExpirationTest {
    @BeforeEach
    void setUp() {
        String uri = String.format(
                "hotrod://%s:%s@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222",
                "ispn-admin",
                "password"
        );

        try (RemoteCacheManager manager = new RemoteCacheManager(uri)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("distCacheWithIdleExpiration");

            org.infinispan.configuration.cache.Configuration configurationWithIdleExpiration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering()
                            .cacheMode(org.infinispan.configuration.cache.CacheMode.DIST_SYNC)
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .expiration().maxIdle(3L, TimeUnit.SECONDS).wakeUpInterval(1L, TimeUnit.SECONDS)
                            .build();

            admin.getOrCreateCache("distCacheWithIdleExpiration", configurationWithIdleExpiration);

            admin.removeCache("distCacheWithLifespanExpiration");

            org.infinispan.configuration.cache.Configuration configurationWithLifespanExpiration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering()
                            .cacheMode(org.infinispan.configuration.cache.CacheMode.DIST_SYNC)
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .expiration().lifespan(3L, TimeUnit.SECONDS).wakeUpInterval(1L, TimeUnit.SECONDS)
                            .build();

            admin.getOrCreateCache("distCacheWithLifespanExpiration", configurationWithLifespanExpiration);
        }
    }

    // ここに、テストコードを書く
}

maxIdlelifespanを有効にしたDistributed Cacheをそれぞれ定義して、テストごとに削除、作成するようにしています。
時間はそれぞれ3秒。

この有効期限の確認は、Infinispan Server上で60秒間隔(デフォルト)で行われるので、今回は1秒ごとに確認するようにしました。

Infinispan Serverに作成されるキャッシュ定義は、以下になります。

server/data/caches.xml

<?xml version="1.0"?>
<infinispan xmlns="urn:infinispan:config:14.0">
    <cache-container>
        <caches>
            <distributed-cache name="distCacheWithIdleExpiration" mode="SYNC" remote-timeout="17500" statistics="true">
                <encoding>
                    <key media-type="application/x-protostream"/>
                    <value media-type="application/x-protostream"/>
                </encoding>
                <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
                <expiration max-idle="3000" interval="1000"/>
                <state-transfer timeout="60000"/>
            </distributed-cache>
            <distributed-cache name="distCacheWithLifespanExpiration" mode="SYNC" remote-timeout="17500" statistics="true">
                <encoding>
                    <key media-type="application/x-protostream"/>
                    <value media-type="application/x-protostream"/>
                </encoding>
                <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
                <expiration lifespan="3000" interval="1000"/>
                <state-transfer timeout="60000"/>
            </distributed-cache>
        </caches>
    </cache-container>
</infinispan>
maxIdleを有効にしたキャッシュとNear Cacheを組み合わせる

maxIdleを有効にしたキャッシュ(Distributed Cache)とNear Cacheを組み合わせてみます。

    @Test
    void nearCacheWithMaxIdleExpiration() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");
        builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

        builder.remoteCache("distCacheWithIdleExpiration")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCacheWithIdleExpiration");
            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            cache.put("key", "value");
            assertThat(cache.get("key")).isEqualTo("value");

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 3).forEach(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    // ignore
                }

                assertThat(cache.get("key")).isEqualTo("value");

                assertThat(clientStatistics.getNearCacheHits()).isEqualTo(i);
                assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheInvalidations()).isZero();
            });

            try {
                   TimeUnit.SECONDS.sleep(1L);
               } catch (InterruptedException e) {
                   // ignore
            }

            assertThat(cache.get("key")).isNull();

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(3L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(2L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(1L);
        }
    }

Near Cacheは、保存するエントリー数の上限設定と、Bloom Filterを適用した構成とします。

        builder.remoteCache("distCacheWithIdleExpiration")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);

キャッシュにエントリーを保存してから、1秒おきにアクセスし続けます。

            cache.put("key", "value");
            assertThat(cache.get("key")).isEqualTo("value");

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 3).forEach(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    // ignore
                }

                assertThat(cache.get("key")).isEqualTo("value");

                assertThat(clientStatistics.getNearCacheHits()).isEqualTo(i);
                assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheInvalidations()).isZero();
            });

この時、Near Cacheのキャッシュヒット数が上昇していきます(当たり前ですが)。

よって、キャッシュエントリーにアクセスしたかのように見えていますが、実際にはInfinispan Server上へのアクセスは行われていないので、
maxIdleで指定した時間が経過するとキャッシュエントリーは有効期限切れします。

            try {
                   TimeUnit.SECONDS.sleep(1L);
               } catch (InterruptedException e) {
                   // ignore
            }

            assertThat(cache.get("key")).isNull();

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(3L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(2L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(1L);

maxIdleを使用したキャッシュを使用してアクセス(RemoteCache#get)を行うと、以下のような警告ログが出力されます。

WARN: ISPN004087: Use of maxIdle expiration with a near cache is unsupported.

Near Cacheを構成しない場合との差を見てみましょう。

    @Test
    void maxIdleExpiration() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");

        /*
        builder.remoteCache("distCacheWithIdleExpiration")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);
        */

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCacheWithIdleExpiration");

            cache.put("key", "value");
            assertThat(cache.get("key")).isEqualTo("value");

            IntStream.rangeClosed(1, 3).forEach(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    // ignore
                }

                assertThat(cache.get("key")).isEqualTo("value");
            });

            try {
                   TimeUnit.SECONDS.sleep(1L);
               } catch (InterruptedException e) {
                   // ignore
            }

            assertThat(cache.get("key")).isEqualTo("value");

            try {
                   TimeUnit.SECONDS.sleep(3L);
               } catch (InterruptedException e) {
                   // ignore
            }

            assertThat(cache.get("key")).isNull();
        }
    }

この場合、Infinispan Serverへアクセスが行われることになり、有効期限が延長されるので3秒経過してもエントリーが残ったままに
なります。

            cache.put("key", "value");
            assertThat(cache.get("key")).isEqualTo("value");

            IntStream.rangeClosed(1, 3).forEach(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    // ignore
                }

                assertThat(cache.get("key")).isEqualTo("value");
            });

            try {
                   TimeUnit.SECONDS.sleep(1L);
               } catch (InterruptedException e) {
                   // ignore
            }

            assertThat(cache.get("key")).isEqualTo("value");

さらに3秒待つと、取得できなくなりますが。

            try {
                   TimeUnit.SECONDS.sleep(3L);
               } catch (InterruptedException e) {
                   // ignore
            }

            assertThat(cache.get("key")).isNull();

このように、Near Cacheを有効にするとInfinispan Serverへアクセスが行われなくなるので、maxIdleを指定した意図の動作には
ならなくなります。lifespanに近くなりますね。

キャッシュに保存するエントリーごとにmaxIdleを指定することもできますが、こちらも動作としては同じなります。

    @Test
    void nearCacheWithMaxIdleExpirationPerEntry() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");
        builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

        builder.remoteCache("distCacheWithIdleExpiration")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCacheWithIdleExpiration");
            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            cache.put("key", "value", -1L, TimeUnit.SECONDS, 5L, TimeUnit.SECONDS);

            assertThat(cache.get("key")).isEqualTo("value");

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 5).forEach(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    // ignore
                }

                assertThat(cache.get("key")).isEqualTo("value");

                assertThat(clientStatistics.getNearCacheHits()).isEqualTo(i);
                assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheInvalidations()).isZero();
            });

            try {
                TimeUnit.SECONDS.sleep(5L);
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(cache.get("key")).isNull();

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(5L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(2L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(1L);
        }
    }

Infinispan Serverに作成したキャッシュの定義よりも少し長めのmaxIdleを設定しましたが、動作としては同じです。

            cache.put("key", "value", -1L, TimeUnit.SECONDS, 5L, TimeUnit.SECONDS);
lifespanを有効にしたキャッシュとNear Cacheを組み合わせる

最後にlifespanを有効にしたキャッシュとNear Cacheを組み合わせてみましょう。

    @Test
    void nearCacheWithLifespanExpiration() {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServers("172.18.0.2:11222;172.18.0.3:11222;172.18.0.4:11222");
        builder.security().authentication().username("ispn-user").password("password");
        builder.connectionPool().maxActive(1).exhaustedAction(ExhaustedAction.WAIT);

        builder.remoteCache("distCacheWithLifespanExpiration")
                .nearCacheMode(NearCacheMode.INVALIDATED)
                .nearCacheMaxEntries(30)
                .nearCacheUseBloomFilter(true);

        Configuration configuration = builder.build();

        try (RemoteCacheManager cacheManager = new RemoteCacheManager(configuration)) {
            RemoteCache<String, String> cache = cacheManager.getCache("distCacheWithLifespanExpiration");
            RemoteCacheClientStatisticsMXBean clientStatistics = cache.clientStatistics();

            cache.put("key", "value");
            assertThat(cache.get("key")).isEqualTo("value");

            assertThat(clientStatistics.getNearCacheHits()).isZero();
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
            assertThat(clientStatistics.getNearCacheInvalidations()).isZero();

            IntStream.rangeClosed(1, 3).forEach(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    // ignore
                }

                assertThat(cache.get("key")).isEqualTo("value");

                assertThat(clientStatistics.getNearCacheHits()).isEqualTo(i);
                assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheSize()).isEqualTo(1L);
                assertThat(clientStatistics.getNearCacheInvalidations()).isZero();
            });

            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(cache.get("key")).isNull();

            assertThat(clientStatistics.getNearCacheHits()).isEqualTo(3L);
            assertThat(clientStatistics.getNearCacheMisses()).isEqualTo(2L);
            assertThat(clientStatistics.getNearCacheSize()).isZero();
            assertThat(clientStatistics.getNearCacheInvalidations()).isEqualTo(1L);
        }
    }

といっても、こちらはmaxIdleと異なりエントリーを作成した時間からの有効期限になるので、設定どおりの時間が経過すると
エントリーは有効期限切れします。

こんなところでしょうか。

実装を見る

では、少し実装を見ていきましょう。

Near Cacheを有効にすると、作成されるRemoteCacheの実装がInvalidatedNearRemoteCacheクラスになります(そうでない場合は
RemoteCacheImplクラスです)。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L570-L581

Near Cacheとしての保存先を作成しているのはこちらですが、

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L110-L112

中身を追うと、保存するエントリーの最大数を指定しているかどうかで実装が変わります。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/DefaultNearCacheFactory.java#L14-L19

最大数を設定しない場合(=無制限の場合)はjava.util.concurrent.ConcurrentHashMapです。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/ConcurrentMapNearCache.java

最大数を設定した場合は、CaffeineのCacheになります。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/BoundedConcurrentMapNearCache.java

Bloom Filterの設定有無は影響しません。

インターフェースとしてのNearCacheも存在します(ConcurrentMapNearCacheおよびBoundedConcurrentMapNearCacheは、
NearCacheインターフェースの実装です)。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCache.java

Near Cacheの利用ですが、InvalidatedNearRemoteCache#getAsyncの呼び出し時に先にNear Cacheの存在確認を行うことで
利用されます。
RemoteCache#getメソッドも、内部的にはgetAsyncを呼び出しています

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/InvalidatedNearRemoteCache.java#L61-L65

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/InvalidatedNearRemoteCache.java#L74-L112

実際のNear Cacheの操作を行うのは、NearCacheServiceです。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L155-L171

つまり、Near CacheについてはNearCacheServiceを見るとほぼわかることになります。

更新系(putreplaceremove)のメソッドを使った場合は、NearCacheService#removeが呼び出され、Near Cache上の
エントリーが削除されます。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/InvalidatedNearRemoteCache.java#L114-L179

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L137-L153

この時にInfinispan Server側のエントリーも更新(削除)されるわけですが、他のクライアントはClientListenerを使って通知を受け取り、
Near Cacheのエントリーを削除することになります。

      @ClientListener
   private static class InvalidatedNearCacheListener<K, V> {
      private static final Log log = LogFactory.getLog(InvalidatedNearCacheListener.class);
      private final NearCache<K, V> cache;

      private InvalidatedNearCacheListener(NearCache<K, V> cache) {
         this.cache = cache;
      }

      @ClientCacheEntryModified
      @SuppressWarnings("unused")
      public void handleModifiedEvent(ClientCacheEntryModifiedEvent<K> event) {
         invalidate(event.getKey());
      }

      @ClientCacheEntryRemoved
      @SuppressWarnings("unused")
      public void handleRemovedEvent(ClientCacheEntryRemovedEvent<K> event) {
         invalidate(event.getKey());
      }

      @ClientCacheEntryExpired
      @SuppressWarnings("unused")
      public void handleExpiredEvent(ClientCacheEntryExpiredEvent<K> event) {
         invalidate(event.getKey());
      }

      @ClientCacheFailover
      @SuppressWarnings("unused")
      public void handleFailover(ClientCacheFailoverEvent e) {
         if (log.isTraceEnabled()) log.trace("Clear near cache after fail-over of server");
         cache.clear();
      }

      private void invalidate(K key) {
         cache.remove(key);
      }
   }

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L218-L255

なお、ClientListenerを単純にRemoteCache#addClientListenerを呼び出して終わるかどうかは、Bloom Filterの適用有無で
変わるようです。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L61-L72

Near Cacheの実装については、こんなところでしょうか。

Bloom Filterについては、Infinispanが実装しているMurmurHash3BloomFilterが使われます。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/commons/all/src/main/java/org/infinispan/commons/util/BloomFilter.java

https://github.com/infinispan/infinispan/blob/14.0.7.Final/commons/all/src/main/java/org/infinispan/commons/util/MurmurHash3BloomFilter.java

Bloom Filterを適用すると、クライアント(ClientListener)にイベントを送信するかどうかの判定が変わるようですね。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/ClientListenerRegistry.java#L520-L525

https://github.com/infinispan/infinispan/blob/14.0.7.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/ClientListenerRegistry.java#L293-L316

Bloom Filterは「Near Cacheに保存する最大エントリー数 / 16 + 3」を閾値として、

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L68

これを超えるとBloom Filterを更新するようです。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/near/NearCacheService.java#L89-L93

Hot Rodのプロトコルとしても、更新用の処理(updateBloomFilter)が設けられています。

https://github.com/infinispan/infinispan/blob/14.0.7.Final/server/hotrod/src/main/resources/hotrod.gr#L337

https://github.com/infinispan/infinispan/blob/14.0.7.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/CacheRequestProcessor.java#L84-L105

Bloom Filterについてはちょっとあっさりですが、こんなところでしょうか。

まとめ

Infinispan Server+Hot Rod ClientでのNear Cacheを試してみました。

Near Cacheについてはあまりちゃんと見てきてなかったので、この機会に仕組みを知れて良かったかなと思います。
ローカルのキャッシュとClientListenerを使った仕組みなんですね。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-near-cache