CLOVER🍀

That was when it all began.

InfinispanのLocked Streamsを試す

Infinispan 9.1で、Locked StreamsというAPIが追加されました。

Infinispan: Infinispan 9.1 "Bastille"

残念ながらまだドキュメントには記載のない機能ですが、forEachの呼び出しの際に対象となるエントリに対してロックを取ることで別の更新処理に
邪魔されずに副作用を伴う操作(更新など)ができるようになります。

また、このforEachはBiConsumerを引数に取るもので、一緒にCacheのインスタンスも渡されてくるため他のエントリも参照することが
できます。

9.1の現時点で、非トランザクショナルなCacheとPessimisticなトランザクショナルCacheをサポートしています。OptimisticなトランザクショナルCacheは
サポートしていません。

とまあ、説明はこれくらいにして使ってみましょう。

準備

依存関係は、このように定義しました。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "9.1.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.3" % Test
)

「infinispan-core」モジュールがあればOKです。ScalaTestはテスト用、jcip-annotationsはScalaとInfinispanを一緒に使っているので
こうなります、と。

設定ファイルは、雛形としてはこんな感じに用意します。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.1 http://www.infinispan.org/schemas/infinispan-config-9.1.xsd"
        xmlns="urn:infinispan:config:9.1">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>
    <cache-container>
        <jmx duplicate-domains="true"/>
        <transport cluster="test-cluster" stack="udp"/>

        <!-- あとでCacheの定義を書く! -->

    </cache-container>
</infinispan>

Cacheの定義自体は、またあとで書いていきます。

テストコードの雛形も。まあ、今回はテスト自体は書かずにただのランチャーとして使うだけなのですが。

src/test/scala/org/littlewings/infinispan/lockedstreams/LockedStreamsSpec.scala 
package org.littlewings.infinispan.lockedstreams

import org.infinispan.Cache
import org.infinispan.container.entries.CacheEntry
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.util.function.{SerializableBiConsumer, SerializableConsumer, SerializablePredicate}
import org.scalatest.{FunSuite, Matchers}

class LockedStreamsSpec extends FunSuite with Matchers {

  // ここにテストを書く!

  protected def withCache[K, V](cacheName: String, numInstances: Int)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    } finally {
      managers.foreach(_.stop())
    }
  }
}

簡易的にクラスタを構成可能なヘルパーメソッド付き。

では、書いていきます。

非トランザクショナルなCache

まずは、トランザクション設定をしないCacheで試してみましょう。

Cacheの定義は、単純なDistributed Cacheとして行います。

        <distributed-cache name="distributedCache"/>

Locked Streamsを使ったコードはこちら。

  test("simple locked-streams") {
    withCache[String, Integer]("distributedCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key${i}", i))

      val lockedStream = cache.getAdvancedCache.lockedStream

      try {
        lockedStream
          .filter(new SerializablePredicate[CacheEntry[String, Integer]] {
            override def test(e: CacheEntry[String, Integer]): Boolean = e.getValue % 2 == 0
          })
          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit =
              println(s"Cache[${c.getName}]: entry[${entry}]")
          })
      } finally {
        lockedStream.close()
      }
    }
  }

Locked Streamsは、AdvancedCacheから取得することができます。

      val lockedStream = cache.getAdvancedCache.lockedStream

あとは通常のStreamと同じように使える…と言いたいのですが(今回はfilterしてforEachしてます)

      try {
        lockedStream
          .filter(new SerializablePredicate[CacheEntry[String, Integer]] {
            override def test(e: CacheEntry[String, Integer]): Boolean = e.getValue % 2 == 0
          })
          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit =
              println(s"Cache[${c.getName}]: entry[${entry}]")
          })
      } finally {
        lockedStream.close()
      }

あまりこのLocked Streamsでできる操作は多くありません。mapやflatMap、collectなどは使えなかったりします。
https://docs.jboss.org/infinispan/9.1/apidocs/org/infinispan/LockedStream.html

filterとforEachが、メインのAPIとなるような感じですね。あとは、parallelDistributionやsequentialDistributionなどの分散・並列系の設定は
できるといったところでしょうか。

Locked Streamsも、使い終わったらクローズしておく必要があります。Javaであれば、try-with-resourcesを使えば済む話ですけど。

        lockedStream.close()

基本的には、こんな感じです。

一応、実行結果例。

