CLOVER🍀

That was when it all began.

Infinispan 9.3で追加された、Hot Rod Transactionを試す

先日、Infinispan 9.3.0.Finalがリリースされました。

Infinispan 9.3.0.Final is out!

これからちょっとずつ見ていこうと思いますが、まず最初に気になったのはこちら。

Transaction support Hot Rod. The java Hot Rod client can participate in Java transactions via Synchronization or XA enlistment. Note that recovery isn't supported yet.

https://blog.infinispan.org/2018/06/infinispan-930final-is-out.html

Hot Rodで、トランザクションがサポートされたようです。リカバリこそできないものの、JTAのSynchronization、もしくは
XAリソースとしてトランザクションに参加可能な模様。

ドキュメントは、こちらです。

Hot Rod Transaction

というわけで、今回はこちらを試してみます。

Hot Rod Transaction?

中身に入っていく前に、ドキュメントからもうちょっとHot Rod Transactionについて掘り下げてみましょう。

Hot Rod Transaction

Hot Rodの(Java)Clientを、JTAトランザクションに参加させることができる機能です。

書き込み準備時にロックを取る、とありますが…?

The transactions are optimistic in a way that the write locks are acquired at prepare time.

http://infinispan.org/docs/9.3.x/user_guide/user_guide.html#hot_rod_transaction

Cacheは、Server側とClient側それぞれでトランザクションの設定を行う必要があります。

まずは、Server側。Hot RodのServer側ですが、これは通常Infinispan Serverとして起動しているでしょう。

Hot Rod Transactionで使うCacheは、以下の条件を満たす形で構成されている必要があります。

  • Isolation LevelがREPEATABLE_READ
  • Locking ModeがPESSIMISTIC(厳密には、OPTIMISTIC、またはTotal Order based commit protocolの利用を許容しない)

上記を満たす、トランザクショナルなCacheである必要があります、と。

トランザクションのモードは、通常どおりNON_XA、NON_DURABLE_XA、FULL_XAから選ぶことができますが、パフォーマンス上の理由から、NON_XAまたはNON_DURABLE_XAを
推奨しているようです。

Also, as transaction mode, it is recommended to use NON_XA or NON_DURABLE_XA. FULL_XA imposes a performance penalty and it won’t be used by your Hot Rod transaction. Hot Rod transaction will have it owns recovery mechanism.

http://infinispan.org/docs/9.3.x/user_guide/user_guide.html#server_configuration

そのうち、Hot Rod側で独自のリカバリカニズムを持つのでしょうか?

Client側は、TransactionManagerとTransactionModeを決める必要があります。

TransactionManagerは、TransactionManagerLookupによりどのTransactionManagerを使用するかを決めます。

TransactionManagerLookupは、Hot Rod Clientでは以下の2つが用意されています。

  • GenericTransactionManagerLookup … Java EEサーバーが提供するTransactionManagerを使用する、TransactionManagerLookupの実装。利用できるTransactionManagerがない場合は、RemoteTransactionManagerを使用する
  • RemoteTransactionManagerLookup … RemoteTransactionManagerを使用する、TransactionManagerLookupの実装。RemoteTransactionManagerは、インメモリな実装で、並行性やリカバリに制限がある

TransactionModeについては、Server側とは別に、Client側でも指定する必要があります。RemoteCacheが、どのように振る舞うかです。

  • NONE … デフォルト。RemoteCacheは、TransactionManagerと対話しない(トランザクションに参加しない)
  • NON_XA … RemoteCacheをJTAのSynchronizationとして使う
  • NON_DURABLE_XA … RemoteCacheをリカバリの無効化されたXAResourceとして扱う
  • FULL_XA … RemoteCacheをリカバリを有効化されたXAResourceとして扱うが、現在未サポート

といった感じです。

では、そろそろ使うコードに入っていきましょう。

環境

確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

$ sbt sbtVersion
[info] Loading project definition from /path/to/remote-transaction/project
[info] Loading settings from build.sbt ...
[info] Set current project to remote-transaction (in build file:/path/to/remote-transaction/)
[info] 1.1.6

