CLOVER🍀

That was when it all began.

Infinispan 9.3/9.4で追加された、Segmented Data Containerを試す

これは、なにをしたくて書いたもの?

  • Infinispan 9.4で、Off-HeapのDataContainerにSegment化の対応が入ったらしい
  • Infinispan 9.3では、On-HeapのDataContainerのみがSegment化されていた
  • それがどういうことを表すのか、ちゃんと確認したい

という動機で、InfinispanのSegmented Data Containerについて確認してみたエントリです。

Segmented Data Container?

Infinispan 9.4.0.Finalがリリースされる前に、こんなブログエントリが書かれました。

Infinispan: Segmented Data Containers: Distributed Stream Performance Boost

Infinispan 9.3で、On-HeapのDataContainerのSegment化に対応していて、データがSegment単位で管理されるようになり、
パフォーマンスがアップした操作もあるよ、という話です。

Infinispan 9.4と9.3リリース時の、ブログエントリはこちら。

Infinispan: Infinispan 9.4.0.Final

Infinispan: Infinispan 9.3.0.Final is out!

9.3の時にちょこっと登場しています、Segmented Data Containerの話は。本当にちょこっと。

で、この時はあんまり見ずにスルーしていました…。

Segmented Data Containerに関する、IssueやPull Requestはこちらです(On-Heap/Off-Heap)。

[ISPN-5451] Data Container Segment Striping - JBoss Issue Tracker

[ISPN-9263] Segmented Off Heap Data Container - JBoss Issue Tracker

ISPN-5451 data container striping by wburns · Pull Request #5997 · infinispan/infinispan · GitHub

ISPN-9263 Segmented Off Heap Data Container by wburns · Pull Request #6217 · infinispan/infinispan · GitHub

ISPN-9516 docs for segmentation by oraNod · Pull Request #6305 · infinispan/infinispan · GitHub

ところで、InfinispanはもともとSegment単位でデータを管理しているとドキュメント上は言っていて、
「Segmented Data Containerとはどういうことだろう?」とちょっと思っていました。

There is no hard rule on how segments must be mapped to owners, but the goal is to balance the number of segments allocated to each node and at the same time minimize the number of segments that have to move after a node joins or leaves the cluster. The segment mapping is customizable, and in fact there are five implementations that ship with Infinispan:

http://infinispan.org/docs/9.2.x/user_guide/user_guide.html#key_ownership

実際に動かしつつ調べてみると、Segmentの概念はあるものの、実際のデータ管理はSegment単位では行われていなかったようです。

では、このあたりを確認していってみましょう。

なお、今回はData Container自体に着目して見ていきます。Infinispan 9.4のドキュメントを見ると、Persistenceまわりにも
いろいろあるみたいですが、今回はパスします。

環境

今回確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)


$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T03:33:14+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-36-generic", arch: "amd64", family: "unix"

準備

Maven依存関係は、こちら。

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>9.4.0.Final</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.3.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.3.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.11.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.0</version>
            </plugin>
        </plugins>
    </build>

Infinispanは、9.4.0.Finalを使用します。その他は、テストに使うライブラリです。

設定ファイルの雛形は、こんな感じで用意します。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.4 http://www.infinispan.org/schemas/infinispan-config-9.4.xsd"
        xmlns="urn:infinispan:config:9.4">

    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>

    <cache-container shutdown-hook="REGISTER">

        <!-- ここにCacheの定義を書く -->
    </cache-container>

</infinispan>

Cacheの定義は、テストコードを書く時に合わせて埋めていきます。

テストコードの雛形

動作は、テストコードで確認します。

雛形は、こんな感じで用意。
src/test/java/org/littlewings/infinispan/segmented/SegmentedDataContainerTest.java

package org.littlewings.infinispan.segmented;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.HashConfiguration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.impl.BoundedSegmentedDataContainer;
import org.infinispan.container.impl.DefaultDataContainer;
import org.infinispan.container.impl.DefaultSegmentedDataContainer;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.container.impl.L1SegmentedDataContainer;
import org.infinispan.container.offheap.BoundedOffHeapDataContainer;
import org.infinispan.container.offheap.OffHeapDataContainer;
import org.infinispan.container.offheap.SegmentedBoundedOffHeapDataContainer;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.CacheCollectors;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class SegmentedDataContainerTest {

    // ここに、テストを書く!!

    <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> func) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        try {
            managers.forEach(m -> m.getCache(cacheName));

            Cache<K, V> cache = managers.get(0).getCache(cacheName);
            func.accept(cache);
        } finally {
            managers.forEach(m -> m.getCache(cacheName).stop());
            managers.forEach(EmbeddedCacheManager::stop);
        }
    }
}

