CLOVER🍀

That was when it all began.

Apache GeodeでJTAを使ってトランザクション管理

以前、Apache Geodeでのトランザクション管理のうち、Cache Transaction(CacheTransactionManager)を使用したエントリを書きました。

Apache GeodeのCache Transactionを使う - CLOVER

この時は、Geode Cache TransactionとJTAグローバルトランザクションを利用する方法があると書きましたが、Cache Transactionを利用するのみにしていました。

今度は、JTAを使ってみたいと思います。

GeodeのJTAトランザクション管理についての、ドキュメントはこちらです。

http://geode.docs.pivotal.io/docs/developing/transactions/JTA_transactions_with_GemFire.html

サンプルは、こちら。

http://geode.docs.pivotal.io/docs/developing/transactions/transaction_jta_gemfire_example.html

その他のドキュメント。これらは、JDBCデータソースとの連携や、Writer/Loader、そもそもJTA連携を無効にする方法が書いています。

Configuring Database Connections Using JNDI

http://geode.docs.pivotal.io/docs/developing/transactions/cache_plugins_with_jta.html

http://geode.docs.pivotal.io/docs/developing/transactions/turning_off_jta.html

ドキュメントを参照すると、Apache GeodeでのJTAトランザクションの利用には、以下の3パターンがあるようです。

今回は、JTAトランザクションにLast Resourceとして参加するパターン"以外の"ものを試してみたいと思います。

Last Resourceとして参加する場合は、JCA Resource Adapterを設定したりと面倒そうなので…。

Using Geode as the "Last Resource" in a Container-Managed JTA Transaction

Apache GeodeJTAトランザクション

Apache GeodeJTAトランザクションに参加する場合の動きについて、見ていってみます。

ドキュメントとしては、こちらを参照するのがよいと思います。

Coordinating with External JTA Transactions Managers

Coordinated by an External Transaction Manager

Using Geode as the JTA Transaction Manager

How to Run a JTA Global Transaction Using Geode as the JTA Transaction Manager

http://geode.docs.pivotal.io/docs/developing/transactions/configuring_db_connections_using_JNDI.html

http://geode.docs.pivotal.io/docs/developing/transactions/monitor_troubleshoot_transactions.html

どうやら、Apache GeodeJTAトランザクションに参加する際には、javax.transaction.Synchronizationとしてjavax.transaction.Transactionに登録されることで、実現されているようです。図を見ると、背後にいるのはCacheTransactionManagerのようですね。

