CLOVER🍀

That was when it all began.

InfinispanのFunctional Map APIを試す - Functional Listeners編 -

Infinispan 8.0.0.Finalから追加されたFunctional Mapについて書いてきたエントリですが、いったんこれで最後です。


最後は、Functional Listenersを扱います。

36.9. Functional Listeners

Functional Mapに対してListenerを付けることができ、イベントに応じた通知を受け取ることができるものです。Write Listenersと、ReadWrite Listenersの2つがあり、WriteOnlyMapとReadWriteMapに対して適用することができます。

それでは、使っていってみます。

追記
Functional Mapについて、ドキュメントの範囲で簡単に試してみたので、まとめておきます。

InfinispanのFunctional Map APIを試す - CLOVER

InfinispanのFunctional Map APIを試す - ReadWriteMap編 - - CLOVER

InfinispanのFunctional Map APIを試す - Metadata & Invocation Parameter編 - - CLOVER

InfinispanのFunctional Map APIを試す - Functional Listeners編 - - CLOVER

準備

まずは、Maven依存関係。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>8.0.1.Final</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.2.0</version>
            <scope>test</scope>
        </dependency>

JUnitとAssertJは、テストコード用です。

Infinispanの設定は、こんな感じ。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd"
        xmlns="urn:infinispan:config:8.0">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="cacheManager" shutdown-hook="REGISTER">
        <transport cluster="cluster" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <!--
        <local-cache name="distCache"/>
        -->
        <distributed-cache name="distCache"/>
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

テストコードの雛形

では、テストコードを書いていきます。
src/test/java/org/littlewings/infinispan/functionalmap/FunctionalMapTest.java

package org.littlewings.infinispan.functionalmap;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.commons.api.functional.EntryView;
import org.infinispan.commons.api.functional.FunctionalMap;
import org.infinispan.commons.api.functional.Listeners;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadWriteMapImpl;
import org.infinispan.functional.impl.WriteOnlyMapImpl;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Verifier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

public class FunctionalMapTest {
    private int closeableHandlerCalledCount = 0;
    private int expectedCloseableHandlerCalledCount = 0;
    private int listenerCalledCount = 0;
    private int expectedListenerCalledCount = 0;

    @Rule
    public VerifierRule verifierRule = new VerifierRule();

    private class VerifierRule extends Verifier {
        @Override
        protected void verify() {
            assertThat(closeableHandlerCalledCount)
                    .isEqualTo(expectedCloseableHandlerCalledCount);
            assertThat(listenerCalledCount)
                    .isEqualTo(expectedListenerCalledCount);
        }
    }

    // ここに、テストを書く!

    protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> consumer) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(m -> m.getCache(cacheName));

        try {
            consumer.accept(managers.get(0).getCache(cacheName));
        } finally {
            managers.forEach(EmbeddedCacheManager::stop);
        }
    }
}

簡易的にクラスタを構成するヘルパーメソッド付きですが、今回はコールバックを受けるListenerがあるので、Verifierを使ってみました。

    private int closeableHandlerCalledCount = 0;
    private int expectedCloseableHandlerCalledCount = 0;
    private int listenerCalledCount = 0;
    private int expectedListenerCalledCount = 0;

    @Rule
    public VerifierRule verifierRule = new VerifierRule();

    private class VerifierRule extends Verifier {
        @Override
        protected void verify() {
            assertThat(closeableHandlerCalledCount)
                    .isEqualTo(expectedCloseableHandlerCalledCount);
            assertThat(listenerCalledCount)
                    .isEqualTo(expectedListenerCalledCount);
        }
    }

こちらを使って、テストメソッドを書いていきます。

Write Listeners

まずは、Write Listenersから。

    @Test
    public void testWriteListener() {
        withCache("distCache", 1, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.WriteOnlyMap<String, Integer> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);

            expectedCloseableHandlerCalledCount = 1;
            expectedListenerCalledCount = 1;

            AutoCloseable writeFunctionHandler = writeOnlyMap.listeners().onWrite(written -> {
                closeableHandlerCalledCount++;
                assertThat(written.find())
                        .hasValue(100);
            });

            AutoCloseable writeListenerHandler = writeOnlyMap.listeners().add(new Listeners.WriteListeners.WriteListener<String, Integer>() {
                @Override
                public void onWrite(EntryView.ReadEntryView<String, Integer> write) {
                    listenerCalledCount++;
                    assertThat(write.find()).hasValue(100);
                }
            });

            // onWrite
            writeOnlyMap.eval("key10",
                    100,
                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                            (v, view) -> view.set(v)).join();

            try {
                writeFunctionHandler.close();
                writeListenerHandler.close();
            } catch (Exception e) {
                fail(e.getMessage());
            }
        });
    }

WriteOnlyMapに対して、Listenerを付けていきます。

