CLOVER🍀

That was when it all began.

Infinispanの改善されたDistributed Iterator(with Reactive Streams/RxJava)を試す

Infinispan 9.2.0.Finalが先日リリースされました。

Infinispan: Infinispan 9.2.0.Final

いろいろ変わったところはあるのですが、個人的にまず気になったのはこちら。

Reactive streams-based distributed Iteration improvements
Distributed iterator now uses less threads and allows for efficient parallel retrieval providing for improved throughput

http://blog.infinispan.org/2018/02/infinispan-920final.html

(Distributed)IteratorがReactive Streamsベースになったとか?

Infinispan: Distributed iteration improvements

ちょっと気になるところですね、試してみましょう。

Distributed Iterator

まあ、例によってドキュメントはないので…詳細はこちらのブログエントリになります。

Infinispan: Distributed iteration improvements

エントリの取得がこれまでのようなPull型ではなく、Push型になりました。これを実現するために、InfinispanのCoreモジュールに新たな依存関係として、
RxJava2とReactive Streamsが追加されています。

つまり、IteratorがReactive Streamsを使って動作するように書き換えられたということです。

これによって、ブロッキング操作が減ってInfinispan内部で使用していたスレッドが節約できるようになったようです。

パフォーマンスはどうなったか?ということですが、ブログエントリに記載がありますが、標準的な使い方で11%の速度向上、シーケンシャルIteratorで3.5%、
リハッシュを無効にした場合は14%のパフォーマンス向上が結果として得られたようです。

この仕組みは、Distributed Streams APIの中に組み込まれているため、今後はCacheStreamからPublisherを返せるようにAPIを拡張していくことを
考えているようです。また、Segmentベースのイテレーションが可能なようにもしたいのだとか。

ちょっと今後も見ておきたい機能ですね。

使ってみる

というわけで、紹介はこのくらいにして実際に使っていってみましょう。

依存関係から。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "9.2.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

InfinispanのCoreがあれば問題ありません。

Infinispan 9.2.0.Finalでは、「infinispan-core」を依存関係に追加すると、推移的な依存関係として全部で以下が入るようになりました。

  • Caffeine
  • Reactive Streams/RxJava2
  • JBoss Logging
  • JBoss Marshalling
  • JGroups
  • JTAAPIのみ)
  • Infinispan Commons

あとは、テスト用にScalaTestを追加しています。

Javaバージョンは、以下です。

$ java -version
openjdk version "1.8.0_151"
OpenJDK Runtime Environment (build 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

テストコードの雛形と設定ファイル

テストコードの雛形は、こんな感じで。
src/test/scala/org/littlewings/infinispan/distiterator/DistributedIteratorSuite.scala

package org.littlewings.infinispan.distiterator

import java.util.Map

import org.infinispan.Cache
import org.infinispan.commons.util.CloseableIterator
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.util.function.{SerializableConsumer, SerializableFunction}
import org.scalatest.{FunSuite, Matchers}

import scala.collection.JavaConverters._

class DistributedIteratorSuite extends FunSuite with Matchers {
  // ここに、テストを書く!

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    try {
      managers.foreach(_.getCache[K, V](cacheName))

      val cache = managers(0).getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    } finally {
      managers.foreach(_.stop())
    }
  }
}

簡単にクラスタを構成できる、ヘルパーメソッド付き。

あとは設定ファイルを用意。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.2 http://www.infinispan.org/schemas/infinispan-config-9.2.xsd"
        xmlns="urn:infinispan:config:9.2">

    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>

    <cache-container>
        <transport cluster="test-cluster" stack="udp"/>

        <distributed-cache name="distributedCache"/>

        <replicated-cache name="replicatedCache"/>

        <local-cache name="localCache"/>
    </cache-container>
</infinispan>

Distributed Cache、Replicated Cache、Local Cacheの3種類を今回は用意しました。

Distributed Iterator

では、Iteratorを使ったコードを書いてみましょう。

…といっても、ただのIteratorなので、使い方としては特に特筆することはありません。クラスタを構成可能なCacheは、いずれもNode数は3としています。

Distributed Cache。

  test("distributed cache iterator") {
    withCache[String, Integer]("distributedCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key${i}", i))

      val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] =
        cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]

      /*
      while (iterator.hasNext()) {
        println(iterator.next())
      }
      */

      iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array(
        "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5,
        "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10
      )

      iterator.close()
    }
  }

Replicated Cache。

  test("replicated cache iterator") {
    withCache[String, Integer]("replicatedCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key${i}", i))

      val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] =
        cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]

      /*
      while (iterator.hasNext()) {
        println(iterator.next())
      }
      */

      iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array(
        "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5,
        "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10
      )

      iterator.close()
    }
  }

Local Cache。

  test("local cache iterator") {
    withCache[String, Integer]("localCache") { cache =>
      (1 to 10).foreach(i => cache.put(s"key${i}", i))

      val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] =
        cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]

      /*
      while (iterator.hasNext()) {
        println(iterator.next())
      }
      */

      iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array(
        "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5,
        "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10
      )

      iterator.close()
    }
  }

どのコードも、全部同じです。

Cache#entrySetやkeySet、valuesなどからCacheSetやCacheCollectionを取得して、そこからIteratorを取得します。キャストのコードが入っているのは、
Scalaの都合だったりします…。

      val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] =
        cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]

あとは、Iteratorとしてふつうに使うだけです。

      while (iterator.hasNext()) {
        println(iterator.next())
      }

今回は、ScalaIteratorに変換してアサーションしましたが。

      iterator.asScala.toList.map(e => e.getKey -> e.getValue) should contain theSameElementsAs Array(
        "key1" -> 1, "key2" -> 2, "key3" -> 3, "key4" -> 4, "key5" -> 5,
        "key6" -> 6, "key7" -> 7, "key8" -> 8, "key9" -> 9, "key10" -> 10
      )

