CLOVER🍀

That was when it all began.

HazelcastのReliable Topicを試す

Hazelcast 3.5から、Reliable Topicというものが増えています。

Reliable Topic

これまでのTopicと同様に、ITopicインターフェースを実装したものですが、以下の特徴があるそうです(ドキュメントのちょい訳)。

  • Reliable Topicの実体は、Ringbufferである
  • Ringbufferは、デフォルトでバックアップをひとつ持つように構成されているので、イベントが失われない(※)
  • 各Reliable TopicはそれぞれRingbufferを持つため、あるTopicに対するProducerが高速なであり、他のTopicが低速でも問題はありません
  • 通常Topicの背後にあるイベントシステムは、他のデータ構造(例えばCollection Listeners)と共有してしまっていますが、これはReliable Topicでは発生しません。

※ そういえば、確かにTopicにはバックアップの指定がないですね…。

よくわかりませんけど、要するに

  • Ringbufferを背後に持つTopicである
  • 背後がRingbufferのため、バックアップを持てる
  • Reliable Topicごとにデータの保存領域が別のため、Topicが独立している

といったところでしょうか。

確かに、これまでのTopicの実装を見ると、分離されてない感はありますね…。

https://github.com/hazelcast/hazelcast/blob/v3.6/hazelcast/src/main/java/com/hazelcast/topic/impl/TopicProxySupport.java#L91
https://github.com/hazelcast/hazelcast/blob/v3.6/hazelcast/src/main/java/com/hazelcast/spi/impl/eventservice/impl/EventServiceImpl.java#L282

それはそうと、使ってみましょう。

準備

Reliable Topicを使った、テストコードでサンプルは書いていきます。

Maven依存関係は、以下のように定義。

        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>3.6</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.3.0</version>
            <scope>test</scope>
        </dependency>

テストコードの雛形は、こちら。Hazelcastクラスタを、簡易に構成するためのメソッドを付きです。
src/test/java/org/littlewings/hazelcast/reliabletopic/HazelcastReliableTopicTest.java

package org.littlewings.hazelcast.reliabletopic;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.topic.TopicOverloadException;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HazelcastReliableTopicTest {

    // ここに、テストコードを書く!

    protected void withHazelcast(int numInstances, Consumer<HazelcastInstance> f) {
        List<HazelcastInstance> hazelcastInstances =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml")))
                        .collect(Collectors.toList());

        try {
            f.accept(hazelcastInstances.get(0));
        } finally {
            hazelcastInstances.forEach(h -> h.getLifecycleService().shutdown());
            Hazelcast.shutdownAll();
        }
    }
}

設定ファイルは、こちら。Reliable Topicsの設定などは、また後で記載します。
src/test/resources/hazelcast.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <group>
        <name>my-cluster</name>
        <password>my-pass</password>
    </group>
    <network>
        <port auto-increment="true" port-count="100">5701</port>
        <join>
            <multicast enabled="true">
                <multicast-group>224.10.10.3</multicast-group>
                <multicast-port>54327</multicast-port>
            </multicast>
        </join>
    </network>

    <!-- Reliable TopicとRingbufferの定義 -->
</hazelcast>

設定なしのReliable Topicを使う

まずは、設定なしの状態で使ってみましょう。

コードはこちら。

    @Test
    public void testSimpleUsage() {
        withHazelcast(3, hazelcastInstance -> {
            ITopic<String> topic = hazelcastInstance.getReliableTopic("default");

            List<String> receivedMessage = new ArrayList<>();
            topic.addMessageListener(message -> receivedMessage.add(message.getMessageObject()));

            topic.publish("Hello World");
            topic.publish("Hello Hazelcast!");

            assertThat(receivedMessage)
                    .isEqualTo(Arrays.asList("Hello World", "Hello Hazelcast!"));

            assertThat(topic.getLocalTopicStats().getPublishOperationCount())
                    .isEqualTo(2L);
            assertThat(topic.getLocalTopicStats().getReceiveOperationCount())
                    .isEqualTo(2L);
        });
    }

HazelcastInstanceより、Reliable Topicを取得します。型としては、通常のTopicと同様ITopicとして扱えます。

            ITopic<String> topic = hazelcastInstance.getReliableTopic("default");

Reliable Topicに対して、Listenerを登録します。MessageListenerというインターフェースを実装したクラスを作成するのですが、今回はLambda式で済ませました。

            List<String> receivedMessage = new ArrayList<>();
            topic.addMessageListener(message -> receivedMessage.add(message.getMessageObject()));

Listenerは、Serializableである必要はありません。

あとは、普通にTopicと同じように使います。

            topic.publish("Hello World");
            topic.publish("Hello Hazelcast!");

            assertThat(receivedMessage)
                    .isEqualTo(Arrays.asList("Hello World", "Hello Hazelcast!"));

            assertThat(topic.getLocalTopicStats().getPublishOperationCount())
                    .isEqualTo(2L);
            assertThat(topic.getLocalTopicStats().getReceiveOperationCount())
                    .isEqualTo(2L);

