CLOVER🍀

That was when it all began.

Apache Geodeを組み込みなMemcached/Redis Serverとして使う

ちょっとした小ネタ的に。

Apache Geodeは、Memcached ServerおよびRedis Serverになれる機能を持っています。

How Gemcached Works | Geode Docs

Geode Redis Adapter | Geode Docs

それぞれ、GemcachedとRedis Adapterという名前のようです。

Gemcachedについては、組み込みとして使うサンプルがドキュメント上にあるのですが、Redis Adapterについては
gfsh上での操作しか記載されていません。

ですが、Redis Adapterも同じように組み込みで使うことができるようなので、合わせて動かしてみることにしました。

準備

GemcachedもRedis Adapterも、Apache Geodeのcoreモジュールに含まれています。よって、基本的にはcoreがあればOK…。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>geode-core</artifactId>
            <version>1.2.1</version>
        </dependency>

と言いたいところですが、Redis Adapterについてはoptionalな依存関係が必要になります。そちらについては、
後述することにしましょう。

MemcachedおよびRedisにアクセスするためのライブラリとしては、それぞれspymemcachedとLettuceを使用することにします。

        <dependency>
            <groupId>net.spy</groupId>
            <artifactId>spymemcached</artifactId>
            <version>2.12.3</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.0.RELEASE</version>
        </dependency>

確認はテストコードでやります。JUnitとAssertJを依存関係に加えておきます。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-launcher</artifactId>
            <version>1.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.8.0</version>
            <scope>test</scope>
        </dependency>

では、書いていってみましょう。

Memcached(Gemcached)

まずは、テストコードの雛形から。
src/test/java/org/littlewings/geode/embedded/EmbeddedMemcachedServerTest.java

package org.littlewings.geode.embedded;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.BinaryConnectionFactory;
import net.spy.memcached.MemcachedClient;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.memcached.ValueWrapper;
import org.apache.geode.memcached.GemFireMemcachedServer;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class EmbeddedMemcachedServerTest {
    // ここに、テストを書く!
}

最初にも書きましたが、Memcached Serverへのアクセスにはspymemcachedを使用します。

GitHub - couchbase/spymemcached: A simple, asynchronous, single-threaded memcached client written in java.

Gemcachedを組み込みとして使ったコード例は、こんな感じです。

    @Test
    public void gettingStarted() throws IOException, InterruptedException, ExecutionException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

        MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211));

        try {
            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            TimeUnit.SECONDS.sleep(5L);

            assertThat(client.get("key1")).isNull();

            client.set("key2", 3, "value2").get();  // Future
            assertThat(client.get("key2")).isEqualTo("value2");
            client.delete("key2").get();  // Future
            assertThat(client.get("key2")).isNull();
        } finally {
            client.shutdown();
            memcachedServer.shutdown();
        }
    }

GemFireMemcachedServerクラスのインスタンスを作成して、GemFireMemcachedServer#startするだけ。

        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

GemFireMemcachedServerのコンストラクタは2つあり、リッスンポートのみの指定と、バインドするアドレスとポート、
Memcachedで使うプロトコルを指定できるものがあります。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L123
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L142

今回は、全部指定する形を取りました。

リッスンポートは、負の値を指定するとデフォルトとして11212ポートを取ります。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L86

プロトコルは、デフォルトでASCIIです。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L130

使い終わったら、shutdownしましょう。

            memcachedServer.shutdown();

これが、基本的な使い方になります。

expireとかも効いていて、良さそうですね。

            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            TimeUnit.SECONDS.sleep(5L);

            assertThat(client.get("key1")).isNull();

            client.set("key2", 3, "value2").get();  // Future
            assertThat(client.get("key2")).isEqualTo("value2");
            client.delete("key2").get();  // Future
            assertThat(client.get("key2")).isNull();

spymemcachedのsetの戻り値がFutureになっていることを知らなくて、最初テストが不安定になってハマりました…。

