以前、Apache Geodeでのトランザクション管理のうち、Cache Transaction(CacheTransactionManager)を使用したエントリを書きました。
Apache GeodeのCache Transactionを使う - CLOVER
この時は、Geode Cache TransactionとJTAグローバルトランザクションを利用する方法があると書きましたが、Cache Transactionを利用するのみにしていました。
今度は、JTAを使ってみたいと思います。
GeodeのJTAトランザクション管理についての、ドキュメントはこちらです。
http://geode.docs.pivotal.io/docs/developing/transactions/JTA_transactions_with_GemFire.html
サンプルは、こちら。
http://geode.docs.pivotal.io/docs/developing/transactions/transaction_jta_gemfire_example.html
その他のドキュメント。これらは、JDBCデータソースとの連携や、Writer/Loader、そもそもJTA連携を無効にする方法が書いています。
Configuring Database Connections Using JNDI
http://geode.docs.pivotal.io/docs/developing/transactions/cache_plugins_with_jta.html
http://geode.docs.pivotal.io/docs/developing/transactions/turning_off_jta.html
ドキュメントを参照すると、Apache GeodeでのJTAトランザクションの利用には、以下の3パターンがあるようです。
- コンテナ(WebLogicなど)の利用するJTAトランザクションマネージャーを利用
- コンテナ(WebLogicなど)のJTAトランザクションマネージャーに、「Last Resource」として参加
- Apache Geodeが持つJTAトランザクションマネージャーを利用
今回は、JTAトランザクションにLast Resourceとして参加するパターン"以外の"ものを試してみたいと思います。
Last Resourceとして参加する場合は、JCA Resource Adapterを設定したりと面倒そうなので…。
Using Geode as the "Last Resource" in a Container-Managed JTA Transaction
Apache GeodeとJTAトランザクション
Apache GeodeがJTAトランザクションに参加する場合の動きについて、見ていってみます。
ドキュメントとしては、こちらを参照するのがよいと思います。
Coordinating with External JTA Transactions Managers
Coordinated by an External Transaction Manager
Using Geode as the JTA Transaction Manager
How to Run a JTA Global Transaction Using Geode as the JTA Transaction Manager
http://geode.docs.pivotal.io/docs/developing/transactions/configuring_db_connections_using_JNDI.html
http://geode.docs.pivotal.io/docs/developing/transactions/monitor_troubleshoot_transactions.html
どうやら、Apache GeodeがJTAトランザクションに参加する際には、javax.transaction.Synchronizationとしてjavax.transaction.Transactionに登録されることで、実現されているようです。図を見ると、背後にいるのはCacheTransactionManagerのようですね。
ドキュメントを読むと、トランザクションのコミット時にコミット前のコールバック(Synchronization#beforeCompletion)が呼び出され、ロックやコンフリクトの検出などを行うようです。その後、JTAトランザクションマネージャーが他のリソースにコミット指示を行い、コミット後のコールバックで(Synchronization#afterCompletion)変更のCacheへの適用、他のメンバーへの反映を行うようです。
関連するコードは、このあたり。
Synchronizationとして登録。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java#L9133
コールバックを受けるのは、このあたり。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java#L900
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java#L955
つまり、JTAトランザクションに参加するといっても厳密にXAリソースとして対応しているわけではなさそうですね。
※類似プロダクトでいくと、Infinispanのトランザクション設定でのNON_XAと同じ
あと、外部のJTAトランザクションマネージャーを利用する場合は、データ更新時にJTAトランザクションマネージャーが利用可能かどうか、自動検出するみたいです。
あらかじめ、JNDIルックアップ対象を並べておいて
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java#L81
見つけたら、それを利用という感じみたいですね。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java#L234
と、仕掛けはこれくらいにして、コードを書いていきましょう。
準備
今回の実装方針としては、以下とします。
- Cache Transactionの時と、ほぼ同等のテストコードで確認する
- Apache GeodeのJTAトランザクションマネージャーを利用する
- 外部のJTAトランザクションマネージャーも利用する(Narayanaを利用)
というわけで、今回はJTAを使った場合の差分についてフォーカスします。
コードの説明は、こちらを見るとよいでしょう。
Apache GeodeのCache Transactionを使う - CLOVER
では、まずはMaven依存関係から。基本的には、以下のものを利用します。
<!-- Apache Geode --> <dependency> <groupId>org.apache.geode</groupId> <artifactId>gemfire-core</artifactId> <version>1.0.0-incubating.M1</version> </dependency> <!-- for Testing --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.3.0</version> <scope>test</scope> </dependency>
JTAトランザクションマネージャーとしてNarayanaを利用する場合は、以下を加えます。
<!-- JTA API and Implementation --> <dependency> <groupId>org.jboss.spec.javax.transaction</groupId> <artifactId>jboss-transaction-api_1.2_spec</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>org.jboss.narayana.jta</groupId> <artifactId>narayana-jta</artifactId> <version>5.3.1.Final</version> </dependency> <!-- JNDI --> <dependency> <groupId>jboss</groupId> <artifactId>jnpserver</artifactId> <version>4.2.2.GA</version> </dependency> <dependency> <groupId>org.jboss.logging</groupId> <artifactId>jboss-logging</artifactId> <version>3.3.0.Final</version> </dependency>
また、Narayanaを利用する場合は、JNDIの設定としてクラスパス上に以下のファイルを用意します。
src/test/resources/jndi.properties
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Narayanaを利用する場合は一応これも入れておきましたが、Synchronizationで実現していることを考えると、他にトランザクション管理対象のリソースがなければ、あんまり意味なかったかも…。
src/test/resources/jbossts-properties.xml
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <entry key="ObjectStoreEnvironmentBean.communicationStore.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.VolatileStore</entry> <entry key="ObjectStoreEnvironmentBean.objectStoreDir">./target/ObjectStore</entry> </properties>
テストコードの雛形
それでは、それぞれのパターンにおいて、テストコードの雛形を書いていきます。
まずは、Apache GeodeのJTAトランザクションマネージャーを利用する場合。
src/test/java/org/littlewings/geode/jta/JtaTest.java
package org.littlewings.geode.jta; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.stream.IntStream; import javax.naming.NamingException; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import javax.transaction.UserTransaction; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionShortcut; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; import static org.littlewings.geode.jta.JtaTest.CanThrowsRunnable.wrap; public class JtaTest { // ここに、テストを書く! @FunctionalInterface interface CanThrowsRunnable { void run() throws Exception; static Runnable wrap(CanThrowsRunnable runner) { return () -> { try { runner.run(); } catch (Exception e) { throw new RuntimeException(e); } }; } } }
CompletableFutureを利用する時に、RunnableとLambda式の組み合わせでJTAのチェック例外がちょっとうるさくなるので、ちょっと手抜き的にインターフェースを用意しました…。
Narayanaを利用する場合は、JNDIのセットアップとTransactionManager、UserTransactionのバインドを行います。
src/test/java/org/littlewings/geode/jta/NarayanaJtaTest.java
package org.littlewings.geode.jta; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.stream.IntStream; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import javax.transaction.UserTransaction; import com.arjuna.ats.jta.common.jtaPropertyManager; import com.arjuna.ats.jta.utils.JNDIManager; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionShortcut; import org.jnp.interfaces.NamingParser; import org.jnp.server.NamingBeanImpl; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; import static org.littlewings.geode.jta.NarayanaJtaTest.CanThrowsRunnable.wrap; public class NarayanaJtaTest { protected static NamingBeanImpl namingBean = new NamingBeanImpl(); @BeforeClass public static void setUpClass() throws Exception { namingBean.start(); JNDIManager.bindJTAImplementation(); namingBean.getNamingInstance().createSubcontext(new NamingParser().parse("jboss")); jtaPropertyManager.getJTAEnvironmentBean().setTransactionManagerJNDIContext("java:/jboss/TransactionManager"); jtaPropertyManager .getJTAEnvironmentBean() .setTransactionSynchronizationRegistryJNDIContext("java:/jboss/TransactionSynchronizationRegistry"); jtaPropertyManager .getJTAEnvironmentBean() .setUserTransactionJNDIContext("java:comp/UserTransaction"); JNDIManager.bindJTATransactionManagerImplementation(); JNDIManager.bindJTAUserTransactionImplementation(); } @AfterClass public static void tearDownClass() { namingBean.stop(); } // ここに、テストコードを書く! @FunctionalInterface interface CanThrowsRunnable { void run() throws Exception; static Runnable wrap(CanThrowsRunnable runner) { return () -> { try { runner.run(); } catch (Exception e) { throw new RuntimeException(e); } }; } } }
これらを利用して、コードを書いていきます。
単純なコミット、ロールバック
最初は、説明を入れておきましょう。
@Test public void jtaTransactionCommit() throws SystemException, NotSupportedException, NamingException, HeuristicRollbackException, HeuristicMixedException, RollbackException { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); userTransaction.begin(); IntStream.rangeClosed(1, 10).forEach(i -> region.put("key" + i, "value" + i)); userTransaction.commit(); assertThat(region) .hasSize(10); assertThat(region.get("key1")) .isEqualTo("value1"); region.close(); } } @Test public void jtaTransactionRollback() throws SystemException, NotSupportedException, NamingException { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); userTransaction.begin(); IntStream.rangeClosed(1, 10).forEach(i -> region.put("key" + i, "value" + i)); userTransaction.rollback(); assertThat(region) .isEmpty(); assertThat(region.get("key1")) .isNull(); region.close(); } }
UserTransactionをCache#getJNDIContext経由でJNDIルックアップし、
UserTransaction userTransaction =
(UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");
必要な箇所でUserTransaction#begin、commit、rollbackすれば、それぞれ変更が反映もしくは破棄されます。
Narayanaを利用する場合は、UserTransactionを取得している箇所をふつうにJNDIルックアップするように置き換えればOKです。
注)JNDI名が今回の例ではApache Geode組み込みのものを利用する場合と、微妙に異なります
UserTransaction userTransaction = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
複数のRegionに対してトランザクション操作する
Cache Transactionを利用している時と、ほとんど変わりません。
@Test public void multipleJtaTransactionCommit() throws SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException, NamingException { try (Cache cache = new CacheFactory().create()) { Region<String, String> region1 = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion1"); Region<String, String> region2 = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion2"); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); userTransaction.begin(); IntStream.rangeClosed(1, 10).forEach(i -> region1.put("key" + i, "value" + i)); IntStream.rangeClosed(1, 10).forEach(i -> region2.put("key" + i, "value" + i)); userTransaction.commit(); assertThat(region1) .hasSize(10); assertThat(region1.get("key1")) .isEqualTo("value1"); assertThat(region1) .hasSize(10); assertThat(region1.get("key10")) .isEqualTo("value10"); region1.close(); region2.close(); } } @Test public void multipleJtaTransactionRollback() throws SystemException, NotSupportedException, NamingException { try (Cache cache = new CacheFactory().create()) { Region<String, String> region1 = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion1"); Region<String, String> region2 = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion2"); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); userTransaction.begin(); IntStream.rangeClosed(1, 10).forEach(i -> region1.put("key" + i, "value" + i)); IntStream.rangeClosed(1, 10).forEach(i -> region2.put("key" + i, "value" + i)); userTransaction.rollback(); assertThat(region1) .isEmpty(); assertThat(region1.get("key1")) .isNull(); assertThat(region1) .isEmpty(); assertThat(region1.get("key10")) .isNull(); region1.close(); region2.close(); } }
Narayanaに変更する場合も、先ほどの例と同じです。
マルチスレッド環境下で実行
マルチスレッド環境下で実行する場合も、Cache Transactionを使用した場合と同等の動きをします。
それぞれ、マルチスレッド環境下で同じキーに対して操作をした場合、ミュータブルなオブジェクトを別のスレッドで操作した場合、Regionからの取得時にコピーを行うようにしてマルチスレッドで操作した場合です。
@Test public void multiThreadedJtaTransaction() throws NamingException { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i)); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); Executor updateExecutor = Executors.newSingleThreadExecutor(); Executor readExecutor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture .runAsync(wrap(() -> userTransaction.begin()), updateExecutor) .thenRunAsync(wrap(() -> userTransaction.begin()), readExecutor) .thenRunAsync(() -> region.put("key6", "value6"), updateExecutor) // insert .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor) // reader, non-visible .thenRunAsync(() -> region.put("key5", "value5-5"), updateExecutor) // update .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor) // reader, non-visible .thenRunAsync(() -> region.remove("key3"), updateExecutor) // delete .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor) // reader, still-visible .thenRunAsync(wrap(() -> userTransaction.commit()), updateExecutor) .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor) // reader, still-visible .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor) // reader, still-visible .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor) // reader, still-visible .thenRunAsync(wrap(() -> userTransaction.commit()), readExecutor); future.join(); assertThat(region) .hasSize(5); assertThat(region.get("key5")) .isEqualTo("value5-5"); assertThat(region.get("key6")) .isEqualTo("value6"); assertThat(region.get("key3")) .isNull(); } } @Test public void multiThreadedJtaTransactionMutableObject() throws NamingException { try (Cache cache = new CacheFactory().create()) { Region<String, List<Integer>> region = cache.<String, List<Integer>>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); region.put("key", new ArrayList<>(Arrays.asList(1, 2))); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); Executor updateExecutor = Executors.newSingleThreadExecutor(); Executor readExecutor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture .runAsync(wrap(() -> userTransaction.begin()), updateExecutor) .thenRunAsync(wrap(() -> userTransaction.begin()), readExecutor) .thenRunAsync(() -> region.get("key").add(3), updateExecutor) // update, modify .thenRunAsync( () -> assertThat(region.get("key")).hasSize(3).containsOnly(1, 2, 3), readExecutor) // reader, visible!! .thenRunAsync(wrap(() -> userTransaction.commit()), updateExecutor) .thenRunAsync(wrap(() -> userTransaction.commit()), readExecutor); future.join(); assertThat(region.get("key")) .hasSize(3) .containsOnly(1, 2, 3); } } @Test public void multiThreadedTransactionMutableObjectUsingCopy() throws NamingException { try (Cache cache = new CacheFactory().create()) { // use, copy cache.setCopyOnRead(true); Region<String, List<Integer>> region = cache.<String, List<Integer>>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); region.put("key", new ArrayList<>(Arrays.asList(1, 2))); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); Executor updateExecutor = Executors.newSingleThreadExecutor(); Executor readExecutor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture .runAsync(wrap(() -> userTransaction.begin()), updateExecutor) .thenRunAsync(wrap(() -> userTransaction.begin()), readExecutor) .thenRunAsync(() -> region.get("key").add(3), updateExecutor) // update, for copy .thenRunAsync(() -> { assertThat(region.get("key")).hasSize(2); assertThat(region.get("key")).containsOnly(1, 2); }, readExecutor) // reader, non-visible .thenRunAsync(wrap(() -> userTransaction.commit()), updateExecutor) .thenRunAsync(wrap(() -> userTransaction.commit()), readExecutor); future.join(); assertThat(region.get("key")) .hasSize(2) .containsOnly(1, 2); } }
JTAを使ったコードはチェック例外がいろいろ宣言されていてLambda式と相性が悪いので、今回はこういう風にまとめました。
CompletableFuture
.runAsync(wrap(() -> userTransaction.begin()), updateExecutor)
コンフリクト
コンフリクトについてもほぼ同等ですが、コンフリクト検出時に投げられる例外が変わります。
@Test public void multiThreadedJtaTransactionConflict() throws NamingException { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i)); UserTransaction userTransaction = (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction"); Executor firstUpdateExecutor = Executors.newSingleThreadExecutor(); Executor secondUpdateExecutor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture .runAsync(wrap(() -> userTransaction.begin()), firstUpdateExecutor) .thenRunAsync(wrap(() -> userTransaction.begin()), secondUpdateExecutor) .thenRunAsync(() -> region.put("key3", "value3-1-3"), firstUpdateExecutor) .thenRunAsync(() -> region.put("key3", "value3-2-3"), secondUpdateExecutor) .thenRunAsync(wrap(() -> userTransaction.commit()), firstUpdateExecutor) .thenRunAsync(wrap(() -> userTransaction.commit()), secondUpdateExecutor); try { future.join(); fail(); } catch (CompletionException e) { assertThat(e.getCause()) .hasMessageContaining("Transaction rolled back because of Exception in notifyBeforeCompletion processing") .hasCauseInstanceOf(RollbackException.class); } assertThat(region.get("key3")) .isEqualTo("value3-1-3"); } }
CommitConflictExceptionだったのが、RollbackExceptionになりました。
また、Narayanaを利用した場合はエラーメッセージが変わります。
assertThat(e.getCause()) .hasMessageContaining("ARJUNA016053: Could not commit transaction.") .hasCauseInstanceOf(RollbackException.class);
まとめ
Apache Geodeで、JTAを使ったトランザクション管理を試してみました。
JCA Resource Adapterは利用しませんでしたが、他は実装も含めてなんとなく見れたので、よしとしましょう。