クラスタを構成するための、ヘルパーメソッド付き。

Cacheの設定でDataContainerがどう変わるか確認する

それではいきなりですが、Cacheの設定によってどんなDataContainerが使われるのか変わるようですので、そちらの確認を
してみましょう。

まずは、Cacheを定義してみます。すべてDistributed Cacheです。

        <distributed-cache name="defaultCache"/>

        <distributed-cache name="l1Cache" l1-lifespan="1000"/>

        <distributed-cache name="sizeBoundedCache">
            <memory><binary size="100"/></memory>
        </distributed-cache>

        <distributed-cache name="sizeBoundedL1Cache" l1-lifespan="1000">
            <memory><binary size="100"/></memory>
        </distributed-cache>

        <distributed-cache name="defaultOffHeapCache">
            <memory><off-heap/></memory>
        </distributed-cache>

        <distributed-cache name="l1OffHeapCache" l1-lifespan="1000">
            <memory><off-heap/></memory>
        </distributed-cache>

        <distributed-cache name="sizeBoundedOffHeapCache">
            <memory><off-heap size="100"/></memory>
        </distributed-cache>

        <distributed-cache name="sizeBoundedL1OffHeapCache" l1-lifespan="1000">
            <memory><off-heap size="100"/></memory>
        </distributed-cache>

On-Heapでデフォルト、L1 Cache有効化、サイズ制限あり、L1 Cache有効化かつサイズ制限ありの4つ。
Off-Heapだと、Off-Heapを使う以外はデフォルト、L1 Cache有効化、サイズ制限あり、L1 Cache有効化かつ制限ありの同じく4つを用意。

これらのCacheから、内部で使われるDataContainerがどのように変わっていくかを見ていきます。

結果は、このように変わっていきます。

    @Test
    public void dataContainersDefaultSegmentedEnabled() {
        this.<String, Integer>withCache("defaultCache", 3, cache -> {
            EmbeddedCacheManager manager = cache.getCacheManager();

            // On-Heap
            DataContainer<String, Integer> defaultHeapDataContainer =
                    manager.<String, Integer>getCache("defaultCache").getAdvancedCache().getDataContainer();
            assertThat(defaultHeapDataContainer)
                    .isInstanceOf(DefaultSegmentedDataContainer.class);

            DataContainer<String, Integer> l1HeapDataContainer =
                    manager.<String, Integer>getCache("l1Cache").getAdvancedCache().getDataContainer();
            assertThat(l1HeapDataContainer)
                    .isInstanceOf(L1SegmentedDataContainer.class);

            DataContainer<String, Integer> sizeBoundedHeapDataContainer =
                    manager.<String, Integer>getCache("sizeBoundedCache").getAdvancedCache().getDataContainer();
            assertThat(sizeBoundedHeapDataContainer)
                    .isInstanceOf(BoundedSegmentedDataContainer.class);

            DataContainer<String, Integer> sizeBoundedL1HeapDataContainer =
                    manager.<String, Integer>getCache("sizeBoundedL1Cache").getAdvancedCache().getDataContainer();
            assertThat(sizeBoundedL1HeapDataContainer)
                    .isInstanceOf(BoundedSegmentedDataContainer.class);

            // Off-Heap
            DataContainer<String, Integer> defaultOffHeapDataContainer =
                    manager.<String, Integer>getCache("defaultOffHeapCache").getAdvancedCache().getDataContainer();
            assertThat(defaultOffHeapDataContainer)
                    .isInstanceOf(DefaultSegmentedDataContainer.class);

            DataContainer<String, Integer> l1OffHeapDataContainer =
                    manager.<String, Integer>getCache("l1OffHeapCache").getAdvancedCache().getDataContainer();
            assertThat(l1OffHeapDataContainer)
                    .isInstanceOf(L1SegmentedDataContainer.class);

            DataContainer<String, Integer> sizeBoundedOffHeapDataContainer =
                    manager.<String, Integer>getCache("sizeBoundedOffHeapCache").getAdvancedCache().getDataContainer();
            assertThat(sizeBoundedOffHeapDataContainer)
                    .isInstanceOf(SegmentedBoundedOffHeapDataContainer.class);

            DataContainer<String, Integer> sizeBoundedL1OffHeapDataContainer =
                    manager.<String, Integer>getCache("sizeBoundedL1OffHeapCache").getAdvancedCache().getDataContainer();
            assertThat(sizeBoundedL1OffHeapDataContainer)
                    .isInstanceOf(SegmentedBoundedOffHeapDataContainer.class);
        });
    }

