CLOVER🍀

That was when it all began.

Infinispan 9.3でWrite BehindなCacheWriterが、Fault Tolerantになったという話

Infinispan 9.3の新機能ネタです。

InfinspanにはCacheの背後のデータストアに対してデータを永続化したり読み出したりする、Persistence(CacheLoader/CacheWriter)の
仕組みがあるのですが、この中でWrite BehindなCacheWriterがInfinispan 9.3からFault Tolerantになりました、と。

Write-behind stores are now fault-tolerant by default.

https://blog.infinispan.org/2018/06/infinispan-930final-is-out.html

Write Behind?

Write Behindというのは、Cacheに書き込まれたデータを、データストアに非同期に反映する方法です。
対となるのはWrite Throughで、こちらは同期書き込みになります。

Write Behindでは、Cacheへの変更内容をキューに入れ、別スレッドでデータストアに非同期に反映されます。この時に
使われるのが、CacheWriterまたはAdvancedCacheWriterの実装です。

ちなみに、変更キューがいっぱいになった場合は、キューに受け入れ可能になるまでWrite Throughと同じになるそうな。

Persistence / Write-Behind (Asynchronous)

Fault Tolerant?

Infinispan 9.2まではWrite Behindに構成されたCacheWriterは、背後にあるデータストアの書き込みに失敗した場合は
既定回数(3回)リトライし、それでもダメだった場合は諦める=該当の変更はデータストアには反映されない、という
実装でした。

これがInfinispan 9.3では、改善されたという話です。

Add Fault-tolerance to write-behind stores

CacheWriterに対して、isAvailableというメソッドが追加されました。(これは、CacheLoaderにも追加されていますが)

CacheWriter (Infinispan JavaDoc All 9.3.2.Final API)

このisAvailableメソッドがfalseを返している間はデータストアへの反映を中止しキューに反映を積み込むだけとなり、
CacheWriter#isAvailableがtrueになったらデータストアへの反映を再開します。

今回は、この機能について見ていこうと思います。

環境

今回確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

$ sbt sbtVersion
[info] 1.1.6

使用するInfinispanは、9.3です。

準備

sbtでの依存関係は、こちら。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "9.3.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

Infinispanは、Embedded Modeで利用します。また、確認はScalaTestを使ったテストコードで行います。

CacheStore(CacheWriter/AdvancedCacheWriter)を作成する

今回の確認で使うCacheStore(CacheWriter…というか、実際はAdvancedLoadWriteStoreですが…)は、今回はインメモリ(Map)にデータを持つように作成します。
src/main/scala/org/littlewings/infinispan/writebehind/InMemoryCacheStore.scala

package org.littlewings.infinispan.writebehind

import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.concurrent.{ConcurrentHashMap, Executor}

import org.infinispan.commons.marshall.StreamingMarshaller
import org.infinispan.marshall.core.{MarshalledEntry, MarshalledEntryFactory}
import org.infinispan.metadata.InternalMetadata
import org.infinispan.persistence.spi.{AdvancedCacheWriter, AdvancedLoadWriteStore, InitializationContext}
import org.infinispan.util.TimeService
import org.jboss.logging.Logger

import scala.collection.JavaConverters._

object InMemoryCacheStore {
  val COUNTER: AtomicInteger = new AtomicInteger(0)
  val AVAILABLE: AtomicBoolean = new AtomicBoolean(true)

  private val CURRENT_STORE: AtomicReference[InMemoryCacheStore[_, _]] = new AtomicReference[InMemoryCacheStore[_, _]]

  def currentStoreEntries[K, V]: Map[K, V] =
    CURRENT_STORE.get.underlyingStore.map { case (k, (v, _)) => (k.asInstanceOf[K], v.asInstanceOf[V]) }.toMap
}

class InMemoryCacheStore[K, V] extends AdvancedLoadWriteStore[K, V] {
  val logger: Logger = Logger.getLogger(getClass)

  var configuration: InMemoryCacheStoreConfiguration = _
  var marshaller: StreamingMarshaller = _
  var marshalledEntryFactory: MarshalledEntryFactory[K, V] = _
  var timeService: TimeService = _

  val underlyingStore: scala.collection.mutable.Map[K, (V, InternalMetadata)] =
    new ConcurrentHashMap[K, (V, InternalMetadata)]().asScala

  val failedKeyCounter: scala.collection.mutable.Map[K, Int] =
    new ConcurrentHashMap[K, Int]().asScala

