Infinispan 8.0.0.Finalから追加されたFunctional Mapについて書いてきたエントリですが、いったんこれで最後です。
最後は、Functional Listenersを扱います。
Functional Mapに対してListenerを付けることができ、イベントに応じた通知を受け取ることができるものです。Write Listenersと、ReadWrite Listenersの2つがあり、WriteOnlyMapとReadWriteMapに対して適用することができます。
それでは、使っていってみます。
追記)
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。
InfinispanのFunctional Map APIを試す - CLOVER
InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER
InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER
InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER
準備
まずは、Maven依存関係。
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>8.0.1.Final</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.2.0</version> <scope>test</scope> </dependency>
JUnitとAssertJは、テストコード用です。
Infinispanの設定は、こんな感じ。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd" xmlns="urn:infinispan:config:8.0"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container name="cacheManager" shutdown-hook="REGISTER"> <transport cluster="cluster" stack="udp"/> <jmx duplicate-domains="true"/> <!-- <local-cache name="distCache"/> --> <distributed-cache name="distCache"/> </cache-container> </infinispan>
JGroupsの設定は、端折ります。
テストコードの雛形
では、テストコードを書いていきます。
src/test/java/org/littlewings/infinispan/functionalmap/FunctionalMapTest.java
package org.littlewings.infinispan.functionalmap; import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.infinispan.Cache; import org.infinispan.commons.api.functional.EntryView; import org.infinispan.commons.api.functional.FunctionalMap; import org.infinispan.commons.api.functional.Listeners; import org.infinispan.functional.impl.FunctionalMapImpl; import org.infinispan.functional.impl.ReadWriteMapImpl; import org.infinispan.functional.impl.WriteOnlyMapImpl; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Verifier; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; public class FunctionalMapTest { private int closeableHandlerCalledCount = 0; private int expectedCloseableHandlerCalledCount = 0; private int listenerCalledCount = 0; private int expectedListenerCalledCount = 0; @Rule public VerifierRule verifierRule = new VerifierRule(); private class VerifierRule extends Verifier { @Override protected void verify() { assertThat(closeableHandlerCalledCount) .isEqualTo(expectedCloseableHandlerCalledCount); assertThat(listenerCalledCount) .isEqualTo(expectedListenerCalledCount); } } // ここに、テストを書く! protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) { List<EmbeddedCacheManager> managers = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> { try { return new DefaultCacheManager("infinispan.xml"); } catch (IOException e) { throw new UncheckedIOException(e); } }) .collect(Collectors.toList()); managers.forEach(m -> m.getCache(cacheName)); try { consumer.accept(managers.get(0).getCache(cacheName)); } finally { managers.forEach(EmbeddedCacheManager::stop); } } }
簡易的にクラスタを構成するヘルパーメソッド付きですが、今回はコールバックを受けるListenerがあるので、Verifierを使ってみました。
private int closeableHandlerCalledCount = 0; private int expectedCloseableHandlerCalledCount = 0; private int listenerCalledCount = 0; private int expectedListenerCalledCount = 0; @Rule public VerifierRule verifierRule = new VerifierRule(); private class VerifierRule extends Verifier { @Override protected void verify() { assertThat(closeableHandlerCalledCount) .isEqualTo(expectedCloseableHandlerCalledCount); assertThat(listenerCalledCount) .isEqualTo(expectedListenerCalledCount); } }
こちらを使って、テストメソッドを書いていきます。
Write Listeners
まずは、Write Listenersから。
@Test public void testWriteListener() { withCache("distCache", 1, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap); expectedCloseableHandlerCalledCount = 1; expectedListenerCalledCount = 1; AutoCloseable writeFunctionHandler = writeOnlyMap.listeners().onWrite(written -> { closeableHandlerCalledCount++; assertThat(written.find()) .hasValue(100); }); AutoCloseable writeListenerHandler = writeOnlyMap.listeners().add(new Listeners.WriteListeners.WriteListener<String, Integer>() { @Override public void onWrite(EntryView.ReadEntryView<String, Integer> write) { listenerCalledCount++; assertThat(write.find()).hasValue(100); } }); // onWrite writeOnlyMap.eval("key10", 100, (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v)).join(); try { writeFunctionHandler.close(); writeListenerHandler.close(); } catch (Exception e) { fail(e.getMessage()); } }); }
WriteOnlyMapに対して、Listenerを付けていきます。
付け方は、WriteOnlyMap#listeners#onWriteもしくはWriteOnlyMap#listeners#addで登録します。
WriteOnlyMap#listeners#onWriteメソッドで登録する場合、EntryView.ReadEntryViewを引数に取るConsumerを作成して渡します。
AutoCloseable writeFunctionHandler = writeOnlyMap.listeners().onWrite(written -> { closeableHandlerCalledCount++; assertThat(written.find()) .hasValue(100); });
addで追加する場合は、Listeners.WriteListeners.WriteListenerインターフェースを実装し、onWriteメソッドを実装します。onWriteメソッドには、EntryView.ReadEntryViewが引数として渡されます。
AutoCloseable writeListenerHandler = writeOnlyMap.listeners().add(new Listeners.WriteListeners.WriteListener<String, Integer>() { @Override public void onWrite(EntryView.ReadEntryView<String, Integer> write) { listenerCalledCount++; assertThat(write.find()).hasValue(100); } });
Lambda式で書いてもいいのですが、今回は型を明示しています。
あとは、イベントを起こすと発火すると。
// onWrite writeOnlyMap.eval("key10", 100, (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable) (v, view) -> view.set(v)).join();
なお、登録したListenerはAutoCloseableで返ってくるので、closeしておきましょう。
writeFunctionHandler.close(); writeListenerHandler.close();
ReadWrite Listeners
続いては、ReadWrite Listenersです。
こちらのサンプルは、ちょっと大きくなりました…。ReadWriteMapに対して、使用します。
@Test public void testReadWriteListener() { withCache("distCache", 1, (Cache<String, Integer> cache) -> { IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i)); FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap); List<AutoCloseable> closeableHandlers = new ArrayList<>(); expectedCloseableHandlerCalledCount = 6; expectedListenerCalledCount = 6; closeableHandlers.add(readWriteMap.listeners().onCreate(created -> { closeableHandlerCalledCount++; assertThat(created.find()) .hasValue(20); })); closeableHandlers.add(readWriteMap.listeners().onModify((before, after) -> { closeableHandlerCalledCount++; assertThat(before.find()) .hasValue(10); assertThat(after.find()) .hasValue(100); })); closeableHandlers.add(readWriteMap.listeners().onRemove(removed -> { closeableHandlerCalledCount++; assertThat(removed.find()) .hasValue(5); })); closeableHandlers.add(readWriteMap.listeners().onWrite(written -> { closeableHandlerCalledCount++; written.find().ifPresent(v -> assertThat(v == 20 || v == 100).isTrue()); if (!written.find().isPresent()) { assertThat(written.key()) .isEqualTo("key5"); } })); closeableHandlers.add(readWriteMap.listeners().add(new Listeners.ReadWriteListeners.ReadWriteListener<String, Integer>() { @Override public void onCreate(EntryView.ReadEntryView<String, Integer> created) { listenerCalledCount++; assertThat(created.find()) .hasValue(20); } @Override public void onModify(EntryView.ReadEntryView<String, Integer> before, EntryView.ReadEntryView<String, Integer> after) { listenerCalledCount++; assertThat(before.find()) .hasValue(10); assertThat(after.find()) .hasValue(100); } @Override public void onRemove(EntryView.ReadEntryView<String, Integer> removed) { listenerCalledCount++; assertThat(removed.find()) .hasValue(5); } })); // Listeners.WriteListeners.WriteListener closeableHandlers.add(readWriteMap.listeners().add(written -> { listenerCalledCount++; written.find().ifPresent(v -> { assertThat(v == 20 || v == 100) .isTrue(); }); if (!written.find().isPresent()) { assertThat(written.key()) .isEqualTo("key5"); } })); // onCreate/onWrite readWriteMap.eval("key20", 20, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) (v, view) -> view.set(v)).join(); // onModify/onWrite readWriteMap.eval("key10", 100, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) (v, view) -> view.set(v)).join(); // onRemove/onWrite readWriteMap.eval("key5", (Function<EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) view -> view.remove()).join(); closeableHandlers.forEach(closeable -> { try { closeable.close(); } catch (Exception e) { fail(e.getMessage()); } }); }); }
登録したListenerは、AutoCloseableなので後からまとめてcloseできるように、今回のサンプルはListで受け取るようにしています…。
List<AutoCloseable> closeableHandlers = new ArrayList<>();
ReadWriteMap#listenersから追加するのですが、onCreate、onModify、onRemove、onWriteで追加するパターンと、addで追加するパターンがあります。
on〜で実装する場合は、Consumer(onCreate、onRemove、onWrite)、BiConsumer(onModify)を引数として取ります。
onCreate、onRemove、onWriteの場合は、ReadEntryViewを引数に取るConsumerを実装します。
closeableHandlers.add(readWriteMap.listeners().onCreate(created -> { closeableHandlerCalledCount++; assertThat(created.find()) .hasValue(20); })); closeableHandlers.add(readWriteMap.listeners().onRemove(removed -> { closeableHandlerCalledCount++; assertThat(removed.find()) .hasValue(5); })); closeableHandlers.add(readWriteMap.listeners().onWrite(written -> { closeableHandlerCalledCount++; written.find().ifPresent(v -> assertThat(v == 20 || v == 100).isTrue()); if (!written.find().isPresent()) { assertThat(written.key()) .isEqualTo("key5"); } }));
onModifyの場合は、変更前後のEntryが引数として渡ってきます。BiConsumerとして実装し、ReadViewEntryが2つ渡ってきます。
closeableHandlers.add(readWriteMap.listeners().onModify((before, after) -> { closeableHandlerCalledCount++; assertThat(before.find()) .hasValue(10); assertThat(after.find()) .hasValue(100); }));
ReadWriteMap#listeners#addで追加する場合は、Listeners.ReadWriteListeners.ReadWriteListener、もしくはListeners.WriteListeners.WriteListenerのインスタンスを登録します。
Listeners.ReadWriteListeners.ReadWriteListenerの場合は、onCreate、onModify、onRemoveメソッドを実装します。
closeableHandlers.add(readWriteMap.listeners().add(new Listeners.ReadWriteListeners.ReadWriteListener<String, Integer>() { @Override public void onCreate(EntryView.ReadEntryView<String, Integer> created) { listenerCalledCount++; assertThat(created.find()) .hasValue(20); } @Override public void onModify(EntryView.ReadEntryView<String, Integer> before, EntryView.ReadEntryView<String, Integer> after) { listenerCalledCount++; assertThat(before.find()) .hasValue(10); assertThat(after.find()) .hasValue(100); } @Override public void onRemove(EntryView.ReadEntryView<String, Integer> removed) { listenerCalledCount++; assertThat(removed.find()) .hasValue(5); } }));
使い方は、先ほどまでのon〜と似た感じです。
addでListeners.WriteListeners.WriteListenerを追加する場合は、先ほどと同様にonWriteメソッドを実装します。
// Listeners.WriteListeners.WriteListener closeableHandlers.add(readWriteMap.listeners().add(written -> { listenerCalledCount++; written.find().ifPresent(v -> { assertThat(v == 20 || v == 100) .isTrue(); }); if (!written.find().isPresent()) { assertThat(written.key()) .isEqualTo("key5"); } }));
イベント発火。
// onCreate/onWrite readWriteMap.eval("key20", 20, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) (v, view) -> view.set(v)).join(); // onModify/onWrite readWriteMap.eval("key10", 100, (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) (v, view) -> view.set(v)).join(); // onRemove/onWrite readWriteMap.eval("key5", (Function<EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable) view -> view.remove()).join();
最後に、close。
closeableHandlers.forEach(closeable -> { try { closeable.close(); } catch (Exception e) { fail(e.getMessage()); } });
各メソッドで、どのようなイベントが発生するかは、またListenerの登録方法が確認できた感じですね。
ちなみに、このあたりのイベント処理が実装されているのは、このクラスになります。
https://github.com/infinispan/infinispan/blob/8.0.1.Final/core/src/main/java/org/infinispan/functional/impl/FunctionalNotifierImpl.java
今回作成したソースコードは、こちらに置いてあります。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-map-listeners