CLOVER🍀

That was when it all began.

InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 -

こちらのエントリの続きです。

[http://d.hatena.ne.jp/Kazuhira/20151012/1444657975:title:

今回は、ドキュメントに書かれているMetadata ParameterとInvocation Parameterを扱ってみます。

36.7. Metadata Parameter Handling

36.8. Invocation Parameter

Metadata Parameterというのは、Cacheに保存するエントリを書き込む際にTTLやアイドル時間によるタイムアウトなどの設定を行うことができたり、Cacheから保存したTTLなどの情報を取得できるものです。Invocation Parameterというのは、ReadOnlyMapやWriteOnlyMapのFutureやPersistenceの挙動を変えることができるものです。

それぞれ、順次見ていってみましょう。

追記
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。

InfinispanのFunctional Map APIを試す - CLOVER

InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER

InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER

InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER

準備

ビルドと設定の準備。

Maven依存関係。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>8.0.1.Final</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.2.0</version>
            <scope>test</scope>
        </dependency>

JUnitとAssertJは、テスト用です。

Infinispanの設定は、こんな感じで。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd"
        xmlns="urn:infinispan:config:8.0">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="cacheManager" shutdown-hook="REGISTER">
        <transport cluster="cluster" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <!--
        <local-cache name="distCache"/>
        -->
        <distributed-cache name="distCache"/>
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

テストコードの雛形

テストを実行するための、雛形コードを作成します。

クラスタを簡易的に起動/停止できるようにするため、このようなメソッド付きのクラスを用意。
src/test/java/org/littlewings/infinispan/functionalmap/MetadataInvocationParameterTest.java

package org.littlewings.infinispan.functionalmap;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.commons.api.functional.EntryView;
import org.infinispan.commons.api.functional.FunctionalMap;
import org.infinispan.commons.api.functional.MetaParam;
import org.infinispan.commons.api.functional.Param;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadOnlyMapImpl;
import org.infinispan.functional.impl.WriteOnlyMapImpl;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.fail;

public class MetadataInvocationParameterTest {
    // ここに、テストを書く!

    protected <K, V> void withCache(String cacheName, Consumer<Cache<K, V>> consumer) {
        withCache(cacheName, 1, consumer);
    }

    protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(manager -> manager.getCache(cacheName));

        try {
            consumer.accept(managers.get(0).getCache(cacheName));
        } finally {
            managers.forEach(EmbeddedCacheManager::stop);
        }
    }
}

このクラスにテストを足して、Metadata ParameterおよびInvocation Parameterを使って挙動を確認していきます。

Metadata Parameter

まずは、Metadata Parameterから。

Metadataとして扱われるクラスは、全部で以下の5つあります。

  • MetaParam.MetaCreated
  • MetaParam.MetaEntryVersion
  • MetaParam.MetaLastUsed
  • MetaParam.MetaLifespan
  • MetaParam.MetaMaxIdle

名前で、なんとなく意味はわかりますね。

では、使ってみましょう。ここでは、MetaLifespanで試してみます。

    @Test
    public void testSetMetadata() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

            CompletableFuture<Void> writeFuture =
                    writeOnlyMap
                            .eval("key20",
                                    100,
                                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                                            (v, view) -> view.set(v, new MetaParam.MetaLifespan(Duration.ofSeconds(3).toMillis())));
            CompletableFuture<Optional<Integer>> readFuture =
                    writeFuture
                            .thenCompose(v -> readOnlyMap.eval("key20", EntryView.ReadEntryView::find));

            assertThat(readFuture.join())
                    .hasValue(100);

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(readOnlyMap.eval("key20", EntryView.ReadEntryView::find).join())
                    .isEmpty();
        });
    }

エントリの保存時に、MetaLifespanで3秒になるように設定。

            CompletableFuture<Void> writeFuture =
                    writeOnlyMap
                            .eval("key20",
                                    100,
                                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                                            (v, view) -> view.set(v, new MetaParam.MetaLifespan(Duration.ofSeconds(3).toMillis())));

EntryView.WriteEntryView#setに可変長引数として、Metadataを保存できます。

この場合、3秒後にTTL切れとなるため、エントリがなくなります。

            assertThat(readFuture.join())
                    .hasValue(100);

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(readOnlyMap.eval("key20", EntryView.ReadEntryView::find).join())
                    .isEmpty();

ただ、新規エントリを保存する時はMetadataを保存できるのですが、既存のエントリに設定した場合は、効果がありませんでした…。そのうち、挙動が変わらないようでしたら調べてみようかな…。