  override def start(): Unit = {
    logger.infof("InMemoryCacheLoadWriteStore started")

    InMemoryCacheStore.CURRENT_STORE.set(this)
  }

  override def stop(): Unit = {
    logger.infof("InMemoryCacheLoadWriteStore stopped")
  }

  override def init(ctx: InitializationContext): Unit = {
    configuration = ctx.getConfiguration.asInstanceOf[InMemoryCacheStoreConfiguration]
    marshaller = ctx.getMarshaller
    marshalledEntryFactory = ctx.getMarshalledEntryFactory.asInstanceOf[MarshalledEntryFactory[K, V]]
    timeService = ctx.getTimeService

    logger.infof("InMemoryCacheLoadWriteStore initialized")
  }

  override def size(): Int = underlyingStore.size

  override def clear(): Unit = underlyingStore.clear()

  override def purge(threadPool: Executor, listener: AdvancedCacheWriter.PurgeListener[_ >: K]): Unit = {
    logger.infof("purge started")

    val now = timeService.wallClockTime()

    underlyingStore
      .foreach { case (k, (_, meta)) =>
        if (meta.isExpired(now)) {
          logger.infof("purge key = %s", k)
          listener.entryPurged(k)
        }
      }

    logger.infof("purge end")
  }

  override def isAvailable: Boolean = InMemoryCacheStore.AVAILABLE.get

  override def write(entry: MarshalledEntry[_ <: K, _ <: V]): Unit = {
    val key = entry.getKey
    val value = entry.getValue

    failedKeyCounter
      .get(key)
      .orElse(Option(0))
      .filter(_ < InMemoryCacheStore.COUNTER.get)
      .foreach { v =>
        failedKeyCounter.put(key, v + 1)
        logger.infof("Oops!! key = %s, failed-count = %d", key, v)
        throw new RuntimeException(s"Oops!! key = ${key} failed-count = ${v}")
      }

    failedKeyCounter.put(key, 0)

    logger.infof("write key = %s, value = %s", key, value)

    underlyingStore.put(key, (value, entry.getMetadata))
  }

  override def delete(key: scala.Any): Boolean = {
    logger.infof("delete key = %s", key)
    underlyingStore.remove(key.asInstanceOf[K]).isDefined
  }

  override def load(key: scala.Any): MarshalledEntry[K, V] = {
    val loaded = underlyingStore
      .get(key.asInstanceOf[K])
      .map { case (value, metadata) => marshalledEntryFactory.newMarshalledEntry(key, value, metadata) }
      .orNull

    logger.infof("loaded key = %s, value = %s", key, loaded)

    loaded
  }

  override def contains(key: scala.Any): Boolean = underlyingStore.contains(key.asInstanceOf[K])
}

こんな感じで、CacheStoreで読み書きするデータは、Mapで管理。

  val underlyingStore: scala.collection.mutable.Map[K, (V, InternalMetadata)] =
    new ConcurrentHashMap[K, (V, InternalMetadata)]().asScala

テストでの確認目的で、このCacheWriterは外部から操作できるようにしておきます。

CacheWriter#write時に任意の回数失敗させられたり、CacheWriter#isAvailableの値をコントロールしたり、現在のMapの値を取得できるようにしたり。

object InMemoryCacheStore {
  val COUNTER: AtomicInteger = new AtomicInteger(0)
  val AVAILABLE: AtomicBoolean = new AtomicBoolean(true)

  private val CURRENT_STORE: AtomicReference[InMemoryCacheStore[_, _]] = new AtomicReference[InMemoryCacheStore[_, _]]

  def currentStoreEntries[K, V]: Map[K, V] =
    CURRENT_STORE.get.underlyingStore.map { case (k, (v, _)) => (k.asInstanceOf[K], v.asInstanceOf[V]) }.toMap
}

任意の回数CacheWriter#writeを失敗させたり、CacheWriter#isAvailableの値をコントロールしているのは、このあたり。

  override def isAvailable: Boolean = InMemoryCacheStore.AVAILABLE.get

  override def write(entry: MarshalledEntry[_ <: K, _ <: V]): Unit = {
    val key = entry.getKey
    val value = entry.getValue

    failedKeyCounter
      .get(key)
      .orElse(Option(0))
      .filter(_ < InMemoryCacheStore.COUNTER.get)
      .foreach { v =>
        failedKeyCounter.put(key, v + 1)
        logger.infof("Oops!! key = %s, failed-count = %d", key, v)
        throw new RuntimeException(s"Oops!! key = ${key} failed-count = ${v}")
      }

    failedKeyCounter.put(key, 0)

    logger.infof("write key = %s, value = %s", key, value)

    underlyingStore.put(key, (value, entry.getMetadata))
  }

