CLOVER🍀

That was when it all began.

InfinispanのFunctional Map APIを試す

Infinispan 8.0.0.Finalから、Functional Map APIというものが実験的に追加されました。

35. Functional Map API

Functional API Tutorial - Infinispan

Infinispan: New Functional Map API in Infinispan 8 - Introduction

Infinispan: Functional Map API: Working with single entries

Infinispan: Functional Map API: Working with multiple entries

ドキュメントによると、こんな特徴らしいです。

  • 非同期。すべてのメソッドの結果が単一の結果、またはCompletableFutureを返す。返却されたCompletableFutureを合成することもできる
  • 複数の結果を返却する際には、Traversableを使用する。Traversableは、複数の結果を遅延して取得する(pull-style API
  • 3つのインタフェースが提供され、それぞれ読み取り専用(ReadOnlyMap)、書き込み専用(WriteOnlyMap)、読み書き可能(ReadWriteMap)がある

ドキュメントやサンプルを見ると、CompletableFutureやStream(に似た)API、そしてLambda式がバシバシ出てきていて、Java 8の能力をふんだんに使った機能という感じがしますね。

CompletableFutureは実は初めて使うのですが、チャレンジしてみましょう。

追記
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。

InfinispanのFunctional Map APIを試す - CLOVER

InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER

InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER

InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER

準備

Maven依存関係は、以下のように設定。

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>8.0.1.Final</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.2.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

JUnitとAssertJは、テストコード用です。

Infinispanの設定は、以下のようにしています。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd"
        xmlns="urn:infinispan:config:8.0">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="cacheManager" shutdown-hook="REGISTER">
        <transport cluster="cluster" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <!--
        <local-cache name="distCache"/>
        -->
        <distributed-cache name="distCache"/>
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

Cacheは、Distributed Cacheとします。最初はLocal Cacheでやっていたのですが、あとでDistributed Cacheにしてみたらシリアライズでエラーになったので全部その対応を入れました…。

雛形コードの用意。

では、Functional Map APIを使ってみます。けっこうボリュームが大きくなりそうなので、エントリ自体を分けますが、続きはいつ書けることやら…。

今回は、ReadOnlyMapとWriteOnlyMapを対象に進めたいと思います。

動かすための雛形コードとして、以下のようなクラスを用意。
src/test/java/org/littlewings/infinispan/functionalmap/FunctionalMapTest.java

package org.littlewings.infinispan.functionalmap;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.assertj.core.data.MapEntry;
import org.infinispan.Cache;
import org.infinispan.commons.api.functional.EntryView;
import org.infinispan.commons.api.functional.FunctionalMap;
import org.infinispan.commons.api.functional.Traversable;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadOnlyMapImpl;
import org.infinispan.functional.impl.WriteOnlyMapImpl;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.stream.CacheCollectors;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.infinispan.factories.KnownComponentNames.ASYNC_OPERATIONS_EXECUTOR;

public class FunctionalMapTest {
    // ここに、テストを書く!!

    protected <K, V> void withCache(String cacheName, Consumer<Cache<K, V>> consumer) {
        withCache(cacheName, 1, consumer);
    }

    protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());
        List<Cache<K, V>> caches =
                managers
                        .stream()
                        .map(m -> m.<K, V>getCache(cacheName))
                        .collect(Collectors.toList());

        try {
            consumer.accept(caches.get(0));
        } finally {
            caches.forEach(Cache::stop);
            managers.forEach(EmbeddedCacheManager::stop);
        }
    }
}

Infinispanの起動/停止を行うための、簡易メソッド付き。クラスタも組みます。

Functional Mapを取得する

まずFunctional Mapを使うためには、以下のようなコードを書きます。

ReadOnlyMapの場合。

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

WriteOnlyMapの場合。

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

〜Implが連発される、確かに実験的なAPI感が満載な気がしないでもないですが、今時点ではこちらなので、このまま進めます。

ReadOnlyMapを使う

最初は、ReadOnlyMapからいきます。

    @Test
    public void testSimpleReadOnlyMap1() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            CompletableFuture<Optional<Integer>> readFuture1 =
                    readOnlyMap.eval("key10", EntryView.ReadEntryView::find);

            assertThat(readFuture1.join())
                    .hasValue(10);

            //////////////////////////////////////////////////////////////////////

            CompletableFuture<Optional<Integer>> readFuture2 =
                    readOnlyMap.eval("key20", EntryView.ReadEntryView::find);

            assertThat(readFuture2.join())
                    .isEmpty();
        });
    }

