CLOVER🍀

That was when it all began.

Infinispan 9.1で追加されたClustered Countersを試す

Infinispan 9.1から、新機能としてClustered Countersが追加されました。

Infinispan: Cluster Counter

Clustered Counters

9.1からと言いつつ、実は9.0.2でも使えるようになっている感じがあります。

今回は、このClustered Countersを見ていってみようと思います。

Clustered Countersとは?

文字通り、クラスタ内で共有して使えるカウンターになります。InfinispanでCache以外のデータの持ち方としては、初めてになるのでしょうか?

Clustered Countersには、次の2種類があります。

  • StrongCounter
  • WeakCounter

現時点では、Embedded Modeでのみ使える機能になります。

StrongCounter

StrongCounterは、Cacheに格納された単一のキーを使用して、一貫性を持ったカウンターを提供します。

更新時はロックを取り、現在の値を読み取る際にはロックを使用しません。CAS操作のようなアトミックな演算が可能です。

この種類のカウンターのユースケースとしては、以下が挙げられるようです。

  • クラスタにまたがったIDジェネレーター
  • クラスタにまたがったシーケンス
  • 上限・下限のあるカウンター(※)

※ WeakCounterでは上限、下限は設定できません

WeakCounter

WeakCounterは、カウンターの値を複数のキーに格納します。作成されるキーの数は、concurrency-level属性で設定することができます。

各キーは、カウンターの部分的な値を保存しており、同時に更新することができます。

StrongCounterに比べて、Cache内の競合が起きにくくなります。その一方で、読み取り時のコストは高くなり、またカウンターに上限、下限を
儲けることができません。

注意点としては、リセット(カウンター値の初期化)操作はアトミックな操作ではなく、リセットが完了するまでの中途半端な状態が発生します。
この過程は、カウンターに登録したListenerにより確認することができます。

ユースケースとしては、更新処理の結果が不要である場合や、カウンターの値の読み取りがあまり頻繁に利用されない場合が適しているようです。
例えば、統計情報の収集などが良い例になるでしょう。

準備

と、説明はこれくらいにして実際に使ってみましょう。

まずは準備から。sbtの依存関係は、こんな感じで。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-clustered-counter" % "9.1.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.3" % Test
)

「infinispan-clustered-counter」というモジュールが必要です。

ScalaTestは、テストコード用。

設定ファイルの大枠は、こんな感じで用意。

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.1 http://www.infinispan.org/schemas/infinispan-config-9.1.xsd"
        xmlns="urn:infinispan:config:9.1">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>
    <cache-container>
        <jmx duplicate-domains="true"/>
        <transport cluster="test-cluster" stack="udp"/>

        <!-- ここを変えていく!! -->

    </cache-container>
</infinispan>

確認する内容にしたがって、この中に追記していきたいと思います。

また、テストコードの雛形は、このような感じで。
src/test/scala/org/littlewings/infinispan/counter/ClusteredCounterSpec.scala

package org.littlewings.infinispan.counter

import java.util.concurrent.CompletionException
import java.util.stream.Collectors

import org.infinispan.counter.EmbeddedCounterManagerFactory
import org.infinispan.counter.api.CounterState
import org.infinispan.counter.exception.CounterOutOfBoundsException
import org.infinispan.counter.impl.CounterModuleLifecycle
import org.infinispan.counter.impl.entries.{CounterKey, CounterValue}
import org.infinispan.counter.impl.strong.{BoundedStrongCounter, UnboundedStrongCounter}
import org.infinispan.counter.impl.weak.WeakCounterImpl
import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager}
import org.infinispan.util.function.{SerializableFunction, SerializablePredicate}
import org.scalatest.{FunSuite, Matchers}

class ClusteredCounterSpec extends FunSuite with Matchers {
  // ここに、テストを書く!!

  protected def withCacheManagers(configurationXml: String, numInstances: Int)(fun: Seq[EmbeddedCacheManager] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager(configurationXml))

    try {
      fun(managers)
    } finally {
      managers.foreach(_.stop())
    }
  }
}

指定した数だけEmbeddedCacheManagerを作成して、クラスタを構成する感じにして使います。

では、いってみましょう。