付け方は、WriteOnlyMap#listeners#onWriteもしくはWriteOnlyMap#listeners#addで登録します。

WriteOnlyMap#listeners#onWriteメソッドで登録する場合、EntryView.ReadEntryViewを引数に取るConsumerを作成して渡します。

            AutoCloseable writeFunctionHandler = writeOnlyMap.listeners().onWrite(written -> {
                closeableHandlerCalledCount++;
                assertThat(written.find())
                        .hasValue(100);
            });

addで追加する場合は、Listeners.WriteListeners.WriteListenerインターフェースを実装し、onWriteメソッドを実装します。onWriteメソッドには、EntryView.ReadEntryViewが引数として渡されます。

            AutoCloseable writeListenerHandler = writeOnlyMap.listeners().add(new Listeners.WriteListeners.WriteListener<String, Integer>() {
                @Override
                public void onWrite(EntryView.ReadEntryView<String, Integer> write) {
                    listenerCalledCount++;
                    assertThat(write.find()).hasValue(100);
                }
            });

Lambda式で書いてもいいのですが、今回は型を明示しています。

あとは、イベントを起こすと発火すると。

            // onWrite
            writeOnlyMap.eval("key10",
                    100,
                    (BiConsumer<Integer, EntryView.WriteEntryView<Integer>> & Serializable)
                            (v, view) -> view.set(v)).join();

なお、登録したListenerはAutoCloseableで返ってくるので、closeしておきましょう。

                writeFunctionHandler.close();
                writeListenerHandler.close();

ReadWrite Listeners

続いては、ReadWrite Listenersです。

こちらのサンプルは、ちょっと大きくなりました…。ReadWriteMapに対して、使用します。

    @Test
    public void testReadWriteListener() {
        withCache("distCache", 1, (Cache<String, Integer> cache) -> {
            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, i));

            FunctionalMapImpl<String, Integer> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
            FunctionalMap.ReadWriteMap<String, Integer> readWriteMap = ReadWriteMapImpl.create(functionalMap);

            List<AutoCloseable> closeableHandlers = new ArrayList<>();

            expectedCloseableHandlerCalledCount = 6;
            expectedListenerCalledCount = 6;

            closeableHandlers.add(readWriteMap.listeners().onCreate(created -> {
                closeableHandlerCalledCount++;
                assertThat(created.find())
                        .hasValue(20);
            }));

            closeableHandlers.add(readWriteMap.listeners().onModify((before, after) -> {
                closeableHandlerCalledCount++;
                assertThat(before.find())
                        .hasValue(10);
                assertThat(after.find())
                        .hasValue(100);
            }));

            closeableHandlers.add(readWriteMap.listeners().onRemove(removed -> {
                closeableHandlerCalledCount++;
                assertThat(removed.find())
                        .hasValue(5);
            }));

            closeableHandlers.add(readWriteMap.listeners().onWrite(written -> {
                closeableHandlerCalledCount++;
                written.find().ifPresent(v -> assertThat(v == 20 || v == 100).isTrue());

                if (!written.find().isPresent()) {
                    assertThat(written.key())
                            .isEqualTo("key5");
                }
            }));

            closeableHandlers.add(readWriteMap.listeners().add(new Listeners.ReadWriteListeners.ReadWriteListener<String, Integer>() {
                @Override
                public void onCreate(EntryView.ReadEntryView<String, Integer> created) {
                    listenerCalledCount++;
                    assertThat(created.find())
                            .hasValue(20);
                }

                @Override
                public void onModify(EntryView.ReadEntryView<String, Integer> before, EntryView.ReadEntryView<String, Integer> after) {
                    listenerCalledCount++;
                    assertThat(before.find())
                            .hasValue(10);
                    assertThat(after.find())
                            .hasValue(100);
                }

                @Override
                public void onRemove(EntryView.ReadEntryView<String, Integer> removed) {
                    listenerCalledCount++;
                    assertThat(removed.find())
                            .hasValue(5);
                }
            }));

            // Listeners.WriteListeners.WriteListener
            closeableHandlers.add(readWriteMap.listeners().add(written -> {
                listenerCalledCount++;
                written.find().ifPresent(v -> {
                    assertThat(v == 20 || v == 100)
                            .isTrue();
                });

                if (!written.find().isPresent()) {
                    assertThat(written.key())
                            .isEqualTo("key5");
                }
            }));

            // onCreate/onWrite
            readWriteMap.eval("key20",
                    20,
                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                            (v, view) -> view.set(v)).join();
            // onModify/onWrite
            readWriteMap.eval("key10",
                    100,
                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                            (v, view) -> view.set(v)).join();
            // onRemove/onWrite
            readWriteMap.eval("key5",
                    (Function<EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                            view -> view.remove()).join();

            closeableHandlers.forEach(closeable -> {
                try {
                    closeable.close();
                } catch (Exception e) {
                    fail(e.getMessage());
                }
            });
        });
    }