最後にcloseしておきましょう。

      iterator.close()

中身は?

で、これだけで終わっては面白くありません。もう少し中身がどうなっているのか確認してみましょう。

今回、Distributed Cache、Replicated Cache、Local Cacheで試してみましたが、冒頭の紹介したPublisher/Subscriberが使われているのは
Distributed Cacheから作成したIterator、Distributed Iteratorのみです。

Distributed Cache

まずは、Distributed Cacheの場合から。

IteratorはCacheCollection/CacheSetから取得するわけですが、

      val iterator: CloseableIterator[java.util.Map.Entry[String, Integer]] =
        cache.entrySet.iterator.asInstanceOf[CloseableIterator[java.util.Map.Entry[String, Integer]]]

この時点で取得するIteratorはDistributionBulkInterceptorの内部クラス(BackingEntrySet)から取得するIteratorになり、まだRxJava2は登場しません。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/DistributionBulkInterceptor.java#L96-L100

valuesのようなCacheCollectionを使っても、このDistributionBulkInterceptorにたどり着きます。

Reactive Streamsを使ったIteratorが登場するのは、Iterator#hasNextやIterator#nextを最初に使用した時です。このIteratorは、DistributedCacheStreamで
作成されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L534
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L536-L545

対象となるNode、Segmentの情報を元にして、複数のPublisherを合成したIteratorを作成します。この時のPublisherの数は、対象に選出されたNode数と4の
小さい方が選択されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L652-L686
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L103-L107

Iteratorの実体は、MultiSubscriberIteratorになります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L110

IteratorとしてはQueueに登録された値をポーリングしてIterator#nextやIterator#hasNextの動きを見せている、という挙動になります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L167

ここでいうQueueは、実はSubscriberだったりします。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L228

というわけで、Subscribeした結果をまとめてIteratorとして見せているというのがDistributed Iteratorの正体です。

さて、ここでPublisherはどうやって作られるんでしょう?と。

Remote Nodeに対するPublisherは、まずはClusterStreamManager/ClusterStreamManagerImplの内部クラスとして作成されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L661
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L677
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L442

これは、あくまで自前のPublisherです(RemoteIteratorPublisherがPublisherを実装しています)。

さらにこれが、RxJava2のFlowableで包まれることになります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L663
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L679
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java#L48-L54
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java#L45
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java#L66-L73
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java#L75-L76

Local Nodeの場合は、その場でPublisherを作成するか、Flowable#emptyのどちらかになります。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L637-L643
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L646

https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/AbstractRehashPublisherDecorator.java#L45-L62

Local Node向けのPublisherを作る場合は、内部で保持しているStreamからIteratorを取得し、これをFlowableで包みます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L618

これらのPublisherを元にして、MultiSubscriberIteratorが作られるというわけですね。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/PriorityMergingProcessor.java#L104

Subscriptionは、ClusterStreamManagerImpl内に定義されています。こちらも、Infinispanが自前で作成しているものです。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L478

データをPullしてくるところについては、このSubscriptionを見るとよいでしょう。

StreamIteratorNextCommandを使っているところや
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L545

レスポンスのハンドリングしているところなどを中心に。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L549-L622

Publisherを駆動するExecutorですが、予想に違わずInfinispanのAsync用のExecutorが使用されます。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/DistributionBulkInterceptor.java#L143
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java#L45

これ自体は、DistributedCacheStreamで使うExecutorと同じです。

ここまで見てきたクラスのうち、Reactive StreamsおよびRxJava2が出てくるのはすべてStreamを実装で登場するパッケージです。
https://github.com/infinispan/infinispan/tree/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl

その割には、現在のDistributed Streams API自体は、まだReactive Streamsは使わないのですが…。

  test("distributed cache stream") {
    withCache[String, Integer]("distributedCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key${i}", i))

      val stream = cache.entrySet.stream
      try {
        stream
          .map[String](new SerializableFunction[java.util.Map.Entry[String, Integer], String] {
          override def apply(e: Map.Entry[String, Integer]): String = e.getKey
        })
          .forEach(new SerializableConsumer[String] {
            override def accept(v: String): Unit = println(v)
          })
      } finally {
        stream.close()
      }
    }
  }

DistributedCacheStream内を起点にReactive StreamsのAPIを使っている割には…まあ、今後なのでしょう。今後の予定でStreamがPublisherを公開する予定が
あるというので、なおさら。

Replicated Cache

Replicated Cacheの場合は、ローカル用のPublisherが作成されます。定義自体は、DisbributedCacheStreamにあるのですが。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L650

この場合は、RemoteIteratorPublisherを直接RxJava2のFlowableで包む形になります。つまり、Remote Nodeのことは特に知りません。
全データがLocal Nodeにあるのだから、そうなのですが。なお、使われるのはRxJava2のBlockingFlowableIterable…から取得するIteratorです。

微妙にReactive Streams/RxJava2絡んでますね。

Local Cache

最後にLocal Cacheの場合。Local Cacheになるとだいぶシンプルになり(そりゃあそうですね)、Infinispanのデータ管理の実体である
DataContainerから直接Iteratorを作り出します。
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/commands/read/EntrySetCommand.java#L92
https://github.com/infinispan/infinispan/blob/9.2.0.Final/core/src/main/java/org/infinispan/util/DataContainerRemoveIterator.java

まとめ

Infinispan 9.2.0.Finalで導入された、Reactive StreamsベースのDistributed Iteratorを見てみました。

まだInfinispan自体をReactive StreamsのAPIを使って操作をすることはできませんが(あくまでInfinispanの内部利用のみのため)、今後少しずつ
APIが公開されていくとよいですね。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distributed-iterator