まずは使ってみる

Clustered Countersを使うには、カウンターの設定をする必要があります。APIでも組めますし、設定ファイルでも組めますが自分は設定ファイル派
なので、そのように。

countersという新しいタグを使って、カウンターを定義する領域を作ります。

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">
            <strong-counter name="simpleStrongCounter"/>
            <weak-counter name="simpleWeakCounter"/>
        </counters>

counters内に、strong-counterやweak-counter要素を書いて、カウンター自体を定義していきます。

num-ownersとreliabilityは記載していますが、これがデフォルト値になります。

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">

※xmlnsでのバージョンが、なぜか9.0なのかはまた後で…

num-ownersは、クラスタ内でカウンター値のコピーの数です。まあ、名前からお察しできるかもしれませんが、Clustered Countersの背後で
動いているのはDistributed Cacheということになります。カウンターごとの設定はできず、カウンター全体としての設定となります。

reliabilityは、ネットワーク分断時の動作を設定します。設定できる値は2つで、それぞれ以下です。

要するに、Partition handlingの設定になります。

Partition handling

今回は、reliabilityについてはAVAILABLEのみを使うことにします。

で、最初は単純に定義したStrongCounterから。書いたコードはこんな感じ。

  test("simple strong-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("simpleStrongCounter")

      strongCounter.getName should be("simpleStrongCounter")
      strongCounter.getValue.join() should be(0L)
      strongCounter.incrementAndGet().join() should be(1L)
      strongCounter.addAndGet(100L).join() should be(101L)
      strongCounter.decrementAndGet().join() should be(100L)
      strongCounter.compareAndSet(100L, 120L)

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(1))
        .getStrongCounter("simpleStrongCounter")
        .getValue.join() should be(120L)

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(2))
        .getStrongCounter("simpleStrongCounter")
        .addAndGet(50L).join()

      strongCounter.getValue.join() should be(170L)

      strongCounter.reset().join()
      strongCounter.getValue.join() should be(0L)

      strongCounter.addAndGet(200L).join()

      strongCounter should be(a[UnboundedStrongCounter])

      val countersCache = manager.getCache[CounterKey, CounterValue](CounterModuleLifecycle.COUNTER_CACHE_NAME)
      countersCache should have size 1
      val counterKey = countersCache.keySet.stream().filter(new SerializablePredicate[CounterKey] {
        override def test(key: CounterKey): Boolean = key.getCounterName.toString == "simpleStrongCounter"
      }).findFirst().get

      countersCache.get(counterKey).getValue should be(200L)
    }
  }

とりあえず、クラスタ内のNode数は3にしておきました。カウンターを取得するにはCounterManagerを取得する必要があり、これはEmbeddedCacheManagerを
もとにしてEmbeddedCounterManagerFactoryから取得することができます。

    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("simpleStrongCounter")

CounterManagerが取得できれば、あとはカウンター名を指定してカウンターが取得できます。今回は、CounterManager#getStrongCounterで
StrongCounterを取得します。

StrongCounterに対しては、値の取得やインクリメント、任意の値の加算、デクリメント、compare-and-setなどを行うことができます。いずれの
メソッドも、CompletableFutureが返ってきます。

      strongCounter.getName should be("simpleStrongCounter")
      strongCounter.getValue.join() should be(0L)
      strongCounter.incrementAndGet().join() should be(1L)
      strongCounter.addAndGet(100L).join() should be(101L)
      strongCounter.decrementAndGet().join() should be(100L)
      strongCounter.compareAndSet(100L, 120L)

こうやって更新した値は、他のNodeからも参照、更新することができます。

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(1))
        .getStrongCounter("simpleStrongCounter")
        .getValue.join() should be(120L)

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(2))
        .getStrongCounter("simpleStrongCounter")
        .addAndGet(50L).join()

      strongCounter.getValue.join() should be(170L)

リセット。

      strongCounter.reset().join()
      strongCounter.getValue.join() should be(0L)

今回のStrongCounterは上限、下限の設定をしていないので、実体はUnboundedStrongCounterとなります。

      strongCounter should be(a[UnboundedStrongCounter])

