CLOVER🍀

That was when it all began.

Apache GeodeのCache Transactionを使う

Apache Geodeでは、トランザクション管理機能を持っているようです。

http://geode.docs.pivotal.io/docs/developing/transactions/about_transactions.html

トランザクションの機能を使うには、以下の2つの方法があります。

CacheTransactionManagerを使ったGeodeのトランザクション管理機能を直接使う方法と、JTAと統合する方法です。

今回は、CacheTransactionManagerを使う方法で試してみたいと思います。

Geode Cache Transactionsについて

…の前に、ちょっとApache Geodeトランザクションまわりのドキュメントをさらさらと眺めておきます。JTAのところはまだ読んでいないのですが、JTAを使っても背後にいるのはCacheTransactionManagerらしいので、ある程度共通の内容になるのでしょう。

まず、基本的な事項としては

といった模様。

http://geode.docs.pivotal.io/docs/developing/transactions/about_transactions.html

パフォーマンスについては、Replicated Regionの場合はそれほど大きくないサイズのデータセットで使うべし、となっています。これはReplicated Regionの場合は、最終的に更新内容を全Memberに適用しなくてはならないからでしょうね。Partitioned Regionの場合は、パーティション上でローカルロックが使用でき、コミット時にのみメッセージ送信を行うためパフォーマンスがよいそうです。

http://geode.docs.pivotal.io/docs/developing/transactions/cache_transaction_performance.html

ただ、Partitioned Regionの場合は、トランザクションで操作するデータが、単一のメンバーで行えるようになること、という注意書きがあります。

http://geode.docs.pivotal.io/docs/developing/transactions/data_location_cache_transactions.html

このために、colocateを設定する必要があるようです。

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/custom_partitioning_and_data_colocation.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/using_custom_partition_resolvers.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/colocating_partitioned_region_data.html

トランザクションを利用したコードはあとで書きますが、colocate自体は今回は扱わず、クラスタも組みません。

どういうことかというと、要するにPartitionResolverを使用して関連のあるデータはひとつのメンバーにまとめよ、ということのようです。

トランザクションで操作するデータは、単一のメンバー上であることかぁ。覚えておかないと危ないですね。トランザクションの最初の説明にもありましたが、分散ロックを使わないということなのでしょう。

トランザクションは、Cache単位で行い、ネストしたトランザクションはサポートしていません。

http://geode.docs.pivotal.io/docs/developing/transactions/run_a_cache_transaction.html

繰り返しになりますが、Apache Geodeトランザクションはローカルで行われ、トランザクションはスレッドに紐付けられて管理されるため、複数のトランザクションを動作させることができます。ひとつのスレッドが同時に複数のトランザクションを利用することはできず、子スレッドもトランザクションを引き継ぎません。

また、並行で動作するトランザクションとの分離度を保つため、トランザクション中ではTransaction Viewという概念があり、スナップショットを参照するような仕組みになっているようです。こちらで、変更中の内容を参照したり、コミット時のコンフリクトを発見するようです。コンフリクトが発見された場合は、トランザクションロールバックされます。

How Geode Cache Transactions Work

ドキュメント上のサンプルコードは、こちら。

http://geode.docs.pivotal.io/docs/developing/transactions/transactions_overview.html

http://geode.docs.pivotal.io/docs/developing/transactions/transaction_suspend_resume_example.html

その他。

http://geode.docs.pivotal.io/docs/developing/transactions/working_with_transactions.html

とまあ、内容とドキュメントの説明はこれくらいにして、実際に使ってみましょう。

準備

Maven依存関係は、以下のように設定しました。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>gemfire-core</artifactId>
            <version>1.0.0-incubating.M1</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.3.0</version>
            <scope>test</scope>
        </dependency>

使用するApache Geodeバージョンは、1.0.0-incubating.M1とします。また、テストコードとしてサンプルは記述するので、JUnitとAssertJも使用します。

テストコードの雛形