ReadOnlyMap#evalで、キーとEntryView.ReadEntryViewを引数に取るConsumerを指定します。この中で、EntryView.ReadEntryView#findしていますが、こちらで指定のキーに対するエントリを取得します。

            CompletableFuture<Optional<Integer>> readFuture1 =
                    readOnlyMap.eval("key10", EntryView.ReadEntryView::find);

ReadOnlyMapでは、このEntryView.ReadEntryViewを使ってコードを書いていきます。

結果はCompletableFutureとなります。

            assertThat(readFuture1.join())
                    .hasValue(10);

なお、EntryView.ReadEntryView#findの結果はOptionalで、キーに対応するエントリがなかった場合にはEmptyとなります。

            CompletableFuture<Optional<Integer>> readFuture2 =
                    readOnlyMap.eval("key20", EntryView.ReadEntryView::find);

            assertThat(readFuture2.join())
                    .isEmpty();

EntryView.ReadEntryViewには、getというメソッドもあります。

    @Test
    public void testSimpleReadOnlyMap2() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            CompletableFuture<Integer> readFuture1 =
                    readOnlyMap.eval("key10", EntryView.ReadEntryView::get);

            assertThat(readFuture1.join())
                    .isEqualTo(10);

            //////////////////////////////////////////////////////////////////////

            CompletableFuture<Integer> readFuture2 =
                    readOnlyMap.eval("key20", EntryView.ReadEntryView::get);

            assertThatThrownBy(() -> readFuture2.join())
                    .isInstanceOf(CompletionException.class)
                    .hasCauseInstanceOf(NoSuchElementException.class)
                    .hasMessage("java.util.NoSuchElementException: No value");
        });
    }

EntryView.ReadEntryView#getを使った場合、結果の方はOptionalではなくエントリの型そのものになりますが、

            CompletableFuture<Integer> readFuture1 =
                    readOnlyMap.eval("key10", EntryView.ReadEntryView::get);

キーに対応するエントリが存在しない場合にはNoSuchElementExceptionがスローされます。

            assertThatThrownBy(() -> readFuture2.join())
                    .isInstanceOf(CompletionException.class)
                    .hasCauseInstanceOf(NoSuchElementException.class)
                    .hasMessage("java.util.NoSuchElementException: No value");

EntryView.ReadEntryView#keyを使用した場合には、呼び出しに使用したキーそのものが返却されます。

    @Test
    public void testSimpleReadOnlyMap3() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            CompletableFuture<String> readFuture1 =
                    readOnlyMap.eval("key10", EntryView.ReadEntryView::key);

            assertThat(readFuture1.join())
                    .isEqualTo("key10");

            //////////////////////////////////////////////////////////////////////

            CompletableFuture<String> readFuture2 =
                    readOnlyMap.eval("key20", EntryView.ReadEntryView::key);

            assertThat(readFuture2.join())
                    .isEqualTo("key20");
        });
    }

キーに対応するエントリが存在するかどうかは、あまり関係がないみたいです…。

複数のエントリを一気に取得する場合は、ReadOnlyMap#evalManyを使います。

    @Test
    public void testSimpleReadOnlyMap4() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            Set<String> keys1 = new HashSet<>(Arrays.asList("key1", "key2", "key10"));

            Traversable<Integer> entries1 =
                    readOnlyMap.evalMany(keys1, EntryView.ReadEntryView::get);

            List<Integer> values1 = entries1.collect(Collectors.toList());

            assertThat(values1)
                    .containsOnly(1, 2, 10);

            //////////////////////////////////////////////////////////////////////

            Set<String> keys2 = new HashSet<>(Arrays.asList("key1", "key2", "key20"));

            Traversable<Optional<Integer>> entries2 =
                    readOnlyMap.evalMany(keys2, EntryView.ReadEntryView::find);

            List<Integer> values2 =
                    entries2
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .collect(Collectors.toList());

            assertThat(values2)
                    .containsOnly(1, 2);

            //////////////////////////////////////////////////////////////////////

            CompletableFuture<Optional<Integer>> readFuture1 =
                    readOnlyMap.eval("key1", EntryView.ReadEntryView::find);
            CompletableFuture<Optional<Integer>> readFuture2 =
                    readOnlyMap.eval("key2", EntryView.ReadEntryView::find);

            CompletableFuture<Integer> resultFuture =
                    readFuture1.thenCombineAsync(readFuture2, (optionV1, optionV2) -> optionV1.orElse(0) + optionV2.orElse(0));

            assertThat(resultFuture.join())
                    .isEqualTo(3);
        });
    }