また、カウンターは「___counters」という名前のCacheに保存されていて、カウンターに対してキーが紐づいていることも確認することが
できます。

      val countersCache = manager.getCache[CounterKey, CounterValue](CounterModuleLifecycle.COUNTER_CACHE_NAME)
      countersCache should have size 1
      val counterKey = countersCache.keySet.stream().filter(new SerializablePredicate[CounterKey] {
        override def test(key: CounterKey): Boolean = key.getCounterName.toString == "simpleStrongCounter"
      }).findFirst().get

      countersCache.get(counterKey).getValue should be(200L)

続いて、WeakCounter。

  test("simple weak-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("simpleWeakCounter")

      weakCounter.getName should be("simpleWeakCounter")
      weakCounter.getValue should be(0L)
      weakCounter.increment().join()
      weakCounter.getValue should be(1L)
      weakCounter.add(150L).join()
      weakCounter.getValue should be(151L)
      weakCounter.decrement().join()
      weakCounter.getValue should be(150L)

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(1))
        .getWeakCounter("simpleWeakCounter")
        .getValue should be(150L)

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(2))
        .getWeakCounter("simpleWeakCounter")
        .add(100L).join()

      weakCounter.getValue should be(250L)

      weakCounter.reset().join()
      weakCounter.getValue should be(0L)

      weakCounter.add(100L).join()

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(1))
        .getWeakCounter("simpleWeakCounter")
        .add(50).join()

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(2))
        .getWeakCounter("simpleWeakCounter")
        .add(200L).join()

      weakCounter.getValue should be(350L)

      weakCounter should be(a[WeakCounterImpl])

      val countersCache = manager.getCache[CounterKey, CounterValue](CounterModuleLifecycle.COUNTER_CACHE_NAME)
      countersCache should have size 6

      val keys = countersCache.keySet().stream().map[String](new SerializableFunction[CounterKey, String] {
        override def apply(key: CounterKey): String = key.toString
      }).sorted().collect(Collectors.toList[String])

      keys should have size 6
      (0 until 6).foreach(i => keys.get(i) should be(s"WeakCounterKey{counterName=simpleWeakCounter, index=${i}}"))
    }
  }

WeakCounterの場合は、CounterManagerからの取得方法がCounterManager#getWeakCounterとなります。

      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("simpleWeakCounter")

StrongCounterと捜査感覚は似ているようで異なり、値の取得はいきなり値が返りますし、その他の操作はCompletableFutureが返るものの、
値の取得はできません(Voidです)。

      weakCounter.getName should be("simpleWeakCounter")
      weakCounter.getValue should be(0L)
      weakCounter.increment().join()
      weakCounter.getValue should be(1L)
      weakCounter.add(150L).join()
      weakCounter.getValue should be(151L)
      weakCounter.decrement().join()
      weakCounter.getValue should be(150L)

クラスタ内で共有できるのは同じ。

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(1))
        .getWeakCounter("simpleWeakCounter")
        .getValue should be(150L)

      EmbeddedCounterManagerFactory
        .asCounterManager(managers(2))
        .getWeakCounter("simpleWeakCounter")
        .add(100L).join()

      weakCounter.getValue should be(250L)

リセット。

      weakCounter.reset().join()
      weakCounter.getValue should be(0L)

WeakCounterは、単純にWeakCounterImplという実装クラス。

      weakCounter should be(a[WeakCounterImpl])

カウンターの値は、StrongCounterと同じCacheに格納されますが、デフォルトでは6つのキーに分割して格納されます。

      val countersCache = manager.getCache[CounterKey, CounterValue](CounterModuleLifecycle.COUNTER_CACHE_NAME)
      countersCache should have size 6

      val keys = countersCache.keySet().stream().map[String](new SerializableFunction[CounterKey, String] {
        override def apply(key: CounterKey): String = key.toString
      }).sorted().collect(Collectors.toList[String])

      keys should have size 6
      (0 until 6).foreach(i => keys.get(i) should be(s"WeakCounterKey{counterName=simpleWeakCounter, index=${i}}"))

なぜ6なのかは、また後で。

初期値を指定する

カウンターには、初期値を指定することができます。