以降のコードは、以下のテストコード内に実装されているものとします。
src/test/java/org/littlewings/geode/transaction/TransactionTest.java

package org.littlewings.geode.transaction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheTransactionManager;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TransactionTest {
    // ここに、テストを書く!
}

では、書いていってみます。

単純なコミット、ロールバック

まずは、基本的なコミット、ロールバックから。

CacheTransactionManagerをCacheから取得し、トランザクションを始めたいポイントでCacheTransactionManager#beginを呼び出します。その後、Regionに対して行いたい操作を行ってから、CacheTransactionManager#commitでコミットです。

    @Test
    public void transactionCommit() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();

            transactionManager.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region.put("key" + i, "value" + i));

            transactionManager.commit();

            assertThat(region)
                    .hasSize(10);
            assertThat(region.get("key1"))
                    .isEqualTo("value1");

            region.close();
        }
    }

コミット後、変更が確定します。

が、これだけでは機能しているかどうかわからないので、ロールバックも行ってみます。

    @Test
    public void transactionRollback() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();

            transactionManager.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region.put("key" + i, "value" + i));

            transactionManager.rollback();

            assertThat(region)
                    .isEmpty();
            assertThat(region.get("key1"))
                    .isNull();

            region.close();
        }
    }

ロールバックすると、Regionに対して適用した変更が、なかったことになっています。

複数のRegionに対してトランザクション操作する

CacheTransactionManagerは、Cacheを起点に行うため、複数のRegionに対してトランザクション操作を行うことができます。

コミットした場合。

    @Test
    public void multipleCacheTransactionCommit() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region1 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion1");
            Region<String, String> region2 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion2");


            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();

            transactionManager.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region1.put("key" + i, "value" + i));
            IntStream.rangeClosed(1, 10).forEach(i -> region2.put("key" + i, "value" + i));

            transactionManager.commit();

            assertThat(region1)
                    .hasSize(10);
            assertThat(region1.get("key1"))
                    .isEqualTo("value1");
            assertThat(region1)
                    .hasSize(10);
            assertThat(region1.get("key10"))
                    .isEqualTo("value10");

            region1.close();
            region2.close();
        }
    }

ロールバックした場合。

    @Test
    public void multipleCacheTransactionRollback() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region1 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion1");
            Region<String, String> region2 =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion2");


            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();

            transactionManager.begin();

            IntStream.rangeClosed(1, 10).forEach(i -> region1.put("key" + i, "value" + i));
            IntStream.rangeClosed(1, 10).forEach(i -> region2.put("key" + i, "value" + i));

            transactionManager.rollback();

            assertThat(region1)
                    .isEmpty();
            assertThat(region1.get("key1"))
                    .isNull();
            assertThat(region1)
                    .isEmpty();
            assertThat(region1.get("key10"))
                    .isNull();

            region1.close();
            region2.close();
        }
    }

ただ、分散環境では、関連するデータを単一のメンバーに集めるようにしておく必要があるのは、前述のとおりです(試していませんけど)。

http://geode.docs.pivotal.io/docs/developing/transactions/data_location_cache_transactions.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/custom_partitioning_and_data_colocation.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/using_custom_partition_resolvers.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/colocating_partitioned_region_data.html

マルチスレッド環境下で実行

続いて、マルチスレッド環境で動かして、トランザクションの分離度を見てみます。

ドキュメントにも、トランザクションはスレッドに紐付けられて管理されるとの記載がありましたので、なんとなく予想はつきますが、ThreadLocalで管理されています。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java#L94