Binaryプロトコルでも、問題なく動作します。

    @Test
    public void gettingStartedAsBinary() throws IOException, InterruptedException, ExecutionException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.BINARY);
        memcachedServer.start();

        MemcachedClient client =
                new MemcachedClient(
                        new BinaryConnectionFactory(),
                        AddrUtil.getAddresses("localhost:11211")
                );

        try {
            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            TimeUnit.SECONDS.sleep(5L);

            assertThat(client.get("key1")).isNull();

            client.set("key2", 3, "value2").get();  // Future
            assertThat(client.get("key2")).isEqualTo("value2");
            client.delete("key2").get();  // Future
            assertThat(client.get("key2")).isNull();
        } finally {
            client.shutdown();
            memcachedServer.shutdown();
        }
    }

また、Gemcachedが使用するRegionは、「gemcached」という名前となっています。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L69

「gemcached」Regionが存在しない場合はPartition Regionとして作成されるようになっています。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/internal/memcached/commands/AbstractCommand.java#L144-L153

よって、「gemcached」Regionをカスタマイズしたい場合は、cache.xmlに「gemcached」Regionの定義をすることに
なるでしょう。

例えば、こんな感じに。

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://geode.apache.org/schema/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
        version="1.0">
    <region name="gemcached" refid="PARTITION_REDUNDANT"/>
</cache>

特に設定ファイルを与える方法がないので、「cache.xml」のファイル名を指定させるにはシステムプロパティを
使うことになりますねぇ…。

「gemcached」Regionに、データが入っていることを確認してみましょう。

    @Test
    public void underlyingRegion() throws IOException, InterruptedException, ExecutionException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

        MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211));

        try {
            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            Cache cache = CacheFactory.getAnyInstance();
            Region<String, ValueWrapper> region = cache.getRegion("gemcached");
            assertThat(new String(region.get("key1").getValue(), StandardCharsets.UTF_8))
                    .isEqualTo("value1");
        } finally {
            client.shutdown();
            memcachedServer.shutdown();
        }
    }

確かに「gemcached」Regionにデータが入っているようです。

Memcached(Gemcached)については、こんな感じです。

Redis(Redis Adapter)

続いて、Redis Adapterへ。

Redis Adapterを使用するには、依存関係としてSpring Shellが必要になります。というわけで、依存関係を追加します。

        <!-- GeodeRedisServer only -->
        <dependency>
          <groupId>org.springframework.shell</groupId>
          <artifactId>spring-shell</artifactId>
          <version>1.2.0.RELEASE</version>
          <exclusions>
            <exclusion>
              <artifactId>cglib</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>asm</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>spring-aop</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>guava</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>aopalliance</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>spring-context-support</artifactId>
              <groupId>*</groupId>
            </exclusion>
          </exclusions>
        </dependency>

いろいろexcludeしてあるのは、Apache Geode側の設定そのままに倣いました。
https://github.com/apache/geode/blob/develop/geode-core/build.gradle#L110-L118

テストコードの雛形としては、こんな感じに。
src/test/java/org/littlewings/geode/embedded/EmbeddedRedisServerTest.java

package org.littlewings.geode.embedded;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.redis.GeodeRedisServer;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class EmbeddedRedisServerTest {
    // ここに、テストを書く!
}

Redisへのアクセスには、Lettuceを使用します。

Lettuce

Redis Adapterを組み込みとして使ったコードは、こんな感じになります。

    @Test
    public void gettingStarted() throws InterruptedException {
        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");

            syncCommands.set("key2", "value2,", SetArgs.Builder.ex(3));
            TimeUnit.SECONDS.sleep(5L);
            assertThat(syncCommands.get("key2")).isNull();
        } finally {
            connection.close();
            redisClient.shutdown();
            redisServer.shutdown();
        }
    }

※StatefulRedisConnectionのみCloseableなのですが、他のクラスと合わせた形で書きました…

Gemcachedと似た感じですね。GeodeRedisServerというクラスを使用します。

        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

コンストラクタには、リッスンポートのみ、バインドするアドレスとポート、バインドするアドレスとポートに加えてログレベルを指定できる、
3つのコンストラクタがあります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L301-L364