設定は、こんな感じです。

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">
            <strong-counter name="initialValuedStrongCounter" initial-value="100"/>
            <weak-counter name="initialValuedWeakCounter" initial-value="100"/>
        </counters>

こうすると、カウンターの初期値が変わります。

  test("initial-valued strong-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("initialValuedStrongCounter")

      strongCounter.getValue.join() should be(100L)
      strongCounter.addAndGet(50L).join() should be(150L)

      strongCounter.reset().join()
      strongCounter.getValue.join() should be(100L)
    }
  }

  test("initial-valued weak-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("initialValuedWeakCounter")

      weakCounter.getValue should be(100L)
      weakCounter.add(50L).join()
      weakCounter.getValue should be(150L)

      weakCounter.reset().join()
      weakCounter.getValue should be(100L)
    }
  }

上限、下限を設定する(StrongCounterのみ)

StrongCounterでは、上限と下限を設定することができます。

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">
            <strong-counter name="boundedStrongCounter" initial-value="100">
                <lower-bound value="50"/>
                <upper-bound value="150"/>
            </strong-counter>
        </counters>

未指定の場合は、lower-boundはLong.MIN_VALUE、upper-boundはLong.MAX_VALUEとなります。

なお、上限、下限を設定する場合はinitial-valueは上限、下限の範囲の中に収まる必要があります。

で、使ったコードはこちら。

  test("bounded strong-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("boundedStrongCounter")

      strongCounter.getValue.join() should be(100L)
      strongCounter.addAndGet(50L).join() should be(150L)

      val thrown = the[CompletionException] thrownBy strongCounter.incrementAndGet().join()
      val counterOutOfBounds = thrown.getCause.asInstanceOf[CounterOutOfBoundsException]
      counterOutOfBounds should be(a[CounterOutOfBoundsException])
      counterOutOfBounds.getMessage should be("ISPN028001: Upper bound reached.")
      counterOutOfBounds.isUpperBoundReached should be(true)

      strongCounter.reset().join()
      strongCounter.getValue.join() should be(100L)

      strongCounter should be(a[BoundedStrongCounter])
    }
  }

演算の結果、上限や下限をはみ出す場合は例外が発生します。

例えば上限を越えると、こうなります。

      val thrown = the[CompletionException] thrownBy strongCounter.incrementAndGet().join()
      val counterOutOfBounds = thrown.getCause.asInstanceOf[CounterOutOfBoundsException]
      counterOutOfBounds should be(a[CounterOutOfBoundsException])
      counterOutOfBounds.getMessage should be("ISPN028001: Upper bound reached.")
      counterOutOfBounds.isUpperBoundReached should be(true)

発生する例外はCounterOutOfBoundsExceptionで、CounterOutOfBoundsException#isUpperBoundReached、またはisLowerBoundReachedで
オーバーフローしたのかアンダーフローしたのかを確認することができます。

上限、下限の設定(どちらか一方でも)を行うと、StrongCounterの実装クラスはBoundedStrongCounterとなります。

      strongCounter should be(a[BoundedStrongCounter])

concurrency-levelを設定する(WeakCounterのみ)

WeakCounterでは、concurrency-leveoを設定することができます。

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">
            <weak-counter name="tunedConcurrencyLevelWeakCounter" concurrency-level="256"/>
        </counters>

このconcurrency-level、デフォルトでは64で、この数字がWeakCounterを構成するCacheのキーの数に影響します。

具体的には、以下の演算で算出されます。

32 - Integer.numberOfLeadingZeros(concurrencyLevel - 1);

これ、どういう数になるかというと、32から"指定されたint値の2の補数バイナリ表現の最上位(「もっとも左側」)の1のビットに先行するゼロのビットの数"
(正確には指定されたint値から1引いてますが)引いたものになります(Integer#numberOfLeadingZerosより)。

64であれば6、65であれば7、128であれば7、129であれば8です。

これがWeakCounterが使用するキーの数になります。今回は256を指定したので「8」になります。

  test("tuned concurrency-level weak-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("tunedConcurrencyLevelWeakCounter")

      weakCounter.getValue should be(0L)
      weakCounter.increment().join()
      weakCounter.getValue should be(1L)
      weakCounter.increment().join()
      weakCounter.getValue should be(2L)

      val countersCache = manager.getCache[CounterKey, CounterValue]("___counters")
      countersCache should have size 8

      val keys = countersCache.keySet().stream().map[String](new SerializableFunction[CounterKey, String] {
        override def apply(key: CounterKey): String = key.toString
      }).sorted().collect(Collectors.toList[String])

      keys should have size 8
      (0 until 8).foreach(i => keys.get(i) should be(s"WeakCounterKey{counterName=tunedConcurrencyLevelWeakCounter, index=${i}}"))
    }
  }