ドキュメントを読むと、トランザクションのコミット時にコミット前のコールバック(Synchronization#beforeCompletion)が呼び出され、ロックやコンフリクトの検出などを行うようです。その後、JTAトランザクションマネージャーが他のリソースにコミット指示を行い、コミット後のコールバックで(Synchronization#afterCompletion)変更のCacheへの適用、他のメンバーへの反映を行うようです。

関連するコードは、このあたり。

Synchronizationとして登録。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java#L9133

コールバックを受けるのは、このあたり。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java#L900
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java#L955

つまり、JTAトランザクションに参加するといっても厳密にXAリソースとして対応しているわけではなさそうですね。
※類似プロダクトでいくと、Infinispanのトランザクション設定でのNON_XAと同じ

あと、外部のJTAトランザクションマネージャーを利用する場合は、データ更新時にJTAトランザクションマネージャーが利用可能かどうか、自動検出するみたいです。

あらかじめ、JNDIルックアップ対象を並べておいて
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java#L81
見つけたら、それを利用という感じみたいですね。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java#L234

と、仕掛けはこれくらいにして、コードを書いていきましょう。

準備

今回の実装方針としては、以下とします。

というわけで、今回はJTAを使った場合の差分についてフォーカスします。

コードの説明は、こちらを見るとよいでしょう。

Apache GeodeのCache Transactionを使う - CLOVER

では、まずはMaven依存関係から。基本的には、以下のものを利用します。

        <!-- Apache Geode -->
        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>gemfire-core</artifactId>
            <version>1.0.0-incubating.M1</version>
        </dependency>

        <!-- for Testing -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.3.0</version>
            <scope>test</scope>
        </dependency>

JTAトランザクションマネージャーとしてNarayanaを利用する場合は、以下を加えます。

        <!-- JTA API and Implementation -->
        <dependency>
            <groupId>org.jboss.spec.javax.transaction</groupId>
            <artifactId>jboss-transaction-api_1.2_spec</artifactId>
            <version>1.0.0.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.narayana.jta</groupId>
            <artifactId>narayana-jta</artifactId>
            <version>5.3.1.Final</version>
        </dependency>

        <!-- JNDI -->
        <dependency>
            <groupId>jboss</groupId>
            <artifactId>jnpserver</artifactId>
            <version>4.2.2.GA</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.logging</groupId>
            <artifactId>jboss-logging</artifactId>
            <version>3.3.0.Final</version>
        </dependency>

また、Narayanaを利用する場合は、JNDIの設定としてクラスパス上に以下のファイルを用意します。
src/test/resources/jndi.properties

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Narayanaを利用する場合は一応これも入れておきましたが、Synchronizationで実現していることを考えると、他にトランザクション管理対象のリソースがなければ、あんまり意味なかったかも…。
src/test/resources/jbossts-properties.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
    <entry key="ObjectStoreEnvironmentBean.communicationStore.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.VolatileStore</entry>
    <entry key="ObjectStoreEnvironmentBean.objectStoreDir">./target/ObjectStore</entry>
</properties>

テストコードの雛形

それでは、それぞれのパターンにおいて、テストコードの雛形を書いていきます。

まずは、Apache GeodeJTAトランザクションマネージャーを利用する場合。
src/test/java/org/littlewings/geode/jta/JtaTest.java

package org.littlewings.geode.jta;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import javax.naming.NamingException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import static org.littlewings.geode.jta.JtaTest.CanThrowsRunnable.wrap;

public class JtaTest {
    // ここに、テストを書く!

    @FunctionalInterface
    interface CanThrowsRunnable {
        void run() throws Exception;

        static Runnable wrap(CanThrowsRunnable runner) {
            return () -> {
                try {
                    runner.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
        }
    }
}

CompletableFutureを利用する時に、RunnableとLambda式の組み合わせでJTAのチェック例外がちょっとうるさくなるので、ちょっと手抜き的にインターフェースを用意しました…。

Narayanaを利用する場合は、JNDIのセットアップとTransactionManager、UserTransactionのバインドを行います。
src/test/java/org/littlewings/geode/jta/NarayanaJtaTest.java

package org.littlewings.geode.jta;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import com.arjuna.ats.jta.common.jtaPropertyManager;
import com.arjuna.ats.jta.utils.JNDIManager;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import org.jnp.interfaces.NamingParser;
import org.jnp.server.NamingBeanImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import static org.littlewings.geode.jta.NarayanaJtaTest.CanThrowsRunnable.wrap;

public class NarayanaJtaTest {
    protected static NamingBeanImpl namingBean = new NamingBeanImpl();

    @BeforeClass
    public static void setUpClass() throws Exception {
        namingBean.start();

        JNDIManager.bindJTAImplementation();

        namingBean.getNamingInstance().createSubcontext(new NamingParser().parse("jboss"));
        jtaPropertyManager.getJTAEnvironmentBean().setTransactionManagerJNDIContext("java:/jboss/TransactionManager");
        jtaPropertyManager
                .getJTAEnvironmentBean()
                .setTransactionSynchronizationRegistryJNDIContext("java:/jboss/TransactionSynchronizationRegistry");
        jtaPropertyManager
                .getJTAEnvironmentBean()
                .setUserTransactionJNDIContext("java:comp/UserTransaction");

        JNDIManager.bindJTATransactionManagerImplementation();
        JNDIManager.bindJTAUserTransactionImplementation();
    }

    @AfterClass
    public static void tearDownClass() {
        namingBean.stop();
    }

    // ここに、テストコードを書く!

    @FunctionalInterface
    interface CanThrowsRunnable {
        void run() throws Exception;

        static Runnable wrap(CanThrowsRunnable runner) {
            return () -> {
                try {
                    runner.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
        }
    }
}

これらを利用して、コードを書いていきます。

単純なコミット、ロールバック

最初は、説明を入れておきましょう。

    @Test
    public void jtaTransactionCommit() throws SystemException, NotSupportedException, NamingException, HeuristicRollbackException, HeuristicMixedException, RollbackException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");

            userTransaction.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region.put("key" + i, "value" + i));

            userTransaction.commit();

            assertThat(region)
                    .hasSize(10);
            assertThat(region.get("key1"))
                    .isEqualTo("value1");

            region.close();
        }
    }

    @Test
    public void jtaTransactionRollback() throws SystemException, NotSupportedException, NamingException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");

            userTransaction.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region.put("key" + i, "value" + i));

            userTransaction.rollback();

            assertThat(region)
                    .isEmpty();
            assertThat(region.get("key1"))
                    .isNull();

            region.close();
        }
    }

