CLOVER🍀

That was when it all began.

InfinispanのFunctional Map APIを試す - ReadWriteMap編 -

こちらのエントリの続きです。

InfinispanのFunctional Map APIを試す - CLOVER

前回はReadOnlyMapおよびWriteOnlyMapを扱いましたが、今回はReadWriteMapを扱います。

参考にするのは、以下のドキュメントやマニュアルなどです。

35.6. Read-Write Map API

Functional API Tutorial - Infinispan

Infinispan: New Functional Map API in Infinispan 8 - Introduction

Infinispan: Functional Map API: Working with single entries

Infinispan: Functional Map API: Working with multiple entries

それでは、進めていってみます。

追記
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。

InfinispanのFunctional Map APIを試す - CLOVER

InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER

InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER

InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER

準備

ビルドや設定など。

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>8.0.1.Final</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.2.0</version>
            <scope>test</scope>
        </dependency>

JUnitとAssertJは、テストコード用です。

Infinispanの設定は、このようにしています。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd"
        xmlns="urn:infinispan:config:8.0">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="cacheManager" shutdown-hook="REGISTER">
        <transport cluster="cluster" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <!--
        <local-cache name="distCache"/>
        -->
        <distributed-cache name="distCache"/>
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

テストコードの雛形

それでは、まずテストコードの雛形を用意します。

src/test/java/org/littlewings/infinispan/functionalmap/FunctionalMapTest.java

package org.littlewings.infinispan.functionalmap;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.commons.api.functional.EntryView;
import org.infinispan.commons.api.functional.FunctionalMap;
import org.infinispan.commons.api.functional.Traversable;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadWriteMapImpl;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class FunctionalMapTest {
    // ここに、テストを書く!

    protected <K, V> void withCache(String cacheName, Consumer<Cache<K, V>> consumer) {
        withCache(cacheName, 1, consumer);
    }

    protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());
        List<Cache<K, V>> caches =
                managers
                        .stream()
                        .map(m -> m.<K, V>getCache(cacheName))
                        .collect(Collectors.toList());

        try {
            consumer.accept(caches.get(0));
        } finally {
            caches.forEach(Cache::stop);
            managers.forEach(EmbeddedCacheManager::stop);
        }
    }
}

クラスタを簡易的に構成できる、ヘルパーメソッド付きです。こちらを使用して、ReadWriteMapを使うコードを書いていきます。

ReadWriteMapを使う

ReadWriteMapは、前回出てきたReadOnlyMapとWriteOnlyMapの性質を合わせたようなMapです。

ただし、同じメソッドを持っているわけではありません。
※ReadWriteMapは、ReadOnlyMapとWriteOnlyMapのインターフェースを実装している、というわけではないので

ReadWriteMapがあれば、ReadOnlyMapやWriteOnlyMapの両方の用途がすべて満たせるかというとそうではないので、定義されているメソッドなどをAPIリファレンスを見て確認するようにしましょう。

ReadWriteMapは、このようにReadOnlyMapおよびWriteOnlyMapと差がありますが、EntryView.ReadWriteEntryViewについてはEntryView.ReadEntryViewとEntryView.WriteEntryViewを実装したインターフェースとして定義されているため、こちらについては各EntryViewと同じ操作が可能です。

このあたりを踏まえて、使っていきましょう。

まずは、単純なRead/Writeの例を。

    @Test
    public void testReadWriteMap1() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

            CompletableFuture<Optional<Integer>> previousValueFuture =
                    readWriteMap
                            .eval("key1",
                                    10,
                                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                            (v, view) -> {
                                                Optional<Integer> previous = view.find();
                                                view.set(v);
                                                return previous;
                                            });

            assertThat(previousValueFuture.join())
                    .hasValue(1);
        });
    }

ReadWriteMapも、ReadOnlyMapやWriteOnlyMapと同様にFunctionalMapImplから作成します。

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

CompletableFutureを返すevalを持ち、EntryView.ReadWriteEntryViewではfindやsetができるので、こちらを使用してエントリの取得と更新をしてみます。

            CompletableFuture<Optional<Integer>> previousValueFuture =
                    readWriteMap
                            .eval("key1",
                                    10,
                                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                            (v, view) -> {
                                                Optional<Integer> previous = view.find();
                                                view.set(v);
                                                return previous;
                                            });

