Infinispan 8.0.0.Finalから、Functional Map APIというものが実験的に追加されました。
Functional API Tutorial - Infinispan
Infinispan: New Functional Map API in Infinispan 8 - Introduction
Infinispan: Functional Map API: Working with single entries
Infinispan: Functional Map API: Working with multiple entries
ドキュメントによると、こんな特徴らしいです。
- 非同期。すべてのメソッドの結果が単一の結果、またはCompletableFutureを返す。返却されたCompletableFutureを合成することもできる
- 複数の結果を返却する際には、Traversableを使用する。Traversableは、複数の結果を遅延して取得する(pull-style API)
- 3つのインタフェースが提供され、それぞれ読み取り専用(ReadOnlyMap)、書き込み専用(WriteOnlyMap)、読み書き可能(ReadWriteMap)がある
ドキュメントやサンプルを見ると、CompletableFutureやStream(に似た)API、そしてLambda式がバシバシ出てきていて、Java 8の能力をふんだんに使った機能という感じがしますね。
CompletableFutureは実は初めて使うのですが、チャレンジしてみましょう。
追記)
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。
InfinispanのFunctional Map APIを試す - CLOVER
InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER
InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER
InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER
準備
Maven依存関係は、以下のように設定。
<dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>8.0.1.Final</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.2.0</version> <scope>test</scope> </dependency> </dependencies>
JUnitとAssertJは、テストコード用です。
Infinispanの設定は、以下のようにしています。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd" xmlns="urn:infinispan:config:8.0"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container name="cacheManager" shutdown-hook="REGISTER"> <transport cluster="cluster" stack="udp"/> <jmx duplicate-domains="true"/> <!-- <local-cache name="distCache"/> --> <distributed-cache name="distCache"/> </cache-container> </infinispan>
JGroupsの設定は、端折ります。
Cacheは、Distributed Cacheとします。最初はLocal Cacheでやっていたのですが、あとでDistributed Cacheにしてみたらシリアライズでエラーになったので全部その対応を入れました…。
雛形コードの用意。
では、Functional Map APIを使ってみます。けっこうボリュームが大きくなりそうなので、エントリ自体を分けますが、続きはいつ書けることやら…。
今回は、ReadOnlyMapとWriteOnlyMapを対象に進めたいと思います。
動かすための雛形コードとして、以下のようなクラスを用意。
src/test/java/org/littlewings/infinispan/functionalmap/FunctionalMapTest.java
package org.littlewings.infinispan.functionalmap; import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.assertj.core.data.MapEntry; import org.infinispan.Cache; import org.infinispan.commons.api.functional.EntryView; import org.infinispan.commons.api.functional.FunctionalMap; import org.infinispan.commons.api.functional.Traversable; import org.infinispan.functional.impl.FunctionalMapImpl; import org.infinispan.functional.impl.ReadOnlyMapImpl; import org.infinispan.functional.impl.WriteOnlyMapImpl; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.stream.CacheCollectors; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.infinispan.factories.KnownComponentNames.ASYNC_OPERATIONS_EXECUTOR; public class FunctionalMapTest { // ここに、テストを書く!! protected <K, V> void withCache(String cacheName, Consumer<Cache<K, V>> consumer) { withCache(cacheName, 1, consumer); } protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) { List<EmbeddedCacheManager> managers = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> { try { return new DefaultCacheManager("infinispan.xml"); } catch (IOException e) { throw new UncheckedIOException(e); } }) .collect(Collectors.toList()); List<Cache<K, V>> caches = managers .stream() .map(m -> m.<K, V>getCache(cacheName)) .collect(Collectors.toList()); try { consumer.accept(caches.get(0)); } finally { caches.forEach(Cache::stop); managers.forEach(EmbeddedCacheManager::stop); } } }
Infinispanの起動/停止を行うための、簡易メソッド付き。クラスタも組みます。
Functional Mapを取得する
まずFunctional Mapを使うためには、以下のようなコードを書きます。
ReadOnlyMapの場合。
FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);
WriteOnlyMapの場合。
FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
〜Implが連発される、確かに実験的なAPI感が満載な気がしないでもないですが、今時点ではこちらなので、このまま進めます。
ReadOnlyMapを使う
最初は、ReadOnlyMapからいきます。
@Test public void testSimpleReadOnlyMap1() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); CompletableFuture<Optional<Integer>> readFuture1 = readOnlyMap.eval("key10", EntryView.ReadEntryView::find); assertThat(readFuture1.join()) .hasValue(10); ////////////////////////////////////////////////////////////////////// CompletableFuture<Optional<Integer>> readFuture2 = readOnlyMap.eval("key20", EntryView.ReadEntryView::find); assertThat(readFuture2.join()) .isEmpty(); }); }
ReadOnlyMap#evalで、キーとEntryView.ReadEntryViewを引数に取るConsumerを指定します。この中で、EntryView.ReadEntryView#findしていますが、こちらで指定のキーに対するエントリを取得します。
CompletableFuture<Optional<Integer>> readFuture1 =
readOnlyMap.eval("key10", EntryView.ReadEntryView::find);
ReadOnlyMapでは、このEntryView.ReadEntryViewを使ってコードを書いていきます。
結果はCompletableFutureとなります。
assertThat(readFuture1.join())
.hasValue(10);
なお、EntryView.ReadEntryView#findの結果はOptionalで、キーに対応するエントリがなかった場合にはEmptyとなります。
CompletableFuture<Optional<Integer>> readFuture2 =
readOnlyMap.eval("key20", EntryView.ReadEntryView::find);
assertThat(readFuture2.join())
.isEmpty();
EntryView.ReadEntryViewには、getというメソッドもあります。
@Test public void testSimpleReadOnlyMap2() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); CompletableFuture<Integer> readFuture1 = readOnlyMap.eval("key10", EntryView.ReadEntryView::get); assertThat(readFuture1.join()) .isEqualTo(10); ////////////////////////////////////////////////////////////////////// CompletableFuture<Integer> readFuture2 = readOnlyMap.eval("key20", EntryView.ReadEntryView::get); assertThatThrownBy(() -> readFuture2.join()) .isInstanceOf(CompletionException.class) .hasCauseInstanceOf(NoSuchElementException.class) .hasMessage("java.util.NoSuchElementException: No value"); }); }
EntryView.ReadEntryView#getを使った場合、結果の方はOptionalではなくエントリの型そのものになりますが、
CompletableFuture<Integer> readFuture1 =
readOnlyMap.eval("key10", EntryView.ReadEntryView::get);
キーに対応するエントリが存在しない場合にはNoSuchElementExceptionがスローされます。
assertThatThrownBy(() -> readFuture2.join()) .isInstanceOf(CompletionException.class) .hasCauseInstanceOf(NoSuchElementException.class) .hasMessage("java.util.NoSuchElementException: No value");
EntryView.ReadEntryView#keyを使用した場合には、呼び出しに使用したキーそのものが返却されます。
@Test public void testSimpleReadOnlyMap3() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); CompletableFuture<String> readFuture1 = readOnlyMap.eval("key10", EntryView.ReadEntryView::key); assertThat(readFuture1.join()) .isEqualTo("key10"); ////////////////////////////////////////////////////////////////////// CompletableFuture<String> readFuture2 = readOnlyMap.eval("key20", EntryView.ReadEntryView::key); assertThat(readFuture2.join()) .isEqualTo("key20"); }); }
キーに対応するエントリが存在するかどうかは、あまり関係がないみたいです…。
複数のエントリを一気に取得する場合は、ReadOnlyMap#evalManyを使います。
@Test public void testSimpleReadOnlyMap4() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); Set<String> keys1 = new HashSet<>(Arrays.asList("key1", "key2", "key10")); Traversable<Integer> entries1 = readOnlyMap.evalMany(keys1, EntryView.ReadEntryView::get); List<Integer> values1 = entries1.collect(Collectors.toList()); assertThat(values1) .containsOnly(1, 2, 10); ////////////////////////////////////////////////////////////////////// Set<String> keys2 = new HashSet<>(Arrays.asList("key1", "key2", "key20")); Traversable<Optional<Integer>> entries2 = readOnlyMap.evalMany(keys2, EntryView.ReadEntryView::find); List<Integer> values2 = entries2 .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); assertThat(values2) .containsOnly(1, 2); ////////////////////////////////////////////////////////////////////// CompletableFuture<Optional<Integer>> readFuture1 = readOnlyMap.eval("key1", EntryView.ReadEntryView::find); CompletableFuture<Optional<Integer>> readFuture2 = readOnlyMap.eval("key2", EntryView.ReadEntryView::find); CompletableFuture<Integer> resultFuture = readFuture1.thenCombineAsync(readFuture2, (optionV1, optionV2) -> optionV1.orElse(0) + optionV2.orElse(0)); assertThat(resultFuture.join()) .isEqualTo(3); }); }
TraversableはCollectionやStreamではありませんが、mapやfilterなどを似たように使うことができます。
指定のキーに対するエントリを一気に取ってくるばあいは、こちら。
Set<String> keys1 = new HashSet<>(Arrays.asList("key1", "key2", "key10")); Traversable<Integer> entries1 = readOnlyMap.evalMany(keys1, EntryView.ReadEntryView::get); List<Integer> values1 = entries1.collect(Collectors.toList()); assertThat(values1) .containsOnly(1, 2, 10);
ところが、EntryView.ReadEntryView#getだとキーに対応するエントリがなかった場合は例外になるので、そういう場合はfindを使います。
Set<String> keys2 = new HashSet<>(Arrays.asList("key1", "key2", "key20")); Traversable<Optional<Integer>> entries2 = readOnlyMap.evalMany(keys2, EntryView.ReadEntryView::find); List<Integer> values2 = entries2 .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); assertThat(values2) .containsOnly(1, 2);
evalManyを使用せずとも、複数回evalを呼んで、その結果を合成するという考え方もあるようですが。
CompletableFuture<Optional<Integer>> readFuture1 = readOnlyMap.eval("key1", EntryView.ReadEntryView::find); CompletableFuture<Optional<Integer>> readFuture2 = readOnlyMap.eval("key2", EntryView.ReadEntryView::find); CompletableFuture<Integer> resultFuture = readFuture1.thenCombineAsync(readFuture2, (optionV1, optionV2) -> optionV1.orElse(0) + optionV2.orElse(0)); assertThat(resultFuture.join()) .isEqualTo(3);
その他、ReadOnlyMapにはkeysとentriesがあり、キーおよびエントリの全体を一括で取得することができます。
@Test public void testSimpleReadOnlyMap5() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); Traversable<String> keys = readOnlyMap.keys(); Traversable<EntryView.ReadEntryView<String, Integer>> entries = readOnlyMap.entries(); List<String> keysAsList = keys .collect(CacheCollectors.serializableCollector(() -> Collectors.toList())); Map<String, Integer> entriesAsMap = entries .collect(Collectors.toMap(EntryView.ReadEntryView::key, EntryView.ReadEntryView::get)); assertThat(keysAsList) .containsOnly("key1", "key2", "key3", "key4", "key5", "key6", "key7", "key8", "key9", "key10"); assertThat(entriesAsMap) .containsOnly(MapEntry.entry("key1", 1), MapEntry.entry("key2", 2), MapEntry.entry("key3", 3), MapEntry.entry("key4", 4), MapEntry.entry("key5", 5), MapEntry.entry("key6", 6), MapEntry.entry("key7", 7), MapEntry.entry("key8", 8), MapEntry.entry("key9", 9), MapEntry.entry("key10", 10)); }); }
結果は、Traversableです。
なお、keysメソッドの結果で使うCollectorは、なぜかSerializableである必要があったり…。
List<String> keysAsList =
keys
.collect(CacheCollectors.serializableCollector(() -> Collectors.toList()));
Distributed Cacheでは、分散実行されるっぽい?
entriesでは、そんなことないのに?
とりあえず、ReadOnlyMapはこんなところです。
WriteOnlyMap
続いて、WriteOnlyMapです。
@Test public void testSimpleWriteOnlyMap1() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); CompletableFuture<Void> writeFuture1 = writeOnlyMap.eval("key1", (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(10)); CompletableFuture<Optional<Integer>> readFuture1 = writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find)); assertThat(readFuture1.join()) .hasValue(10); ////////////////////////////////////////////////////////////////////// CompletableFuture<Void> writeFuture2 = writeOnlyMap.eval("key1", 20, (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v)); CompletableFuture<Optional<Integer>> readFuture2 = writeFuture2.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find)); assertThat(readFuture2.join()) .hasValue(20); ////////////////////////////////////////////////////////////////////// CompletableFuture<Void> writeFuture3 = writeOnlyMap.eval("key20", (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(20)); CompletableFuture<Optional<Integer>> readFuture3 = writeFuture3.thenCompose(r -> readOnlyMap.eval("key20", EntryView.ReadEntryView::find)); assertThat(readFuture3.join()) .hasValue(20); }); }
どうもこちらは、WriteOnlyMapに渡すLambda式はSerializableである必要があるようです。
※少なくとも、Distributed Cacheの場合は。Local CacheではSerializableでなくてもOKでした
CompletableFuture<Void> writeFuture1 = writeOnlyMap.eval("key1", (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(10)); CompletableFuture<Optional<Integer>> readFuture1 = writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find)); assertThat(readFuture1.join()) .hasValue(10);
EntryView.WriteEntryView#setで、エントリの値を設定することができます。
CompletableFuture<Void> writeFuture1 = writeOnlyMap.eval("key1", (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(10));
また、evalにキーと値を指定することで、変更時の値も指定することができます。
CompletableFuture<Void> writeFuture2 = writeOnlyMap.eval("key1", 20, (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v));
存在しないキーを指定した場合は、単純に追加ですね。
CompletableFuture<Void> writeFuture3 = writeOnlyMap.eval("key20", (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(20));
複数のエントリを一気に指定する場合は、WriteOnlyMap#evalManyを使用します。
@Test public void testSimpleWriteOnlyMap2() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); Set<String> keys = new HashSet<>(Arrays.asList("key1", "key2", "key10")); CompletableFuture<Void> writeFuture1 = writeOnlyMap.evalMany(keys, (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) view -> view.set(100)); CompletableFuture<Traversable<Optional<Integer>>> readFuture1 = writeFuture1.thenCompose(r -> CompletableFuture.supplyAsync(() -> readOnlyMap.<Optional<Integer>>evalMany(keys, EntryView.ReadEntryView::find))); List<Integer> entries1 = readFuture1 .join() .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); assertThat(entries1) .containsOnly(100, 100, 100); ////////////////////////////////////////////////////////////////////// IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); ExecutorService executorService = cache .getAdvancedCache() .getComponentRegistry() .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR); Map<String, Integer> inputEntries = new HashMap<>(); inputEntries.put("key1", 500); inputEntries.put("key20", 1000); CompletableFuture<Void> writeFuture2 = writeOnlyMap.evalMany(inputEntries, (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v)); Set<String> targetKeys = new HashSet<>(Arrays.asList("key1", "key2", "key10", "key20")); CompletableFuture<Traversable<Optional<Integer>>> readFuture2 = writeFuture2.thenCompose(r -> CompletableFuture.supplyAsync(() -> readOnlyMap.<Optional<Integer>>evalMany(targetKeys, EntryView.ReadEntryView::find), executorService)); List<Integer> entries2 = readFuture2 .join() .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); assertThat(entries2) .containsOnly(500, 2, 10, 1000); }); }
WriteOnlyMap#evalManyには、キーのSet、もしくはキーと値を含んだMapを渡すことができ、Lambda式の部分はキー単位で呼び出されるので、あとはevalと同じように操作することができます。
削除を行う場合は、EntryView.WriteEntryView#removeを使用します。
@Test public void testSimpleWriteOnlyMap3() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap); FunctionalMap.ReadOnlyMap<String, Integer> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); CompletableFuture<Void> removeFuture = writeOnlyMap.eval("key1", (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) EntryView.WriteEntryView::remove); CompletableFuture<Optional<Integer>> readFuture1 = removeFuture.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::find)); assertThat(readFuture1.join()) .isEmpty(); ////////////////////////////////////////////////////////////////////// ExecutorService executorService = cache .getAdvancedCache() .getComponentRegistry() .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR); Set<String> targetKeys = new HashSet<>(Arrays.asList("key2", "key10", "key20")); CompletableFuture<Void> removesFuture = writeOnlyMap.evalMany(targetKeys, (Consumer<EntryView.WriteEntryView<Integer>> & Serializable) EntryView.WriteEntryView::remove); CompletableFuture<Traversable<Optional<Integer>>> readFuture2 = removesFuture.thenCompose(r -> CompletableFuture.supplyAsync(() -> readOnlyMap.evalMany(targetKeys, EntryView.ReadEntryView::find), executorService)); boolean isEmpty = readFuture2 .join() .allMatch(optional -> !optional.isPresent()); assertThat(isEmpty) .isTrue(); }); }
evalでもevalManyでも、どちらでもOKです。
直接Functional Mapには関係ありませんが、ReadOnlyMap#evalManyの結果をCompletableFutureにしたくて
CompletableFuture<Traversable<Optional<Integer>>> readFuture2 = removesFuture.thenCompose(r -> CompletableFuture.supplyAsync(() -> readOnlyMap.evalMany(targetKeys, EntryView.ReadEntryView::find), executorService));
この時のExecutorServiceにInfinispanのExecutorServiceを使用しました。
ExecutorService executorService =
cache
.getAdvancedCache()
.getComponentRegistry()
.getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);
Infinispan内のCompletableFutureで使用しているものと、合わせた方がいいかなぁと思いまして。
エントリの全件削除をする場合は、WriteOnlyMap#evalAllからEntryView.WriteEntryView#remove、もしくはWriteOnlyMap#truncateを使用します。
@Test public void testSimpleWriteOnlyMap4() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap); ExecutorService executorService = cache .getAdvancedCache() .getComponentRegistry() .getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR); CompletableFuture<Void> allRemoveFuture = writeOnlyMap.evalAll((Consumer<EntryView.WriteEntryView<Integer>> & Serializable) EntryView.WriteEntryView::remove); CompletableFuture<Boolean> readFuture1 = allRemoveFuture.thenCompose(r -> CompletableFuture.supplyAsync(() -> cache.isEmpty(), executorService)); assertThat(readFuture1.join()) .isTrue(); ////////////////////////////////////////////////////////////////////// IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); CompletableFuture<Void> truncateFuture = writeOnlyMap.truncate(); CompletableFuture<Boolean> readFuture2 = truncateFuture.thenCompose(r -> CompletableFuture.supplyAsync(() -> cache.isEmpty(), executorService)); assertThat(readFuture2.join()) .isTrue(); }); }
Listener等が起動しないらしいですが、速度重視ならtruncateらしいです。
まとめ
とりあえず、Functional Mapの初歩を試してみました。CompletableFutureも初めて使ったので、いろいろ迷いましたが、Futureの合成感があって面白かったです。
Functional Mapはまだテーマが半分くらい残っているので、時間を見つけて続きを書こうと思います。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-map-api
オマケ
ReadOnlyMapのTraversable⇒Listへの変換で悩んでいる時に、うらがみさん(@backpaper0)ときつねさん(@bitter_fox)からいろいろアドバイスをいただきました。
これの、filter⇒map⇒collectしている部分ですね。
Set<String> keys2 = new HashSet<>(Arrays.asList("key1", "key2", "key20")); Traversable<Optional<Integer>> entries2 = readOnlyMap.evalMany(keys2, EntryView.ReadEntryView::find); List<Integer> values2 = entries2 .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); assertThat(values2) .containsOnly(1, 2);
java 8 list optional flatten
src.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
src.stream().flatMap(opt -> opt.map(Stream::of).orElseGet(Stream::empty)).collect(Collectors.toList());
@kazuhira_r entries2の型ってなんなんです?なんかすごいことになってますね
@bitter_fox Traversable<Optional<Integer>>って型になってて、Optionalの外側にCollectionっぽいメソッド持ってるんですけど、Collectionじゃない型がいる感じになってます
@kazuhira_r あー、なるほどー大変なことになってますね。個人的にはその2つだとisPresentとgetが好きですね。なんでかというと要素ずつわざわざStream作るのが無駄っぽいので・・・
@kazuhira_r どうしてもisPresentとget使いたくないのであれば、collectで3引数のを使って2引数目で(l, opt) -> opt.ifPresent()てきな感じにしますね
これらのアドバイスの結果、以下のようなバリエーションができあがりました。
TraversableをいったんIteratorに変換後、flatMapでStreamに変換して合成。
List<Integer> values2 = StreamSupport .stream(Spliterators.spliteratorUnknownSize(Traversables.asIterator(entries2), Spliterator.ORDERED), false) .flatMap(optional -> optional.map(Stream::of).orElseGet(Stream::empty)) .collect(Collectors.toList());
Traversableは、Iteratorに変換しない方がいいのかな…?
今のところは、変換してもOK…?
3つ引数があるcollectメソッドを使って、畳み込む感じで。
List<Integer> values2 = entries2 .collect(ArrayList::new, (list, optional) -> optional.ifPresent(list::add), ArrayList::addAll);
Java 8のキャッチアップができていないのが、バレバレですね!
頑張ります…。
うらがみさん、きつねさん、ありがとうございました!