Cache[distributedCache]: entry[ImmortalCacheEntry{key=key6, value=6}]
Cache[distributedCache]: entry[ImmortalCacheEntry{key=key8, value=8}]
Cache[distributedCache]: entry[ImmortalCacheEntry{key=key10, value=10}]
Cache[distributedCache]: entry[ImmortalCacheEntry{key=key2, value=2}]
Cache[distributedCache]: entry[ImmortalCacheEntry{key=key4, value=4}]

OptimisticなトランザクショナルCache

最初にも書きましたが、Locked StreamsはOptimisticなCacheをサポートしていません。

こうやって定義して

        <distributed-cache name="withOptimisticLockCache">
            <transaction mode="NON_XA" locking="OPTIMISTIC" auto-commit="false"/>
        </distributed-cache>

使ってみても、Locked Streamsを取得しようとした時点で例外がスローされます。

  test("optimistic-lock cache, not supported") {
    withCache[String, Integer]("withOptimisticLockCache", 3) { cache =>
      val transactionManager = cache.getAdvancedCache.getTransactionManager
      transactionManager.begin()
      (1 to 10).foreach(i => cache.put(s"key${i}", i))
      transactionManager.commit()

      val thrown = the[UnsupportedOperationException] thrownBy cache.getAdvancedCache.lockedStream
      thrown.getMessage should be("Method lockedStream is not supported in OPTIMISTIC transactional caches!")
    }
  }

PessimiticなトランザクショナルCache

次は、PessimisticなトランザクショナルCacheで。

        <distributed-cache name="withPessimisticLockCache">
            <transaction mode="NON_XA" locking="PESSIMISTIC" auto-commit="false"/>
        </distributed-cache>

auto-commitは、オフにしています。なので、Cacheを更新する時にはTransactionManagerが必要です。

  test("with pessimistic-lock, locked-streams") {
    withCache[String, Integer]("withPessimisticLockCache", 3) { cache =>
      val transactionManager = cache.getAdvancedCache.getTransactionManager
      transactionManager.begin()
      (1 to 10).foreach(i => cache.put(s"key${i}", i))
      transactionManager.commit()

      val lockedStream = cache.getAdvancedCache.lockedStream

      try {
        lockedStream
          .filter(new SerializablePredicate[CacheEntry[String, Integer]] {
            override def test(e: CacheEntry[String, Integer]): Boolean = e.getValue % 2 == 0
          })
          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit =
              println(s"Cache[${c.getName}]: entry[${entry}]")
          })
      } finally {
        lockedStream.close()
      }
    }
  }

使っているところは、TransactionManagerがいるくらいで、非トランザクショナルなCacheの時と同じですが。

結果も同じようになります。

Cache[withPessimisticLockCache]: entry[ImmortalCacheEntry{key=key8, value=8}]
Cache[withPessimisticLockCache]: entry[ImmortalCacheEntry{key=key2, value=2}]
Cache[withPessimisticLockCache]: entry[ImmortalCacheEntry{key=key10, value=10}]
Cache[withPessimisticLockCache]: entry[ImmortalCacheEntry{key=key6, value=6}]
Cache[withPessimisticLockCache]: entry[ImmortalCacheEntry{key=key4, value=4}]

LockedStream#forEach内でエントリを更新してみる / 非トランザクショナルなCache

続いて、BiConsumerでCacheとエントリを受け取るforEachで、エントリの更新をしてみます。

  test("simple locked-streams, lookup and update another cache entry") {
    withCache[String, Integer]("distributedCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key${i}", i))

      val lockedStream = cache.getAdvancedCache.lockedStream

      try {
        lockedStream
          .filter(new SerializablePredicate[CacheEntry[String, Integer]] {
            override def test(e: CacheEntry[String, Integer]): Boolean = e.getValue % 2 == 0
          })
          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit = {
              val key = entry.getKey
              val value = entry.getValue
              val keyAsInt = key.replace("key", "").toInt
              val nextKey = if (keyAsInt >= 10) "key1" else s"key${keyAsInt + 1}"

              val nextValue = c.get(nextKey)

              c.put(key, nextValue + value)

            }
          })
      } finally {
        lockedStream.close()
      }

      val stream = cache.entrySet().stream

      try {
        stream.forEach(new SerializableConsumer[java.util.Map.Entry[String, Integer]] {
          override def accept(e: java.util.Map.Entry[String, Integer]): Unit =
            println(s"updated: key[${e.getKey}], value[${e.getValue}]")
        })
      } finally {
        stream.close()
      }
    }
  }

最後の結果出力は、ふつうのStreamを使いました。