TraversableはCollectionやStreamではありませんが、mapやfilterなどを似たように使うことができます。

指定のキーに対するエントリを一気に取ってくるばあいは、こちら。

            Set<String> keys1 = new HashSet<>(Arrays.asList("key1", "key2", "key10"));

            Traversable<Integer> entries1 =
                    readOnlyMap.evalMany(keys1, EntryView.ReadEntryView::get);

            List<Integer> values1 = entries1.collect(Collectors.toList());

            assertThat(values1)
                    .containsOnly(1, 2, 10);

ところが、EntryView.ReadEntryView#getだとキーに対応するエントリがなかった場合は例外になるので、そういう場合はfindを使います。

            Set<String> keys2 = new HashSet<>(Arrays.asList("key1", "key2", "key20"));

            Traversable<Optional<Integer>> entries2 =
                    readOnlyMap.evalMany(keys2, EntryView.ReadEntryView::find);

            List<Integer> values2 =
                    entries2
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .collect(Collectors.toList());

            assertThat(values2)
                    .containsOnly(1, 2);

evalManyを使用せずとも、複数回evalを呼んで、その結果を合成するという考え方もあるようですが。

            CompletableFuture<Optional<Integer>> readFuture1 =
                    readOnlyMap.eval("key1", EntryView.ReadEntryView::find);
            CompletableFuture<Optional<Integer>> readFuture2 =
                    readOnlyMap.eval("key2", EntryView.ReadEntryView::find);

            CompletableFuture<Integer> resultFuture =
                    readFuture1.thenCombineAsync(readFuture2, (optionV1, optionV2) -> optionV1.orElse(0) + optionV2.orElse(0));

            assertThat(resultFuture.join())
                    .isEqualTo(3);

その他、ReadOnlyMapにはkeysとentriesがあり、キーおよびエントリの全体を一括で取得することができます。

    @Test
    public void testSimpleReadOnlyMap5() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            Traversable<String> keys = readOnlyMap.keys();
            Traversable<EntryView.ReadEntryView<String, Integer>> entries = readOnlyMap.entries();

            List<String> keysAsList =
                    keys
                            .collect(CacheCollectors.serializableCollector(() -> Collectors.toList()));

            Map<String, Integer> entriesAsMap =
                    entries
                            .collect(Collectors.toMap(EntryView.ReadEntryView::key, EntryView.ReadEntryView::get));

            assertThat(keysAsList)
                    .containsOnly("key1", "key2", "key3", "key4", "key5",
                            "key6", "key7", "key8", "key9", "key10");

            assertThat(entriesAsMap)
                    .containsOnly(MapEntry.entry("key1", 1),
                            MapEntry.entry("key2", 2),
                            MapEntry.entry("key3", 3),
                            MapEntry.entry("key4", 4),
                            MapEntry.entry("key5", 5),
                            MapEntry.entry("key6", 6),
                            MapEntry.entry("key7", 7),
                            MapEntry.entry("key8", 8),
                            MapEntry.entry("key9", 9),
                            MapEntry.entry("key10", 10));
        });
    }

結果は、Traversableです。

なお、keysメソッドの結果で使うCollectorは、なぜかSerializableである必要があったり…。

            List<String> keysAsList =
                    keys
                            .collect(CacheCollectors.serializableCollector(() -> Collectors.toList()));

Distributed Cacheでは、分散実行されるっぽい?

entriesでは、そんなことないのに?

とりあえず、ReadOnlyMapはこんなところです。

WriteOnlyMap

続いて、WriteOnlyMapです。

    @Test
    public void testSimpleWriteOnlyMap1() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            CompletableFuture<Void> writeFuture1 =
                    writeOnlyMap.eval("key1",
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(10));

            CompletableFuture<Optional<Integer>> readFuture1 =
                    writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find));

            assertThat(readFuture1.join())
                    .hasValue(10);

            //////////////////////////////////////////////////////////////////////

            CompletableFuture<Void> writeFuture2 =
                    writeOnlyMap.eval("key1", 20,
                            (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v));

            CompletableFuture<Optional<Integer>> readFuture2 =
                    writeFuture2.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find));

            assertThat(readFuture2.join())
                    .hasValue(20);

            //////////////////////////////////////////////////////////////////////

            CompletableFuture<Void> writeFuture3 =
                    writeOnlyMap.eval("key20",
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(20));

            CompletableFuture<Optional<Integer>> readFuture3 =
                    writeFuture3.thenCompose(r -> readOnlyMap.eval("key20", EntryView.ReadEntryView::find));

            assertThat(readFuture3.join())
                    .hasValue(20);
        });
    }