今回は、バインドするアドレスとポートを指定。そして、GeodeRedisServer#startを呼び出します。

        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

ポートについては負の値を指定すると、デフォルトのポートとして6379を取ります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L156

利用が終わったら、GeodeRedisServer#shutdownを呼び出します。

            redisServer.shutdown();

アクセス自体は、Lettuceを使って確認。こちらもexpireなどは良さそうですね。

        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");

            syncCommands.set("key2", "value2,", SetArgs.Builder.ex(3));
            TimeUnit.SECONDS.sleep(5L);
            assertThat(syncCommands.get("key2")).isNull();
        } finally {
            connection.close();
            redisClient.shutdown();
            redisServer.shutdown();
        }

GeodeRedisServerに保存されたデータは、こちらも内部的にはRegionとして保持されますが、いくつか種類があります。

とりあえず、Stringとして保存されるRegionを確認してみましょう。

    @Test
    public void underlyging() throws InterruptedException {
        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");

            syncCommands.set("key2", "value2", SetArgs.Builder.ex(3));
            assertThat(syncCommands.get("key2")).isEqualTo("value2");

            Cache cache = CacheFactory.getAnyInstance();
            Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion = cache.getRegion("ReDiS_StRiNgS");
            assertThat(stringsRegion.get(new ByteArrayWrapper("key1".getBytes(StandardCharsets.UTF_8))))
                    .isEqualTo(new ByteArrayWrapper("value1".getBytes(StandardCharsets.UTF_8)));
            assertThat(stringsRegion.get(new ByteArrayWrapper("key2".getBytes(StandardCharsets.UTF_8))))
                    .isEqualTo(new ByteArrayWrapper("value2".getBytes(StandardCharsets.UTF_8)));
        } finally {
            connection.close();
            redisClient.shutdown();
            redisServer.shutdown();
        }
    }

「ReDiS_StRiNgS」というRegionに、データが保存されています。

RegionはString、HyperLogLogs、メタデータの3種類を使用します。

StringとHyperLogLogs用のRegionは、デフォルトでPartition Regionとなります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L270

変更したい場合は、システムプロパティ「gemfireredis.regiontype」で指定するか、あらかじめRegionを定義しておきましょう。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L425-L434

メタデータ用のRegionは、デフォルトでReplicate Regionとなります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L435-L442

こちらも、変更したい場合はあらかじめRegionを定義しておきましょう。

それぞれのRegionの名前は、けっこうすごいことになっていますが…ソースコードを見ましょう。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L223-L239

Gemcachedと同様、GeodeRedisServerにCacheXML自体を指定する手段はなさそうなので、CacheXML名を変えたい場合はシステムプロパティを
使うことになりますね。

注意点

とまあ、こんな感じに使えそうなRedis Adapterですが、注意点もあるようです。

How the Redis Adapter Works

サポートしているRedisのコマンドは、GeodeRedisServerのJavadocを確認しましょう。

GeodeRedisServer (Apache Geode 1.6.0)

とはいえ、ドキュメントに

The Geode Redis Adapter supports all Redis commands for each of the Redis data structures. (See the Javadocs for the GemFireRedisServer class for a detailed list.)

http://geode.apache.org/docs/guide/12/tools_modules/redis_adapter.html#how-the-redis-adapter-wo

とあるので、ふつうに使う分には困らないのではないかと。

ただ、いくつかRedisと異なる振る舞いをするケースがあるようです。

  • キーを削除した際に返ってくる削除されたエントリ数は、ローカルNodeでの数(クラスタ全体ではない)。ただし、エントリは削除されている
  • Setに新しいメンバーを追加した際の戻り値が、不定になる。コマンド自体はRedisプロトコルと同様に動くが、戻ってくる数は設定した結果が反映されているとは限らない
  • トランザクションは、ローカルNodeに収まる範囲で実行する必要がある(Nodeをまたがる場合はNG、またトランザクションが有効になっていない永続域内にあるデータに対しても、トランザクションは実行できない)
  • デフォルトでApache Geodeトランザクション内のキーをすべて監視するため、キーをWATCHまたはUNWATCHすることができない