Infinispan Serverは、9.3.0.Finalを使用します。

準備

sbtでの依存関係は、こちら。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-client-hotrod" % "9.3.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

Hot Rod Transactionを使うには、通常のHot Rod Clientを使う時と同じように「infinispan-client-hotrod」を使えばOKです。

Infinispan Serverを起動。

$ unzip infinispan-server-9.3.0.Final.zip
$ cd infinispan-server-9.3.0.Final
$ bin/standalone.sh -c clustered.xml

Infinispan Serverは1 Nodeとしますが、クラスタを構成可能なモードで起動しておきます。

Cacheの作成

Infinispan ServerにCacheを作成するために、まずはCLIで操作するための管理ユーザーを作成します。

$ bin/add-user.sh -u ispn-admin -p password
$ bin/ispn-cli.sh -c -u=ispn-admin -p=password --command=reload

接続。

$ bin/ispn-cli.sh -c -u=ispn-admin -p=password 
[standalone@localhost:9990 /] 

Cacheを作成。

## NON_XAなCache Configurationを作成
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonXaCacheConfiguration:add(start=EAGER,mode=SYNC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonXaCacheConfiguration/transaction=TRANSACTION:add(mode=NON_XA,locking=PESSIMISTIC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonXaCache:add(configuration=nonXaCacheConfiguration)
{"outcome" => "success"}


## NON_DURABLE_XAなCache Configurationを作成
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonDurableXaCacheConfiguration:add(start=EAGER,mode=SYNC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonDurableXaCacheConfiguration/transaction=TRANSACTION:add(mode=NON_DURABLE_XA,locking=PESSIMISTIC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonDurableXaCache:add(configuration=nonDurableXaCacheConfiguration)
{"outcome" => "success"}


## NON_XAなCacheを2つ追加
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonXaCache1:add(configuration=nonXaCacheConfiguration)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonXaCache2:add(configuration=nonXaCacheConfiguration)
{"outcome" => "success"}

## NON_DURABLE_XAなCacheを2つ追加
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonDurableXaCache1:add(configuration=nonDurableXaCacheConfiguration)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonDurableXaCache2:add(configuration=nonDurableXaCacheConfiguration)
{"outcome" => "success"}

リロード。

[standalone@localhost:9990 /] reload

では、これを元にテストコードで確認していきます。

テストコードの雛形

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/transaction/HotRodTransactionSpec.scala

package org.littlewings.infinispan.transaction

import java.util.concurrent.{CompletableFuture, Executors}

import org.infinispan.client.hotrod.configuration.{ConfigurationBuilder, TransactionMode}
import org.infinispan.client.hotrod.exceptions.HotRodClientException
import org.infinispan.client.hotrod.transaction.manager.RemoteTransactionManager
import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager}
import org.scalatest.{FunSuite, Matchers}

class HotRodTransactionSpec extends FunSuite with Matchers {
  // ここに、テストを書く!!
}

RemoteCacheManagerおよびRemoteCacheの作成

いつもなら、テストコードの雛形の中にRemoteCacheManagerやRemoteCache取得のコードは含めてしまうのですが、今回はここもポイントになるので、個別に。

まず、Hot Rod Clientでトランザクションを扱うためには、RemoteCacheManager作成時にトランザクションを有効にしておく必要があります。

  protected def withCacheManager(transactionMode: TransactionMode = TransactionMode.NONE)(fun: RemoteCacheManager => Unit): Unit = {
    val manager =
      new RemoteCacheManager(
        new ConfigurationBuilder()
          .addServer()
          .host("172.17.0.2")
          .port(11222)
          .transaction()
          .transactionMode(transactionMode)
          .build()
      )

    try {
      fun(manager)
    } finally {
      manager.stop()
    }
  }

ConfigurationBuilderでの設定時に、transactionalメソッドでトランザクションの設定に切り替え、TransactionModeを指定します。RemoteCacheではなく、
RemoteCacheManagerの単位で全体の設定を行うことになります。

今回は使いませんが、TransactionManagerLookupの設定を行うこともできます。

TransactionConfigurationBuilder (Infinispan JavaDoc All 9.3.6.Final API)

hotrod-client.propertiesで指定する場合は、こちらのプロパティを指定する模様。

https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/ConfigurationProperties.java#L71-L72

RemoteCacheManager構築時に指定したトランザクションの設定はデフォルトの設定で、RemoteCacheを取得する時にその設定を上書きすることもできます。

今回は、このメソッドを使ってRemoteCacheManagerを使います。

さらに簡易的に、RemoteCacheを使う場合は、こちらのメソッドで。

  protected def withCache[K, V](cacheName: String, transactionMode: TransactionMode = TransactionMode.NONE)(fun: RemoteCache[K, V] => Unit): Unit =
    withCacheManager(transactionMode) { manager =>
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    }

では、テストを書いてみましょう。

非トランザクショナルなCacheを使う

まずは、非トランザクショナルなRemoteCacheを使ってみましょう。

  test("non-transactional-cache / mode none") {
    withCache[String, String]("default", TransactionMode.NONE) { cache =>
      cache.put("key1", "value1")
      cache.get("key1") should be("value1")

      val tm = cache.getTransactionManager
      tm should be(null)
    }
  }

ふつうに使えるのはそうなのですが、TransactionModeがNONEの場合(デフォルト)は、RemoteCache#getTransactionManagerがnullを返します。

ここで、「default」というCacheはInfinispan Serverでデフォルトで用意されている次の定義です。

<distributed-cache name="default"/>

TransactionManagerは、RemoteCacheから取得するんですね。

続いて、RemoteCacheManagerを、TransactionModeをNON_DURABLE_XAで構成してTransactionManagerを使ってみます。

  test("non-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("default", TransactionMode.NON_DURABLE_XA) { cache =>
      cache.put("key1", "value1")
      cache.get("key1") should be("value1")

      val tm = cache.getTransactionManager

      tm should not be (null)

      tm.begin()

      val thrown = the[HotRodClientException] thrownBy cache.put("key2", "value2")
      thrown.getMessage should be("ISPN004084: Cache default doesn't support transactions. Please check the documentation how to configure it properly.")

      tm.commit()
    }
  }

Server側のCacheは非トランザクショナル、RemoteCacheはNON_DURABLE_XAという状態です。

すると、この「default」Cacheは非トランザクショナルなCacheなため操作が実行できず、例外がスローされます。

      val thrown = the[HotRodClientException] thrownBy cache.put("key2", "value2")
      thrown.getMessage should be("ISPN004084: Cache default doesn't support transactions. Please check the documentation how to configure it properly.")

というわけで、Server側で定義されているCacheは、トランザクションに参加するためにはトランザクションを有効にしたCacheである必要があることが確認できました。

以降は、トランザクショナルに構成されたCacheを使っていきます。

トランザクショナルなCacheを使う

続いて、Server側でトランザクショナルに構成されたCacheを使っていきます。

Server側のNON_XAなCacheと、RemoteCacheもNON_XAで。

  test("non-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_XA) { cache =>
      val tm = cache.getTransactionManager

      tm.begin()

      cache.put("key1", "value1")
      cache.get("key1") should be("value1")

      tm.commit()

      cache.get("key1") should be("value1")


      tm.begin()

      cache.put("key2", "value2")
      cache.get("key2") should be("value2")

      tm.rollback()

      cache.get("key2") should be(null)
    }
  }

すると、TransactionManager#begin、TransactionManager#commit、TransactionManager#rollbackなどが機能し、トランザクションに参加できているように
見えます。

Server側のNON_DURABLE_XAなCacheと、NON_DURABLE_XAにしたRemoteCacheを使った場合も、同じように。

  test("non-xa-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_DURABLE_XA) { cache =>
      val tm = cache.getTransactionManager

      tm.begin()

      cache.put("key3", "value3")
      cache.get("key3") should be("value3")

      tm.commit()

      cache.get("key3") should be("value3")


      tm.begin()

      cache.put("key4", "value4")
      cache.get("key4") should be("value4")

      tm.rollback()

      cache.get("key4") should be(null)
    }
  }