どうもこちらは、WriteOnlyMapに渡すLambda式はSerializableである必要があるようです。
※少なくとも、Distributed Cacheの場合は。Local CacheではSerializableでなくてもOKでした

            CompletableFuture<Void> writeFuture1 =
                    writeOnlyMap.eval("key1",
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(10));

            CompletableFuture<Optional<Integer>> readFuture1 =
                    writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find));

            assertThat(readFuture1.join())
                    .hasValue(10);

EntryView.WriteEntryView#setで、エントリの値を設定することができます。

            CompletableFuture<Void> writeFuture1 =
                    writeOnlyMap.eval("key1",
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(10));

また、evalにキーと値を指定することで、変更時の値も指定することができます。

            CompletableFuture<Void> writeFuture2 =
                    writeOnlyMap.eval("key1", 20,
                            (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v));

存在しないキーを指定した場合は、単純に追加ですね。

            CompletableFuture<Void> writeFuture3 =
                    writeOnlyMap.eval("key20",
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(20));

複数のエントリを一気に指定する場合は、WriteOnlyMap#evalManyを使用します。

    @Test
    public void testSimpleWriteOnlyMap2() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            Set<String> keys = new HashSet<>(Arrays.asList("key1", "key2", "key10"));

            CompletableFuture<Void> writeFuture1 =
                    writeOnlyMap.evalMany(keys,
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(100));

            CompletableFuture<Traversable<Optional<Integer>>> readFuture1 =
                    writeFuture1.thenCompose(r ->
                            CompletableFuture.supplyAsync(() ->
                                    readOnlyMap.<Optional<Integer>>evalMany(keys, EntryView.ReadEntryView::find)));
            List<Integer> entries1 =
                    readFuture1
                            .join()
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .collect(Collectors.toList());

            assertThat(entries1)
                    .containsOnly(100, 100, 100);

            //////////////////////////////////////////////////////////////////////

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            ExecutorService executorService =
                    cache
                            .getAdvancedCache()
                            .getComponentRegistry()
                            .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);

            Map<String, Integer> inputEntries = new HashMap<>();
            inputEntries.put("key1", 500);
            inputEntries.put("key20", 1000);

            CompletableFuture<Void> writeFuture2 =
                    writeOnlyMap.evalMany(inputEntries,
                            (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v));

            Set<String> targetKeys = new HashSet<>(Arrays.asList("key1", "key2", "key10", "key20"));

            CompletableFuture<Traversable<Optional<Integer>>> readFuture2 =
                    writeFuture2.thenCompose(r ->
                            CompletableFuture.supplyAsync(() ->
                                            readOnlyMap.<Optional<Integer>>evalMany(targetKeys, EntryView.ReadEntryView::find),
                                    executorService));

            List<Integer> entries2 =
                    readFuture2
                            .join()
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .collect(Collectors.toList());

            assertThat(entries2)
                    .containsOnly(500, 2, 10, 1000);
        });
    }

WriteOnlyMap#evalManyには、キーのSet、もしくはキーと値を含んだMapを渡すことができ、Lambda式の部分はキー単位で呼び出されるので、あとはevalと同じように操作することができます。

削除を行う場合は、EntryView.WriteEntryView#removeを使用します。

    @Test
    public void testSimpleWriteOnlyMap3() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
            FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

            CompletableFuture<Void> removeFuture =
                    writeOnlyMap.eval("key1",
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) EntryView.WriteEntryView::remove);
            CompletableFuture<Optional<Integer>> readFuture1 =
                    removeFuture.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find));

            assertThat(readFuture1.join())
                    .isEmpty();

            //////////////////////////////////////////////////////////////////////

            ExecutorService executorService =
                    cache
                            .getAdvancedCache()
                            .getComponentRegistry()
                            .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);

            Set<String> targetKeys = new HashSet<>(Arrays.asList("key2", "key10", "key20"));

            CompletableFuture<Void> removesFuture =
                    writeOnlyMap.evalMany(targetKeys,
                            (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) EntryView.WriteEntryView::remove);
            CompletableFuture<Traversable<Optional<Integer>>> readFuture2 =
                    removesFuture.thenCompose(r ->
                            CompletableFuture.supplyAsync(() -> readOnlyMap.evalMany(targetKeys, EntryView.ReadEntryView::find),
                                    executorService));

            boolean isEmpty =
                    readFuture2
                            .join()
                            .allMatch(optional -> !optional.isPresent());

            assertThat(isEmpty)
                    .isTrue();
        });
    }