UserTransactionをCache#getJNDIContext経由でJNDIルックアップし、

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");

必要な箇所でUserTransaction#begin、commit、rollbackすれば、それぞれ変更が反映もしくは破棄されます。

Narayanaを利用する場合は、UserTransactionを取得している箇所をふつうにJNDIルックアップするように置き換えればOKです。
注)JNDI名が今回の例ではApache Geode組み込みのものを利用する場合と、微妙に異なります

            UserTransaction userTransaction = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");

複数のRegionに対してトランザクション操作する

Cache Transactionを利用している時と、ほとんど変わりません。

    @Test
    public void multipleJtaTransactionCommit() throws SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException, NamingException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region1 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion1");
            Region<String, String> region2 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion2");

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");

            userTransaction.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region1.put("key" + i, "value" + i));
            IntStream.rangeClosed(1, 10).forEach(i -> region2.put("key" + i, "value" + i));

            userTransaction.commit();

            assertThat(region1)
                    .hasSize(10);
            assertThat(region1.get("key1"))
                    .isEqualTo("value1");
            assertThat(region1)
                    .hasSize(10);
            assertThat(region1.get("key10"))
                    .isEqualTo("value10");

            region1.close();
            region2.close();
        }
    }

    @Test
    public void multipleJtaTransactionRollback() throws SystemException, NotSupportedException, NamingException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region1 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion1");
            Region<String, String> region2 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion2");

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");

            userTransaction.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region1.put("key" + i, "value" + i));
            IntStream.rangeClosed(1, 10).forEach(i -> region2.put("key" + i, "value" + i));

            userTransaction.rollback();

            assertThat(region1)
                    .isEmpty();
            assertThat(region1.get("key1"))
                    .isNull();
            assertThat(region1)
                    .isEmpty();
            assertThat(region1.get("key10"))
                    .isNull();

            region1.close();
            region2.close();
        }
    }

Narayanaに変更する場合も、先ほどの例と同じです。

マルチスレッド環境下で実行

マルチスレッド環境下で実行する場合も、Cache Transactionを使用した場合と同等の動きをします。

