CLOVER🍀

That was when it all began.

Hazelcast 3.5で追加されたRingbufferを試す

Hazelcast 3.5から、Ringbufferというものが追加されていました。

全然気付いてなかったので、ちょっと試してみることにします。

Ringbuffer

Ringbufferとは?

ドキュメントによれば、いくらかの容量を持った循環配列?と思えばいいらしいです。tailとheadを持ち、要素を追加していくと最初(=head)のものは上書きされるか、expireされます。

Ringbufferの要素を使用する際には、Sequence IDを使用し、これはRingbufferから取得することができます。Ringbuffer内に格納されている要素は、headからtailの間にマップされていることになります。

現在のHazelcastの持つデータ構造としては、Queueが比較対象になるようです。

IQueue vs. Ringbuffer

QueueとRingbufferの違いは、こんな感じみたいです。

  • Ringbufferは要素を削除できない
  • Ringbufferは決まった位置(=Sequence IDで指定した)要素しか取得することができない

代わりに、Queueに比べてのアドバンテージとしては

  • 同じ要素を複数回取得することができ、マルチスレッドでアクセス可能。これは、read-at-least-once または read-at-most-onceを実現するのに役立つだろう
  • 同じ要素をマルチスレッドで読むことができる。これ自体はスレッドごとにQueueを使えば可能だが、リモート処理の効率が悪い。また、Queueからの取得(take)は破壊的であるため、その変更をバックアップ先にも適用する必要があり、Ringbufferのreadよりも適用コストがかかる
  • このため、Ringbufferは変更されないので、読み取りによるレプリケーションを要求されない
  • Ringbufferは読み込み、書き込みともにバッチ処理を持つため、パフォーマンスを改善することができる

ということみたいです。

こんな感じに説明されているRingbufferですが、ドキュメントを読んだだけだとよくわからないので、使っていってみます。

なお、ListやSet、Queueなどと同様、Ringbufferに格納されるデータは、ひとつのNodeのメモリサイズの範囲になりそうな感じがします。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/RingbufferProxy.java#L72

準備

Maven依存関係は、このように定義。

        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>3.5.3</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.2.0</version>
            <scope>test</scope>
        </dependency>

Hazelcast 3.5.3を使用、JUnitとAssertJはテスト用です。

Hazelcastの設定ファイルも用意しました。
src/test/resources/hazelcast.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.5.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <group>
        <name>my-cluster</name>
        <password>my-password</password>
    </group>

    <network>
        <port auto-increment="true" port-count="100">5701</port>
        <join>
            <multicast enabled="true">
                <multicast-group>224.2.2.3</multicast-group>
                <multicast-port>54327</multicast-port>
            </multicast>
            <tcp-ip enabled="false"/>
        </join>
    </network>

<!-- 後で -->

</hazelcast>

Ringbufferの設定については、また後で。

あとは、テスト用の雛形。
src/test/java/org/littlewings/hazelcast/ringbuffer/HazelcastRingbufferTest.java

package org.littlewings.hazelcast.ringbuffer;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.fail;

public class HazelcastRingbufferTest {

   // ここに、テストを書く!

    protected void withHazelcast(int numInstances, Consumer<HazelcastInstance> consumer) {
        List<HazelcastInstance> hazelcastInstances =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml")))
                        .collect(Collectors.toList());

        try {
            consumer.accept(hazelcastInstances.get(0));
        } finally {
            hazelcastInstances.forEach(h -> h.getLifecycleService().shutdown());
            Hazelcast.shutdownAll();
        }
    }
}

複数Nodeの起動停止ができるようにしたのですが、いろいろあって1 Node分しか使いませんでしたけど…。

使ってみる

それでは、Ringbufferを使ってみます。

最初にできたコードは、こんな感じです。

    @Test
    public void testSimpleRingbuffer() {
        withHazelcast(1, hazelcast -> {
            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("default");

            IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));

            assertThat(ringbuffer.size())
                    .isEqualTo(100L);

            long headSequence = ringbuffer.headSequence();
            assertThat(headSequence)
                    .isEqualTo(0L);

            try {
                long currentSequence = headSequence;
                assertThat(ringbuffer.readOne(currentSequence))
                        .isEqualTo("element1");

                currentSequence++;

                String lastElement = null;

                while (ringbuffer.size() > 0) {
                    lastElement = ringbuffer.readOne(currentSequence);

                    assertThat(lastElement)
                            .isNotNull();

                    if (currentSequence == 99) {
                        break;
                    }

                    currentSequence++;
                }

                assertThat(lastElement)
                        .isEqualTo("element100");

                assertThat(ringbuffer.headSequence())
                        .isEqualTo(0L);
                assertThat(ringbuffer.tailSequence())
                        .isEqualTo(99L);
            } catch (InterruptedException e) {
                fail(e.getMessage());
            }
        });
    }