各種DataContainerのクラス名に、「Segmented」が入っていますね。つまり、デフォルトでSegmented Data Containerと
呼ばれているものが使われます。

現在実装されているSegmented〜が付くDataContainerはこれくらいで、L1 Cacheが有効かどうかや、Evictionが有効かどうかで
実装が選択されます。
https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/factories/DataContainerFactory.java#L41-L115

L1 CacheとEvictionを同時に有効にした場合は、Evictionを有効にした場合のSegmentedDataContainerが使われるようです。

これを明示的にオフにするには、Featuresを使用する必要があります。

Segmented Data Containerを使わないように(無効に)するには、方法が2つあります。

ひとつ目は、「META-INF/infinispan-features.properties」で指定すること。
src/test/resources/META-INF/infinispan-features.properties

org.infinispan.feature.data-segmentation=false

この値を「true」にすると、Segmented Data Containerが有効になります。

ふたつ目は、システムプロパティで指定すること。

System.setProperty("org.infinispan.feature.data-segmentation", "false");

とか

$ java -Dorg.infinispan.feature.data-segmentation=false ...

とかですね。

今回は、テストケース内でシステムプロパティで指定することにしましょう。

    @Test
    public void dataContainersSegmentedDisabled() {
        System.setProperty("org.infinispan.feature.data-segmentation", "false");

        this.<String, Integer>withCache("defaultCache", 3, cache -> {
            EmbeddedCacheManager manager = cache.getCacheManager();

            // On-Heap
            DataContainer<String, Integer> defaultHeapDataContainer =
                    manager.<String, Integer>getCache("defaultCache").getAdvancedCache().getDataContainer();
            assertThat(defaultHeapDataContainer)
                    .isInstanceOf(DefaultDataContainer.class);

            DataContainer<String, Integer> l1HeapDataContainer =
                    manager.<String, Integer>getCache("l1Cache").getAdvancedCache().getDataContainer();
            assertThat(l1HeapDataContainer)
                    .isInstanceOf(DefaultDataContainer.class);

            DataContainer<String, Integer> sizeBoundedHeapDataContainer =
                    manager.<String, Integer>getCache("sizeBoundedCache").getAdvancedCache().getDataContainer();
            assertThat(sizeBoundedHeapDataContainer)
                    .isInstanceOf(DefaultDataContainer.class);

            DataContainer<String, Integer> sizeBoundedL1HeapDataContainer =
                    manager.<String, Integer>getCache("sizeBoundedL1Cache").getAdvancedCache().getDataContainer();
            assertThat(sizeBoundedL1HeapDataContainer)
                    .isInstanceOf(DefaultDataContainer.class);
        });

        System.clearProperty("org.infinispan.feature.data-segmentation");
    }

On-Heapは、全部DefaultDataContainerになりました…。なお、Segmented Data Containerを無効にすると、Off-Heapは
うまく動きませんでした…。

Featuresについては、また後で。

Local CacheとReplicated Cache、Scattered Cacheは?

Distributed Cacheは、Segmented Data Containerが使われていましたが、他のCacheはどうなのでしょう?

Local Cache、Replicated Cache、Scattered Cacheで確認してみました。

        <local-cache name="localCache"/>
        <replicated-cache name="replicatedCache"/>
        <scattered-cache name="scatteredCache"/>

結果。

    @Test
    public void localOrReplicatedOrScatteredCache() {
        this.<String, Integer>withCache("defaultCache", 3, cache -> {
            EmbeddedCacheManager manager = cache.getCacheManager();

            // Local Cache
            Cache<String, Integer> localCache = manager.getCache("localCache");
            assertThat(localCache.getAdvancedCache().getDataContainer())
                    .isInstanceOf(DefaultDataContainer.class);

            assertThat(localCache.getCacheConfiguration().clustering().hash().keyPartitioner())
                    .isInstanceOf(HashFunctionPartitioner.class);

            // Replicated Cache
            Cache<String, Integer> replicatedCache = manager.getCache("replicatedCache");
            assertThat(replicatedCache.getAdvancedCache().getDataContainer())
                    .isInstanceOf(DefaultSegmentedDataContainer.class);

            assertThat(replicatedCache.getCacheConfiguration().clustering().hash().keyPartitioner())
                    .isInstanceOf(HashFunctionPartitioner.class);

            // Scattered Cache
            Cache<String, Integer> scatteredCache = manager.getCache("scatteredCache");
            assertThat(scatteredCache.getAdvancedCache().getDataContainer())
                               .isInstanceOf(DefaultSegmentedDataContainer.class);

            assertThat(scatteredCache.getCacheConfiguration().clustering().hash().keyPartitioner())
                    .isInstanceOf(HashFunctionPartitioner.class);
        });
    }