それほど難しくはありません。

設定&オーバーフローさせてみる

では、今度はReliable Topicの設定をしつつ、Topicのオーバーフロー時の挙動を見ていきたいと思います。

設定ファイルに、Reliable TopicとRingbufferを以下のように定義。この時、Reliable Topicと対になるRingbufferの名前は、同じである必要があります。

    <reliable-topic name="with-ttl">
        <read-batch-size>10</read-batch-size>
        <topic-overload-policy>BLOCK</topic-overload-policy>
        <statistics-enabled>true</statistics-enabled>
    </reliable-topic>

    <ringbuffer name="with-ttl">
        <capacity>10000</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>30</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

これは、デフォルトのHazelcastの設定から持ってきたものです。
https://github.com/hazelcast/hazelcast/blob/v3.6/hazelcast/src/main/resources/hazelcast-default.xml#L237-L249

Ringbufferの容量が10,000であり、TTLは30秒です。この状態で、Reliable Topicのtopic-overload-policyはBLOCKと設定されています。

この設定で、10,000を超えるメッセージを登録してみます。また、メッセージを消費するためのListnerは、今回は付けません。

    @Test
    public void testWithRingbufferTtl() {
        withHazelcast(3, hazelcastInstance -> {
            ITopic<String> topic = hazelcastInstance.getReliableTopic("with-ttl");

            long start = System.currentTimeMillis();

            IntStream
                    .rangeClosed(1, 10005)
                    .forEach(i -> topic.publish("message-" + i));

            long elapsed = System.currentTimeMillis() - start;

            assertThat(topic.getLocalTopicStats().getPublishOperationCount())
                    .isEqualTo(10005L);
            assertThat(topic.getLocalTopicStats().getReceiveOperationCount())
                    .isEqualTo(0L);

            assertThat(elapsed)
                    .isGreaterThanOrEqualTo(30 * 1000L);
        });
    }

メッセージを登録しきるまで、30秒かかっています。

というわけで、Ringbufferの設定に依存しています、と。

実際、Reliable Topicの内部ではRingbufferを取得しています。
https://github.com/hazelcast/hazelcast/blob/v3.6/hazelcast/src/main/java/com/hazelcast/topic/impl/reliable/ReliableTopicProxy.java#L79
https://github.com/hazelcast/hazelcast/blob/v3.6/hazelcast/src/main/java/com/hazelcast/topic/impl/reliable/ReliableTopicService.java#L26

で、いきなり登場したtopic-overload-policyという設定ですが、メッセージを読み出すConsumerが遅い場合のための制御になります。メッセージがRingbufferの容量を超えた場合に、どのように振る舞うかを以下の4つから定義できます。

  • DISCARD_OLDEST … RingbufferにTTLが設定されている場合、古い要素を上書きします
  • DISCARD_NEWEST … 新しい要素を破棄します
  • BLOCK … Ringbufferの要素が有効期限切れするまで、ブロックします(デフォルト)
  • ERROR … Ringbufferに空きがない場合、TopicOverloadExceptionをスローします

その他の設定としては、Ringbufferから1度にどのくらい読み出すかのバッチサイズ(read-batch-size)、統計情報の有効化(statistics-enabled)があります。

また、Reliable Topicはデフォルトで共有されたスレッドプールを使用するらしいので、より分離度を求めたい場合はReliableTopicConfigに対してExecutorを設定します。
※これは、設定ファイルではできません

では、他のtopic-overload-policyも、試してみましょう。

topic-overload-policy ERRORを試す

topic-overload-policyのひとつである、「ERROR」を試してみます。

    <reliable-topic name="overload-error-policy">
        <read-batch-size>10</read-batch-size>
        <topic-overload-policy>ERROR</topic-overload-policy>
        <statistics-enabled>true</statistics-enabled>
    </reliable-topic>

    <ringbuffer name="overload-error-policy">
        <capacity>10</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>30</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

Ringbufferの容量を、10に制限。