SetとWATCH/UNWATCH以外については、Nodeをまたがった場合にRedisと動きが異なる感じになるみたいですね。
このあたりが許容できるのであれば、Redis AdapterをRedisの代わりとして使えるのでしょう。

一緒に使う

で、ここまで書いていると、GemcachedとRedis Adapterは別々に使うことになるというか、同時に使用できない?というような気もしますが
(両方とも、Server#startしますし)、そんなことはなさそうです。

両方合わせて使ってみました。
src/test/java/org/littlewings/geode/embedded/EmbeddedMemcachedRedisServerTest.java

package org.littlewings.geode.embedded;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import net.spy.memcached.MemcachedClient;
import org.apache.geode.memcached.GemFireMemcachedServer;
import org.apache.geode.redis.GeodeRedisServer;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class EmbeddedMemcachedRedisServerTest {
    @Test
    public void mixedServer() throws IOException, ExecutionException, InterruptedException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

        MemcachedClient memcachedClient = new MemcachedClient(new InetSocketAddress("localhost", 11211));
        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            memcachedClient.set("key1", 3, "value1").get();  // Future
            assertThat(memcachedClient.get("key1")).isEqualTo("value1");

            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");
        } finally {
            redisClient.shutdown();
            memcachedClient.shutdown();
            redisClient.shutdown();
            memcachedServer.shutdown();
        }
    }
}

どちらのServerも、Apache GeodeのCacheがなければ作成し、あればそちらを使用するように実装されているからです。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L167-L171
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L401-L412

Apache Geode自体は、ひとつのJavaVM上で複数のCacheを作成することは許容していません。

なので、こういう実装になっていればうまくいきますよね、と。

まとめ

Apache Geodeが提供する、MemcachedおよびRedisのServerとしての機能を、Javaアプリケーションに組み込んで使ってみました。

内部のコードも少し追いつつ、各クライアントライブラリで確認できたので、まあ良しとしましょう。

こうやって組み込んで使えるのは便利ですが、ちょっと依存ライブラリが多いのが難点でしょうかねぇ…。

## 「mvn depencency:tree」のApache Geode+Spring Shellの部分の結果
[INFO] +- org.apache.geode:geode-core:jar:1.2.1:compile
[INFO] |  +- com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] |  +- org.jgroups:jgroups:jar:3.6.10.Final:compile
[INFO] |  +- antlr:antlr:jar:2.7.7:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.8.6:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.8.6:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.8.6:compile
[INFO] |  +- commons-io:commons-io:jar:2.5:compile
[INFO] |  +- commons-lang:commons-lang:jar:2.6:compile
[INFO] |  +- it.unimi.dsi:fastutil:jar:7.1.0:compile
[INFO] |  +- javax.resource:javax.resource-api:jar:1.7:compile
[INFO] |  |  \- javax.transaction:javax.transaction-api:jar:1.2:compile
[INFO] |  +- net.java.dev.jna:jna:jar:4.0.0:compile
[INFO] |  +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:compile
[INFO] |  +- org.apache.logging.log4j:log4j-api:jar:2.7:compile
[INFO] |  +- org.apache.logging.log4j:log4j-core:jar:2.7:compile
[INFO] |  +- org.apache.shiro:shiro-core:jar:1.3.2:compile
[INFO] |  |  \- org.slf4j:slf4j-api:jar:1.6.4:compile
[INFO] |  +- commons-beanutils:commons-beanutils:jar:1.9.3:compile
[INFO] |  |  +- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] |  +- io.github.lukehutch:fast-classpath-scanner:jar:2.0.11:compile
[INFO] |  +- org.apache.geode:geode-common:jar:1.2.1:compile
[INFO] |  \- org.apache.geode:geode-json:jar:1.2.1:compile
[INFO] +- org.springframework.shell:spring-shell:jar:1.2.0.RELEASE:compile
[INFO] |  +- jline:jline:jar:2.12:compile
[INFO] |  \- org.springframework:spring-core:jar:4.2.4.RELEASE:compile