Local Cacheについては、通常のDataContainerが使われ、あとの2つはSegmented Data Containerですね。

とりあえず確認、というくらいにして、以降はDistributed Cacheで見ていきます。

Segmentの情報を取得してみる

ここでは、Segmentまわりの情報を扱うAPIを試してみたいと思います。

コードはこちら。Segmented Data Containerは有効なままで動作させています。

    @Test
    public void segments() {
        this.<String, Integer>withCache("defaultCache", 3, cache -> {

            HashConfiguration hashConfiguration = cache.getCacheConfiguration().clustering().hash();

            // Segmentの数(デフォルト)
            assertThat(hashConfiguration.numSegments())
                    .isEqualTo(256);

            // 設定されているKeyPartitionerの確認(デフォルト)
            assertThat(hashConfiguration.keyPartitioner())
                    .isInstanceOf(HashFunctionPartitioner.class);

            IntStream.rangeClosed(1, 1000).forEach(i -> cache.put("key" + i, i));

            // データの配置を確認するのに、DistributionManagerを使う
            DistributionManager dm = cache.getAdvancedCache().getDistributionManager();

            // キーの配置状況の確認
            cache.forEach((k, v) -> {
                int segmentId = dm.getCacheTopology().getSegment(k);
                DistributionInfo di = dm.getCacheTopology().getSegmentDistribution(segmentId);

                System.out.printf(
                        "key[%s]: segment[%d], primary[%s], backup[%s]%n",
                        k,
                        segmentId,
                        di.primary(),
                        di.writeBackups().stream().map(Address::toString).collect(Collectors.joining(", "))
                );
            });

            // ローカルのSegmentの数
            IntSet segments = dm.getCacheTopology().getLocalReadSegments();
            assertThat(segments.size()).isGreaterThan(150);

            // ローカルのDataContainerが保持しているSegmentに属するエントリ数
            InternalDataContainer<String, Integer> dataContainer =
                    (InternalDataContainer<String, Integer>) cache.getAdvancedCache().getDataContainer();

            assertThat(dataContainer.size(segments)).isGreaterThan(600);

            // あるSegmentに属するCacheEntryを取得
            int anySegmentId = segments.stream().findAny().get();
            System.out.printf("selected segment-id = %d%n", anySegmentId);
            dataContainer
                    .forEach(
                            IntSets.immutableSet(anySegmentId),
                            cacheEntry -> System.out.println(cacheEntry)
                    );

            int segmentSize =
                    dm
                            .getCacheTopology()
                            // クラスタ内の全Memberを取得
                            .getMembersSet()
                            .stream()
                            // PrimaryOwnerが持つSegmentの集合を取得
                            .map(address -> dm.getReadConsistentHash().getPrimarySegmentsForOwner(address))
                            .peek(s -> System.out.println(s.size()))
                            .peek(s -> assertThat(s.size()).isGreaterThan(75))
                            .reduce(0, (acc, cur) -> acc + cur.size(), (acc, cur) -> acc + cur);

            assertThat(segmentSize).isEqualTo(256);
        });
    }

順を追って。

まずは、HashConfigurationを取得。

            HashConfiguration hashConfiguration = cache.getCacheConfiguration().clustering().hash();

Segmentの数を取得(デフォルトは、256)。

            // Segmentの数(デフォルト)
            assertThat(hashConfiguration.numSegments())
                    .isEqualTo(256);

データの配置をコントロールする、KeyPartitionerを取得(デフォルトは、ハッシュで分散)。

            // 設定されているKeyPartitionerの確認(デフォルト)
            assertThat(hashConfiguration.keyPartitioner())
                    .isInstanceOf(HashFunctionPartitioner.class);

データを突っ込んで、データの分配状況を確認してみましょう。

            IntStream.rangeClosed(1, 1000).forEach(i -> cache.put("key" + i, i));

            // データの配置を確認するのに、DistributionManagerを使う
            DistributionManager dm = cache.getAdvancedCache().getDistributionManager();

