Hazelcast 3.5から、Ringbufferというものが追加されていました。
全然気付いてなかったので、ちょっと試してみることにします。
Ringbufferとは?
ドキュメントによれば、いくらかの容量を持った循環配列?と思えばいいらしいです。tailとheadを持ち、要素を追加していくと最初(=head)のものは上書きされるか、expireされます。
Ringbufferの要素を使用する際には、Sequence IDを使用し、これはRingbufferから取得することができます。Ringbuffer内に格納されている要素は、headからtailの間にマップされていることになります。
現在のHazelcastの持つデータ構造としては、Queueが比較対象になるようです。
QueueとRingbufferの違いは、こんな感じみたいです。
- Ringbufferは要素を削除できない
- Ringbufferは決まった位置(=Sequence IDで指定した)要素しか取得することができない
代わりに、Queueに比べてのアドバンテージとしては
- 同じ要素を複数回取得することができ、マルチスレッドでアクセス可能。これは、read-at-least-once または read-at-most-onceを実現するのに役立つだろう
- 同じ要素をマルチスレッドで読むことができる。これ自体はスレッドごとにQueueを使えば可能だが、リモート処理の効率が悪い。また、Queueからの取得(take)は破壊的であるため、その変更をバックアップ先にも適用する必要があり、Ringbufferのreadよりも適用コストがかかる
- このため、Ringbufferは変更されないので、読み取りによるレプリケーションを要求されない
- Ringbufferは読み込み、書き込みともにバッチ処理を持つため、パフォーマンスを改善することができる
ということみたいです。
こんな感じに説明されているRingbufferですが、ドキュメントを読んだだけだとよくわからないので、使っていってみます。
なお、ListやSet、Queueなどと同様、Ringbufferに格納されるデータは、ひとつのNodeのメモリサイズの範囲になりそうな感じがします。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/RingbufferProxy.java#L72
準備
Maven依存関係は、このように定義。
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>3.5.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.2.0</version> <scope>test</scope> </dependency>
Hazelcast 3.5.3を使用、JUnitとAssertJはテスト用です。
Hazelcastの設定ファイルも用意しました。
src/test/resources/hazelcast.xml
<?xml version="1.0" encoding="UTF-8"?> <hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.5.xsd" xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <group> <name>my-cluster</name> <password>my-password</password> </group> <network> <port auto-increment="true" port-count="100">5701</port> <join> <multicast enabled="true"> <multicast-group>224.2.2.3</multicast-group> <multicast-port>54327</multicast-port> </multicast> <tcp-ip enabled="false"/> </join> </network> <!-- 後で --> </hazelcast>
Ringbufferの設定については、また後で。
あとは、テスト用の雛形。
src/test/java/org/littlewings/hazelcast/ringbuffer/HazelcastRingbufferTest.java
package org.littlewings.hazelcast.ringbuffer; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import com.hazelcast.config.ClasspathXmlConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ICompletableFuture; import com.hazelcast.ringbuffer.OverflowPolicy; import com.hazelcast.ringbuffer.ReadResultSet; import com.hazelcast.ringbuffer.Ringbuffer; import com.hazelcast.ringbuffer.StaleSequenceException; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.fail; public class HazelcastRingbufferTest { // ここに、テストを書く! protected void withHazelcast(int numInstances, Consumer<HazelcastInstance> consumer) { List<HazelcastInstance> hazelcastInstances = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml"))) .collect(Collectors.toList()); try { consumer.accept(hazelcastInstances.get(0)); } finally { hazelcastInstances.forEach(h -> h.getLifecycleService().shutdown()); Hazelcast.shutdownAll(); } } }
複数Nodeの起動停止ができるようにしたのですが、いろいろあって1 Node分しか使いませんでしたけど…。
使ってみる
それでは、Ringbufferを使ってみます。
最初にできたコードは、こんな感じです。
@Test public void testSimpleRingbuffer() { withHazelcast(1, hazelcast -> { Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("default"); IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i)); assertThat(ringbuffer.size()) .isEqualTo(100L); long headSequence = ringbuffer.headSequence(); assertThat(headSequence) .isEqualTo(0L); try { long currentSequence = headSequence; assertThat(ringbuffer.readOne(currentSequence)) .isEqualTo("element1"); currentSequence++; String lastElement = null; while (ringbuffer.size() > 0) { lastElement = ringbuffer.readOne(currentSequence); assertThat(lastElement) .isNotNull(); if (currentSequence == 99) { break; } currentSequence++; } assertThat(lastElement) .isEqualTo("element100"); assertThat(ringbuffer.headSequence()) .isEqualTo(0L); assertThat(ringbuffer.tailSequence()) .isEqualTo(99L); } catch (InterruptedException e) { fail(e.getMessage()); } }); }
ここでは、デフォルト設定のRingbufferを取得して、要素を100個追加しています。
Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("default"); IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));
この時のサイズは100になり、headのSequence IDは0となります。
assertThat(ringbuffer.size()) .isEqualTo(100L); long headSequence = ringbuffer.headSequence(); assertThat(headSequence) .isEqualTo(0L);
要素を取得する時は、Ringbuffer#readOneで。
long currentSequence = headSequence; assertThat(ringbuffer.readOne(currentSequence)) .isEqualTo("element1");
Sequence IDを自分でインクリメントしていくことで、後続の要素を取得することができます。
currentSequence++; String lastElement = null; while (ringbuffer.size() > 0) { lastElement = ringbuffer.readOne(currentSequence); assertThat(lastElement) .isNotNull(); if (currentSequence == 99) { break; } currentSequence++; } assertThat(lastElement) .isEqualTo("element100");
この時、headは0、tailは99です。
assertThat(ringbuffer.headSequence()) .isEqualTo(0L); assertThat(ringbuffer.tailSequence()) .isEqualTo(99L);
複数要素を読む
今度は、複数の要素を一気に取得してみます。
初期化は先ほどのコードと一緒なので端折りまして、複数要素の取得はRingbuffer#readManyAsyncを使用します。
long currentSequence = headSequence; assertThat(ringbuffer.readOne(currentSequence)) .isEqualTo("element1"); currentSequence++; ICompletableFuture<ReadResultSet<String>> completableFuture = ringbuffer .readManyAsync(currentSequence, 1, 10, null);
複数追加は今回紹介しませんが、こういう複数要素に対数する操作は、Asyncとなるようです。
結果は、ICompletableFutureにReadResultSetが包まれて返ってくるので、getして確認。
ReadResultSet<String> readResultSet = completableFuture.get(); int readCount = readResultSet.readCount(); List<String> resultElements = IntStream .range(0, readCount) .mapToObj(i -> readResultSet.get(i)) .collect(Collectors.toList()); List<String> verifyResultElements = IntStream .range(2, readCount + 2) .mapToObj(i -> "element" + i) .collect(Collectors.toList()); assertThat(resultElements) .isEqualTo(verifyResultElements);
キャパシティを超える場合
Ringbufferのデフォルトのキャパシティは10,000だそうなのですが、キャパシティを超えるとどうなるのかという話。
10,000要素はちょっと多いので、キャパシティを小さくしたRingbufferを用意。
<ringbuffer name="small-ringbuffer"> <capacity>10</capacity> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> <time-to-live-seconds>0</time-to-live-seconds> <in-memory-format>BINARY</in-memory-format> </ringbuffer>
capacity要素が10です。
このRingbufferに対して、要素を100個登録します。
Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer"); IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i)); assertThat(ringbuffer.size()) .isEqualTo(10L); long headSequence = ringbuffer.headSequence(); assertThat(headSequence) .isEqualTo(90L);
すると、サイズは10に、headは90となります。
headからの要素が、順次上書きされた??感じになる、と。
long currentSequence = headSequence; assertThat(ringbuffer.readOne(currentSequence)) .isEqualTo("element91"); currentSequence++; String lastElement = null; while (ringbuffer.size() > 0) { lastElement = ringbuffer.readOne(currentSequence); assertThat(lastElement) .isNotNull(); if (currentSequence == 99) { break; } currentSequence++; } assertThat(lastElement) .isEqualTo("element100");
headは90、tailは99ということになっています。
assertThat(ringbuffer.headSequence()) .isEqualTo(90L); assertThat(ringbuffer.tailSequence()) .isEqualTo(99L);
サイズ、キャパシティとtime-to-live(TTL)と
Ringbufferには、size、capacity、remainingCapacityなどのメソッドがあるので、これを登録した要素に応じてどう変動するかを見てみます。
http://docs.hazelcast.org/docs/3.5/manual/html-single/hazelcast-documentation.html#capacity:Capacity
利用するRingbufferは、デフォルト設定のもので要素を100個登録。
Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("default"); IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i));
sizeが100、capacityが10,000(デフォルト)、remainingCapacityが10,000。
assertThat(ringbuffer.size()) .isEqualTo(100L); long headSequence = ringbuffer.headSequence(); assertThat(headSequence) .isEqualTo(0L); long tailSequence = ringbuffer.tailSequence(); assertThat(tailSequence) .isEqualTo(99L); assertThat(ringbuffer.capacity()) .isEqualTo(10000L); assertThat(ringbuffer.remainingCapacity()) .isEqualTo(10000L);
remainingCapacityとcapacityが同じ?と思ってコードを読むと、time-to-live(TTL)を設定しないとcapacityと同じ値を返すようになっているみたいです。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/RingbufferProxy.java#L116
というわけで、有効期限付きのRingbufferを用意。
<ringbuffer name="with-expired"> <capacity>10000</capacity> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> <time-to-live-seconds>3</time-to-live-seconds> <in-memory-format>BINARY</in-memory-format> </ringbuffer>
TTLを3秒とします。
このRingbufferを使って、確認してみます。
Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("with-expired"); IntStream.rangeClosed(1, 100).forEach(i -> ringbuffer.add("element" + i)); assertThat(ringbuffer.size()) .isEqualTo(100L); long headSequence = ringbuffer.headSequence(); assertThat(headSequence) .isEqualTo(0L); long tailSequence = ringbuffer.tailSequence(); assertThat(tailSequence) .isEqualTo(99L); assertThat(ringbuffer.capacity()) .isEqualTo(10000L); assertThat(ringbuffer.remainingCapacity()) .isEqualTo(9900L);
すると、確かにremainingCapacityが登録した要素分だけ減少しました。
せっかくなので、このままTTLの確認もしてみます。
まずは、普通に要素が読めることを確認します。
long currentSequence = headSequence; assertThat(ringbuffer.readOne(currentSequence)) .isEqualTo("element1"); currentSequence++; ICompletableFuture<ReadResultSet<String>> completableFuture = ringbuffer .readManyAsync(currentSequence, 5, 10, null); ReadResultSet<String> readResultSet = completableFuture.get(); int readCount = readResultSet.readCount(); currentSequence += readCount;
headは0でしたね。
この後、5秒待ちます。
TimeUnit.SECONDS.sleep(5);
すると、先ほどのSequence IDからすぐ後ろの要素を取得しようとすると、例外が飛ぶようになります。
long seq1 = currentSequence++; assertThatThrownBy(() -> ringbuffer.readOne(seq1)) .isInstanceOf(StaleSequenceException.class); long seq2 = currentSequence++; assertThatThrownBy(() -> ringbuffer.readOne(seq2)) .isInstanceOf(StaleSequenceException.class);
どういう状態になっているかというと、headは100に移動し、tailは(なぜか)99、remainingCapacityはcapacityは同じ値になります。
assertThat(ringbuffer.headSequence()) .isEqualTo(100L); assertThat(ringbuffer.tailSequence()) .isEqualTo(99L); assertThat(ringbuffer.capacity()) .isEqualTo(10000L); assertThat(ringbuffer.remainingCapacity()) .isEqualTo(10000L);
Overflow Policy
今度は、Overflow Policy。
RingBuffer#addAsync、addAllAsyncを使うと、キャパシティを超えたときの挙動を以下の2つから選ぶことができます。
- OverflowPolicy.OVERWRITE … 古い要素を上書き
- verflowPolicy.FAIL … 呼び出しはアボートし、戻り値は-1となる。要素の追加は失敗する
こちらを確認してみます。
使用するRingbufferの定義は、
<ringbuffer name="small-ringbuffer"> <capacity>10</capacity> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> <time-to-live-seconds>0</time-to-live-seconds> <in-memory-format>BINARY</in-memory-format> </ringbuffer>
とします。
で、こちらを使って確認。OverflowPolicyは、FAILとします。
Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer"); List<ICompletableFuture<Long>> futures = IntStream .rangeClosed(1, 100) .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.FAIL)) .collect(Collectors.toList()); List<Long> results = futures .stream() .map(f -> { try { return f.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }) .collect(Collectors.toList());
サイズは100になりました。
assertThat(results)
.hasSize(100);
ところがですね、ドキュメント上は「-1」になると言っているのに、この結果に含まれる戻り値に「-1」はありません…。
※こちらは後述
assertThat(results)
.doesNotContain(-1L);
Ringbufferへの格納要素としては、headが90、tailが99で、最後に登録した要素が入っている感じですね。
long headSequence = ringbuffer.headSequence(); assertThat(ringbuffer.readOne(headSequence)) .isEqualTo("element91"); assertThat(headSequence) .isEqualTo(90L); long tailSequence = ringbuffer.tailSequence(); assertThat(ringbuffer.readOne(tailSequence)) .isEqualTo("element100"); assertThat(tailSequence) .isEqualTo(99L);
って、あれ?これって、OVERWRITEでは…。
というわけで、OVERWRITEでも確認すると、まったく同じ挙動になります。
withHazelcast(1, hazelcast -> { Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer"); List<ICompletableFuture<Long>> futures = IntStream .rangeClosed(1, 100) .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.OVERWRITE)) .collect(Collectors.toList()); List<Long> results = futures .stream() .map(f -> { try { return f.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }) .collect(Collectors.toList()); assertThat(results) .hasSize(100); assertThat(results) .doesNotContain(-1L); try { long headSequence = ringbuffer.headSequence(); assertThat(ringbuffer.readOne(headSequence)) .isEqualTo("element91"); assertThat(headSequence) .isEqualTo(90L); long tailSequence = ringbuffer.tailSequence(); assertThat(ringbuffer.readOne(tailSequence)) .isEqualTo("element100"); assertThat(tailSequence) .isEqualTo(99L); } catch (InterruptedException e) { fail(e.getMessage()); } });
この挙動ですが、さらにTTLを設定すると変化があります。
というわけで、こんなRingbufferを用意。
<ringbuffer name="small-ringbuffer-expired"> <capacity>10</capacity> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> <time-to-live-seconds>10</time-to-live-seconds> <in-memory-format>BINARY</in-memory-format> </ringbuffer>
10秒のTTLです。
すると、FAILだと最初に登録した要素しか残らなくなります。
@Test public void testRingbufferOverflowFailWithExpiry() { withHazelcast(1, hazelcast -> { Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer-expired"); List<ICompletableFuture<Long>> futures = IntStream .rangeClosed(1, 100) .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.FAIL)) .collect(Collectors.toList()); List<Long> results = futures .stream() .map(f -> { try { return f.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }) .collect(Collectors.toList()); assertThat(results) .hasSize(100); assertThat(results) .contains(-1L); try { long headSequence = ringbuffer.headSequence(); assertThat(ringbuffer.readOne(headSequence)) .isEqualTo("element1"); assertThat(headSequence) .isEqualTo(0L); long tailSequence = ringbuffer.tailSequence(); assertThat(ringbuffer.readOne(tailSequence)) .isEqualTo("element10"); assertThat(tailSequence) .isEqualTo(9L); } catch (InterruptedException e) { fail(e.getMessage()); } }); }
Ringbuffer#addAsyncの結果に、「-1」が含まれるようになります。
さらに、headが0でtailは9となります。
long tailSequence = ringbuffer.tailSequence(); assertThat(ringbuffer.readOne(tailSequence)) .isEqualTo("element10"); assertThat(tailSequence) .isEqualTo(9L);
OVERRITEの場合は、そう変わりませんね。
@Test public void testRingbufferOverflowOverwriteWithExpiry() { withHazelcast(1, hazelcast -> { Ringbuffer<String> ringbuffer = hazelcast.getRingbuffer("small-ringbuffer-expired"); List<ICompletableFuture<Long>> futures = IntStream .rangeClosed(1, 100) .mapToObj(i -> ringbuffer.addAsync("element" + i, OverflowPolicy.OVERWRITE)) .collect(Collectors.toList()); List<Long> results = futures .stream() .map(f -> { try { return f.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }) .collect(Collectors.toList()); assertThat(results) .hasSize(100); assertThat(results) .doesNotContain(-1L); try { long headSequence = ringbuffer.headSequence(); assertThat(ringbuffer.readOne(headSequence)) .isEqualTo("element91"); assertThat(headSequence) .isEqualTo(90L); long tailSequence = ringbuffer.tailSequence(); assertThat(ringbuffer.readOne(tailSequence)) .isEqualTo("element100"); assertThat(tailSequence) .isEqualTo(99L); } catch (InterruptedException e) { fail(e.getMessage()); } }); }
ただ、FAILポリシーにした時に複数Nodeでクラスタを構成すると、headで取得できる要素がずれたりしたので、何かあるかも…。
OverflowPolicy.FAILを使ってもTTLを設定しないとドキュメント通りの動きにならないのは、OverflowPolicy.FAILを設定した時の判定条件がRingbuffer#remainingCapacityに依存しているからみたいです。
if (overflowPolicy == FAIL) { if (ringbuffer.remainingCapacity() < 1) { resultSequence = -1; return; } }
まとめ
Hazelcast 3.5から追加された、Ringbufferを試してみました。
TTLを合わせた時にしか効かない設定がいろいろあって、ちょっと挙動がよくわかりませんが…とりあえず、使い方はなんとなくわかった気がします。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-ringbuffer