それぞれ、マルチスレッド環境下で同じキーに対して操作をした場合、ミュータブルなオブジェクトを別のスレッドで操作した場合、Regionからの取得時にコピーを行うようにしてマルチスレッドで操作した場合です。

    @Test
    public void multiThreadedJtaTransaction() throws NamingException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i));

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(wrap(() -> userTransaction.begin()), updateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.begin()), readExecutor)

                            .thenRunAsync(() -> region.put("key6", "value6"), updateExecutor)  // insert
                            .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor)  // reader, non-visible

                            .thenRunAsync(() -> region.put("key5", "value5-5"), updateExecutor)  // update
                            .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor)  // reader, non-visible

                            .thenRunAsync(() -> region.remove("key3"), updateExecutor)  // delete
                            .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor)  // reader, still-visible

                            .thenRunAsync(wrap(() -> userTransaction.commit()), updateExecutor)

                            .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor)  // reader, still-visible
                            .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor)  // reader, still-visible
                            .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor)  // reader, still-visible

                            .thenRunAsync(wrap(() -> userTransaction.commit()), readExecutor);

            future.join();

            assertThat(region)
                    .hasSize(5);
            assertThat(region.get("key5"))
                    .isEqualTo("value5-5");
            assertThat(region.get("key6"))
                    .isEqualTo("value6");
            assertThat(region.get("key3"))
                    .isNull();
        }
    }

    @Test
    public void multiThreadedJtaTransactionMutableObject() throws NamingException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, List<Integer>> region =
                    cache.<String, List<Integer>>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            region.put("key", new ArrayList<>(Arrays.asList(1, 2)));

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(wrap(() -> userTransaction.begin()), updateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.begin()), readExecutor)

                            .thenRunAsync(() -> region.get("key").add(3), updateExecutor)  // update, modify
                            .thenRunAsync(
                                    () -> assertThat(region.get("key")).hasSize(3).containsOnly(1, 2, 3),
                                    readExecutor)  // reader, visible!!

                            .thenRunAsync(wrap(() -> userTransaction.commit()), updateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.commit()), readExecutor);

            future.join();

            assertThat(region.get("key"))
                    .hasSize(3)
                    .containsOnly(1, 2, 3);
        }
    }

    @Test
    public void multiThreadedTransactionMutableObjectUsingCopy() throws NamingException {
        try (Cache cache = new CacheFactory().create()) {
            // use, copy
            cache.setCopyOnRead(true);

            Region<String, List<Integer>> region =
                    cache.<String, List<Integer>>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            region.put("key", new ArrayList<>(Arrays.asList(1, 2)));

            UserTransaction userTransaction =
                                (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(wrap(() -> userTransaction.begin()), updateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.begin()), readExecutor)

                            .thenRunAsync(() -> region.get("key").add(3), updateExecutor)  // update, for copy
                            .thenRunAsync(() -> {
                                assertThat(region.get("key")).hasSize(2);
                                assertThat(region.get("key")).containsOnly(1, 2);
                            }, readExecutor)  // reader, non-visible

                            .thenRunAsync(wrap(() -> userTransaction.commit()), updateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.commit()), readExecutor);

            future.join();

            assertThat(region.get("key"))
                    .hasSize(2)
                    .containsOnly(1, 2);
        }
    }

JTAを使ったコードはチェック例外がいろいろ宣言されていてLambda式と相性が悪いので、今回はこういう風にまとめました。

                    CompletableFuture
                            .runAsync(wrap(() -> userTransaction.begin()), updateExecutor)

コンフリクト

コンフリクトについてもほぼ同等ですが、コンフリクト検出時に投げられる例外が変わります。

    @Test
    public void multiThreadedJtaTransactionConflict() throws NamingException {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i));

            UserTransaction userTransaction =
                    (UserTransaction) cache.getJNDIContext().lookup("java:/UserTransaction");
            Executor firstUpdateExecutor = Executors.newSingleThreadExecutor();
            Executor secondUpdateExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(wrap(() -> userTransaction.begin()), firstUpdateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.begin()), secondUpdateExecutor)

                            .thenRunAsync(() -> region.put("key3", "value3-1-3"), firstUpdateExecutor)
                            .thenRunAsync(() -> region.put("key3", "value3-2-3"), secondUpdateExecutor)

                            .thenRunAsync(wrap(() -> userTransaction.commit()), firstUpdateExecutor)
                            .thenRunAsync(wrap(() -> userTransaction.commit()), secondUpdateExecutor);

            try {
                future.join();
                fail();
            } catch (CompletionException e) {
                assertThat(e.getCause())
                        .hasMessageContaining("Transaction rolled back because of Exception in notifyBeforeCompletion processing")
                        .hasCauseInstanceOf(RollbackException.class);
            }

            assertThat(region.get("key3"))
                    .isEqualTo("value3-1-3");
        }
    }

CommitConflictExceptionだったのが、RollbackExceptionになりました。

また、Narayanaを利用した場合はエラーメッセージが変わります。

                assertThat(e.getCause())
                        .hasMessageContaining("ARJUNA016053: Could not commit transaction.")
                        .hasCauseInstanceOf(RollbackException.class);

まとめ

Apache Geodeで、JTAを使ったトランザクション管理を試してみました。

JCA Resource Adapterは利用しませんでしたが、他は実装も含めてなんとなく見れたので、よしとしましょう。