Server側のCacheがNON_DURABLE_XAであっても、RemoteCache側はNON_XAといったように、Server側のCacheとRemoteCacheのTransactionModeは一致している
必要はないようです。

  test("non-durable-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonDurableXaCache", TransactionMode.NON_XA) { cache =>
      val tm = cache.getTransactionManager

      // 省略
    }
  }

RemoteCacheManagerのデフォルトの構成としてNON_XAとして、RemoteCacheを取得する際にNON_DURABLE_XAで上書きする例。

  test("non-durable-xa-transactional-cache / mode non-xa / override non-durable-xa") {
    withCacheManager(TransactionMode.NON_XA) { manager =>
      val cache = manager.getCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA)

      val tm = cache.getTransactionManager

      tm.begin()

      cache.put("key3", "value3")
      cache.get("key3") should be("value3")

      tm.commit()

      cache.get("key3") should be("value3")


      tm.begin()

      cache.put("key4", "value4")
      cache.get("key4") should be("value4")

      tm.rollback()

      cache.get("key4") should be(null)
    }
  }

RemoteCacheManager#getCache時に、TransactionModeを指定することで上書きすることができます。

      val cache = manager.getCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA)

TransactionManagerを上書きする場合は、同じくRemoteCacheManager#getCache時に上書きしたいTransactionManagerを指定します(TransactionManagerLookupではありません)。