ここでは、デフォルト設定のRingbufferを取得して、要素を100個追加しています。

            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("default");

            IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));

この時のサイズは100になり、headのSequence IDは0となります。

            assertThat(ringbuffer.size())
                    .isEqualTo(100L);

            long headSequence = ringbuffer.headSequence();
            assertThat(headSequence)
                    .isEqualTo(0L);

要素を取得する時は、Ringbuffer#readOneで。

                long currentSequence = headSequence;
                assertThat(ringbuffer.readOne(currentSequence))
                        .isEqualTo("element1");

Sequence IDを自分でインクリメントしていくことで、後続の要素を取得することができます。

                currentSequence++;

                String lastElement = null;

                while (ringbuffer.size() > 0) {
                    lastElement = ringbuffer.readOne(currentSequence);

                    assertThat(lastElement)
                            .isNotNull();

                    if (currentSequence == 99) {
                        break;
                    }

                    currentSequence++;
                }

                assertThat(lastElement)
                        .isEqualTo("element100");

この時、headは0、tailは99です。

                assertThat(ringbuffer.headSequence())
                        .isEqualTo(0L);
                assertThat(ringbuffer.tailSequence())
                        .isEqualTo(99L);

複数要素を読む

今度は、複数の要素を一気に取得してみます。

初期化は先ほどのコードと一緒なので端折りまして、複数要素の取得はRingbuffer#readManyAsyncを使用します。

                long currentSequence = headSequence;
                assertThat(ringbuffer.readOne(currentSequence))
                        .isEqualTo("element1");

                currentSequence++;

                ICompletableFuture<ReadResultSet<String>> completableFuture =
                        ringbuffer
                                .readManyAsync(currentSequence, 1, 10, null);

複数追加は今回紹介しませんが、こういう複数要素に対数する操作は、Asyncとなるようです。

結果は、ICompletableFutureにReadResultSetが包まれて返ってくるので、getして確認。

                ReadResultSet<String> readResultSet = completableFuture.get();
                int readCount = readResultSet.readCount();

                List<String> resultElements =
                        IntStream
                                .range(0, readCount)
                                .mapToObj(i -> readResultSet.get(i))
                                .collect(Collectors.toList());

                List<String> verifyResultElements =
                        IntStream
                                .range(2, readCount + 2)
                                .mapToObj(i -> "element" + i)
                                .collect(Collectors.toList());

                assertThat(resultElements)
                        .isEqualTo(verifyResultElements);

キャパシティを超える場合

Ringbufferのデフォルトのキャパシティは10,000だそうなのですが、キャパシティを超えるとどうなるのかという話。

10,000要素はちょっと多いので、キャパシティを小さくしたRingbufferを用意。

    <ringbuffer name="small-ringbuffer">
        <capacity>10</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>0</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

capacity要素が10です。

このRingbufferに対して、要素を100個登録します。

            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer");

            IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));

            assertThat(ringbuffer.size())
                    .isEqualTo(10L);

            long headSequence = ringbuffer.headSequence();
            assertThat(headSequence)
                    .isEqualTo(90L);

すると、サイズは10に、headは90となります。

headからの要素が、順次上書きされた??感じになる、と。

                long currentSequence = headSequence;
                assertThat(ringbuffer.readOne(currentSequence))
                        .isEqualTo("element91");

                currentSequence++;

                String lastElement = null;

                while (ringbuffer.size() > 0) {
                    lastElement = ringbuffer.readOne(currentSequence);

                    assertThat(lastElement)
                            .isNotNull();

                    if (currentSequence == 99) {
                        break;
                    }

                    currentSequence++;
                }

                assertThat(lastElement)
                        .isEqualTo("element100");

headは90、tailは99ということになっています。

                assertThat(ringbuffer.headSequence())
                        .isEqualTo(90L);
                assertThat(ringbuffer.tailSequence())
                        .isEqualTo(99L);

サイズ、キャパシティとtime-to-live(TTL)と

Ringbufferには、size、capacity、remainingCapacityなどのメソッドがあるので、これを登録した要素に応じてどう変動するかを見てみます。

http://docs.hazelcast.org/docs/3.5/manual/html-single/hazelcast-documentation.html#capacity:Capacity

利用するRingbufferは、デフォルト設定のもので要素を100個登録。

            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("default");

            IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));

sizeが100、capacityが10,000(デフォルト)、remainingCapacityが10,000。

            assertThat(ringbuffer.size())
                    .isEqualTo(100L);

            long headSequence = ringbuffer.headSequence();
            assertThat(headSequence)
                    .isEqualTo(0L);

            long tailSequence = ringbuffer.tailSequence();
            assertThat(tailSequence)
                    .isEqualTo(99L);

            assertThat(ringbuffer.capacity())
                    .isEqualTo(10000L);

            assertThat(ringbuffer.remainingCapacity())
                    .isEqualTo(10000L);

