RedisのJavaクライアントのひとつである、Lettuceをちょっと試してみます。
LettuceというのはPivotalが開発しているRedisのJavaクライアントで、
- Synchronous、Asynchronous、ReactiveなAPIが利用可能
- Master/Slaveをサポート
- Redis Sentinelをサポート
- Redis Clusterをサポート
といった感じになっています。
今回は、Reactive APIを試してみる感じで遊んでみようかと。
準備
それでは、まずは準備から。
Lettuceを使うには、以下のMaven依存関係を追加します。
<dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.0.1.RELEASE</version> </dependency>
Lettuce自体にReactor Coreへの依存関係が入っているのですが、Reactor Testも使いたいのでLettuceが依存しているReactorと同じRelease Trainの
ReactorをdependencyManagementに引き込んでおきます。
https://github.com/lettuce-io/lettuce-core/blob/5.0.1.RELEASE/pom.xml#L64
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
あとは、テスト系のライブラリを追加。
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.0.3</version> <scope>test</scope> </dependency>
Redisそのものも用意しておきます。今回は、Redis 4.0.6を使いました。Redisへの接続パスワードは、「redispass」としておきます。
テストコードの雛形
src/test/java/org/littlewings/lettuce/reactive/LettuceReactiveTest.java
package org.littlewings.lettuce.reactive; import java.util.Arrays; import java.util.stream.Collectors; import io.lettuce.core.RedisClient; import io.lettuce.core.TransactionResult; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.reactive.RedisReactiveCommands; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import static org.junit.jupiter.api.Assertions.assertEquals; public class LettuceReactiveTest { // ここに、テストを書く! }
使ってみる
それでは、使っていってみたいと思います。
とりあえず、Basic UsageとReactive APIを見ながら…。
こんな感じに書いてみました。
@Test public void blockingGettingStarted() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisConnection<String, String> connection = redisClient.connect()) { RedisReactiveCommands<String, String> commands = connection.reactive(); Mono<String> set = commands.set("key", "value"); Mono<String> get = commands.get("key"); set.block(); assertEquals("value", get.block()); } finally { redisClient.shutdown(); } }
Redis TransactionsのところにReactive APIを使ったBlockingの例があったので、とりあえずそちらをマネて。
Redisへの接続は、今回はこんな感じでURI指定で。
RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");
他にも、APIで接続先を組み立てる方法もあるようです。
とりあえずReactive APIを使いたいので、RedisClient#connectでStatefulRedisConnectionを取得後、StatefulRedisConnection#reactiveで
RedisReactiveCommandsを取得します。以後は、このRedisReactiveCommandsのインスタンスを使って操作していきます。
try (StatefulRedisConnection<String, String> connection = redisClient.connect()) { RedisReactiveCommands<String, String> commands = connection.reactive(); // 省略 } finally { redisClient.shutdown(); }
で、まずはMono#blockを使って、APIこそReactive APIを使ってみたものの、ブロックして結果を取得する感じに。
Mono<String> set = commands.set("key", "value"); Mono<String> get = commands.get("key"); set.block(); assertEquals("value", get.block());
と、setもgetもブロックしているのが微妙なので、ちょっと趣向を変えてみましょう。
setの結果をsubscribeして、それからgetをsubscribeする感じに。
@Test public void reactiveGettingStarted1() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisConnection<String, String> connection = redisClient.connect()) { RedisReactiveCommands<String, String> commands = connection.reactive(); Mono<String> set = commands.set("key", "value"); Mono<String> get = commands.get("key"); set.subscribe(result -> get.subscribe(value -> assertEquals("value", value))); get.block(); } finally { redisClient.shutdown(); } }
最後にgetでブロックするんですけど…。
もうちょっとそれっぽく書き直してみましょう。
@Test public void reactiveGettingStarted2() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisConnection<String, String> connection = redisClient.connect()) { RedisReactiveCommands<String, String> commands = connection.reactive(); Mono<String> setAndGet = commands .set("key", "value") .then(commands.get("key")); StepVerifier.create(setAndGet) .expectNext("value") .verifyComplete(); } finally { redisClient.shutdown(); } }
Mono#thenでsetとgetをつなげて、最後のアサーションはStepVerifierに任せる感じで。
とりあえず、こんな感じでしょうか?
Multi/Exec
最後に、Multi/Execも試してみましょう。
最初、雰囲気で書いたらうまくいかなくて、ちゃんとこちらを見て書き直しました…。
Transactions using the reactive API
Multiの結果を受けて、個々のコマンドをsubscribeしておく必要があるんですねぇ…。
で、そのあたりを踏まえて、今回はこんな感じのコードを書いてみました。
@Test public void multi() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisConnection<String, String> connection = redisClient.connect()) { RedisReactiveCommands<String, String> commands = connection.reactive(); Mono<TransactionResult> transactionSetResult = commands .multi() .flatMap(multiResponse -> { commands.set("key1", "value1").subscribe(); commands.set("key2", "value2").subscribe(); commands.set("key3", "value3").subscribe(); return commands.exec(); }); StepVerifier .create(transactionSetResult.map(TransactionResult::wasRolledBack)) .expectNext(false) .verifyComplete(); Mono<TransactionResult> transactionGetResult = commands .multi() .flatMap(multiResponse -> { commands.get("key1").subscribe(); commands.get("key2").subscribe(); commands.get("key3").subscribe(); return commands.exec(); }); StepVerifier .create(transactionGetResult.map(result -> result.stream().map(String.class::cast).collect(Collectors.toList()))) .expectNext(Arrays.asList("value1", "value2", "value3")) .verifyComplete(); } finally { redisClient.shutdown(); } }
RedisReactiveCommands#multiの後に、flatMapで各種コマンドを実行後、execのMonoを返すように実装。
RedisReactiveCommands#exec実行時のMonoはTransactionResultが入っているので、こちらを確認するとMulti/Execの結果がわかるようです。
TransactionResult#wasRolledBackでロールバックされたかどうか、またTransactionResultはIterableでもあり、かつStreamとしても扱うことができるので、
これを利用して個々のコマンドの結果を確認できます。
Multiを使ってsetした結果。
StepVerifier
.create(transactionSetResult.map(TransactionResult::wasRolledBack))
.expectNext(false)
.verifyComplete();
Multiを使ってgetした結果。
StepVerifier .create(transactionGetResult.map(result -> result.stream().map(String.class::cast).collect(Collectors.toList()))) .expectNext(Arrays.asList("value1", "value2", "value3")) .verifyComplete();
Multiのところでちょっとハマりましたが、スタンドアロンなRedisに対しての基本的な使い方はこんなところで。