各所で求められるLambda式(というか関数型インターフェース)は、シリアライズ可能であることが求められるようです。Local Cacheだとシリアライズできなくてもいいみたいですが、Distributed Cacheだと軒並みシリアライズ可能である必要があったので、Serializableにしておくとよいでしょう。

ReadWriteMap#evalManyを使って、複数のエントリを取得する例。

    @Test
    public void testReadWriteMap2() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

            Set<String> keys = new HashSet<>(Arrays.asList("key1", "key5", "key10"));

            Traversable<Optional<Integer>> entries =
                    readWriteMap
                            .evalMany(keys,
                                    (Function<EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                            EntryView.ReadWriteEntryView::find);

            List<Integer> results =
                    entries
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .collect(Collectors.toList());

            assertThat(results)
                    .containsOnly(1, 5, 10);
        });

WriteOnlyMap#evalManyではCompletableFutureを返していましたが、ReadWriteMapではReadOnlyMapと同様にTraversableを返します。

ReadWriteMap#evalAllを使った、エントリの全削除の例。こちらも、CompletableFutureではなくTraversableが返ります。

    @Test
    public void testReadWriteMap3() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

            Traversable<Void> results =
                    readWriteMap
                            .evalAll((Function<EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                                    EntryView.ReadWriteEntryView::remove);
            results.forEach(r -> {
            });

            assertThat(cache.isEmpty())
                    .isTrue();
        });
    }

WriteOnlyMapには、これと同様の操作を簡易的にできるtruncateメソッドがありましたが、ReadWriteMapには存在しません。

ReadWriteMapは、ReadOnlyMapとWriteOnlyMapと同じことができるわけではないというのを、このあたりで確認した感じですね。

あとは、ドキュメントのサンプルに習って、Map#replaceライクな例を書いてみます。

    @Test
    public void testReadWriteMapLikeReplace1() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

            int oldValue = 10;

            CompletableFuture<Optional<Integer>> replaceFuture =
                    readWriteMap
                            .eval(
                                    "key10",
                                    100,
                                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                            (v, view) ->
                                                    view.find().map(
                                                            (Function<Integer, Integer> & Serializable)
                                                                    previous -> {
                                                                        if (previous == oldValue) {
                                                                            view.set(v);
                                                                            return previous;
                                                                        } else {
                                                                            return previous;
                                                                        }
                                                                    }));

            CompletableFuture<Optional<Integer>> result =
                    replaceFuture.thenCompose((Function<Optional<Integer>, CompletableFuture<Optional<Integer>>> & Serializable)
                            p -> readWriteMap
                                    .eval("key10",
                                            (Function<EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                                    EntryView.ReadWriteEntryView::find));

            assertThat(result.join())
                    .hasValue(100);
        });
    }

Serializableを使った交差型キャストが連発されるのが、ちょっと戸惑うところですね…。

こちらは、値が異なるので書き換え不可な例。ちょっとムリヤリ感がありますが。

    @Test
    public void testReadWriteMapLikeReplace2() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

            int oldValue = 50;

            CompletableFuture<Optional<Integer>> replaceFuture =
                    readWriteMap.eval(
                            "key10",
                            100,
                            (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                    (v, view) ->
                                            view.find().map((Function<Integer, Integer> & Serializable)
                                                    previous -> {
                                                        if (previous == oldValue) {
                                                            view.set(v);
                                                            return previous;
                                                        } else {
                                                            return previous;
                                                        }
                                                    }));

            CompletableFuture<Optional<Integer>> result =
                    replaceFuture.thenCompose((Function<Optional<Integer>, CompletableFuture<Optional<Integer>>> & Serializable)
                            p -> readWriteMap
                                    .eval("key10",
                                            (Function<EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable)
                                                    EntryView.ReadWriteEntryView::find));

            assertThat(result.join())
                    .hasValue(10);
        });
    }

まとめ

前回のReadOnlyMapとWriteOnlyMapから、その両方の性格を併せ持ったようなインターフェース、ReadWriteMapを使ってみました。

使う前までは、ReadOnlyMapとWriteOnlyMapのインターフェースを両方とも実装しているのかな?と思ったのですが、そんなことはなくAPIドキュメントをよくよく読むことになりました。

また、CompletableFutureや交差型キャストを覚えることにもなるので、ホントに勉強になります…。

そのうち、また次のテーマも書くとしましょう。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-readwritemap