お行儀良くないですが、staticにしてテストコード側からコントロールします。

あとは、このCacheStoreを作成するためのConfigurationBuilderおよびConfigurationを作成。

// src/main/scala/org/littlewings/infinispan/writebehind/InMemoryCacheStoreConfigurationBuilder.scala
package org.littlewings.infinispan.writebehind

import org.infinispan.configuration.cache.{AbstractStoreConfiguration, AbstractStoreConfigurationBuilder, PersistenceConfigurationBuilder}

class InMemoryCacheStoreConfigurationBuilder(builder: PersistenceConfigurationBuilder)
  extends AbstractStoreConfigurationBuilder[InMemoryCacheStoreConfiguration, InMemoryCacheStoreConfigurationBuilder](builder, AbstractStoreConfiguration.attributeDefinitionSet) {
  override def self(): InMemoryCacheStoreConfigurationBuilder = this

  override def create(): InMemoryCacheStoreConfiguration = new InMemoryCacheStoreConfiguration(attributes.protect, async.create, singletonStore.create)
}

// src/main/scala/org/littlewings/infinispan/writebehind/InMemoryCacheStoreConfiguration.scala
package org.littlewings.infinispan.writebehind

import org.infinispan.commons.configuration.attributes.AttributeSet
import org.infinispan.commons.configuration.{BuiltBy, ConfigurationFor}
import org.infinispan.configuration.cache.{AbstractStoreConfiguration, AsyncStoreConfiguration, SingletonStoreConfiguration}

@BuiltBy(classOf[InMemoryCacheStoreConfigurationBuilder])
@ConfigurationFor(classOf[InMemoryCacheStore[_, _]])
class InMemoryCacheStoreConfiguration(
                                       attributes: AttributeSet,
                                       async: AsyncStoreConfiguration,
                                       singletonStore: SingletonStoreConfiguration
                                     ) extends AbstractStoreConfiguration(attributes, async, singletonStore) {
}

これで、CacheStore(CacheWriter)側の準備はおしまいです。

テストコードの雛形

では、作成したCacheStoreを使って確認する側の、テストコードへ移ります。

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/writebehind/FaultTolerantCacheStoreSpec.scala

package org.littlewings.infinispan.writebehind

import java.util.concurrent.TimeUnit

import org.infinispan.Cache
import org.infinispan.configuration.cache.{AsyncStoreConfigurationBuilder, ConfigurationBuilder}
import org.infinispan.manager.DefaultCacheManager
import org.jboss.logging.Logger
import org.scalatest.{FunSuite, Matchers}

class FaultTolerantCacheStoreSpec extends FunSuite with Matchers {
  val logger: Logger = Logger.getLogger(getClass)

  // ここに、テストを書く!!

  protected def withCache[K, V](cacheName: String, persistenceBuilder: ConfigurationBuilder => AsyncStoreConfigurationBuilder[InMemoryCacheStoreConfigurationBuilder])(fun: Cache[K, V] => Unit): Unit = {
    val configuration = persistenceBuilder(new ConfigurationBuilder).build()

    val manager = new DefaultCacheManager
    manager.defineConfiguration(cacheName, configuration)

    try {
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

Cacheを簡易的に扱えるメソッドを付けていますが、テスト側でCacheStoreの設定ができるようにはしています。

とりあえず、Write BehindなCacheStoreを使ってみる

それでは最初に、特にFault Tolerantとか気にせずに、ふつうにWrite BehindなCacheStoreを使ってみます。

  test("write-behind store, simply") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(0)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(1L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )
    }
  }

API上では、async#enableとすると、Write BehindなCacheStoreになります。

        configuration
          .persistence
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled

XMLで書く場合は、write-behindタグになるようですけれどね。

http://infinispan.org/docs/9.3.x/user_guide/user_guide.html#write_behind_asynchronous

CacheStoreへの書き込み失敗数は0で

      InMemoryCacheStore.COUNTER.set(0)

非同期に書き込むことになるので、一応少しsleepしてから、CacheStoreに書き込まれた内容を見るようにしています。