キーの配置状況を確認。キーからSegmentのidを取得できるので、そこからDistributionInfoを得ることができます。

            // キーの配置状況の確認
            cache.forEach((k, v) -> {
                int segmentId = dm.getCacheTopology().getSegment(k);
                DistributionInfo di = dm.getCacheTopology().getSegmentDistribution(segmentId);

                System.out.printf(
                        "key[%s]: segment[%d], primary[%s], backup[%s]%n",
                        k,
                        segmentId,
                        di.primary(),
                        di.writeBackups().stream().map(Address::toString).collect(Collectors.joining(", "))
                );
            });

DistributionInfoを取得できると、そのSegmentに関する情報をいろいろと取得することができます。

これを動作させると、こんな感じにどのキーがどのSegmentにあり、どのNodeがPrimaryで、バックアップがどのNodeに
あるかを確認できます。

key[key460]: segment[1], primary[xxxxx-17034], backup[xxxxx-43606]
key[key986]: segment[1], primary[xxxxx-17034], backup[xxxxx-43606]
key[key352]: segment[1], primary[xxxxx-17034], backup[xxxxx-43606]
key[key43]: segment[2], primary[xxxxx-17034], backup[xxxxx-43606]
key[key334]: segment[4], primary[xxxxx-17034], backup[xxxxx-43606]
key[key468]: segment[4], primary[xxxxx-17034], backup[xxxxx-43606]
key[key182]: segment[7], primary[xxxxx-17034], backup[xxxxx-13423]
key[key24]: segment[7], primary[xxxxx-17034], backup[xxxxx-13423]
key[key393]: segment[7], primary[xxxxx-17034], backup[xxxxx-13423]
key[key217]: segment[7], primary[xxxxx-17034], backup[xxxxx-13423]

...

ローカルで保持しているSegmentの数や、DataContainerが保持しているSegmentに属するエントリ数の確認。

            // ローカルのSegmentの数
            IntSet segments = dm.getCacheTopology().getLocalReadSegments();
            assertThat(segments.size()).isGreaterThan(150);

            // ローカルのDataContainerが保持しているSegmentに属するエントリ数
            InternalDataContainer<String, Integer> dataContainer =
                    (InternalDataContainer<String, Integer>) cache.getAdvancedCache().getDataContainer();

            assertThat(dataContainer.size(segments)).isGreaterThan(600);

内部のインターフェースですが、InternalDataContainerにSegmentを意識したメソッドが定義されています。その上位である
インターフェースのDataContainerは、Segmentについてはメソッドシグニチャには現れません。

先ほど出てきた、DefaultSegmentedDataContainerクラスやL1SegmentedDataContainerクラスなどは、このInternalDataContainerを
実装したクラスになります。

InternalDataContainerが定義するメソッドで、Segmentを跨いだ操作ができそうに見えるものは、SegmentをIntSetという
インターフェースで扱うものが多くなっています。

            IntSet segments = dm.getCacheTopology().getLocalReadSegments();

            assertThat(dataContainer.size(segments)).isGreaterThan(600);

なお、単体のCacheエントリを扱うようなAPI場合は、Segmentのidをintで指定することになります。

続けます。

あるSegmentに属するエントリを取得。

            // あるSegmentに属するCacheEntryを取得
            int anySegmentId = segments.stream().findAny().get();
            System.out.printf("selected segment-id = %d%n", anySegmentId);
            dataContainer
                    .forEach(
                            IntSets.immutableSet(anySegmentId),
                            cacheEntry -> System.out.println(cacheEntry)
                    );

ここでも、IntSetを使います。正確には、「指定したSegmentの集合に属するエントリを取得」ですね。

最後は、クラスタ内の全Nodeに対してPrimaryとなるSegmentを取得してみます。

            int segmentSize =
                    dm
                            .getCacheTopology()
                            // クラスタ内の全Memberを取得
                            .getMembersSet()
                            .stream()
                            // PrimaryOwnerが持つSegmentの集合を取得
                            .map(address -> dm.getReadConsistentHash().getPrimarySegmentsForOwner(address))
                            .peek(s -> System.out.println(s.size()))
                            .peek(s -> assertThat(s.size()).isGreaterThan(75))
                            .reduce(0, (acc, cur) -> acc + cur.size(), (acc, cur) -> acc + cur);

            assertThat(segmentSize).isEqualTo(256);

最後は、そのSegmentの数を全部足して、256個のSegmentを戻ることを確認。