というわけで、スレッドを2本用意して、各スレッドに交互にオペレーションしてみます。

    @Test
    public void multiThreadedTransaction() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i));

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(() -> transactionManager.begin(), updateExecutor)
                            .thenRunAsync(() -> transactionManager.begin(), readExecutor)

                            .thenRunAsync(() -> region.put("key6", "value6"), updateExecutor)  // insert
                            .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor)  // reader, non-visible

                            .thenRunAsync(() -> region.put("key5", "value5-5"), updateExecutor)  // update
                            .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor)  // reader, non-visible

                            .thenRunAsync(() -> region.remove("key3"), updateExecutor)  // delete
                            .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor)  // reader, still-visible

                            .thenRunAsync(() -> transactionManager.commit(), updateExecutor)

                            .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor)  // reader, non-visible
                            .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor)  // reader, non-visible
                            .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor)  // reader, still-visible

                            .thenRunAsync(() -> transactionManager.commit(), readExecutor);

            future.join();

            assertThat(region)
                    .hasSize(5);
            assertThat(region.get("key5"))
                    .isEqualTo("value5-5");
            assertThat(region.get("key6"))
                    .isEqualTo("value6");
            assertThat(region.get("key3"))
                    .isNull();
        }
    }

ある程度初期データを登録した後、書き込み用と読み取り様で、別々にシングルスレッドのExecutorを用意します。

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i));

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

こちらを各CompletableFutureの操作ごとにExecutorを明示的に与えて、順次実行していきます。

トランザクションの開始。

                    CompletableFuture
                            .runAsync(() -> transactionManager.begin(), updateExecutor)
                            .thenRunAsync(() -> transactionManager.begin(), readExecutor)

データの追加。読み取り側からは、見えないようです。

                            .thenRunAsync(() -> region.put("key6", "value6"), updateExecutor)  // insert
                            .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor)  // reader, non-visible

更新、削除も同様に見えていません。

                            .thenRunAsync(() -> region.put("key5", "value5-5"), updateExecutor)  // update
                            .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor)  // reader, non-visible

                            .thenRunAsync(() -> region.remove("key3"), updateExecutor)  // delete
                            .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor)  // reader, still-visible

なんと、書き込み側がコミットした後も見えません。

                            .thenRunAsync(() -> transactionManager.commit(), updateExecutor)

                            .thenRunAsync(() -> assertThat(region.get("key6")).isNull(), readExecutor)  // reader, still-visible
                            .thenRunAsync(() -> assertThat(region.get("key5")).isEqualTo("value5"), readExecutor)  // reader, still-visible
                            .thenRunAsync(() -> assertThat(region.get("key3")).isEqualTo("value3"), readExecutor)  // reader, still-visible

                            .thenRunAsync(() -> transactionManager.commit(), readExecutor);

分離度、高いですねぇ…。なお、Isolation Levelは設定できなさそうな雰囲気です。

トランザクションを抜けた後は、もちろん結果が確認できます。

ミュータブルオブジェクトを操作する

先ほどは、Stringというイミュータブルなオブジェクトを操作しました。ここでは、ArrayListというある種ミュータブルなオブジェクトを2つのスレッドのトランザクションから操作してみたいと思います。

    @Test
    public void multiThreadedTransactionMutableObject() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, List<Integer>> region =
                    cache.<String, List<Integer>>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            region.put("key", new ArrayList<>(Arrays.asList(1, 2)));

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(() -> transactionManager.begin(), updateExecutor)
                            .thenRunAsync(() -> transactionManager.begin(), readExecutor)

                            .thenRunAsync(() -> region.get("key").add(3), updateExecutor)  // update, modify
                            .thenRunAsync(
                                    () -> assertThat(region.get("key")).hasSize(3).containsOnly(1, 2, 3),
                                    readExecutor)  // reader, visible!!

                            .thenRunAsync(() -> transactionManager.commit(), updateExecutor)
                            .thenRunAsync(() -> transactionManager.commit(), readExecutor);

            future.join();

            assertThat(region.get("key"))
                    .hasSize(3)
                    .containsOnly(1, 2, 3);
        }
    }

この場合、読み取り側のスレッドからも、変更内容が見えてしまいます。同じ参照を見てますよ、という話ですねぇ…。