Locked Streamsの方では、forEach内で同じCache内で、forEachで渡されてきたエントリとは別のエントリを参照しています。

          .filter(new SerializablePredicate[CacheEntry[String, Integer]] {
            override def test(e: CacheEntry[String, Integer]): Boolean = e.getValue % 2 == 0
          })
          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit = {
              val key = entry.getKey
              val value = entry.getValue
              val keyAsInt = key.replace("key", "").toInt
              val nextKey = if (keyAsInt >= 10) "key1" else s"key${keyAsInt + 1}"

              val nextValue = c.get(nextKey)

              c.put(key, nextValue + value)

            }
          })

更新対象は、偶数のキーのエントリですね。

結果としては、このように偶数キーのエントリは、ひとつインクリメントした先のキーに紐づく値と元の値の合算で更新されます。

updated: key[key1], value[1]
updated: key[key5], value[5]
updated: key[key3], value[3]
updated: key[key4], value[9]
updated: key[key8], value[17]
updated: key[key2], value[5]
updated: key[key7], value[7]
updated: key[key6], value[13]
updated: key[key10], value[11]
updated: key[key9], value[9]

LockedStream#forEach内でエントリを更新してみる / PessimisticでトランザクショナルなCache

最後に、LockedStream#forEach内での更新を、PessimisticでトランザクショナルなCacheで行います。

先ほどの非トランザクショナルなCacheとよく似た感じになりますが、

  test("with pessimistic-lock, locked-streams, lookup and update another cache entry") {
    withCache[String, Integer]("withPessimisticLockCache", 3) { cache =>
      val transactionManager = cache.getAdvancedCache.getTransactionManager
      transactionManager.begin()
      (1 to 10).foreach(i => cache.put(s"key${i}", i))
      transactionManager.commit()

      val lockedStream = cache.getAdvancedCache.lockedStream

      try {
        lockedStream
          .filter(new SerializablePredicate[CacheEntry[String, Integer]] {
            override def test(e: CacheEntry[String, Integer]): Boolean = e.getValue % 2 == 0
          })
          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit = {
              val tm = c.getAdvancedCache.getTransactionManager
              tm.begin()

              val key = entry.getKey
              val value = entry.getValue
              val keyAsInt = key.replace("key", "").toInt
              val nextKey = if (keyAsInt >= 10) "key1" else s"key${keyAsInt + 1}"

              val nextValue = c.get(nextKey)

              c.put(key, nextValue + value)

              tm.commit()
            }
          })
      } finally {
        lockedStream.close()
      }

      val stream = cache.entrySet().stream

      try {
        stream.forEach(new SerializableConsumer[java.util.Map.Entry[String, Integer]] {
          override def accept(e: java.util.Map.Entry[String, Integer]): Unit =
            println(s"updated: key[${e.getKey}], value[${e.getValue}]")
        })
      } finally {
        stream.close()
      }
    }
  }

よく見ると、forEach内でTransactionManagerを使っています。

          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit = {
              val tm = c.getAdvancedCache.getTransactionManager
              tm.begin()

              val key = entry.getKey
              val value = entry.getValue
              val keyAsInt = key.replace("key", "").toInt
              val nextKey = if (keyAsInt >= 10) "key1" else s"key${keyAsInt + 1}"

              val nextValue = c.get(nextKey)

              c.put(key, nextValue + value)

              tm.commit()
            }
          })

ここが、非トランザクショナルなCacheと比べた時のポイントです。
※auto-commitを有効にした場合は、この限りではないでしょうけれど。

結果は、こちら。

updated: key[key1], value[1]
updated: key[key2], value[5]
updated: key[key8], value[17]
updated: key[key6], value[13]
updated: key[key3], value[3]
updated: key[key4], value[9]
updated: key[key9], value[9]
updated: key[key7], value[7]
updated: key[key5], value[5]
updated: key[key10], value[11]

もうちょっと内部を

簡単に使ってみたところで、もうちょっとLocked Streamsの内部を見てみましょう。

Locked Streamsは、Cache#entrySet#streamで取得できる、CacheStreamのラッパーとして存在しています。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L748-L757

この時、非トランザクショナルなCacheであればLockedStreamImpl、トランザクショナルなCacheであればTxLockedStreamImplでラップされます。
TxLockedStreamImplとLockedStreamImplは、継承関係にあります。

なので、まずはLockedStreamImplを見ていくことになります。

基本的に、LockedStreamImplは内部で保持しているCacheStreamのラッパーで、ほとんどのメソッドは実際のCacheStreamに処理を転送している
だけになります。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/stream/impl/LockedStreamImpl.java#L104