なお、このコードはSegmented Data Containerを無効にしても動作します。

    @Test
     public void nonSegments() {
        System.setProperty("org.infinispan.feature.data-segmentation", "false");

         this.<String, Integer>withCache("defaultCache", 3, cache -> {

             HashConfiguration hashConfiguration = cache.getCacheConfiguration().clustering().hash();

             // Segmentの数(デフォルト)
             assertThat(hashConfiguration.numSegments())
                     .isEqualTo(256);

             // 設定されているKeyPartitionerの確認(デフォルト)
             assertThat(hashConfiguration.keyPartitioner())
                     .isInstanceOf(HashFunctionPartitioner.class);

             IntStream.rangeClosed(1, 1000).forEach(i -> cache.put("key" + i, i));

             // データの配置を確認するのに、DistributionManagerを使う
             DistributionManager dm = cache.getAdvancedCache().getDistributionManager();

             // キーの配置状況の確認
             cache.forEach((k, v) -> {
                 int segmentId = dm.getCacheTopology().getSegment(k);
                 DistributionInfo di = dm.getCacheTopology().getSegmentDistribution(segmentId);

                 System.out.printf(
                         "key[%s]: segment[%d], primary[%s], backup[%s]%n",
                         k,
                         segmentId,
                         di.primary(),
                         di.writeBackups().stream().map(Address::toString).collect(Collectors.joining(", "))
                 );
             });

             // ローカルのSegmentの数
             IntSet segments = dm.getCacheTopology().getLocalReadSegments();
             assertThat(segments.size()).isGreaterThan(150);

             // ローカルのDataContainerが保持しているSegmentに属するエントリ数
             InternalDataContainer<String, Integer> dataContainer =
                     (InternalDataContainer<String, Integer>) cache.getAdvancedCache().getDataContainer();

             assertThat(dataContainer.size(segments)).isGreaterThan(600);

             // あるSegmentに属するCacheEntryを取得
             int anySegmentId = segments.stream().findAny().get();
             System.out.printf("selected segment-id = %d%n", anySegmentId);
             dataContainer
                     .forEach(
                             IntSets.immutableSet(anySegmentId),
                             cacheEntry -> System.out.println(cacheEntry)
                     );

             int segmentSize =
                     dm
                             .getCacheTopology()
                             // クラスタ内の全Memberを取得
                             .getMembersSet()
                             .stream()
                             // PrimaryOwnerが持つSegmentの集合を取得
                             .map(address -> dm.getReadConsistentHash().getPrimarySegmentsForOwner(address))
                             .peek(s -> System.out.println(s.size()))
                             .peek(s -> assertThat(s.size()).isGreaterThan(75))
                             .reduce(0, (acc, cur) -> acc + cur.size(), (acc, cur) -> acc + cur);

             assertThat(segmentSize).isEqualTo(256);
         });

        System.clearProperty("org.infinispan.feature.data-segmentation");
    }

そう、Segmented Data Containerがなかった頃と、そう変わらないんですよ。このあたりの動きは。

Distributed Stream

では、なにが変わったのかというと、このブログエントリにあるように、パフォーマンスに影響します。

Infinispan: Segmented Data Containers: Distributed Stream Performance Boost

簡単にですが、100万件のデータを突っ込んで試してみましょう。

Segmented Data Containerが有効な場合。

    @Test
    public void segmentedDistributedStreams() {
        this.<String, Integer>withCache("defaultCache", 3, cache -> {
            IntStream.rangeClosed(1, 1000000).forEach(i -> cache.put("key" + i, i));

            try (CacheStream<Map.Entry<String, Integer>> stream = cache.entrySet().stream()) {
                long startTime = System.nanoTime();

                long summarizeResult =
                        stream
                                .map(e -> e.getValue() * 2)
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summarizingInt(i -> i)))
                                .getSum();

                long elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

                System.out.println(elapsedTime);
                assertThat(elapsedTime)
                        .isLessThan(180L);

                assertThat(summarizeResult).isEqualTo(1000001000000L);
            }

            try (CacheStream<Map.Entry<String, Integer>> stream = cache.entrySet().stream()) {
                int segmentId =
                        cache.getAdvancedCache().getDistributionManager().getCacheTopology().getSegment("key1");
                assertThat(segmentId).isEqualTo(46);

                long startTime = System.nanoTime();

                long summarizeResult =
                        stream
                                .filterKeySegments(IntSets.immutableSet(segmentId))
                                .map(e -> e.getValue() * 2)
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summarizingInt(i -> i)))
                                .getSum();

                long elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

                System.out.println(elapsedTime);
                assertThat(elapsedTime)
                        .isLessThan(10L);

                assertThat(summarizeResult).isEqualTo(3901388570L);
            }
        });
    }