      TimeUnit.SECONDS.sleep(1L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )

Write Behindに関係する設定

Write Behindに関する設定はいくつかあって、まずはWrite Throughの時にも使われる設定から。

XMLで設定する場合はpersistenceタグの属性として設定するもので、以下に記載があります。

Persistence / Configuration

項目名 意味 Write Behind時の効果 デフォルト値
connection-attempts CacheWriter/CacheLoader開始時に失敗した場合の試行回数 非同期書き込み時に失敗した場合のリトライ回数 10
availability-interval PersistenceManager(の背後にあるCacheWriter/CahceLoader)が使えるかどうか、定期的に確認する間隔 リトライ時のインターバル 1000

その他は、割愛します。

また、Write Behindのみの設定で、ポイントとなるのはこちら。

項目名 意味 デフォルト値
modification-queue-size 非同期書き込みキューの最大エントリ数。これがいっぱいになると、Write Throughと同じ状態になり、次のエントリが受け入れられるようになるまで待機する 1024
thread-pool-size CacheStoreに変更を適用する際のスレッドプールサイズ 1
fail-silently false

fail-silentlyだけ長いので、ちょっと別に。

fail-silentlyはデフォルトで無効(false)で、Write Behindによる非同期書き込み時に失敗した場合、以下の条件を満たしていると変更を保留、待機します。

  • リトライの試行回数がconnection-attempts回に到達していない
  • CacheWriter#isAvailableがfalseを返す

この状態だと、CacheWriterを停止させ、CacheStoreが回復する(CacheWriter#isAvailableがtrueを返す)まで、非同期にCacheStoreに変更を反映するスレッドを
停止させます。

これを有効(true)にすると、CacheStoreへの書き込みに失敗する状態で、connection-attemptsに設定した回数のリトライを超えると、その変更は失われてしまいます。

このあたりの説明は、こちらのConfiguration Schemaを見るとよいでしょう。

Configuration Schema / urn:infinispan:config:9.3

書き込みを全部失敗させてみる

続いては、書き込みを全部失敗させてみます。

  test("write-behind store, write failed") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(2) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(3)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(6L) // waiting, all retry failed...

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)
    }
  }

リトライ数は2回で、ポーリング間隔は1秒(デフォルト)に対して、

         .connectionAttempts(2) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)

書き込み失敗数は3。

      InMemoryCacheStore.COUNTER.set(3)

当然のことながら、すべてのCacheStoreへの書き込みは失敗し、

7 07, 2018 5:11:00 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:11:01 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:11:02 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 2 retries!
7 07, 2018 5:11:02 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:11:03 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1
7 07, 2018 5:11:04 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 2 retries!
7 07, 2018 5:11:04 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 0
7 07, 2018 5:11:05 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 1
7 07, 2018 5:11:06 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 2 retries!

CacheStoreへはなにも書き込まれません。

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)

リトライの間隔が1秒なのは、availability-intervalが1,000ミリ秒(1秒)だからですね。

7 07, 2018 5:11:00 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:11:01 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1

このパターンで失敗した書き込みは、そのままロストしてしまいます。CacheStoreへは、反映されることはありません。

これ、けっこうポイントだったりします。それはまた後で。

失敗するCacheStoreへの書き込みを、後で再開させる

では、失敗するCacheStoreへの書き込みを、少し後で再開させるようにしてみましょう。

サンプルコードは、こちら。

  test("write-behind store, fault-tolerant, simply") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(2)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(1L)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      TimeUnit.SECONDS.sleep(3L)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      TimeUnit.SECONDS.sleep(5L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )
    }
  }

リトライ回数が2、インターバル1秒に対して

          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)

失敗数が2回です。

      InMemoryCacheStore.COUNTER.set(2)

また、途中でCacheWriter#isAvailableがfalseを返すように変更し

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

途中でtrueに返すようにしています。

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

最終的には、全部の書き込みがCacheStoreへ反映されています。

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )

これ、どういう挙動になるかというと

7 07, 2018 5:18:50 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = false
7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:18:54 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = true
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:18:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1
7 07, 2018 5:18:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2
7 07, 2018 5:18:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 0
7 07, 2018 5:18:58 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 1
7 07, 2018 5:18:59 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key3, value = value3

CacheWriter#isAvailableがfalseを返すようにすると、しばらく書き込みの失敗が止まります。

7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = false
7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:18:54 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = true