RemoteCacheManager#getCache(cacheName, transactionMode, transactionManager)

なお、RemoteCacheManager(RemoteCache)をFULL_XAで構成しようとすると、ドキュメントの通り未サポートなので例外がスローされます。

  test("non-xa-transactional-cache / mode full-xa") {
    withCacheManager(TransactionMode.FULL_XA) { manager =>
      val thrown = the[IllegalArgumentException] thrownBy manager.getCache[String, String]("nonXaCache")
      thrown.getMessage should be("FULL_XA isn't supported yet!")
    }
  }

複数のCacheを同じトランザクションで扱う

JTAということで、複数のRemoteCacheを同じトランザクションで扱ってみます。

Server側はNON_XA、RemoteCacheもNON_XA。

  test("non-xa-transactional-cache / mode non-xa / multiple cache") {
    withCacheManager(TransactionMode.NON_XA) { manager =>
      val cache1 = manager.getCache[String, String]("nonXaCache1")
      val cache2 = manager.getCache[String, String]("nonXaCache2")

      val tm = cache1.getTransactionManager

      tm.begin()

      cache1.put("key1-1", "value1-1")
      cache2.put("key2-1", "value2-1")

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")

      tm.commit()

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")


      tm.begin()

      cache1.put("key1-2", "value1-2")
      cache2.put("key2-2", "value2-2")

      cache1.get("key1-2") should be("value1-2")
      cache2.get("key2-2") should be("value2-2")

      tm.rollback()

      cache1.get("key1-2") should be(null)
      cache2.get("key2-2") should be(null)
    }
  }

Server側はNON_DURABLE_XA、RemoteCacheもNON_DURABLE_XA。

  test("non-durable-xa-transactional-cache / mode non-durable-xa / multiple cache") {
    withCacheManager(TransactionMode.NON_DURABLE_XA) { manager =>
      val cache1 = manager.getCache[String, String]("nonDurableXaCache1")
      val cache2 = manager.getCache[String, String]("nonDurableXaCache2")

      val tm = cache1.getTransactionManager

      tm.begin()

      cache1.put("key1-1", "value1-1")
      cache2.put("key2-1", "value2-1")

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")

      tm.commit()

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")


      tm.begin()

      cache1.put("key1-2", "value1-2")
      cache2.put("key2-2", "value2-2")

      cache1.get("key1-2") should be("value1-2")
      cache2.get("key2-2") should be("value2-2")

      tm.rollback()

      cache1.get("key1-2") should be(null)
      cache2.get("key2-2") should be(null)
    }
  }

どちらも実現方法は異なりますが、まあうまくいきます。

トランザクションの分離度?

ドキュメントには、RemoteCacheに関するIsolation Levelの記載は特にありません。Server側のCacheに、REPEATABLE_READを要求しているだけです。