カウンター値の永続化

StrongCounter、WeakCounterとも、カウンターの値を保存して、Nodeが再起動してもその値を引き継ぐことができます。

storageという属性があり、明示的に指定した場合はこのような定義になります。

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">
            <strong-counter name="simpleStrongCounter" storage="VOLATILE"/>
            <weak-counter name="simpleWeakCounter" storage="VOLATILE"/>
        </counters>

値としては「VOLATILE」です。

この設定だと、Nodeが再起動すると値が失われます。そりゃそうですね。
※ちょっと都合上、Node数はここだけひとつにしています

  test("volatile strong-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("simpleStrongCounter")

      strongCounter.getValue.join() should be(0L)
      strongCounter.addAndGet(100L).join() should be(100L)
    }

    withCacheManagers("infinispan-counter-volatile.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("simpleStrongCounter")

      strongCounter.getValue.join() should be(0L)
    }
  }

  test("volatile weak-counter") {
    withCacheManagers("infinispan-counter-volatile.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("simpleWeakCounter")

      weakCounter.getValue should be(0L)
      weakCounter.add(100L).join()
    }

    withCacheManagers("infinispan-counter-volatile.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("simpleWeakCounter")

      weakCounter.getValue should be(0L)
    }
  }

ここで、cache-container要素の配下にglobal-stateを定義することで、storage属性に「PERSISTENT」を指定することができるように
なります。

    <cache-container>
        <jmx duplicate-domains="true"/>
        <transport cluster="test-cluster" stack="udp"/>

        <global-state>
            <persistent-location path="target/global-state"/>
        </global-state>

        <counters xmlns="urn:infinispan:config:counters:9.0" num-owners="2" reliability="AVAILABLE">
            <strong-counter name="persistentStrongCounter" storage="PERSISTENT"/>
            <weak-counter name="persistentWeakCounter" storage="PERSISTENT"/>
        </counters>
    </cache-container>

global-stateを設定しない状態では、「PERSISTENT」を指定するとエラーになります。

こうすると、Nodeを再起動しても値を覚えていてくれるようになります。

  test("persistent strong-counter") {
    withCacheManagers("infinispan-counter-persistent.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("persistentStrongCounter")

      strongCounter.getValue.join() should be(0L)
      strongCounter.addAndGet(100L).join() should be(100L)
    }

    withCacheManagers("infinispan-counter-persistent.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("persistentStrongCounter")

      strongCounter.getValue.join() should be(100L)
    }
  }

  test("persistent weak-counter") {
    withCacheManagers("infinispan-counter-persistent.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("persistentWeakCounter")

      weakCounter.getValue should be(0L)
      weakCounter.add(100L).join()
    }

    withCacheManagers("infinispan-counter-persistent.xml", 1) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("persistentWeakCounter")

      weakCounter.getValue should be(100L)
    }
  }

なお、カウンターの状態としては次のようなものが保存されています。

