Infinispan 9.2.0.Finalが先日リリースされました。
Infinispan: Infinispan 9.2.0.Final
いろいろ変わったところはあるのですが、個人的にまず気になったのはこちら。
Reactive streams-based distributed Iteration improvements
http://blog.infinispan.org/2018/02/infinispan-920final.html
Distributed iterator now uses less threads and allows for efficient parallel retrieval providing for improved throughput
(Distributed)IteratorがReactive Streamsベースになったとか?
Infinispan: Distributed iteration improvements
ちょっと気になるところですね、試してみましょう。
Distributed Iterator?
まあ、例によってドキュメントはないので…詳細はこちらのブログエントリになります。
Infinispan: Distributed iteration improvements
エントリの取得がこれまでのようなPull型ではなく、Push型になりました。これを実現するために、InfinispanのCoreモジュールに新たな依存関係として、
RxJava2とReactive Streamsが追加されています。
つまり、IteratorがReactive Streamsを使って動作するように書き換えられたということです。
これによって、ブロッキング操作が減ってInfinispan内部で使用していたスレッドが節約できるようになったようです。
パフォーマンスはどうなったか?ということですが、ブログエントリに記載がありますが、標準的な使い方で11%の速度向上、シーケンシャルIteratorで3.5%、
リハッシュを無効にした場合は14%のパフォーマンス向上が結果として得られたようです。
この仕組みは、Distributed Streams APIの中に組み込まれているため、今後はCacheStreamからPublisherを返せるようにAPIを拡張していくことを
考えているようです。また、Segmentベースのイテレーションが可能なようにもしたいのだとか。
ちょっと今後も見ておきたい機能ですね。
使ってみる
というわけで、紹介はこのくらいにして実際に使っていってみましょう。
依存関係から。
libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "9.2.0.Final" % Compile, "net.jcip" % "jcip-annotations" % "1.0" % Provided, "org.scalatest" %% "scalatest" % "3.0.5" % Test )
InfinispanのCoreがあれば問題ありません。
Infinispan 9.2.0.Finalでは、「infinispan-core」を依存関係に追加すると、推移的な依存関係として全部で以下が入るようになりました。
- Caffeine
- Reactive Streams/RxJava2
- JBoss Logging
- JBoss Marshalling
- JGroups
- JTA(APIのみ)
- Infinispan Commons
あとは、テスト用にScalaTestを追加しています。
$ java -version openjdk version "1.8.0_151" OpenJDK Runtime Environment (build 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12) OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
テストコードの雛形と設定ファイル
テストコードの雛形は、こんな感じで。
src/test/scala/org/littlewings/infinispan/distiterator/DistributedIteratorSuite.scala
package org.littlewings.infinispan.distiterator import java.util.Map import org.infinispan.Cache import org.infinispan.commons.util.CloseableIterator import org.infinispan.manager.DefaultCacheManager import org.infinispan.util.function.{SerializableConsumer, SerializableFunction} import org.scalatest.{FunSuite, Matchers} import scala.collection.JavaConverters._ class DistributedIteratorSuite extends FunSuite with Matchers { // ここに、テストを書く! protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml")) try { managers.foreach(_.getCache[K, V](cacheName)) val cache = managers(0).getCache[K, V](cacheName) fun(cache) cache.stop() } finally { managers.foreach(_.stop()) } } }
簡単にクラスタを構成できる、ヘルパーメソッド付き。
あとは設定ファイルを用意。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:9.2 http://www.infinispan.org/schemas/infinispan-config-9.2.xsd" xmlns="urn:infinispan:config:9.2"> <jgroups> <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/> </jgroups> <cache-container> <transport cluster="test-cluster" stack="udp"/> <distributed-cache name="distributedCache"/> <replicated-cache name="replicatedCache"/> <local-cache name="localCache"/> </cache-container> </infinispan>
Distributed Cache、Replicated Cache、Local Cacheの3種類を今回は用意しました。
Distributed Iterator
では、Iteratorを使ったコードを書いてみましょう。
…といっても、ただのIteratorなので、使い方としては特に特筆することはありません。クラスタを構成可能なCacheは、いずれもNode数は3としています。
Distributed Cache。
test("distributed cache iterator") { withCache[String, Integer]("distributedCache", 3) { cache => (1 to 10).foreach(i => cache.put(s"key${i}", i)) val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] = cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]] /* while (iterator.hasNext()) { println(iterator.next()) } */ iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array( "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5, "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10 ) iterator.close() } }
Replicated Cache。
test("replicated cache iterator") { withCache[String, Integer]("replicatedCache", 3) { cache => (1 to 10).foreach(i => cache.put(s"key${i}", i)) val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] = cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]] /* while (iterator.hasNext()) { println(iterator.next()) } */ iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array( "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5, "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10 ) iterator.close() } }
Local Cache。
test("local cache iterator") { withCache[String, Integer]("localCache") { cache => (1 to 10).foreach(i => cache.put(s"key${i}", i)) val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] = cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]] /* while (iterator.hasNext()) { println(iterator.next()) } */ iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array( "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5, "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10 ) iterator.close() } }
どのコードも、全部同じです。
Cache#entrySetやkeySet、valuesなどからCacheSetやCacheCollectionを取得して、そこからIteratorを取得します。キャストのコードが入っているのは、
Scalaの都合だったりします…。
val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] = cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]
あとは、Iteratorとしてふつうに使うだけです。
while (iterator.hasNext()) {
println(iterator.next())
}
今回は、ScalaのIteratorに変換してアサーションしましたが。
iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array( "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5, "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10 )
最後にcloseしておきましょう。
iterator.close()
中身は?
で、これだけで終わっては面白くありません。もう少し中身がどうなっているのか確認してみましょう。
今回、Distributed Cache、Replicated Cache、Local Cacheで試してみましたが、冒頭の紹介したPublisher/Subscriberが使われているのは
Distributed Cacheから作成したIterator、Distributed Iteratorのみです。
Distributed Cache
まずは、Distributed Cacheの場合から。
IteratorはCacheCollection/CacheSetから取得するわけですが、
val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] = cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]
この時点で取得するIteratorはDistributionBulkInterceptorの内部クラス(BackingEntrySet)から取得するIteratorになり、まだRxJava2は登場しません。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/DistributionBulkInterceptor.java#L96-L100
valuesのようなCacheCollectionを使っても、このDistributionBulkInterceptorにたどり着きます。
Reactive Streamsを使ったIteratorが登場するのは、Iterator#hasNextやIterator#nextを最初に使用した時です。このIteratorは、DistributedCacheStreamで
作成されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L534
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L536-L545
対象となるNode、Segmentの情報を元にして、複数のPublisherを合成したIteratorを作成します。この時のPublisherの数は、対象に選出されたNode数と4の
小さい方が選択されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L652-L686
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L103-L107
Iteratorの実体は、MultiSubscriberIteratorになります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L110
IteratorとしてはQueueに登録された値をポーリングしてIterator#nextやIterator#hasNextの動きを見せている、という挙動になります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L167
ここでいうQueueは、実はSubscriberだったりします。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L228
というわけで、Subscribeした結果をまとめてIteratorとして見せているというのがDistributed Iteratorの正体です。
さて、ここでPublisherはどうやって作られるんでしょう?と。
Remote Nodeに対するPublisherは、まずはClusterStreamManager/ClusterStreamManagerImplの内部クラスとして作成されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L661
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L677
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L442
これは、あくまで自前のPublisherです(RemoteIteratorPublisherがPublisherを実装しています)。
さらにこれが、RxJava2のFlowableで包まれることになります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L663
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L679
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java#L48-L54
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java#L45
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java#L66-L73
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java#L75-L76
Local Nodeの場合は、その場でPublisherを作成するか、Flowable#emptyのどちらかになります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L637-L643
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L646
Local Node向けのPublisherを作る場合は、内部で保持しているStreamからIteratorを取得し、これをFlowableで包みます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L618
これらのPublisherを元にして、MultiSubscriberIteratorが作られるというわけですね。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L104
Subscriptionは、ClusterStreamManagerImpl内に定義されています。こちらも、Infinispanが自前で作成しているものです。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L478
データをPullしてくるところについては、このSubscriptionを見るとよいでしょう。
StreamIteratorNextCommandを使っているところや
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L545
レスポンスのハンドリングしているところなどを中心に。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L549-L622
Publisherを駆動するExecutorですが、予想に違わずInfinispanのAsync用のExecutorが使用されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/DistributionBulkInterceptor.java#L143
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java#L45
これ自体は、DistributedCacheStreamで使うExecutorと同じです。
ここまで見てきたクラスのうち、Reactive StreamsおよびRxJava2が出てくるのはすべてStreamを実装で登場するパッケージです。
https://github.com/infinispan/infinispan/tree/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl
その割には、現在のDistributed Streams API自体は、まだReactive Streamsは使わないのですが…。
test("distributed cache stream") { withCache[String, Integer]("distributedCache", 3) { cache => (1 to 10).foreach(i => cache.put(s"key${i}", i)) val stream = cache.entrySet.stream try { stream .map[String](new SerializableFunction[java.util.Map.Entry[String, Integer], String] { override def apply(e: Map.Entry[String, Integer]): String = e.getKey }) .forEach(new SerializableConsumer[String] { override def accept(v: String): Unit = println(v) }) } finally { stream.close() } } }
DistributedCacheStream内を起点にReactive StreamsのAPIを使っている割には…まあ、今後なのでしょう。今後の予定でStreamがPublisherを公開する予定が
あるというので、なおさら。
Replicated Cache
Replicated Cacheの場合は、ローカル用のPublisherが作成されます。定義自体は、DisbributedCacheStreamにあるのですが。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L650
この場合は、RemoteIteratorPublisherを直接RxJava2のFlowableで包む形になります。つまり、Remote Nodeのことは特に知りません。
全データがLocal Nodeにあるのだから、そうなのですが。なお、使われるのはRxJava2のBlockingFlowableIterable…から取得するIteratorです。
微妙にReactive Streams/RxJava2絡んでますね。
Local Cache
最後にLocal Cacheの場合。Local Cacheになるとだいぶシンプルになり(そりゃあそうですね)、Infinispanのデータ管理の実体である
DataContainerから直接Iteratorを作り出します。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/commands/read/EntrySetCommand.java#L92
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/util/DataContainerRemoveIterator.java
まとめ
Infinispan 9.2.0.Finalで導入された、Reactive StreamsベースのDistributed Iteratorを見てみました。
まだInfinispan自体をReactive StreamsのAPIを使って操作をすることはできませんが(あくまでInfinispanの内部利用のみのため)、今後少しずつ
APIが公開されていくとよいですね。
今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distributed-iterator