こちらのエントリの続きです。
InfinispanのFunctional Map APIを試す - CLOVER
前回はReadOnlyMapおよびWriteOnlyMapを扱いましたが、今回はReadWriteMapを扱います。
参考にするのは、以下のドキュメントやマニュアルなどです。
Functional API Tutorial - Infinispan
Infinispan: New Functional Map API in Infinispan 8 - Introduction
Infinispan: Functional Map API: Working with single entries
Infinispan: Functional Map API: Working with multiple entries
それでは、進めていってみます。
追記)
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。
InfinispanのFunctional Map APIを試す - CLOVER
InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER
InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER
InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER
準備
ビルドや設定など。
Maven依存関係は、こちら。
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>8.0.1.Final</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.2.0</version> <scope>test</scope> </dependency>
JUnitとAssertJは、テストコード用です。
Infinispanの設定は、このようにしています。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd" xmlns="urn:infinispan:config:8.0"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container name="cacheManager" shutdown-hook="REGISTER"> <transport cluster="cluster" stack="udp"/> <jmx duplicate-domains="true"/> <!-- <local-cache name="distCache"/> --> <distributed-cache name="distCache"/> </cache-container> </infinispan>
JGroupsの設定は、端折ります。
テストコードの雛形
それでは、まずテストコードの雛形を用意します。
src/test/java/org/littlewings/infinispan/functionalmap/FunctionalMapTest.java
package org.littlewings.infinispan.functionalmap; import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.infinispan.Cache; import org.infinispan.commons.api.functional.EntryView; import org.infinispan.commons.api.functional.FunctionalMap; import org.infinispan.commons.api.functional.Traversable; import org.infinispan.functional.impl.FunctionalMapImpl; import org.infinispan.functional.impl.ReadWriteMapImpl; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class FunctionalMapTest { // ここに、テストを書く! protected <K, V> void withCache(String cacheName, Consumer<Cache<K, V>> consumer) { withCache(cacheName, 1, consumer); } protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) { List<EmbeddedCacheManager> managers = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> { try { return new DefaultCacheManager("infinispan.xml"); } catch (IOException e) { throw new UncheckedIOException(e); } }) .collect(Collectors.toList()); List<Cache<K, V>> caches = managers .stream() .map(m -> m.<K, V>getCache(cacheName)) .collect(Collectors.toList()); try { consumer.accept(caches.get(0)); } finally { caches.forEach(Cache::stop); managers.forEach(EmbeddedCacheManager::stop); } } }
クラスタを簡易的に構成できる、ヘルパーメソッド付きです。こちらを使用して、ReadWriteMapを使うコードを書いていきます。
ReadWriteMapを使う
ReadWriteMapは、前回出てきたReadOnlyMapとWriteOnlyMapの性質を合わせたようなMapです。
ただし、同じメソッドを持っているわけではありません。
※ReadWriteMapは、ReadOnlyMapとWriteOnlyMapのインターフェースを実装している、というわけではないので
ReadWriteMapがあれば、ReadOnlyMapやWriteOnlyMapの両方の用途がすべて満たせるかというとそうではないので、定義されているメソッドなどをAPIリファレンスを見て確認するようにしましょう。
ReadWriteMapは、このようにReadOnlyMapおよびWriteOnlyMapと差がありますが、EntryView.ReadWriteEntryViewについてはEntryView.ReadEntryViewとEntryView.WriteEntryViewを実装したインターフェースとして定義されているため、こちらについては各EntryViewと同じ操作が可能です。
このあたりを踏まえて、使っていきましょう。
まずは、単純なRead/Writeの例を。
@Test public void testReadWriteMap1() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap); CompletableFuture<Optional<Integer>> previousValueFuture = readWriteMap .eval("key1", 10, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) (v, view) -> { Optional<Integer> previous = view.find(); view.set(v); return previous; }); assertThat(previousValueFuture.join()) .hasValue(1); }); }
ReadWriteMapも、ReadOnlyMapやWriteOnlyMapと同様にFunctionalMapImplから作成します。
FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);
CompletableFutureを返すevalを持ち、EntryView.ReadWriteEntryViewではfindやsetができるので、こちらを使用してエントリの取得と更新をしてみます。
CompletableFuture<Optional<Integer>> previousValueFuture = readWriteMap .eval("key1", 10, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) (v, view) -> { Optional<Integer> previous = view.find(); view.set(v); return previous; });
各所で求められるLambda式(というか関数型インターフェース)は、シリアライズ可能であることが求められるようです。Local Cacheだとシリアライズできなくてもいいみたいですが、Distributed Cacheだと軒並みシリアライズ可能である必要があったので、Serializableにしておくとよいでしょう。
ReadWriteMap#evalManyを使って、複数のエントリを取得する例。
@Test public void testReadWriteMap2() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap); Set<String> keys = new HashSet<>(Arrays.asList("key1", "key5", "key10")); Traversable<Optional<Integer>> entries = readWriteMap .evalMany(keys, (Function<EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) EntryView.ReadWriteEntryView::find); List<Integer> results = entries .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); assertThat(results) .containsOnly(1, 5, 10); });
WriteOnlyMap#evalManyではCompletableFutureを返していましたが、ReadWriteMapではReadOnlyMapと同様にTraversableを返します。
ReadWriteMap#evalAllを使った、エントリの全削除の例。こちらも、CompletableFutureではなくTraversableが返ります。
@Test public void testReadWriteMap3() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap); Traversable<Void> results = readWriteMap .evalAll((Function<EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) EntryView.ReadWriteEntryView::remove); results.forEach(r -> { }); assertThat(cache.isEmpty()) .isTrue(); }); }
WriteOnlyMapには、これと同様の操作を簡易的にできるtruncateメソッドがありましたが、ReadWriteMapには存在しません。
ReadWriteMapは、ReadOnlyMapとWriteOnlyMapと同じことができるわけではないというのを、このあたりで確認した感じですね。
あとは、ドキュメントのサンプルに習って、Map#replaceライクな例を書いてみます。
@Test public void testReadWriteMapLikeReplace1() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap); int oldValue = 10; CompletableFuture<Optional<Integer>> replaceFuture = readWriteMap .eval( "key10", 100, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) (v, view) -> view.find().map( (Function<Integer, Integer> & Serializable) previous -> { if (previous == oldValue) { view.set(v); return previous; } else { return previous; } })); CompletableFuture<Optional<Integer>> result = replaceFuture.thenCompose((Function<Optional<Integer>, CompletableFuture<Optional<Integer>>> & Serializable) p -> readWriteMap .eval("key10", (Function<EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) EntryView.ReadWriteEntryView::find)); assertThat(result.join()) .hasValue(100); }); }
Serializableを使った交差型キャストが連発されるのが、ちょっと戸惑うところですね…。
こちらは、値が異なるので書き換え不可な例。ちょっとムリヤリ感がありますが。
@Test public void testReadWriteMapLikeReplace2() { withCache("distCache", 3, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap); int oldValue = 50; CompletableFuture<Optional<Integer>> replaceFuture = readWriteMap.eval( "key10", 100, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) (v, view) -> view.find().map((Function<Integer, Integer> & Serializable) previous -> { if (previous == oldValue) { view.set(v); return previous; } else { return previous; } })); CompletableFuture<Optional<Integer>> result = replaceFuture.thenCompose((Function<Optional<Integer>, CompletableFuture<Optional<Integer>>> & Serializable) p -> readWriteMap .eval("key10", (Function<EntryView.ReadWriteEntryView<String, Integer>, Optional<Integer>> & Serializable) EntryView.ReadWriteEntryView::find)); assertThat(result.join()) .hasValue(10); }); }
まとめ
前回のReadOnlyMapとWriteOnlyMapから、その両方の性格を併せ持ったようなインターフェース、ReadWriteMapを使ってみました。
使う前までは、ReadOnlyMapとWriteOnlyMapのインターフェースを両方とも実装しているのかな?と思ったのですが、そんなことはなくAPIドキュメントをよくよく読むことになりました。
また、CompletableFutureや交差型キャストを覚えることにもなるので、ホントに勉強になります…。
そのうち、また次のテーマも書くとしましょう。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-readwritemap