MetaMaxIdleの場合。コード自体は、MetaLifespanとクラスが変わっただけで同等ですね。

    @Test
    public void testMetadataMaxIdle() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

            CompletableFuture<Void> writeFuture =
                    writeOnlyMap
                            .eval("key20",
                                    100,
                                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                                            (v, view) -> view.set(v, new MetaParam.MetaMaxIdle(Duration.ofSeconds(3).toMillis())));
            CompletableFuture<Optional<Integer>> readFuture =
                    writeFuture
                            .thenCompose(v -> readOnlyMap.eval("key20", EntryView.ReadEntryView::find));

            assertThat(readFuture.join())
                    .hasValue(100);

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                // ignore
            }

            assertThat(readOnlyMap.eval("key20", EntryView.ReadEntryView::find).join())
                    .isEmpty();
        });
    }

今度は、Metadataを取得する場合。

    @Test
    public void testFindMetadata() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

            writeOnlyMap.eval("key20",
                    20,
                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                            (v, view) -> view.set(v,
                                    new MetaParam.MetaMaxIdle(Duration.ofSeconds(3).toMillis()),
                                    new MetaParam.MetaLifespan(Duration.ofSeconds(5).toMillis()))).join();

            CompletableFuture<Optional<MetaParam.MetaEntryVersion>> readFuture1 =
                    readOnlyMap.eval("key20",
                            view -> view.findMetaParam(MetaParam.MetaEntryVersion.class));

            assertThat(readFuture1.join())
                    .isEmpty();

            CompletableFuture<Optional<MetaParam.MetaEntryVersion<Long>>> readFuture2 =
                    readOnlyMap.eval("key20", view -> view.findMetaParam(MetaParam.MetaEntryVersion.type()));

            assertThat(readFuture2.join())
                    .isEmpty();

            CompletableFuture<Optional<MetaParam.MetaMaxIdle>> readFuture3 =
                    readOnlyMap.eval("key20", view -> view.findMetaParam(MetaParam.MetaMaxIdle.class));

            assertThat(readFuture3.join())
                    .hasValue(new MetaParam.MetaMaxIdle(Duration.ofSeconds(3).toMillis()));

            CompletableFuture<Optional<MetaParam.MetaLifespan>> readFuture4 =
                    readOnlyMap.eval("key20", view -> view.findMetaParam(MetaParam.MetaLifespan.class));

            assertThat(readFuture4.join())
                    .hasValue(new MetaParam.MetaLifespan(Duration.ofSeconds(5).toMillis()));
        });
    }

ReadEntryView#findMetaParamにClassクラスを渡すことで、指定のMetadataについての情報を取得することができます。

ドキュメントには、MetaEntryVersionの例が書いていたのですが、どうやったらVersionが付与されるのか、ちょっとわかりませんでした…。

MetaMaxIdleとMetaLifespanについては、write時に保存したものを

            writeOnlyMap.eval("key20",
                    20,
                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                            (v, view) -> view.set(v,
                                    new MetaParam.MetaMaxIdle(Duration.ofSeconds(3).toMillis()),
                                    new MetaParam.MetaLifespan(Duration.ofSeconds(5).toMillis()))).join();

無事、取得できましたと。

            CompletableFuture<Optional<MetaParam.MetaMaxIdle>> readFuture3 =
                    readOnlyMap.eval("key20", view -> view.findMetaParam(MetaParam.MetaMaxIdle.class));

            assertThat(readFuture3.join())
                    .hasValue(new MetaParam.MetaMaxIdle(Duration.ofSeconds(3).toMillis()));

            CompletableFuture<Optional<MetaParam.MetaLifespan>> readFuture4 =
                    readOnlyMap.eval("key20", view -> view.findMetaParam(MetaParam.MetaLifespan.class));

            assertThat(readFuture4.join())
                    .hasValue(new MetaParam.MetaLifespan(Duration.ofSeconds(5).toMillis()));

Invocation Parameter

Invocation Parameterの場合、以下の2つの挙動を変更できます。

  • Param.FutureMode
  • Param.PersistenceMode

Param.FutureModeの場合、ReadOnlyMapの挙動を非同期にするのか(ASYNC)、同期にするのか(COMPLETED)設定することができます。基本的には非同期を使うのでしょうが、同期にした場合はCompletableFutureを返すものの、ブロックするようになります。