100万件のデータを登録。

            IntStream.rangeClosed(1, 1000000).forEach(i -> cache.put("key" + i, i));

最初は、単純に2倍して合算。

            try (CacheStream<Map.Entry<String, Integer>> stream = cache.entrySet().stream()) {
                long startTime = System.nanoTime();

                long summarizeResult =
                        stream
                                .map(e -> e.getValue() * 2)
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summarizingInt(i -> i)))
                                .getSum();

                long elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

                System.out.println(elapsedTime);
                assertThat(elapsedTime)
                        .isLessThan(180L);

                assertThat(summarizeResult).isEqualTo(1000001000000L);
            }

もうひとつは、キー「key1」を含むSegmentのみで合算。

            try (CacheStream<Map.Entry<String, Integer>> stream = cache.entrySet().stream()) {
                int segmentId =
                        cache.getAdvancedCache().getDistributionManager().getCacheTopology().getSegment("key1");
                assertThat(segmentId).isEqualTo(46);

                long startTime = System.nanoTime();

                long summarizeResult =
                        stream
                                .filterKeySegments(IntSets.immutableSet(segmentId))
                                .map(e -> e.getValue() * 2)
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summarizingInt(i -> i)))
                                .getSum();

                long elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

                System.out.println(elapsedTime);
                assertThat(elapsedTime)
                        .isLessThan(10L);

                assertThat(summarizeResult).isEqualTo(3901388570L);
            }

演算結果については、Segmented Data Containerを無効にしても変わりません。そりゃあ、そうですよね。

    @Test
    public void nonSegmentedDistributedStreams() {
        System.setProperty("org.infinispan.feature.data-segmentation", "false");

        this.<String, Integer>withCache("defaultCache", 3, cache -> {
            IntStream.rangeClosed(1, 1000000).forEach(i -> cache.put("key" + i, i));

            try (CacheStream<Map.Entry<String, Integer>> stream = cache.entrySet().stream()) {
                long startTime = System.nanoTime();

                long summarizeResult =
                        stream
                                .map(e -> e.getValue() * 2)
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summarizingInt(i -> i)))
                                .getSum();

                long elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

                System.out.println(elapsedTime);
                assertThat(elapsedTime)
                        .isLessThan(300L);

                assertThat(summarizeResult).isEqualTo(1000001000000L);
            }

            try (CacheStream<Map.Entry<String, Integer>> stream = cache.entrySet().stream()) {
                int segmentId =
                        cache.getAdvancedCache().getDistributionManager().getCacheTopology().getSegment("key1");
                assertThat(segmentId).isEqualTo(46);

                long startTime = System.nanoTime();

                long summarizeResult =
                        stream
                                .filterKeySegments(IntSets.immutableSet(segmentId))
                                .map(e -> e.getValue() * 2)
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summarizingInt(i -> i)))
                                .getSum();

                long elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

                System.out.println(elapsedTime);
                assertThat(elapsedTime)
                        .isLessThan(150L);

                assertThat(summarizeResult).isEqualTo(3901388570L);
            }
        });

        System.clearProperty("org.infinispan.feature.data-segmentation");
    }

違うのは処理時間で、全件処理の場合はこれくらい(単位はミリ秒)

// Segmented Data Container
                assertThat(elapsedTime)
                        .isLessThan(180L);


// Disabled, Segmented Data Container
                assertThat(elapsedTime)
                        .isLessThan(300L);

ひとつのSegmentで実行した場合は、これくらい違います。

// Segmented Data Container
                assertThat(elapsedTime)
                        .isLessThan(10L);


// Disabled, Segmented Data Container
                assertThat(elapsedTime)
                        .isLessThan(150L);

どちらもSegmented Data Containerの方が高速になっているのですが、特にSegmentを絞った場合の方が顕著に速いですね。

なお、これはあくまでローカルで3 Node立てて単純に実行しただけなので、きっちりしたベンチマークが必要であれば、
自前で実施して確認していただけますと…。

とりあえず、動かしてはみましたよ、と。では、ここからは少し気になるところを見ていきましょう。

Features?

Segmented Data Containerに出てきたFeaturesですが、どうもInfinispan 9.4から出てきたみたいです。