target/global-state/___counters.dat
FCS1j^\(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^C&#65533;^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A@j^]'&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^WpersistentStrongCounter^C&#65533;d^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^Aj(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^A^C&#65533;^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A@j^\(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^B^C&#65533;^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A@j^\(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^C^C&#65533;^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A@j^\(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^D^C&#65533;^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A@j^\(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^C&#65533;^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A@j^\(&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^C&#65533;^UpersistentWeakCounter^A^C&#65533;d^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^BY^A^C&#65533;^B^A

その他、カウンターの設定や

target/global-state/___counter_configuration.dat
FCS1h^[&&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^A^B^A^WpersistentStrongCounter^C&#65533;^A^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^B9^A&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;g^Y^P&&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^A^B^A^UpersistentWeakCounter^C&#65533;^B^A@^B=&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;^B9^A&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;&#65533;

Nodeの情報が保存されていたりもします。

target/global-state/___global.state 
@version=9.1.0.Final
version-major=9
@timestamp=2017-08-18T14\:41\:27.231Z
uuid=ad45f9e3-b5ba-4f33-9b42-912957d8d8b1

というか、global-state知りませんでした。

Listener

最後はListenerです。

カウンターにはListenerを設定することができ、値が変化する時にイベントを受け取ることができます。

今回は、受け取った情報をそのまま持つListenerにしてみましょう。
src/test/scala/org/littlewings/infinispan/counter/MyCounterListener.scala

package org.littlewings.infinispan.counter

import org.infinispan.counter.api.{CounterEvent, CounterListener, CounterState}

import scala.collection.mutable

class MyCounterListener extends CounterListener {
  val receiveEvents: mutable.Map[Int, (CounterState, Long, CounterState, Long)] =
    mutable.Map.empty[Int, (CounterState, Long, CounterState, Long)]

  override def onUpdate(entry: CounterEvent): Unit = {
    val index = receiveEvents.size + 1
    receiveEvents += (index -> ((entry.getOldState, entry.getOldValue, entry.getNewState, entry.getNewValue)))
    System.out.println(receiveEvents(index))
  }
}

Listenerは、StrongCounter、WeakCounterのどちらでも使えます。

StrongCounterは、上限、下限設定ありで使ってみましょう。

  test("bounded strong-counter with listener") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val strongCounter = counterManager.getStrongCounter("boundedStrongCounter")

      val listener = new MyCounterListener
      val handle = strongCounter.addListener(listener)

      strongCounter.getValue.join() should be(100L)
      strongCounter.addAndGet(50L).join() should be(150L)

      listener.receiveEvents(1) should be((CounterState.VALID, 100L, CounterState.VALID, 150L))

      val thrown = the[CompletionException] thrownBy strongCounter.incrementAndGet().join()
      val counterOutOfBounds = thrown.getCause.asInstanceOf[CounterOutOfBoundsException]
      counterOutOfBounds.getMessage should be("ISPN028001: Upper bound reached.")

      listener.receiveEvents(2) should be((CounterState.VALID, 150L, CounterState.UPPER_BOUND_REACHED, 150L))

      strongCounter.reset().join()

      listener.receiveEvents(3) should be((CounterState.UPPER_BOUND_REACHED, 150L, CounterState.VALID, 100L))

      strongCounter.getValue.join() should be(100L)

      listener.receiveEvents should  have size 3

      handle.getCounterListener should be(listener)
      handle.remove()
    }
  }

値の変更時にイベントを受け取れていますが、

      strongCounter.getValue.join() should be(100L)
      strongCounter.addAndGet(50L).join() should be(150L)

      listener.receiveEvents(1) should be((CounterState.VALID, 100L, CounterState.VALID, 150L))

オーバーフローするとステータスがUPPER_BOUND_REACHEDとなります。

      val thrown = the[CompletionException] thrownBy strongCounter.incrementAndGet().join()
      val counterOutOfBounds = thrown.getCause.asInstanceOf[CounterOutOfBoundsException]
      counterOutOfBounds.getMessage should be("ISPN028001: Upper bound reached.")

      listener.receiveEvents(2) should be((CounterState.VALID, 150L, CounterState.UPPER_BOUND_REACHED, 150L))