というわけで、ちょっと動きを見てみましょう。2つのSingleThreadのExecutorを用意して、CompletableFututeでそれぞれのスレッドでトランザクションを開始し、
操作中の値が別のスレッド(トランザクション)から参照できたりするか、見てみます。

Server側、RemoteCacheも、ともにNON_XAの場合。

  test("transaction visibility / non-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial
      cache.put("key30", "value30") // initial

      val tm = cache.getTransactionManager

      val updateExecutor = Executors.newSingleThreadExecutor
      val readExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), updateExecutor)
          .thenRunAsync(() => tm.begin(), readExecutor)

          .thenRunAsync(() => cache.put("key20", "value20"), updateExecutor) // insert
          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible

          .thenRunAsync(() => cache.put("key10", "value10-1"), updateExecutor) // update
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible

          .thenRunAsync(() => cache.remove("key30"), updateExecutor) // delete
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), readExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), updateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), updateExecutor)
          .thenRunAsync(() => cache.get("key30") should be(null), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key30") should be(null), readExecutor) // reader, deleted

      future.join()

      cache.get("key20") should be("value20")
      cache.get("key10") should be("value10-1")
      cache.get("key30") should be(null)
    }
  }

片方のスレッドでput、put(update)、removeをしていますが、もう片方のスレッドからはその結果が全然見えていませんね。

更新しているトランザクションのコミット後、さらに自身のトランザクションが完了すると参照できるようになっています。まあ、分離度高いですこと。

Server側とRemoteCacheが、NON_DURABLE_XAであっても結果は同じ。

  test("transaction visibility / non-durable-xa-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial
      cache.put("key30", "value30") // initial

      val tm = cache.getTransactionManager

      val updateExecutor = Executors.newSingleThreadExecutor
      val readExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), updateExecutor)
          .thenRunAsync(() => tm.begin(), readExecutor)

          .thenRunAsync(() => cache.put("key20", "value20"), updateExecutor) // insert
          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible

          .thenRunAsync(() => cache.put("key10", "value10-1"), updateExecutor) // update
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible

          .thenRunAsync(() => cache.remove("key30"), updateExecutor) // delete
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), readExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), updateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), updateExecutor)
          .thenRunAsync(() => cache.get("key30") should be(null), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key30") should be(null), readExecutor) // reader, deleted

      future.join()

      cache.get("key20") should be("value20")
      cache.get("key10") should be("value10-1")
      cache.get("key30") should be(null)
    }
  }

更新が競合したら?

最後に、更新が競合した場合の動きを見てみます。

これは、"後勝ち"になるようです?

Server側、RemoteCacheがともにNON_XA。

  test("conflict / non-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial

      val tm = cache.getTransactionManager

      val firstUpdateExecutor = Executors.newSingleThreadExecutor
      val secondUpdateExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), firstUpdateExecutor)
          .thenRunAsync(() => tm.begin(), secondUpdateExecutor)

          .thenRunAsync(() => cache.put("key10", "value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.put("key10", "value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => tm.commit(), firstUpdateExecutor)
          .thenRunAsync(() => tm.commit(), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), firstUpdateExecutor)  // last updated
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)


      future.join()

      cache.get("key10") should be("value10-2-1")  // last updated
    }
  }

Server側、RemoteCacheがともにNON_DURABLE_XA。

  test("conflict / non-durable-xa-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial

      val tm = cache.getTransactionManager

      val firstUpdateExecutor = Executors.newSingleThreadExecutor
      val secondUpdateExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), firstUpdateExecutor)
          .thenRunAsync(() => tm.begin(), secondUpdateExecutor)

          .thenRunAsync(() => cache.put("key10", "value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.put("key10", "value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => tm.commit(), firstUpdateExecutor)
          .thenRunAsync(() => tm.commit(), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), firstUpdateExecutor)  // last updated
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)


      future.join()

      cache.get("key10") should be("value10-2-1")  // last updated
    }
  }

気になるところは、だいたい確認できた感じでしょうか?