登録したListenerは、AutoCloseableなので後からまとめてcloseできるように、今回のサンプルはListで受け取るようにしています…。

            List<AutoCloseable> closeableHandlers = new ArrayList<>();

ReadWriteMap#listenersから追加するのですが、onCreate、onModify、onRemove、onWriteで追加するパターンと、addで追加するパターンがあります。

on〜で実装する場合は、Consumer(onCreate、onRemove、onWrite)、BiConsumer(onModify)を引数として取ります。

onCreate、onRemove、onWriteの場合は、ReadEntryViewを引数に取るConsumerを実装します。

            closeableHandlers.add(readWriteMap.listeners().onCreate(created -> {
                closeableHandlerCalledCount++;
                assertThat(created.find())
                        .hasValue(20);
            }));

            closeableHandlers.add(readWriteMap.listeners().onRemove(removed -> {
                closeableHandlerCalledCount++;
                assertThat(removed.find())
                        .hasValue(5);
            }));

            closeableHandlers.add(readWriteMap.listeners().onWrite(written -> {
                closeableHandlerCalledCount++;
                written.find().ifPresent(v -> assertThat(v == 20 || v == 100).isTrue());

                if (!written.find().isPresent()) {
                    assertThat(written.key())
                            .isEqualTo("key5");
                }
            }));

onModifyの場合は、変更前後のEntryが引数として渡ってきます。BiConsumerとして実装し、ReadViewEntryが2つ渡ってきます。

            closeableHandlers.add(readWriteMap.listeners().onModify((before, after) -> {
                closeableHandlerCalledCount++;
                assertThat(before.find())
                        .hasValue(10);
                assertThat(after.find())
                        .hasValue(100);
            }));

ReadWriteMap#listeners#addで追加する場合は、Listeners.ReadWriteListeners.ReadWriteListener、もしくはListeners.WriteListeners.WriteListenerのインスタンスを登録します。

Listeners.ReadWriteListeners.ReadWriteListenerの場合は、onCreate、onModify、onRemoveメソッドを実装します。

            closeableHandlers.add(readWriteMap.listeners().add(new Listeners.ReadWriteListeners.ReadWriteListener<String, Integer>() {
                @Override
                public void onCreate(EntryView.ReadEntryView<String, Integer> created) {
                    listenerCalledCount++;
                    assertThat(created.find())
                            .hasValue(20);
                }

                @Override
                public void onModify(EntryView.ReadEntryView<String, Integer> before, EntryView.ReadEntryView<String, Integer> after) {
                    listenerCalledCount++;
                    assertThat(before.find())
                            .hasValue(10);
                    assertThat(after.find())
                            .hasValue(100);
                }

                @Override
                public void onRemove(EntryView.ReadEntryView<String, Integer> removed) {
                    listenerCalledCount++;
                    assertThat(removed.find())
                            .hasValue(5);
                }
            }));

使い方は、先ほどまでのon〜と似た感じです。

addでListeners.WriteListeners.WriteListenerを追加する場合は、先ほどと同様にonWriteメソッドを実装します。

            // Listeners.WriteListeners.WriteListener
            closeableHandlers.add(readWriteMap.listeners().add(written -> {
                listenerCalledCount++;
                written.find().ifPresent(v -> {
                    assertThat(v == 20 || v == 100)
                            .isTrue();
                });

                if (!written.find().isPresent()) {
                    assertThat(written.key())
                            .isEqualTo("key5");
                }
            }));

イベント発火。

            // onCreate/onWrite
            readWriteMap.eval("key20",
                    20,
                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                            (v, view) -> view.set(v)).join();
            // onModify/onWrite
            readWriteMap.eval("key10",
                    100,
                    (BiFunction<Integer, EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                            (v, view) -> view.set(v)).join();
            // onRemove/onWrite
            readWriteMap.eval("key5",
                    (Function<EntryView.ReadWriteEntryView<String, Integer>, Void> & Serializable)
                            view -> view.remove()).join();

最後に、close。

            closeableHandlers.forEach(closeable -> {
                try {
                    closeable.close();
                } catch (Exception e) {
                    fail(e.getMessage());
                }
            });

各メソッドで、どのようなイベントが発生するかは、またListenerの登録方法が確認できた感じですね。

ちなみに、このあたりのイベント処理が実装されているのは、このクラスになります。
https://github.com/infinispan/infinispan/blob/8.0.1.Final/core/src/main/java/org/infinispan/functional/impl/FunctionalNotifierImpl.java

今回作成したソースコードは、こちらに置いてあります。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-functional-map-listeners

終わりに

全4回、Infinispan 8.0.0.Finalで追加されたFunctional Map APIを試してみました。

まだ実験的APIの状態みたいなので、しばらく様子見なのかなという感じもしますが、CompletableFutureや交差型キャストなどJava 8の機能の勉強にもなって面白かったです。