これを避ける場合、読み取り時にコピーするように設定します。Cache全体に設定する場合は、こちら。

Setting Global Copy on Read

Cache全体ではなく、個別のオペレーションでどうにかしたい場合は、CopyHelperというものを使用するそうです。

Making a Safe Change Within a Transaction Using CopyHelper.copy

今回は、Cache全体に対して設定します。Cache-XMLで設定する場合は、以下のようになるようです。

<cache copy-on-read="true">

ですが、今回はJava APIで設定するようにします。

        try (Cache cache = new CacheFactory().create()) {
            // use, copy
            cache.setCopyOnRead(true);

全体としては、こんな感じ。

    @Test
    public void multiThreadedTransactionMutableObjectUsingCopy() {
        try (Cache cache = new CacheFactory().create()) {
            // use, copy
            cache.setCopyOnRead(true);

            Region<String, List<Integer>> region =
                    cache.<String, List<Integer>>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            region.put("key", new ArrayList<>(Arrays.asList(1, 2)));

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
            Executor updateExecutor = Executors.newSingleThreadExecutor();
            Executor readExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(() -> transactionManager.begin(), updateExecutor)
                            .thenRunAsync(() -> transactionManager.begin(), readExecutor)

                            .thenRunAsync(() -> region.get("key").add(3), updateExecutor)  // update, for copy
                            .thenRunAsync(() -> {
                                assertThat(region.get("key")).hasSize(2);
                                assertThat(region.get("key")).containsOnly(1, 2);
                            }, readExecutor)  // reader, non-visible

                            .thenRunAsync(() -> transactionManager.commit(), updateExecutor)
                            .thenRunAsync(() -> transactionManager.commit(), readExecutor);

            future.join();

            assertThat(region.get("key"))
                    .hasSize(2)
                    .containsOnly(1, 2);
        }
    }

読み取り側のスレッドからも、もう片方の変更が見えなくなりました。

その代わり、書き込み側からRegion#putしていないので、変更が最終的に反映されなくなっています。

            assertThat(region.get("key"))
                    .hasSize(2)
                    .containsOnly(1, 2);

というか、いつも明示的にRegion#putしましょうね、ということだと思いますが。

コンフリクト

最後は、コンフリクトをシミュレーションしてみます。

同じキーに対して、2つのスレッドから交互に更新してみます。

    @Test
    public void multiThreadedTransactionConflict() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key" + i, "value" + i));

            CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
            Executor firstUpdateExecutor = Executors.newSingleThreadExecutor();
            Executor secondUpdateExecutor = Executors.newSingleThreadExecutor();

            CompletableFuture<Void> future =
                    CompletableFuture
                            .runAsync(() -> transactionManager.begin(), firstUpdateExecutor)
                            .thenRunAsync(() -> transactionManager.begin(), secondUpdateExecutor)

                            .thenRunAsync(() -> region.put("key3", "value3-1-3"), firstUpdateExecutor)
                            .thenRunAsync(() -> region.put("key3", "value3-2-3"), secondUpdateExecutor)

                            .thenRunAsync(() -> transactionManager.commit(), firstUpdateExecutor)
                            .thenRunAsync(() -> transactionManager.commit(), secondUpdateExecutor);

            assertThatThrownBy(() -> future.join())
                    .hasMessageContaining("Entry for key  key3  on region  sampleRegion  had a state change")
                    .hasCauseInstanceOf(CommitConflictException.class);

            assertThat(region.get("key3"))
                    .isEqualTo("value3-1-3");
        }
    }

コンフリクトが検出され、後からコミットしたスレッドがエラーになります(CommitConflictException)。

まとめ

Apache GeodeのCacheTransactionManagerによる、トランザクション管理を試してみました。

トランザクションの分離度が高くてビックリですが、トランザクションで操作するデータの集約など、注意するところもそれなりにありそうな感じですね。

そのうち、JTA側も試してみます。…どうやってやろうかな。