失敗する回数自体は途中で変えていないので、CacheWriter#isAvailableがtrueを返すようにすると、また失敗し続けますが…。

INFO: set store availability = true
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:18:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1

これ、どういうことかというと、InfinispanのPersistenceの仕組みの中で、CacheWriter#isAvailableがfalseを返すようになると(CacheLoader#isAvailableも
そうですが)、そのCacheStoreへのアクセスを停止するようになっているからです。

CacheWriter#isAvailableがtrueになると、書き込みを再開します。

で、今回の例では、設定した数だけ失敗したあとにCacheStoreへの書き込みが完了します、と。

このあたりは、また後で見てみましょう。

CacheStoreが停止中に、同じキーに対して書き込みを行ったら?

CacheWriter#isAvailableがfalseとなるとCacheStoreへの書き込みを停止し、trueになると再開することがわかりました。

では、CacheWriter#isAvailableがfalseの間に、同じキーに対して書き込みをしたらどうなるでしょう?

  test("write-behind store, fault-tolerant, duplicate") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(5) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(4)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(2L)

      cache.put("key1", "value1-1")
      cache.put("key2", "value2-2")
      cache.put("key3", "value3-3")

      TimeUnit.SECONDS.sleep(1L)


      TimeUnit.SECONDS.sleep(1L)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      TimeUnit.SECONDS.sleep(1L)

      InMemoryCacheStore.COUNTER.set(0)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      TimeUnit.SECONDS.sleep(6L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1-1", "key2" -> "value2-2", "key3" -> "value3-3"
      )
    }
  }

意外とすんなり反映されました?

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1-1", "key2" -> "value2-2", "key3" -> "value3-3"
      )

ログを見ても、それっぽく並んでいます。

7 07, 2018 5:31:51 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:31:52 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:31:53 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 2
7 07, 2018 5:31:54 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 3
7 07, 2018 5:31:55 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$15
INFO: set store availability = false
7 07, 2018 5:31:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1
7 07, 2018 5:31:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$15
INFO: set store availability = true
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1-1
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key3, value = value3-3
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2-2

順番は崩れないのかな?

でも、よくよく見ると「key3 / value3」の組み合わせの書き込みがなくなったりしていますね。

7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1-1
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key3, value = value3-3
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2-2

これはどこに行ったのでしょう?

fail-silentlyをtrueにしてみる

最後に、fail-silentlyをtrueにしてみましょう。

  test("write-behind store, fault-tolerant, disabled") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
          .failSilently(true)  // default false
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(3)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(2L)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)
    }
  }

fail-silentlyをfalseにして

          .failSilently(true)  // default false

あとはシンプルに。書き込み失敗数は3。最初に、CacheWriter#isAvailableがfalseになるようにしています。

      InMemoryCacheStore.COUNTER.set(3)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(2L)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)

リトライ数は3です。つまり、書き込みはすべて失敗するシナリオなのですが。

          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)

これ、どういう挙動になるかというと、CacheWriter#isAvailableがfalseを返すように設定しているにも関わらず、CacheStoreへ書き込もうとします。

7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 2
7 07, 2018 5:33:26 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 3 retries!
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 2
7 07, 2018 5:33:26 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 3 retries!
7 07, 2018 5:33:28 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$18
INFO: set store availability = true

あと、書き込みが失敗しても特にインターバルなくリトライしに行っています。

7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1

ある意味で、ストレートな挙動ですね。

これで、だいたいWrite BehindなCache StoreのFault Tolerantに関する設定は見れたのかなと思います。

もうちょっと中身を

それでは、せっかくなのでもうちょっと中身の方を。

CacheStoreをWrite Behindに構成すると、CacheWriterがAsyncCacheWriter(AdvancedCacheWriterの場合はAdvancedAsyncCacheWriter)にラップされます。
これは、PersistenceManagerImplにて行われます。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L824-L827

AdvancedAsyncCacheWriterは、AsyncCacheWriterのサブクラスなので、以降は大筋のAsyncCacheWriterにフォーカスして書いていきます。

CacheStoreへの反映を制御しているのは、AsyncStoreProcessorというクラスになります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L407

書き込みのタスク実行時に、リトライ回数としてconection-attemptsが使われます。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L423

で、CacheStoreへの書き込みが失敗する(=例外がスローされる)場合は、connection-attempts回数分だけリトライを繰り返してそれでもダメな場合は
諦める、という流れになります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L437-L481