evalでもevalManyでも、どちらでもOKです。

直接Functional Mapには関係ありませんが、ReadOnlyMap#evalManyの結果をCompletableFutureにしたくて

            CompletableFuture<Traversable<Optional<Integer>>> readFuture2 =
                    removesFuture.thenCompose(r ->
                            CompletableFuture.supplyAsync(() -> readOnlyMap.evalMany(targetKeys, EntryView.ReadEntryView::find),
                                    executorService));

この時のExecutorServiceにInfinispanのExecutorServiceを使用しました。

            ExecutorService executorService =
                    cache
                            .getAdvancedCache()
                            .getComponentRegistry()
                            .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);

Infinispan内のCompletableFutureで使用しているものと、合わせた方がいいかなぁと思いまして。

https://github.com/infinispan/infinispan/blob/8.0.1.Final/core/src/main/java/org/infinispan/functional/impl/FunctionalMapImpl.java#L54

エントリの全件削除をする場合は、WriteOnlyMap#evalAllからEntryView.WriteEntryView#remove、もしくはWriteOnlyMap#truncateを使用します。

    @Test
    public void testSimpleWriteOnlyMap4() {
        withCache("distCache", 3, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

            ExecutorService executorService =
                    cache
                            .getAdvancedCache()
                            .getComponentRegistry()
                            .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);

            CompletableFuture<Void> allRemoveFuture =
                    writeOnlyMap.evalAll((Consumer<EntryView.WriteEntryView<Integer>> & Serializable) EntryView.WriteEntryView::remove);
            CompletableFuture<Boolean> readFuture1 =
                    allRemoveFuture.thenCompose(r -> CompletableFuture.supplyAsync(() -> cache.isEmpty(),
                            executorService));

            assertThat(readFuture1.join())
                    .isTrue();

            //////////////////////////////////////////////////////////////////////

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            CompletableFuture<Void> truncateFuture = writeOnlyMap.truncate();
            CompletableFuture<Boolean> readFuture2 =
                    truncateFuture.thenCompose(r -> CompletableFuture.supplyAsync(() -> cache.isEmpty(),
                            executorService));

            assertThat(readFuture2.join())
                    .isTrue();
        });
    }

Listener等が起動しないらしいですが、速度重視ならtruncateらしいです。

まとめ

とりあえず、Functional Mapの初歩を試してみました。CompletableFutureも初めて使ったので、いろいろ迷いましたが、Futureの合成感があって面白かったです。

Functional Mapはまだテーマが半分くらい残っているので、時間を見つけて続きを書こうと思います。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-map-api

オマケ

ReadOnlyMapのTraversable⇒Listへの変換で悩んでいる時に、うらがみさん(@backpaper0)ときつねさん(@bitter_fox)からいろいろアドバイスをいただきました。

これの、filter⇒map⇒collectしている部分ですね。

            Set<String> keys2 = new HashSet<>(Arrays.asList("key1", "key2", "key20"));

            Traversable<Optional<Integer>> entries2 =
                    readOnlyMap.evalMany(keys2, EntryView.ReadEntryView::find);

            List<Integer> values2 =
                    entries2
                            .filter(Optional::isPresent)
                            .map(Optional::get)
                            .collect(Collectors.toList());

            assertThat(values2)
                    .containsOnly(1, 2);

これらのアドバイスの結果、以下のようなバリエーションができあがりました。

TraversableをいったんIteratorに変換後、flatMapでStreamに変換して合成。

            List<Integer> values2 =
                    StreamSupport
                            .stream(Spliterators.spliteratorUnknownSize(Traversables.asIterator(entries2), Spliterator.ORDERED), false)
                            .flatMap(optional -> optional.map(Stream::of).orElseGet(Stream::empty))
                            .collect(Collectors.toList());

Traversableは、Iteratorに変換しない方がいいのかな…?

https://github.com/infinispan/infinispan/blob/8.0.1.Final/core/src/main/java/org/infinispan/functional/impl/Traversables.java#L26

今のところは、変換してもOK…?

3つ引数があるcollectメソッドを使って、畳み込む感じで。

            List<Integer> values2 =
                    entries2
                            .collect(ArrayList::new,
                                    (list, optional) -> optional.ifPresent(list::add),
                                    ArrayList::addAll);

Java 8のキャッチアップができていないのが、バレバレですね!
頑張ります…。

うらがみさん、きつねさん、ありがとうございました!

このコードは、コメントアウトしていますがGitHub上のコードに含めています。