もう少し、中身を

それでは、もう少し中身のほどを、ソースコードから追ってみます。

Server側のトランザクションの構成は、Hot Rod Client側からトランザクションを有効にしていた時に、確認していましたよね。非トランザクショナルな
Cacheを使おうとすると例外になったので。

このあたりのServer側のCache設定を確認しているソースコードは、こちら。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/TransactionRequestProcessor.java#L157-L178

Server側のCacheに課せられた制限は、今後このあたりが緩くなっていくのでしょうかね。将来的に、OPTIMISTICとか設定できるようにしたそうなドキュメントに
なっているので。

if (configuration.locking().isolationLevel() != IsolationLevel.REPEATABLE_READ) {
throw log.unexpectedIsolationLevel(cache.getName());
}

//TODO because of ISPN-7672, optimistic and total order transactions needs versions. however, versioning is currently broken
if (configuration.transaction().lockingMode() == LockingMode.OPTIMISTIC ||
configuration.transaction().transactionProtocol() == TransactionProtocol.TOTAL_ORDER) {
//no Log. see TODO.
throw new IllegalStateException(
String.format("Cache '%s' cannot use Optimistic neither Total Order transactions.", cache.getName()));
}

https://github.com/infinispan/infinispan/blob/9.3.0.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/TransactionRequestProcessor.java#L157-L178

そうそう、Server側のCacheを作る時にTransactionModeとLocking以外は指定していないのですが、

[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonXaCacheConfiguration/transaction=TRANSACTION:add(mode=NON_XA,locking=PESSIMISTIC)
{"outcome" => "success"}

以前のInfinispanのデフォルトのトランザクション分離レベルはREAD_COMMITTEDだったのが、Infinispan 9.0からREPEATABLE_READになっていたようですね。

[ISPN-7613] Enable write-skew for optimistic + repeatable-read transactions - JBoss Issue Tracker

これには気付いていませんでした、覚えておきましょう。

RemoteCacheManagerでトランザクションを有効にした場合、もしくはRemoteCache取得時にトランザクションの設定を上書きしてトランザクショナルなRemoteCacheに
した場合、RemoteCacheManager#getCacheで返却されるRemoteCacheの実装が変わります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L330-L335

トランザクションが無効な場合はRemoteCacheImplまたはInvalidatedNearRemoteCacheですが、トランザクションが有効な場合はTransactionalRemoteCacheImpl
となります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/TransactionalRemoteCacheImpl.java

RemoteCacheに指定するTransactionModeはNON_XA、NON_DURABLEとありますが、これがどこに影響するかというと、RemoteCacheManagerが持つTransactionTableに
なります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L95-L96

インターフェースとしてTransactionTableがあり、
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/TransactionTable.java

その実装としてSyncModeTransactionTable、XaModeTransactionTableの2つがあります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/SyncModeTransactionTable.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/XaModeTransactionTable.java

どちらが使われるかは、以下のコードで決まります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L447-L457

NON_XAの場合はSyncModeTransactionTable、NON_DURABLE_XAの場合はXaModeTransactionTableとなります。

両者の差は、最初に書いた通りNON_XA(SyncModeTransactionTable)の場合はJTAのSynchronizationを使った仕組みでトランザクションに参加し、
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/SyncModeTransactionTable.java#L71

NON_DURABLE_XA(XaModeTransactionTable)の場合はリカバリに制限があるXAリソースとしてトランザクションに参加します。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/XaModeTransactionTable.java#L56

リカバリ用の処理は、入ってませんよ、と。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/XaModeTransactionTable.java#L181

今回は、こんなところで。

まとめ

Infinispan 9.3で追加された、Hot Rod Transactionを試してみました。

Server側とHot Rod Client側でトランザクションの設定がそれぞれ必須だったりと、ちょっと最初は慣れないところはありましたが、だいたい雰囲気は
分かったのではないかなと。

あと、今回はドキュメントがあったので助かりました。

テストコードも参考にしつつ。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/TxFunctionalTest.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/MultipleCacheTxFunctionalTest.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/LCROTest.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/util/TransactionSetup.java

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-transaction