書き込みを指示している箇所は、こちら。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L463

失敗した時は、connection-attemptsで指定されたリトライ回数に達していない場合はavailability-intervalで指定したミリ秒分だけ待機します。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L473

ここで、定期的に確認しているCacheWriter#isAvailableがfalseになっていることを確認できた場合は、スレッドが待機状態になります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L453

これには、Lock#newConditionから得られるConditionを使用しています。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L85

また、AsyncStoreProcessorの実行指示をするのはAsyncStoreCoordinatorというクラスなのですが、こちらも定期的に確認しているCacheWriter#isAvailableがfalseに
なっている場合は、スレッドが待機状態になります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L305

ちなみに、AsyncStoreCoordinatorは1スレッドで
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L128-L129

AsyncStoreProcessorは、write-behind / thread-pool-sizeで指定された値(デフォルト1)のスレッド数で動作します。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L116-L119

ここで、CacheWriter#isAvailableを確認する処理はAsyncCacheWriterに書かれているのですが
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L165-L194

これを定期的に確認しているのはPersistenceManagerImplとなります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L156-L157

この確認間隔も、availability-intervalによって決まります。

で、先程CacheWriter#isAvailableがfalseを返した場合、AsyncStoreCoordinatorやAsyncStoreProcessorを動かしているスレッドが待機状態に入るという
ことを書きましたが、これを起こすのがこちらの処理。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L185

CacheWriter#isAvailableがfalseからtrueに切り替わったことが確認できたら、Condition#signalAllでスレッドを起こすという仕組みです。

なので、このFault Tolerantの仕組みは

  • CacheWriterの書き込み処理(CacheStoreへの書き込み処理)は、失敗しても規定回数(connection-attempts)リトライされる
    • ここで、繰り返しまではavailability-intervalミリ秒の待機がある
  • このリトライが、規定回数終わっても失敗したままだと、更新内容はCacheStoreへは反映されなくなる
  • ここで、CacheWriter#isAvailableでCacheStoreへの書き込み可能な状態をコントロールすることで、CacheStoreが回復するまで更新処理を待機させる

ということで、CacheStoreへの書き込みに失敗する場合は、リトライ回数に到達するまでにCacheWriter#isAvailableが正しく制御されて更新を停止し、
回復したらCacheWriter#isAvailableにもそれが反映されるということが行われた前提で成立しています。

例えば、JdbcCacheStoreはConnection#isValidの結果を、CacheWriter#isAvailableとして利用しています。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/persistence/jdbc/src/main/java/org/infinispan/persistence/jdbc/stringbased/JdbcStringBasedStore.java#L175

で、こうなると変更を溜めていくキューの仕組みが気になるところですが、ここで使われているキューは、実際にはロックの許容数です。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L254-L263

write-behind / modification-queue-sizeというのは、ロック取得可能な数を指定することのようですね。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L112

というわけで、modification-queue-sizeはNode単位の設定ということが言えるでしょう。

この変更内容はStateというものに入れられるのですが、
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L260

Stateの内部では変更がModificationの積み上げとして表現されていますが、その管理単位はキー単位となります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/State.java#L90-L101

このため、短時間で同じキーに対して変更を入れると、最後だけが残ったりするのでしょうね。

また、Stateはチェーン構成になっていて、それをたどる処理がところどころにあったりします。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L320-L325
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L428-L430

最後にfail-silentlyですが、AsyncCacheWriterでそこそこ登場しており、これがtrueだとCacheWriter#isAvailableの変更結果を気にしないとか
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L170-L171

ここまでに説明したConditionでのスレッドの待機や、リトライ時のavailability-intervalでの待機時間を無視するといった感じになります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L299
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L442-L455
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L469-L478

代わりに、CacheWriter#isAvailableが呼ばれなくはなりますね。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L171

あと、Write Behindとは関係なくオマケ的になのですが、CacheStoreの起動時に失敗してしまうような場合は、connection-attempts回数分だけリトライし、
その時の待機時間はconnection-intervalで決まるみたいですよ。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L870-L893

ここは、availability-intervalじゃないんですねぇ、と。

まとめ

今回は、Infinispan 9.3でFault TolerantになったというWrite BehindなCache Writerを見てみました。

あんまりWrite Behindの裏側って見てこなかったので、いろいろと参考になりましたねぇ。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-write-behind-store-fault-tolerant