Param.PersistenceModeの場合は、CacheのバックエンドにあるPersistenceStore(ファイルやJDBCStoreなど)への書き込みオペレーションを反映するか(PERSIST)、PersistenceStoreへの保存をスキップする(SKIP)か設定できます。スキップした場合は、PersistenceStoreをバイパスすることにより性能は上がるかもですが、メモリとPersistenceStoreの状態がずれてしまうのでご注意を、って感じでしょうか。

まずはParam.FutureModeから使ってみます。

    @Test
    public void testInvocationParameterFuture() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            try {
                assertThat(readOnlyMap
                        .withParams(Param.FutureMode.COMPLETED)
                        .eval("key10", view -> {
                            try {
                                TimeUnit.SECONDS.sleep(5);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                            return view.find();
                        })
                        .get(1, TimeUnit.SECONDS))
                        .hasValue(10);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                fail();
            }

            assertThatThrownBy(() ->
                    readOnlyMap
                            .withParams(Param.FutureMode.ASYNC)
                            .eval("key10", view -> {
                                try {
                                    TimeUnit.SECONDS.sleep(5);
                                    System.out.println("sleeped!");
                                } catch (InterruptedException e) {
                                    // ignore
                                }
                                return view.find();
                            })
                            .get(1, TimeUnit.SECONDS))
                    .isInstanceOf(TimeoutException.class);

            assertThatThrownBy(() ->
                    readOnlyMap
                            .eval("key10", view -> {
                                try {
                                    TimeUnit.SECONDS.sleep(5);
                                    System.out.println("sleeped!");
                                } catch (InterruptedException e) {
                                    // ignore
                                }
                                return view.find();
                            })
                            .get(1, TimeUnit.SECONDS))
                    .isInstanceOf(TimeoutException.class);
        });
    }

ReadOnlyMap#withParamsで指定すれば、OKです。

readOnlyMap.withParams(Param.FutureMode.COMPLETED)

FutureModeをCOMPLETEDにした場合は、ブロックする挙動になるので、このように途中でスリープを入れつつ、CompletableFuture#getで短い時間に絞ってもタイムアウトしません。

            try {
                assertThat(readOnlyMap
                        .withParams(Param.FutureMode.COMPLETED)
                        .eval("key10", view -> {
                            try {
                                TimeUnit.SECONDS.sleep(5);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                            return view.find();
                        })
                        .get(1, TimeUnit.SECONDS))
                        .hasValue(10);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                fail();
            }

ASYNCの場合は、タイムアウトしてしまいます。

            assertThatThrownBy(() ->
                    readOnlyMap
                            .withParams(Param.FutureMode.ASYNC)
                            .eval("key10", view -> {
                                try {
                                    TimeUnit.SECONDS.sleep(5);
                                    System.out.println("sleeped!");
                                } catch (InterruptedException e) {
                                    // ignore
                                }
                                return view.find();
                            })
                            .get(1, TimeUnit.SECONDS))
                    .isInstanceOf(TimeoutException.class);

デフォルトは、ASYNCみたいですね。

            assertThatThrownBy(() ->
                    readOnlyMap
                            .eval("key10", view -> {
                                try {
                                    TimeUnit.SECONDS.sleep(5);
                                    System.out.println("sleeped!");
                                } catch (InterruptedException e) {
                                    // ignore
                                }
                                return view.find();
                            })
                            .get(1, TimeUnit.SECONDS))
                    .isInstanceOf(TimeoutException.class);

PersistenceModeを使用する場合。

    @Test
    public void testInvocationParameterPersistence() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

            writeOnlyMap
                    .withParams(Param.PersistenceMode.SKIP)
                    .eval("key20",
                            20,
                            (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                                    (v, view) -> view.set(v)).join();

            writeOnlyMap
                    .withParams(Param.PersistenceMode.PERSIST)
                    .eval("key30",
                            30,
                            (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                                    (v, view) -> view.set(v)).join();

            assertThat(cache.get("key20"))
                    .isEqualTo(20);

            assertThat(cache.get("key30"))
                    .isEqualTo(30);

        });
    }

この場合、WriteOnlyMap#withParamsで指定します。

            writeOnlyMap
                    .withParams(Param.PersistenceMode.SKIP)

PersistenceStoreまでの構成は、今回はやりませんでした…。

また、今回はReadOnlyMapおよびWriteOnlyMapで試しましたが、ReadWriteMap#withParamsでも両方(FutureMode/PersistenceMode)使えそうな感じです。

このテーマは、ここまでで。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-map-metadata-parameter