WeakCounterの場合は特筆すべきはresetで、内部的に持っているCacheのキーの数だけイベントが発生します。

  test("weak-counter with listener") {
    withCacheManagers("infinispan-counter-volatile.xml", 3) { managers =>
      val manager = managers(0)
      val counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager)
      val weakCounter = counterManager.getWeakCounter("initialValuedWeakCounter")

      val listener = new MyCounterListener
      val handle = weakCounter.addListener(listener)

      weakCounter.getValue should be(100L)

      weakCounter.increment().join()
      listener.receiveEvents(1) should be((CounterState.VALID, 100L, CounterState.VALID, 101L))

      weakCounter.increment().join()
      listener.receiveEvents(2) should be((CounterState.VALID, 101L, CounterState.VALID, 102L))

      weakCounter.reset().join()
      listener.receiveEvents(3) should be((CounterState.VALID, 102L, CounterState.VALID, 102L))
      listener.receiveEvents(4) should be((CounterState.VALID, 102L, CounterState.VALID, 100L))
      listener.receiveEvents(5) should be((CounterState.VALID, 100L, CounterState.VALID, 100L))
      listener.receiveEvents(6) should be((CounterState.VALID, 100L, CounterState.VALID, 100L))
      listener.receiveEvents(7) should be((CounterState.VALID, 100L, CounterState.VALID, 100L))
      listener.receiveEvents(8) should be((CounterState.VALID, 100L, CounterState.VALID, 100L))

      weakCounter.getValue should be(100L)

      listener.receiveEvents should have size 8

      handle.getCounterListener should be(listener)
      handle.remove()
    }
  }

この時、一気に値がリセットされないのはWeakCounterで最初に説明したとおりです。

ここまでで、ひととおりAPIは触ってみた感じですね。

もうちょっと内部の話

せっかくなので、もう少し中身を。

Clustered Countersの構成

Clustered Countersは、2つのCacheで成り立っています。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/CounterModuleLifecycle.java#L149-L161

ひとつはカウンター自体を表すCacheで、これはDistributed Cacheです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/CounterModuleLifecycle.java#L57-L68

もうひとつは、カウンターの設定を格納するCacheで、これはReplicated Cacheです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/CounterModuleLifecycle.java#L70-L80

割と固定の内容ですね?設定ファイルなどで構成した内容は、このレベルだと「reliability」だけがPartition handlingとしてCacheに反映されることになります。

先ほど紹介した、global-stateで保存されていたカウンターの設定は、このReplicated Cacheで表されていた方ですね。

StrongCounter

StrongCacheは、単一のキーとカウンターがマッピングされて管理されることになります。

また、上限、下限の境界を設定するかどうかで、StrongCounterの実装クラスが変わります。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/manager/EmbeddedCounterManager.java#L130-L133

主要な実装は親クラスであるAbstractStrongCounterにまとめられているのですが、Functional Mapを使った実装になっています。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/strong/AbstractStrongCounter.java

よって、CompletableFutureが随所に出てくる、と。

なお、上限、下限設定時にはそれぞれ境界を越えると例外がスローされますが
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/strong/BoundedStrongCounter.java#L52-L60

上限、下限の設定がない場合はそれぞれLong.MAX_VALUEないし、Long.MIN_VALUEにとどまるようです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/function/AddFunction.java#L76

イベントの通知ですが、Clustered Listenerとして実装されているため、こちらで別Nodeでの更新イベントを受け取るようになっているみたいです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/strong/AbstractStrongCounter.java#L52

WeakCounter

WeakCounterは、カウンターの値をいくつかのキーで分割して管理しています。

キーで分割されたカウンターへの割り当てをどうやって割り当てているかですが、スレッドのIDがベースになっているみたいです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/weak/WeakCounterImpl.java#L233-L235

主な操作は、こちらもFunctional Mapです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/weak/WeakCounterImpl.java#L141

カウンターの値を算出する時はCompletableFutureではありませんでしたが、こちらは分割されたエントリの合算で算出します。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/weak/WeakCounterImpl.java#L193-L204

また、WeakCounterもClustered Listenerとして実装されています。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/weak/WeakCounterImpl.java#L59

各種操作でキーに対する値を更新するのですが、
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/weak/WeakCounterImpl.java#L139-L142

その更新イベントを受け取り、Listenerに通知するという実装を取っています。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/counter/src/main/java/org/infinispan/counter/impl/weak/WeakCounterImpl.java#L165-L172

まとめ

Infinispan 9.1で追加された、Clustered Countersを試してみました。

基本的な使い方と、今回はだいぶ中身も見てみたので面白かったです。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-clustered-counter

オマケ

Clustered CountersをXML内で指定する際のスキーマバージョンが、なぜか9.0だったのはPullRequest出しておきました。たぶん、9.0で入れて9.1に
するのが落ちていたんだろうと…。