remainingCapacityとcapacityが同じ?と思ってコードを読むと、time-to-live(TTL)を設定しないとcapacityと同じ値を返すようになっているみたいです。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/RingbufferProxy.java#L116

というわけで、有効期限付きのRingbufferを用意。

    <ringbuffer name="with-expired">
        <capacity>10000</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>3</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

TTLを3秒とします。

Time to live

このRingbufferを使って、確認してみます。

            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("with-expired");

            IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));

            assertThat(ringbuffer.size())
                    .isEqualTo(100L);

            long headSequence = ringbuffer.headSequence();
            assertThat(headSequence)
                    .isEqualTo(0L);

            long tailSequence = ringbuffer.tailSequence();
            assertThat(tailSequence)
                    .isEqualTo(99L);

            assertThat(ringbuffer.capacity())
                    .isEqualTo(10000L);

            assertThat(ringbuffer.remainingCapacity())
                    .isEqualTo(9900L);

すると、確かにremainingCapacityが登録した要素分だけ減少しました。

せっかくなので、このままTTLの確認もしてみます。

まずは、普通に要素が読めることを確認します。

                long currentSequence = headSequence;
                assertThat(ringbuffer.readOne(currentSequence))
                        .isEqualTo("element1");

                currentSequence++;

                ICompletableFuture<ReadResultSet<String>> completableFuture =
                        ringbuffer
                                .readManyAsync(currentSequence, 5, 10, null);

                ReadResultSet<String> readResultSet = completableFuture.get();
                int readCount = readResultSet.readCount();

                currentSequence += readCount;

headは0でしたね。

この後、5秒待ちます。

                TimeUnit.SECONDS.sleep(5);

すると、先ほどのSequence IDからすぐ後ろの要素を取得しようとすると、例外が飛ぶようになります。

                long seq1 = currentSequence++;
                assertThatThrownBy(() -> ringbuffer.readOne(seq1))
                        .isInstanceOf(StaleSequenceException.class);

                long seq2 = currentSequence++;
                assertThatThrownBy(() -> ringbuffer.readOne(seq2))
                        .isInstanceOf(StaleSequenceException.class);

どういう状態になっているかというと、headは100に移動し、tailは(なぜか)99、remainingCapacityはcapacityは同じ値になります。

                assertThat(ringbuffer.headSequence())
                        .isEqualTo(100L);

                assertThat(ringbuffer.tailSequence())
                        .isEqualTo(99L);

                assertThat(ringbuffer.capacity())
                        .isEqualTo(10000L);

                assertThat(ringbuffer.remainingCapacity())
                        .isEqualTo(10000L);

Overflow Policy

今度は、Overflow Policy。

Overflow Policy

RingBuffer#addAsync、addAllAsyncを使うと、キャパシティを超えたときの挙動を以下の2つから選ぶことができます。

  • OverflowPolicy.OVERWRITE … 古い要素を上書き
  • verflowPolicy.FAIL … 呼び出しはアボートし、戻り値は-1となる。要素の追加は失敗する

こちらを確認してみます。

使用するRingbufferの定義は、

    <ringbuffer name="small-ringbuffer">
        <capacity>10</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>0</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

とします。

