CLOVER🍀

That was when it all began.

JavaのRedisクライアント、Lettuce(Reactive)を試す

RedisのJavaクライアントのひとつである、Lettuceをちょっと試してみます。

Lettuce

LettuceというのはPivotalが開発しているRedisのJavaクライアントで、

  • Synchronous、Asynchronous、ReactiveなAPIが利用可能
  • Master/Slaveをサポート
  • Redis Sentinelをサポート
  • Redis Clusterをサポート

といった感じになっています。

今回は、Reactive APIを試してみる感じで遊んでみようかと。

ドキュメント

Lettuceのドキュメントは、こちら。

Documentation

ReferenceとJavadocは、現在の最新バージョンである5.0.1.RELEASEのものを指しています。

Reference

Lettuce API

Lettuce Wiki

準備

それでは、まずは準備から。

Lettuceを使うには、以下のMaven依存関係を追加します。

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.1.RELEASE</version>
        </dependency>

Lettuce自体にReactor Coreへの依存関係が入っているのですが、Reactor Testも使いたいのでLettuceが依存しているReactorと同じRelease Trainの
ReactorをdependencyManagementに引き込んでおきます。
https://github.com/lettuce-io/lettuce-core/blob/5.0.1.RELEASE/pom.xml#L64

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

あとは、テスト系のライブラリを追加。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.3</version>
            <scope>test</scope>
        </dependency>

Redisそのものも用意しておきます。今回は、Redis 4.0.6を使いました。Redisへの接続パスワードは、「redispass」としておきます。

テストコードの雛形

src/test/java/org/littlewings/lettuce/reactive/LettuceReactiveTest.java

package org.littlewings.lettuce.reactive;

import java.util.Arrays;
import java.util.stream.Collectors;

import io.lettuce.core.RedisClient;
import io.lettuce.core.TransactionResult;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class LettuceReactiveTest {
    // ここに、テストを書く!
}

使ってみる

それでは、使っていってみたいと思います。

とりあえず、Basic UsageとReactive APIを見ながら…。

Basic Usage

Reactive API

こんな感じに書いてみました。

    @Test
    public void blockingGettingStarted() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
            RedisReactiveCommands<String, String> commands = connection.reactive();

            Mono<String> set = commands.set("key", "value");
            Mono<String> get = commands.get("key");

            set.block();
            assertEquals("value", get.block());
        } finally {
            redisClient.shutdown();
        }
    }

Redis TransactionsのところにReactive APIを使ったBlockingの例があったので、とりあえずそちらをマネて。

Redis Transactions

Redisへの接続は、今回はこんな感じでURI指定で。

        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

Redis URI

他にも、APIで接続先を組み立てる方法もあるようです。

Examples

とりあえずReactive APIを使いたいので、RedisClient#connectでStatefulRedisConnectionを取得後、StatefulRedisConnection#reactiveで
RedisReactiveCommandsを取得します。以後は、このRedisReactiveCommandsのインスタンスを使って操作していきます。

        try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
            RedisReactiveCommands<String, String> commands = connection.reactive();

            // 省略
        } finally {
            redisClient.shutdown();
        }

で、まずはMono#blockを使って、APIこそReactive APIを使ってみたものの、ブロックして結果を取得する感じに。

            Mono<String> set = commands.set("key", "value");
            Mono<String> get = commands.get("key");

            set.block();
            assertEquals("value", get.block());

と、setもgetもブロックしているのが微妙なので、ちょっと趣向を変えてみましょう。

setの結果をsubscribeして、それからgetをsubscribeする感じに。

    @Test
    public void reactiveGettingStarted1() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
            RedisReactiveCommands<String, String> commands = connection.reactive();

            Mono<String> set = commands.set("key", "value");
            Mono<String> get = commands.get("key");

            set.subscribe(result -> get.subscribe(value -> assertEquals("value", value)));

            get.block();
        } finally {
            redisClient.shutdown();
        }
    }

最後にgetでブロックするんですけど…。

もうちょっとそれっぽく書き直してみましょう。

    @Test
    public void reactiveGettingStarted2() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
            RedisReactiveCommands<String, String> commands = connection.reactive();

            Mono<String> setAndGet =
                    commands
                            .set("key", "value")
                            .then(commands.get("key"));

            StepVerifier.create(setAndGet)
                    .expectNext("value")
                    .verifyComplete();
        } finally {
            redisClient.shutdown();
        }
    }

Mono#thenでsetとgetをつなげて、最後のアサーションはStepVerifierに任せる感じで。

とりあえず、こんな感じでしょうか?

Multi/Exec

最後に、Multi/Execも試してみましょう。

Transactions/Multi

最初、雰囲気で書いたらうまくいかなくて、ちゃんとこちらを見て書き直しました…。

Transactions using the reactive API

Multiの結果を受けて、個々のコマンドをsubscribeしておく必要があるんですねぇ…。

で、そのあたりを踏まえて、今回はこんな感じのコードを書いてみました。

    @Test
    public void multi() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
            RedisReactiveCommands<String, String> commands = connection.reactive();

            Mono<TransactionResult> transactionSetResult =
                    commands
                            .multi()
                            .flatMap(multiResponse -> {
                                commands.set("key1", "value1").subscribe();
                                commands.set("key2", "value2").subscribe();
                                commands.set("key3", "value3").subscribe();
                                return commands.exec();
                            });

            StepVerifier
                    .create(transactionSetResult.map(TransactionResult::wasRolledBack))
                    .expectNext(false)
                    .verifyComplete();

            Mono<TransactionResult> transactionGetResult =
                    commands
                            .multi()
                            .flatMap(multiResponse -> {
                                commands.get("key1").subscribe();
                                commands.get("key2").subscribe();
                                commands.get("key3").subscribe();
                                return commands.exec();
                            });

            StepVerifier
                    .create(transactionGetResult.map(result ->
                            result.stream().map(String.class::cast).collect(Collectors.toList())))
                    .expectNext(Arrays.asList("value1", "value2", "value3"))
                    .verifyComplete();
        } finally {
            redisClient.shutdown();
        }
    }

RedisReactiveCommands#multiの後に、flatMapで各種コマンドを実行後、execのMonoを返すように実装。

RedisReactiveCommands#exec実行時のMonoはTransactionResultが入っているので、こちらを確認するとMulti/Execの結果がわかるようです。

TransactionResult#wasRolledBackでロールバックされたかどうか、またTransactionResultはIterableでもあり、かつStreamとしても扱うことができるので、
これを利用して個々のコマンドの結果を確認できます。

Multiを使ってsetした結果。

            StepVerifier
                    .create(transactionSetResult.map(TransactionResult::wasRolledBack))
                    .expectNext(false)
                    .verifyComplete();

Multiを使ってgetした結果。

            StepVerifier
                    .create(transactionGetResult.map(result ->
                            result.stream().map(String.class::cast).collect(Collectors.toList())))
                    .expectNext(Arrays.asList("value1", "value2", "value3"))
                    .verifyComplete();

Multiのところでちょっとハマりましたが、スタンドアロンなRedisに対しての基本的な使い方はこんなところで。