ちょっとした小ネタ的に。
Apache Geodeは、Memcached ServerおよびRedis Serverになれる機能を持っています。
How Gemcached Works | Geode Docs
Geode Redis Adapter | Geode Docs
それぞれ、GemcachedとRedis Adapterという名前のようです。
Gemcachedについては、組み込みとして使うサンプルがドキュメント上にあるのですが、Redis Adapterについては
gfsh上での操作しか記載されていません。
ですが、Redis Adapterも同じように組み込みで使うことができるようなので、合わせて動かしてみることにしました。
準備
GemcachedもRedis Adapterも、Apache Geodeのcoreモジュールに含まれています。よって、基本的にはcoreがあればOK…。
<dependency> <groupId>org.apache.geode</groupId> <artifactId>geode-core</artifactId> <version>1.2.1</version> </dependency>
と言いたいところですが、Redis Adapterについてはoptionalな依存関係が必要になります。そちらについては、
後述することにしましょう。
MemcachedおよびRedisにアクセスするためのライブラリとしては、それぞれspymemcachedとLettuceを使用することにします。
<dependency> <groupId>net.spy</groupId> <artifactId>spymemcached</artifactId> <version>2.12.3</version> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.0.0.RELEASE</version> </dependency>
確認はテストコードでやります。JUnitとAssertJを依存関係に加えておきます。
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.0.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.0.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-launcher</artifactId> <version>1.0.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.8.0</version> <scope>test</scope> </dependency>
では、書いていってみましょう。
Memcached(Gemcached)
まずは、テストコードの雛形から。
src/test/java/org/littlewings/geode/embedded/EmbeddedMemcachedServerTest.java
package org.littlewings.geode.embedded; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import net.spy.memcached.AddrUtil; import net.spy.memcached.BinaryConnectionFactory; import net.spy.memcached.MemcachedClient; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; import org.apache.geode.internal.memcached.ValueWrapper; import org.apache.geode.memcached.GemFireMemcachedServer; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class EmbeddedMemcachedServerTest { // ここに、テストを書く! }
最初にも書きましたが、Memcached Serverへのアクセスにはspymemcachedを使用します。
Gemcachedを組み込みとして使ったコード例は、こんな感じです。
@Test public void gettingStarted() throws IOException, InterruptedException, ExecutionException { GemFireMemcachedServer memcachedServer = new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII); memcachedServer.start(); MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211)); try { client.set("key1", 3, "value1").get(); // Future assertThat(client.get("key1")).isEqualTo("value1"); TimeUnit.SECONDS.sleep(5L); assertThat(client.get("key1")).isNull(); client.set("key2", 3, "value2").get(); // Future assertThat(client.get("key2")).isEqualTo("value2"); client.delete("key2").get(); // Future assertThat(client.get("key2")).isNull(); } finally { client.shutdown(); memcachedServer.shutdown(); } }
GemFireMemcachedServerクラスのインスタンスを作成して、GemFireMemcachedServer#startするだけ。
GemFireMemcachedServer memcachedServer = new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII); memcachedServer.start();
GemFireMemcachedServerのコンストラクタは2つあり、リッスンポートのみの指定と、バインドするアドレスとポート、
Memcachedで使うプロトコルを指定できるものがあります。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L123
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L142
今回は、全部指定する形を取りました。
リッスンポートは、負の値を指定するとデフォルトとして11212ポートを取ります。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L86
プロトコルは、デフォルトでASCIIです。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L130
使い終わったら、shutdownしましょう。
memcachedServer.shutdown();
これが、基本的な使い方になります。
expireとかも効いていて、良さそうですね。
client.set("key1", 3, "value1").get(); // Future assertThat(client.get("key1")).isEqualTo("value1"); TimeUnit.SECONDS.sleep(5L); assertThat(client.get("key1")).isNull(); client.set("key2", 3, "value2").get(); // Future assertThat(client.get("key2")).isEqualTo("value2"); client.delete("key2").get(); // Future assertThat(client.get("key2")).isNull();
spymemcachedのsetの戻り値がFutureになっていることを知らなくて、最初テストが不安定になってハマりました…。
Binaryプロトコルでも、問題なく動作します。
@Test public void gettingStartedAsBinary() throws IOException, InterruptedException, ExecutionException { GemFireMemcachedServer memcachedServer = new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.BINARY); memcachedServer.start(); MemcachedClient client = new MemcachedClient( new BinaryConnectionFactory(), AddrUtil.getAddresses("localhost:11211") ); try { client.set("key1", 3, "value1").get(); // Future assertThat(client.get("key1")).isEqualTo("value1"); TimeUnit.SECONDS.sleep(5L); assertThat(client.get("key1")).isNull(); client.set("key2", 3, "value2").get(); // Future assertThat(client.get("key2")).isEqualTo("value2"); client.delete("key2").get(); // Future assertThat(client.get("key2")).isNull(); } finally { client.shutdown(); memcachedServer.shutdown(); } }
また、Gemcachedが使用するRegionは、「gemcached」という名前となっています。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L69
「gemcached」Regionが存在しない場合はPartition Regionとして作成されるようになっています。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/internal/memcached/commands/AbstractCommand.java#L144-L153
よって、「gemcached」Regionをカスタマイズしたい場合は、cache.xmlに「gemcached」Regionの定義をすることに
なるでしょう。
例えば、こんな感じに。
<?xml version="1.0" encoding="UTF-8"?> <cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd" version="1.0"> <region name="gemcached" refid="PARTITION_REDUNDANT"/> </cache>
特に設定ファイルを与える方法がないので、「cache.xml」のファイル名を指定させるにはシステムプロパティを
使うことになりますねぇ…。
「gemcached」Regionに、データが入っていることを確認してみましょう。
@Test public void underlyingRegion() throws IOException, InterruptedException, ExecutionException { GemFireMemcachedServer memcachedServer = new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII); memcachedServer.start(); MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211)); try { client.set("key1", 3, "value1").get(); // Future assertThat(client.get("key1")).isEqualTo("value1"); Cache cache = CacheFactory.getAnyInstance(); Region<String, ValueWrapper> region = cache.getRegion("gemcached"); assertThat(new String(region.get("key1").getValue(), StandardCharsets.UTF_8)) .isEqualTo("value1"); } finally { client.shutdown(); memcachedServer.shutdown(); } }
確かに「gemcached」Regionにデータが入っているようです。
Memcached(Gemcached)については、こんな感じです。
Redis(Redis Adapter)
続いて、Redis Adapterへ。
Redis Adapterを使用するには、依存関係としてSpring Shellが必要になります。というわけで、依存関係を追加します。
<!-- GeodeRedisServer only -->
<dependency>
<groupId>org.springframework.shell</groupId>
<artifactId>spring-shell</artifactId>
<version>1.2.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>cglib</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<artifactId>asm</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<artifactId>spring-aop</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<artifactId>aopalliance</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<artifactId>spring-context-support</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
いろいろexcludeしてあるのは、Apache Geode側の設定そのままに倣いました。
https://github.com/apache/geode/blob/develop/geode-core/build.gradle#L110-L118
テストコードの雛形としては、こんな感じに。
src/test/java/org/littlewings/geode/embedded/EmbeddedRedisServerTest.java
package org.littlewings.geode.embedded; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisCommandExecutionException; import io.lettuce.core.RedisConnectionException; import io.lettuce.core.SetArgs; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; import org.apache.geode.redis.GeodeRedisServer; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class EmbeddedRedisServerTest { // ここに、テストを書く! }
Redisへのアクセスには、Lettuceを使用します。
Redis Adapterを組み込みとして使ったコードは、こんな感じになります。
@Test public void gettingStarted() throws InterruptedException { GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379); redisServer.start(); RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0"); StatefulRedisConnection<String, String> connection = redisClient.connect(); try { RedisCommands<String, String> syncCommands = connection.sync(); syncCommands.set("key1", "value1"); assertThat(syncCommands.get("key1")).isEqualTo("value1"); syncCommands.set("key2", "value2,", SetArgs.Builder.ex(3)); TimeUnit.SECONDS.sleep(5L); assertThat(syncCommands.get("key2")).isNull(); } finally { connection.close(); redisClient.shutdown(); redisServer.shutdown(); } }
※StatefulRedisConnectionのみCloseableなのですが、他のクラスと合わせた形で書きました…
Gemcachedと似た感じですね。GeodeRedisServerというクラスを使用します。
GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379); redisServer.start();
コンストラクタには、リッスンポートのみ、バインドするアドレスとポート、バインドするアドレスとポートに加えてログレベルを指定できる、
3つのコンストラクタがあります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L301-L364
今回は、バインドするアドレスとポートを指定。そして、GeodeRedisServer#startを呼び出します。
GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379); redisServer.start();
ポートについては負の値を指定すると、デフォルトのポートとして6379を取ります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L156
利用が終わったら、GeodeRedisServer#shutdownを呼び出します。
redisServer.shutdown();
アクセス自体は、Lettuceを使って確認。こちらもexpireなどは良さそうですね。
RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0"); StatefulRedisConnection<String, String> connection = redisClient.connect(); try { RedisCommands<String, String> syncCommands = connection.sync(); syncCommands.set("key1", "value1"); assertThat(syncCommands.get("key1")).isEqualTo("value1"); syncCommands.set("key2", "value2,", SetArgs.Builder.ex(3)); TimeUnit.SECONDS.sleep(5L); assertThat(syncCommands.get("key2")).isNull(); } finally { connection.close(); redisClient.shutdown(); redisServer.shutdown(); }
GeodeRedisServerに保存されたデータは、こちらも内部的にはRegionとして保持されますが、いくつか種類があります。
とりあえず、Stringとして保存されるRegionを確認してみましょう。
@Test public void underlyging() throws InterruptedException { GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379); redisServer.start(); RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0"); StatefulRedisConnection<String, String> connection = redisClient.connect(); try { RedisCommands<String, String> syncCommands = connection.sync(); syncCommands.set("key1", "value1"); assertThat(syncCommands.get("key1")).isEqualTo("value1"); syncCommands.set("key2", "value2", SetArgs.Builder.ex(3)); assertThat(syncCommands.get("key2")).isEqualTo("value2"); Cache cache = CacheFactory.getAnyInstance(); Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion = cache.getRegion("ReDiS_StRiNgS"); assertThat(stringsRegion.get(new ByteArrayWrapper("key1".getBytes(StandardCharsets.UTF_8)))) .isEqualTo(new ByteArrayWrapper("value1".getBytes(StandardCharsets.UTF_8))); assertThat(stringsRegion.get(new ByteArrayWrapper("key2".getBytes(StandardCharsets.UTF_8)))) .isEqualTo(new ByteArrayWrapper("value2".getBytes(StandardCharsets.UTF_8))); } finally { connection.close(); redisClient.shutdown(); redisServer.shutdown(); } }
「ReDiS_StRiNgS」というRegionに、データが保存されています。
RegionはString、HyperLogLogs、メタデータの3種類を使用します。
StringとHyperLogLogs用のRegionは、デフォルトでPartition Regionとなります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L270
変更したい場合は、システムプロパティ「gemfireredis.regiontype」で指定するか、あらかじめRegionを定義しておきましょう。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L425-L434
メタデータ用のRegionは、デフォルトでReplicate Regionとなります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L435-L442
こちらも、変更したい場合はあらかじめRegionを定義しておきましょう。
それぞれのRegionの名前は、けっこうすごいことになっていますが…ソースコードを見ましょう。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L223-L239
Gemcachedと同様、GeodeRedisServerにCacheXML自体を指定する手段はなさそうなので、CacheXML名を変えたい場合はシステムプロパティを
使うことになりますね。
注意点
とまあ、こんな感じに使えそうなRedis Adapterですが、注意点もあるようです。
サポートしているRedisのコマンドは、GeodeRedisServerのJavadocを確認しましょう。
GeodeRedisServer (Apache Geode 1.6.0)
とはいえ、ドキュメントに
The Geode Redis Adapter supports all Redis commands for each of the Redis data structures. (See the Javadocs for the GemFireRedisServer class for a detailed list.)
http://geode.apache.org/docs/guide/12/tools_modules/redis_adapter.html#how-the-redis-adapter-wo
とあるので、ふつうに使う分には困らないのではないかと。
ただ、いくつかRedisと異なる振る舞いをするケースがあるようです。
- キーを削除した際に返ってくる削除されたエントリ数は、ローカルNodeでの数(クラスタ全体ではない)。ただし、エントリは削除されている
- Setに新しいメンバーを追加した際の戻り値が、不定になる。コマンド自体はRedisプロトコルと同様に動くが、戻ってくる数は設定した結果が反映されているとは限らない
- トランザクションは、ローカルNodeに収まる範囲で実行する必要がある(Nodeをまたがる場合はNG、またトランザクションが有効になっていない永続域内にあるデータに対しても、トランザクションは実行できない)
- デフォルトでApache Geodeのトランザクション内のキーをすべて監視するため、キーをWATCHまたはUNWATCHすることができない
SetとWATCH/UNWATCH以外については、Nodeをまたがった場合にRedisと動きが異なる感じになるみたいですね。
このあたりが許容できるのであれば、Redis AdapterをRedisの代わりとして使えるのでしょう。
一緒に使う
で、ここまで書いていると、GemcachedとRedis Adapterは別々に使うことになるというか、同時に使用できない?というような気もしますが
(両方とも、Server#startしますし)、そんなことはなさそうです。
両方合わせて使ってみました。
src/test/java/org/littlewings/geode/embedded/EmbeddedMemcachedRedisServerTest.java
package org.littlewings.geode.embedded; import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.ExecutionException; import io.lettuce.core.RedisClient; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; import net.spy.memcached.MemcachedClient; import org.apache.geode.memcached.GemFireMemcachedServer; import org.apache.geode.redis.GeodeRedisServer; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class EmbeddedMemcachedRedisServerTest { @Test public void mixedServer() throws IOException, ExecutionException, InterruptedException { GemFireMemcachedServer memcachedServer = new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII); memcachedServer.start(); GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379); redisServer.start(); MemcachedClient memcachedClient = new MemcachedClient(new InetSocketAddress("localhost", 11211)); RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0"); StatefulRedisConnection<String, String> connection = redisClient.connect(); try { memcachedClient.set("key1", 3, "value1").get(); // Future assertThat(memcachedClient.get("key1")).isEqualTo("value1"); RedisCommands<String, String> syncCommands = connection.sync(); syncCommands.set("key1", "value1"); assertThat(syncCommands.get("key1")).isEqualTo("value1"); } finally { redisClient.shutdown(); memcachedClient.shutdown(); redisClient.shutdown(); memcachedServer.shutdown(); } } }
どちらのServerも、Apache GeodeのCacheがなければ作成し、あればそちらを使用するように実装されているからです。
https://github.com/apache/geode/blob/rel/v1.2.1/geode-core/src/main/java/org/apache/geode/memcached/GemFireMemcachedServer.java#L167-L171
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L401-L412
Apache Geode自体は、ひとつのJavaVM上で複数のCacheを作成することは許容していません。
なので、こういう実装になっていればうまくいきますよね、と。
まとめ
Apache Geodeが提供する、MemcachedおよびRedisのServerとしての機能を、Javaアプリケーションに組み込んで使ってみました。
内部のコードも少し追いつつ、各クライアントライブラリで確認できたので、まあ良しとしましょう。
こうやって組み込んで使えるのは便利ですが、ちょっと依存ライブラリが多いのが難点でしょうかねぇ…。
## 「mvn depencency:tree」のApache Geode+Spring Shellの部分の結果 [INFO] +- org.apache.geode:geode-core:jar:1.2.1:compile [INFO] | +- com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile [INFO] | +- org.jgroups:jgroups:jar:3.6.10.Final:compile [INFO] | +- antlr:antlr:jar:2.7.7:compile [INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.8.6:compile [INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.8.6:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.8.6:compile [INFO] | +- commons-io:commons-io:jar:2.5:compile [INFO] | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | +- it.unimi.dsi:fastutil:jar:7.1.0:compile [INFO] | +- javax.resource:javax.resource-api:jar:1.7:compile [INFO] | | \- javax.transaction:javax.transaction-api:jar:1.2:compile [INFO] | +- net.java.dev.jna:jna:jar:4.0.0:compile [INFO] | +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:compile [INFO] | +- org.apache.logging.log4j:log4j-api:jar:2.7:compile [INFO] | +- org.apache.logging.log4j:log4j-core:jar:2.7:compile [INFO] | +- org.apache.shiro:shiro-core:jar:1.3.2:compile [INFO] | | \- org.slf4j:slf4j-api:jar:1.6.4:compile [INFO] | +- commons-beanutils:commons-beanutils:jar:1.9.3:compile [INFO] | | +- commons-logging:commons-logging:jar:1.2:compile [INFO] | | \- commons-collections:commons-collections:jar:3.2.2:compile [INFO] | +- io.github.lukehutch:fast-classpath-scanner:jar:2.0.11:compile [INFO] | +- org.apache.geode:geode-common:jar:1.2.1:compile [INFO] | \- org.apache.geode:geode-json:jar:1.2.1:compile [INFO] +- org.springframework.shell:spring-shell:jar:1.2.0.RELEASE:compile [INFO] | +- jline:jline:jar:2.12:compile [INFO] | \- org.springframework:spring-core:jar:4.2.4.RELEASE:compile