で、こちらを使って確認。OverflowPolicyは、FAILとします。

            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer");

            List<ICompletableFuture<Long>> futures =
                    IntStream
                            .rangeClosed(1, 100)
                            .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.FAIL))
                            .collect(Collectors.toList());

            List<Long> results =
                    futures
                            .stream()
                            .map(f -> {
                                try {
                                    return f.get();
                                } catch (InterruptedException | ExecutionException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .collect(Collectors.toList());

サイズは100になりました。

            assertThat(results)
                    .hasSize(100);

ところがですね、ドキュメント上は「-1」になると言っているのに、この結果に含まれる戻り値に「-1」はありません…。
※こちらは後述

            assertThat(results)
                    .doesNotContain(-1L);

Ringbufferへの格納要素としては、headが90、tailが99で、最後に登録した要素が入っている感じですね。

                long headSequence = ringbuffer.headSequence();
                assertThat(ringbuffer.readOne(headSequence))
                        .isEqualTo("element91");
                assertThat(headSequence)
                        .isEqualTo(90L);

                long tailSequence = ringbuffer.tailSequence();
                assertThat(ringbuffer.readOne(tailSequence))
                        .isEqualTo("element100");
                assertThat(tailSequence)
                        .isEqualTo(99L);

って、あれ?これって、OVERWRITEでは…。

というわけで、OVERWRITEでも確認すると、まったく同じ挙動になります。

        withHazelcast(1, hazelcast -> {
            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer");

            List<ICompletableFuture<Long>> futures =
                    IntStream
                            .rangeClosed(1, 100)
                            .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.OVERWRITE))
                            .collect(Collectors.toList());

            List<Long> results =
                    futures
                            .stream()
                            .map(f -> {
                                try {
                                    return f.get();
                                } catch (InterruptedException | ExecutionException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .collect(Collectors.toList());

            assertThat(results)
                    .hasSize(100);
            assertThat(results)
                    .doesNotContain(-1L);

            try {
                long headSequence = ringbuffer.headSequence();
                assertThat(ringbuffer.readOne(headSequence))
                        .isEqualTo("element91");
                assertThat(headSequence)
                        .isEqualTo(90L);

                long tailSequence = ringbuffer.tailSequence();
                assertThat(ringbuffer.readOne(tailSequence))
                        .isEqualTo("element100");
                assertThat(tailSequence)
                        .isEqualTo(99L);
            } catch (InterruptedException e) {
                fail(e.getMessage());
            }
        });

この挙動ですが、さらにTTLを設定すると変化があります。

というわけで、こんなRingbufferを用意。

    <ringbuffer name="small-ringbuffer-expired">
        <capacity>10</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>10</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

10秒のTTLです。

すると、FAILだと最初に登録した要素しか残らなくなります。

    @Test
    public void testRingbufferOverflowFailWithExpiry() {
        withHazelcast(1, hazelcast -> {
            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer-expired");

            List<ICompletableFuture<Long>> futures =
                    IntStream
                            .rangeClosed(1, 100)
                            .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.FAIL))
                            .collect(Collectors.toList());

            List<Long> results =
                    futures
                            .stream()
                            .map(f -> {
                                try {
                                    return f.get();
                                } catch (InterruptedException | ExecutionException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .collect(Collectors.toList());

            assertThat(results)
                    .hasSize(100);
            assertThat(results)
                    .contains(-1L);

            try {
                long headSequence = ringbuffer.headSequence();
                assertThat(ringbuffer.readOne(headSequence))
                        .isEqualTo("element1");
                assertThat(headSequence)
                        .isEqualTo(0L);

                long tailSequence = ringbuffer.tailSequence();
                assertThat(ringbuffer.readOne(tailSequence))
                        .isEqualTo("element10");
                assertThat(tailSequence)
                        .isEqualTo(9L);
            } catch (InterruptedException e) {
                fail(e.getMessage());
            }
        });
    }

Ringbuffer#addAsyncの結果に、「-1」が含まれるようになります。

さらに、headが0でtailは9となります。

                long tailSequence = ringbuffer.tailSequence();
                assertThat(ringbuffer.readOne(tailSequence))
                        .isEqualTo("element10");
                assertThat(tailSequence)
                        .isEqualTo(9L);

OVERRITEの場合は、そう変わりませんね。

    @Test
    public void testRingbufferOverflowOverwriteWithExpiry() {
        withHazelcast(1, hazelcast -> {
            Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer-expired");

            List<ICompletableFuture<Long>> futures =
                    IntStream
                            .rangeClosed(1, 100)
                            .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.OVERWRITE))
                            .collect(Collectors.toList());

            List<Long> results =
                    futures
                            .stream()
                            .map(f -> {
                                try {
                                    return f.get();
                                } catch (InterruptedException | ExecutionException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .collect(Collectors.toList());

            assertThat(results)
                    .hasSize(100);
            assertThat(results)
                    .doesNotContain(-1L);

            try {
                long headSequence = ringbuffer.headSequence();
                assertThat(ringbuffer.readOne(headSequence))
                        .isEqualTo("element91");
                assertThat(headSequence)
                        .isEqualTo(90L);

                long tailSequence = ringbuffer.tailSequence();
                assertThat(ringbuffer.readOne(tailSequence))
                        .isEqualTo("element100");
                assertThat(tailSequence)
                        .isEqualTo(99L);
            } catch (InterruptedException e) {
                fail(e.getMessage());
            }
        });
    }

ただ、FAILポリシーにした時に複数Nodeでクラスタを構成すると、headで取得できる要素がずれたりしたので、何かあるかも…。

OverflowPolicy.FAILを使ってもTTLを設定しないとドキュメント通りの動きにならないのは、OverflowPolicy.FAILを設定した時の判定条件がRingbuffer#remainingCapacityに依存しているからみたいです。

        if (overflowPolicy == FAIL) {
            if (ringbuffer.remainingCapacity() < 1) {
                resultSequence = -1;
                return;
            }
        }

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/operations/AddOperation.java#L55

まとめ

Hazelcast 3.5から追加された、Ringbufferを試してみました。

TTLを合わせた時にしか効かない設定がいろいろあって、ちょっと挙動がよくわかりませんが…とりあえず、使い方はなんとなくわかった気がします。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-ringbuffer