[ISPN-9538] Allow for segmentation to be configurable via Features - JBoss Issue Tracker

Features (Infinispan JavaDoc All 9.4.0.Final API)

https://github.com/infinispan/infinispan/blob/9.4.0.Final/commons/src/main/java/org/infinispan/commons/util/Features.java

特定の機能が有効、無効かどうかを確認できるのですが、現状ではそれを使用しているのはSegmented Data Container
だけのようですね。

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/factories/DataContainerFactory.java#L55-L56

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L881

Featuresの使い方としては、先に書いたとおり「META-INF/infinispan-features.properties」ファイル、もしくはシステムプロパティで
値をコントロールします。

優先順位は、システムプロパティの方が高いです。

設定はFeatures#isAvailableで、キーを指定して確認します。「META-INF/infinispan-features.properties」およびシステムプロパティの
両方に指定がない場合は、trueが返ります。

Segmented Data Containerの実装について

各種Segmented Data Containerの実装は、以下のパッケージを眺めてみるとよいでしょう。

それぞれ、クラス名に「Segmented」が入っているので、見つけるのは容易です。Infinispan 9.4.0.Finalの時点だと、最初に
サンプルとして列挙したものが対象となります。

On-Heap
https://github.com/infinispan/infinispan/tree/9.4.0.Final/core/src/main/java/org/infinispan/container/impl

Off-Heap
https://github.com/infinispan/infinispan/tree/9.4.0.Final/core/src/main/java/org/infinispan/container/offheap

Distributed Streamが速くなったのは?

ここ、気になりますよね。ちょっと追ってみました。

Distributed Streamを使う時、Distributed Cacheであっても、最終的にはローカルで実行されるStreamに落ちていきます。
ここで、Streamを組み上げる時に、InternalDataContainerのSpliteratorを使用します。

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/stream/impl/local/AbstractLocalCacheStream.java#L63

infinispan/SegmentedEntryStreamSupplier.java at 9.4.0.Final · infinispan/infinispan · GitHub

Segmented Data Containerの場合は、指定されたSegmentに対するMapをflattenする形でSpliteratorが構築されます。

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/DefaultSegmentedDataContainer.java#L101-L104

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/DefaultSegmentedDataContainer.java#L131-L143

これに対して通常のDataContainerだと、イテレーションの際に各エントリが該当のSegmentに該当するかを逐次判定するように
なっています。

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/DefaultDataContainer.java#L191-L194

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/DefaultDataContainer.java#L208-L212

つまり、各DataContainerが持つエントリを全件見ていたということになるわけですね。

これを、Segmentごとにデータを管理するようになったことで、対象外のSegmentに関するデータは見ずに、ストレートに指定の
Segmentに関するデータを扱えるようになったのがSegmented Data Containerに関する変更と言えそうです。

実際、DefaultSegmentedDataContainerではSegment単位にMapを分けるようになっています。 https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/DefaultSegmentedDataContainer.java#L45

   protected final AtomicReferenceArray<ConcurrentMap<K, InternalCacheEntry<K, V>>> maps;

このあたりが、Cache#sizeといった演算の速度にも影響するのでしょう。
※ブログでも、Cache#sizeの速度が大きく向上していましたね

というか、これまでは実際のデータ管理としてはSegmentごとの管理ではなかった、ということですね。

put/getには影響しないの?

ブログによると、パフォーマンス的には影響があるようです。まあ、そりゃあそうですよね…。

What about gets and puts?

Having the container segmented should also affect get and put performance as well, right? In testing the difference for get and puts are less than one percent, in favor of segmentation due to some optimizations we were able to add.

Infinispan: Segmented Data Containers: Distributed Stream Performance Boost

putもgetも、Segmentを意識するように変更されています。

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/AbstractInternalDataContainer.java#L114-L149

https://github.com/infinispan/infinispan/blob/9.4.0.Final/core/src/main/java/org/infinispan/container/impl/AbstractInternalDataContainer.java#L76-L90

ここで、Segmented Data Containerの場合はSegment単位のMapが使われ、そうでない場合はローカルのDataContainerが持つ
全エントリを格納したMapが使われます。

細々と、Segmentを使うようにしたことがいろいろ波及している感じですねぇ。

まとめ

Infinispan 9.3/9.4で導入された、Segmented Data Containerを試してみました。
※Persistenceは除く

Infinispanのデータの管理の概念にSegmentがあることは知っていましたが、ここまでちゃんと見たことはなかったので、
いろいろと勉強になりました。面白かったです。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-segmented-data-containers