で、Listenerがメッセージを消費し切れないうちに10以上メッセージを放り込むと、エラーになります。
※Listerが遅くなるように、スリープを入れています

    @Test
    public void testOverloadError() {
        withHazelcast(3, hazelcastInstance -> {
            ITopic<String> topic = hazelcastInstance.getReliableTopic("overload-error-policy");

            List<String> receivedMessage = new ArrayList<>();
            topic.addMessageListener(message -> {
                receivedMessage.add(message.getMessageObject());

                try {
                    TimeUnit.MICROSECONDS.sleep(500L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

            assertThatThrownBy(() -> {
                        IntStream
                                .rangeClosed(1, 15)
                                .forEach(i -> topic.publish("message-" + i));

                        try {
                            TimeUnit.SECONDS.sleep(10L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
            )
                    .isInstanceOf(TopicOverloadException.class)
                    .hasMessageContaining("Failed to publish message:");
        });
    }

すると、TopicOverloadExceptionがスローされITopic#publishの呼び出しに失敗することがわかります。

topic-overload-policy DISCARD_NEWESTを試す

続いて、DISCARD_NEWESTを試します。設定は以下の通りで、Ringbufferの容量は10です。

    <reliable-topic name="overload-discard-newest">
        <read-batch-size>10</read-batch-size>
        <topic-overload-policy>DISCARD_NEWEST</topic-overload-policy>
        <statistics-enabled>true</statistics-enabled>
    </reliable-topic>

    <ringbuffer name="overload-discard-newest">
        <capacity>10</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>30</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

テストコードはこちら。メッセージを投げる方は特に待ちを入れていませんが、Listenerは1秒のスリープを入れています。

    @Test
    public void testOverloadDiscardNewest() {
        withHazelcast(3, hazelcastInstance -> {
            ITopic<String> topic = hazelcastInstance.getReliableTopic("overload-discard-newest");

            List<String> receivedMessage = new ArrayList<>();
            topic.addMessageListener(message -> {
                receivedMessage.add(message.getMessageObject());

                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

            IntStream
                    .rangeClosed(1, 15)
                    .forEach(i -> topic.publish("message-" + i));

            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            assertThat(receivedMessage)
                    .containsExactly("message-1",
                            "message-2",
                            "message-3",
                            "message-4",
                            "message-5",
                            "message-6",
                            "message-7",
                            "message-8",
                            "message-9",
                            "message-10");
        });
    }

よってRingbufferが溢れ、10を越えた分のメッセージがなくなりました、と。

topic-overload-policy DISCARD_OLDESTを試す

最後は、DISCARD_OLDEST。設定は、以下の通り。

    <reliable-topic name="overload-discard-oldest">
        <read-batch-size>10</read-batch-size>
        <topic-overload-policy>DISCARD_OLDEST</topic-overload-policy>
        <statistics-enabled>true</statistics-enabled>
    </reliable-topic>

    <ringbuffer name="overload-discard-oldest">
        <capacity>10</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>30</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
    </ringbuffer>

これなんですけど、1番てこずったというか、結局うまく動きませんでした…。

テストコードは、こちらです。Listenerはメッセージの取得ごとに、1秒のスリープを入れています。

    @Test
    public void testOverloadDiscardOldest() {
        withHazelcast(3, hazelcastInstance -> {
            ITopic<String> topic = hazelcastInstance.getReliableTopic("overload-discard-oldest");

            List<String> receivedMessage = new ArrayList<>();
            topic.addMessageListener(message -> {
                receivedMessage.add(message.getMessageObject());

                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

            IntStream
                    .rangeClosed(1, 15)
                    .forEach(i -> topic.publish("message-" + i));

            try {
                TimeUnit.SECONDS.sleep(20L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            assertThat(receivedMessage)
                    .containsExactly("message-1");
        });
    }

で、最初の部分が歯抜けになったメッセージが取れていてもよさそうなのですが、結果を見るとひとつしか受信できていません。

            assertThat(receivedMessage)
                    .containsExactly("message-1");

この時、裏ではこんな例外が飛んでいます。

重大: [172.17.0.1]:5703 [my-cluster] [3.6] sequence:1 is too small. The current headSequence is:5 tailSequence is:14
com.hazelcast.ringbuffer.StaleSequenceException: sequence:1 is too small. The current headSequence is:5 tailSequence is:14
	at com.hazelcast.ringbuffer.impl.RingbufferContainer.checkBlockableReadSequence(RingbufferContainer.java:177)
	at com.hazelcast.ringbuffer.impl.operations.ReadManyOperation.beforeRun(ReadManyOperation.java:66)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:166)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:393)
	at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.processPacket(OperationThread.java:184)
	at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.process(OperationThread.java:137)
	at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.doRun(OperationThread.java:124)
	at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.run(OperationThread.java:99)

内部で使っているRingbufferのheadのシーケンスと、今のTopicが知っている開始のシーケンスがずれてしまったので、NGだと…。

でも、それだとこのポリシー、成立しなくない??

どうなのかなぁ…。

まとめ

Hazelcast 3.6で追加された、Reliable Topicを試してみました。Ringbufferと組み合わせて使うもので、topic-overload-policyはいろいろ設定できるのですが…DISCARD_OLDESTはちょっと怪しい感じですね。

使ってみて、この手のメッセージングとか、あと分散キューとかはIn Memory Data Grid以外のものも見た方がいいんだろうなぁと思いました。

今回作成したコードは、以下に置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-reliable-topic