例外は、filterとforEachです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/stream/impl/LockedStreamImpl.java#L85-L95
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/stream/impl/LockedStreamImpl.java#L97-L100

filterの方は、渡されたPredicateを内部でフィールドとして保持します。

forEachの方は、CacheEntryConsumerで来るんで実際のCacheStreamに委譲します。

このCacheEntryConsumerは、LockedStreamImpl内で定義されているクラスになります。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/stream/impl/LockedStreamImpl.java#L184

CacheEntryConsumerの特徴は、該当のエントリのロックを取って、実際のBiConsumerに渡すことです。

      @Override
      public void accept(Cache<K, V> cache, CacheEntry<K, V> entry) {
         K key = entry.getKey();
         lock(key);
         try {
            CacheEntry<K, V> rereadEntry = cache.getAdvancedCache().getCacheEntry(key);
            if (rereadEntry != null && (predicate == null || predicate.test(rereadEntry))) {
               Cache<K, V> cacheToUse = cache.getAdvancedCache().lockAs(key);
               // Pass the Cache with the owner set to our key so they can write and also it won't unlock that key
               realConsumer.accept(cacheToUse, new EntryWrapper<>(cacheToUse, entry));
            }
         } finally {
            lockManager.unlock(key, key);
         }
      }

確かに、ロックを取っています。また、filterで設定したPredicateもここで渡すことになります。

ここで、forEachに渡ってくるCacheは、AdvancedCache#lockAsでDecoratedCacheにラップされたものです。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L1104

               Cache<K, V> cacheToUse = cache.getAdvancedCache().lockAs(key);

こうやってロック用のキーを指定してDecoratedCacheを使うと、getやput時にこのキーをロックのオーナーとして設定してくれるようになります。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java#L465
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java#L586

そして、トランザクショナルなCacheで使われるTxLockedStreamImplの場合ですが、こちらはforEach時にトランザクションサスペンド/レジュームが行われます。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/stream/impl/TxLockedStreamImpl.java#L36-L45

   @Override
   public void forEach(BiConsumer<Cache<K, V>, ? super CacheEntry<K, V>> biConsumer) {
      Transaction ongoingTransaction = null;
      try {
         ongoingTransaction = suspendOngoingTransactionIfExists();
         super.forEach(biConsumer);
      } finally {
         resumePreviousOngoingTransaction(ongoingTransaction);
      }
   }

このため、Locked Streamsを実行する前にトランザクションが実行されていた場合は、forEach呼び出し時に一時中断することになります。

先ほどのPessimisticなトランザクショナルCacheで、forEach時にTransactionManagerを使っていたのは、これが理由です。

          .forEach(new SerializableBiConsumer[Cache[String, Integer], CacheEntry[String, Integer]] {
            override def accept(c: Cache[String, Integer], entry: CacheEntry[String, Integer]): Unit = {
              val tm = c.getAdvancedCache.getTransactionManager
              tm.begin()

              // 省略

              c.put(key, nextValue + value)

              tm.commit()
            }
          })

更新時にトランザクションが必要な場合は、forEach内で行う必要がありますよ、と。

あと、ちょっとハマったのがロックの取り方で、次のように先にロックを取るのですが、見ての通りfilterで設定したPredicateの呼び出し前にエントリの
ロックが取得されます。

      @Override
      public void accept(Cache<K, V> cache, CacheEntry<K, V> entry) {
         K key = entry.getKey();
         lock(key);
         try {
            CacheEntry<K, V> rereadEntry = cache.getAdvancedCache().getCacheEntry(key);
            if (rereadEntry != null && (predicate == null || predicate.test(rereadEntry))) {
               Cache<K, V> cacheToUse = cache.getAdvancedCache().lockAs(key);
               // Pass the Cache with the owner set to our key so they can write and also it won't unlock that key
               realConsumer.accept(cacheToUse, new EntryWrapper<>(cacheToUse, entry));
            }
         } finally {
            lockManager.unlock(key, key);
         }
      }

要するに、filterで除外したつもりのエントリに対してもロックがかかります。最初、これに気づかずにforEach内で別のエントリに対して更新をかけようと
したら、見事にデッドロックになりました…。なるほど…。

本当に対称を減らしたかったら、filterKeysなどを使うのが良いのかもしれませんね。

まとめ

Infinispan 9.1で追加された、Locked Streamsを試してみました。

通常のStreamのインターフェースと違って意外とできることが少なかったり、実際にどういう仕組みで動いているのを知れてよかったかなと思います。
特に、ロックまわりとか…。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-locked-streams