Infinispan 8.0.0.Finalから、Continuous Queryというものが搭載されたようで。
Infinispan: Infinispan 8.0.0.Final
ちょっと試してみたいと思います。
…なのですが、ドキュメントがないようなのでテストコードなどを見ながら試行錯誤してみました。
Continuous Queryって?
コードの雰囲気を見ている限り、Listener+Queryという感じみたいですね。
Listenerとして登録しておいて、合わせてQueryを適用しておき指定の条件にマッチするようなエントリが登録、削除されればListenerにイベントが通知されて起動する、そういうもののようです。
指定の条件で、継続的にCacheをチェックしたい時とか、別の処理を起動したい時に使う感じでしょうか。
準備
それでは、使っていってみます。
Maven依存関係。
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-query</artifactId> <version>8.0.1.Final</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.2.0</version> <scope>test</scope> </dependency>
Continuous Queryを使うためには、infinispan-queryモジュールが必要です。Query DSLも必要なのですが、infinispan-queryに含まれるので割愛。
JUnitとAssertJはテストコード用です。
Infinispanの設定は、このようにしました。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd" xmlns="urn:infinispan:config:8.0"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container name="cacheManager" shutdown-hook="REGISTER"> <transport cluster="cluster" stack="udp"/> <jmx duplicate-domains="true"/> <distributed-cache name="distCache"/> </cache-container> </infinispan>
Distributed Cacheを設定。使ってみた感じ、インデックスの設定は必須ではないようです。
JGroupsの設定は、端折ります。
では、コードを書いていってみます。
ContinuousQueryのListener定義
まずはContinuous Queryを使用するには、ContinuousQueryResultListenerインターフェースを実装したクラスを作成します。
ContinuousQueryResultListenerでは、resultJoiningとresultLeavingというメソッドを実装する必要があり、それぞれエントリの追加、削除事に呼び出されます。
今回は、テストコードに習って呼び出し回数をカウントしたりするContinuousQueryResultListenerを実装してみました。
src/main/java/org/littlewings/infinispan/continuousquery/MyContinuousQueryListener.java
package org.littlewings.infinispan.continuousquery; import java.util.LinkedHashMap; import java.util.Map; import org.infinispan.query.continuous.ContinuousQueryResultListener; public class MyContinuousQueryListener<K, V> implements ContinuousQueryResultListener<K, V> { private Map<K, V> joined = new LinkedHashMap<>(); private Map<K, Integer> joinCalled = new LinkedHashMap<>(); private Map<K, Integer> leaveCalled = new LinkedHashMap<>(); @Override public void resultJoining(K key, V value) { synchronized (joined) { joined.put(key, value); incrementNumberOfCalls(joinCalled, key); } } @Override public void resultLeaving(K key) { incrementNumberOfCalls(leaveCalled, key); } void incrementNumberOfCalls(Map<K, Integer> countingMap, K key) { synchronized (countingMap) { countingMap.compute(key, (k, v) -> v == null ? 1 : v + 1); } } public Map<K, V> getJoined() { return joined; } public Map<K, Integer> getJoinCalled() { return joinCalled; } public Map<K, Integer> getLeaveCalled() { return leaveCalled; } }
なお、Infinispan 8.1.0では、ContinuousQueryResultListenerは早くもContinuousQueryListenerに変更されるようです…。
https://github.com/infinispan/infinispan/blob/8.1.0.Beta1/query/src/main/java/org/infinispan/query/continuous/ContinuousQueryResultListener.java#L6
あとは、検索に必要なEntityを定義します。
今回は、書籍をお題に。
src/test/java/org/littlewings/infinispan/continuousquery/Book.java
src/test/java/org/littlewings/infinispan/continuousquery/Book.java package org.littlewings.infinispan.continuousquery; import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; public class Book implements Serializable { private static final long serialVersionUID = 1L; private String isbn; private String title; private int price; private List<Tag> tags; public Book() { } public Book(String isbn, String title, int price, List<Tag> tags) { this.isbn = isbn; this.title = title; this.price = price; this.tags = tags; } public static Book create(String isbn, String title, int price, String... tags) { return new Book(isbn, title, price, Arrays.stream(tags).map(Tag::new).collect(Collectors.toList())); } public String getIsbn() { return isbn; } public void setIsbn(String isbn) { this.isbn = isbn; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } public List<Tag> getTags() { return tags; } public void setTags(List<Tag> tags) { this.tags = tags; } @Override public boolean equals(Object other) { if (other instanceof Book) { Book otherBook = (Book) other; return Objects.equals(isbn, otherBook.getIsbn()) && Objects.equals(title, otherBook.getTitle()) && Objects.equals(price, otherBook.getPrice()) && Objects.equals(tags, otherBook.getTags()); } return false; } @Override public int hashCode() { return Objects.hash(isbn, title, price, tags); } @Override public String toString() { return String.format("Book[%s, %s, %d, %s]", isbn, title, price, tags); } }
なんとなく、ネストしたEntityまで作成。
src/test/java/org/littlewings/infinispan/continuousquery/Tag.java
package org.littlewings.infinispan.continuousquery; import java.io.Serializable; import java.util.Objects; public class Tag implements Serializable { private static final long serialVersionUID = 1L; private String name; public Tag() { } public Tag(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public boolean equals(Object other) { if (other instanceof Tag) { Tag otherTag = (Tag) other; return Objects.equals(name, otherTag.getName()); } return false; } @Override public int hashCode() { return Objects.hash(name); } @Override public String toString() { return name; } }
ここまでで、Continuous Queryを使うための最低限の準備は完了です。
テストコードの雛形
ここから先は、テストコードを使いつつ動作確認します。
テストコードの雛形は、こちら。
src/test/java/org/littlewings/infinispan/continuousquery/EmbeddedContinuousQueryTest.java
package org.littlewings.infinispan.continuousquery; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.assertj.core.data.MapEntry; import org.infinispan.Cache; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.query.Search; import org.infinispan.query.continuous.ContinuousQuery; import org.infinispan.query.dsl.Query; import org.infinispan.query.dsl.QueryFactory; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class EmbeddedContinuousQueryTest { // ここに、テストコードを書く!! protected <K, V> void withCache(String cacheName, int numberOfInstances, Consumer<Cache<K, V>> consumer) { List<EmbeddedCacheManager> cacheManagers = IntStream .rangeClosed(1, numberOfInstances) .mapToObj(i -> { try { return new DefaultCacheManager("infinispan.xml"); } catch (IOException e) { throw new UncheckedIOException(e); } }) .collect(Collectors.toList()); List<Cache<K, V>> caches = cacheManagers .stream() .map(m -> m.<K, V>getCache(cacheName)) .collect(Collectors.toList()); try { consumer.accept(caches.get(0)); } finally { caches.forEach(Cache::stop); cacheManagers.forEach(EmbeddedCacheManager::stop); } } }
クラスタを構成するための、簡易メソッド付き。
Continuous Queryを試す
それでは、Continuous Queryを試してみます。先ほど用意したメソッドにLambda式を渡す形で、コードを書きます。
@Test public void testContinuousQuerySimply() { withCache("distCache", 1, (Cache<String, Book> cache) -> { // Continuous Queryを使ったコード }); }
今回は1 Nodeとします。
まずはQueryの作成ですが、Query DSLを使ったものになります。
QueryFactory queryFactory = Search.getQueryFactory(cache); Query query = queryFactory .from(Book.class) .having("price") .gte(3000) .toBuilder() .build();
ここでは、3,000以上の価格の書籍を対象にします。
これを、ContinuousQueryResultListenerの実装クラスのインスタンスを作成したうえで、ContinuousQueryに設定します。
MyContinuousQueryListener<String, Book> myCqListener = new MyContinuousQueryListener<>(); ContinuousQuery<String, Book> cq = new ContinuousQuery<>(cache); cq.addContinuousQueryListener(query, myCqListener);
あとは、Cacheに対してput/removeしていくだけです。
3,000円より安い書籍の登録。
Book sprintBoot = Book .create("978-4777518654", "はじめてのSpring Boot 「Spring Framework」で簡単Javaアプリ開発", 2700, "Java", "Spring"); cache.put(sprintBoot.getIsbn(), sprintBoot); assertThat(myCqListener.getJoined()) .isEmpty(); assertThat(myCqListener.getJoinCalled()) .isEmpty(); assertThat(myCqListener.getLeaveCalled()) .isEmpty();
このエントリでは、今回の条件にはマッチしないのでListenerは起動しません。
3,000円を超える本を登録してみます。
Book elasticsearch = Book .create("978-4048662024", "高速スケーラブル検索エンジン ElasticSearch Server", 3024, "Elasticsearch", "全文検索", "Java", "Lucene"); cache.put(elasticsearch.getIsbn(), elasticsearch); assertThat(myCqListener.getJoined()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch)); assertThat(myCqListener.getJoinCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .isEmpty();
ContinuousQueryResultListenerが呼び出されたようです。
ところで、Continuous Queryの正体ってListenerなので、クラスタ内に複数Nodeがあってもイベントを受け取るNodeに対してContinuous Queryが登録されていないと、イベントを受け取れないだろうなぁと思いました。
Listenerも、今回のContinuousQueryResultListenerも、Serializableではある必要がありませんし。
※追記)
と思ったのですが、Continuous Queryで使われているListenerはClustener Listenerなので、複数Nodeがあってもイベントを受け取ることができることがわかりました。
さらに3,000円を超える書籍を追加してみます。
Book solr = Book .create("978-4774161631", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン", 3888, "Solr", "全文検索", "Java", "Lucene"); cache.put(solr.getIsbn(), solr); assertThat(myCqListener.getJoined()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr)); assertThat(myCqListener.getJoinCalled()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .isEmpty(); ///// Book jbossEap = Book .create("978-4774157948", "JBoss Enterprise Application Platform6 構築・運用パーフェクトガイド", 4104, "Java"); cache.put(jbossEap.getIsbn(), jbossEap); assertThat(myCqListener.getJoined()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap)); assertThat(myCqListener.getJoinCalled()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .isEmpty();
登録するにしたがって、resultJoiningが呼び出されていることがわかります。
3,000円を下回る書籍を、再度登録。
Book javaBook = Book .create("978-4774169316", "Javaエンジニア養成読本 [現場で役立つ最新知識、満載!]", 2138, "Java"); cache.put(javaBook.getIsbn(), javaBook); assertThat(myCqListener.getJoined()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap)); assertThat(myCqListener.getJoinCalled()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .isEmpty();
こちらは、やっぱりresultJoiningは呼び出されていません。
で、ここまでresultLeavingは呼び出されていないので、今度はエントリを削除してみます。削除するのは、3,000円を超える書籍です。
cache.remove(elasticsearch.getIsbn()); assertThat(cache.get(elasticsearch.getIsbn())) .isNull(); assertThat(myCqListener.getJoined()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap)); assertThat(myCqListener.getJoinCalled()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1));
すると、resultLeavingが呼び出されたことがわかります。
3,000円を下回る書籍を削除しても、resultLeavingは呼び出されません。
cache.remove(sprintBoot.getIsbn()); assertThat(cache.get(sprintBoot.getIsbn())) .isNull(); assertThat(myCqListener.getJoined()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap)); assertThat(myCqListener.getJoinCalled()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1));
ところで、更新事にも呼び出されるのかな?と思ったのですが、そうはなりませんでした。3,000円以上の書籍を再登録しても、何も起こらず…。
Book newSolr = Book .create("978-4774161631", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン", 4000, "Solr", "全文検索", "Java", "Lucene"); cache.put(newSolr.getIsbn(), newSolr); assertThat(myCqListener.getJoined()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap)); assertThat(myCqListener.getJoinCalled()) .hasSize(3) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1));
この挙動は、JPAContinuousQueryCacheEventFilterConverterクラスのfilterAndConvertメソッドが古いエントリと新しいエントリが共にnullではない場合にContinuousQueryResultListenerを起動しないようになっているからだと思うのですが…。
https://github.com/infinispan/infinispan/blob/8.0.1.Final/query/src/main/java/org/infinispan/query/continuous/JPAContinuousQueryCacheEventFilterConverter.java#L111
そういうものなんでしょうか?これ。
と思ったら、ここを見たらハッキリしました。
GitHub - infinispan/infinispan: Infinispan is an open source data grid platform and highly scalable NoSQL cloud data store.
こちらに、イベントが発生する条件が書かれています。
https://github.com/infinispan/infinispan/wiki/Continuous-query-design-and-indexless-queries#continuous-queries
条件にマッチするものが更新されて、マッチされる結果に遷移してもイベントは発生しないよ、と。古い値がマッチしていなくて、新しい値がマッチするように更新された場合は追加としてのイベントが発生します、と。
であれば、試してみましょう。
先ほどの3,000円を下回る書籍を、3,000円以上に更新。
Book newJavaBook = Book .create("978-4774169316", "Javaエンジニア養成読本 [現場で役立つ最新知識、満載!]", 3500, "Java"); cache.put(newJavaBook.getIsbn(), newJavaBook); assertThat(myCqListener.getJoined()) .hasSize(4) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap), MapEntry.entry(newJavaBook.getIsbn(), newJavaBook)); assertThat(myCqListener.getJoinCalled()) .hasSize(4) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1), MapEntry.entry(newJavaBook.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1));
これであれば、resultJoiningが呼び出されます。
削除する時も、resultLeavingが呼び出されるようになります(そりゃあそうだという気もしますが)。
cache.remove(newJavaBook.getIsbn()); assertThat(cache.get(newJavaBook.getIsbn())) .isNull(); assertThat(myCqListener.getJoined()) .hasSize(4) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr), MapEntry.entry(jbossEap.getIsbn(), jbossEap), MapEntry.entry(newJavaBook.getIsbn(), newJavaBook)); assertThat(myCqListener.getJoinCalled()) .hasSize(4) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1), MapEntry.entry(jbossEap.getIsbn(), 1), MapEntry.entry(newJavaBook.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(newJavaBook.getIsbn(), 1));
しかし、そういうものなんでしょうか?条件にマッチするものが更新されて、さらにマッチする結果になったとして、イベントは受けられなくていいものなのかなぁ…。
ネストした要素に対してContinuous Query
オマケとして、今回はネストしたEntityとして検索対象を定義しているので、ネストした要素に対してContinuous Queryを仕込んでみます。
@Test public void testContinuousQueryNested() { withCache("distCache", 1, (Cache<String, Book> cache) -> { QueryFactory queryFactory = Search.getQueryFactory(cache); Query query = queryFactory .from(Book.class) .having("tags.name") .eq("全文検索") .toBuilder() .build(); MyContinuousQueryListener<String, Book> myCqListener = new MyContinuousQueryListener<>(); ContinuousQuery<String, Book> cq = new ContinuousQuery<>(cache); cq.addContinuousQueryListener(query, myCqListener); // ここで、要素の追加 }); }
タグに「全文検索」を持つものを対象にします。
書籍を追加していってみます。
タグに「全文検索」を持たない書籍。
Book sprintBoot = Book .create("978-4777518654", "はじめてのSpring Boot 「Spring Framework」で簡単Javaアプリ開発", 2700, "Java", "Spring"); cache.put(sprintBoot.getIsbn(), sprintBoot); assertThat(myCqListener.getJoined()) .isEmpty(); assertThat(myCqListener.getJoinCalled()) .isEmpty(); assertThat(myCqListener.getLeaveCalled()) .isEmpty();
resultJoiningが起動されません、と。
タグに「全文検索」を持つ書籍。
Book elasticsearch = Book .create("978-4048662024", "高速スケーラブル検索エンジン ElasticSearch Server", 3024, "Elasticsearch", "全文検索", "Java", "Lucene"); cache.put(elasticsearch.getIsbn(), elasticsearch); assertThat(myCqListener.getJoined()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch)); assertThat(myCqListener.getJoinCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .isEmpty(); ///// Book solr = Book .create("978-4774161631", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン", 3888, "Solr", "全文検索", "Java", "Lucene"); cache.put(solr.getIsbn(), solr); assertThat(myCqListener.getJoined()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr)); assertThat(myCqListener.getJoinCalled()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .isEmpty();
追加されました、と。
今度は、削除してみます。
タグに「全文検索」を持つ書籍。
cache.remove(elasticsearch.getIsbn()); assertThat(cache.get(elasticsearch.getIsbn())) .isNull(); assertThat(myCqListener.getJoined()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr)); assertThat(myCqListener.getJoinCalled()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1));
resultLeavingが呼び出されています。
タグに「全文検索」を持たない書籍。
cache.remove(sprintBoot.getIsbn()); assertThat(cache.get(sprintBoot.getIsbn())) .isNull(); assertThat(myCqListener.getJoined()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), elasticsearch), MapEntry.entry(solr.getIsbn(), solr)); assertThat(myCqListener.getJoinCalled()) .hasSize(2) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1), MapEntry.entry(solr.getIsbn(), 1)); assertThat(myCqListener.getLeaveCalled()) .hasSize(1) .containsOnly(MapEntry.entry(elasticsearch.getIsbn(), 1));
こちらは、resultLeavingが呼び出されていません。
まとめ
Infinispan 8.0.0.Finalから追加された、Continuous Queryを試してみました。だいたい使い方はわかった感じですね。
あと、試したりコードを見た感じ、Queryは使用しますが、Continuous Query自体はHibernate Searchは必須ではなさそうです。イベント発火時に、リフレクションベースの検索が動作している感じみたいですね。
普通にQueryを投げたい時は、Hibernate SearchのEnityとした方がよいケースもあるでしょうけれど。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-continuous-query