CLOVER🍀

That was when it all began.

Amazon SQS互換のElasticMQを使って、Temoporary Queue+RPCを試してみる

これは、なにをしたくて書いたもの?

Amazon SQSで、Temporary Queueを使ったRPCができるというドキュメント、ブログを見かけまして。

AWS互換のソフトウェアを使って、ちょっとローカルで試してみようと思います。

Amazon SQSのTemporary QueueでRPC

Amazon SQSのTemporary Queueについては、こちらに記載があります。

Amazon SQS 一時キュー - Amazon Simple Queue Service

Temporary Queueを使うと、リクエスト ー レスポンスのようなメッセージングができるようです。

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client | AWS Compute Blog

Temporary Queueを使う時のクライアントライブラリとしては、こちらのAmazon SQS Java Temporary Queue Clientを
使うようです。

GitHub - awslabs/amazon-sqs-java-temporary-queues-client: An Amazon SQS client that supports creating lightweight, automatically-deleted temporary queues, for use in common messaging patterns such as Request/Response. See http://aws.amazon.com/sqs.

サンプルはこちら。

GitHub - aws-samples/amazon-sqs-java-temporary-queues-client-samples: Example code for the Amazon SQS temporary queues client (https://github.com/awslabs/amazon-sqs-java-temporary-queues-client )

こちらなのですが、AWS SDK for Javaのバージョン1がベースになっているんですよね。

AWS SDK for Java - 1.12.69

Temporary Queueのドキュメントを見ていると通常のAmazon SQS互換だということなのですが、バージョン2や
その他の言語のライブラリからはどう使うんでしょうね?

ちなみに、AWS SDK for Javaのバージョン2のリポジトリにはissueがありましたが、特に進んではいなさそうです。

Two Way Messaging - Virtual Queues · Issue #1647 · aws/aws-sdk-java-v2 · GitHub

ドキュメントを見ていると、Temporary Queueを使わなくてもRPCを実装できそうな感じはするんですよね。

Amazon SQS メッセージの操作 / リクエストと応答システムの実装

それはまた今度試してみましょうか。

LocalStackで試そうとするも

AWSのサービスをある程度ローカルで動かせるソフトウェアといえば、LocalStackです。ただ、LocalStackでは
うまくいきませんでした。

$ localstack --version
0.12.17.5

こちらと同じ問題を踏むようです。

SQS Action TagQueue is not working with moto as provider · Issue #2690 · localstack/localstack · GitHub

bug: SQS with moto engine fails with 400 on TagQueue call using the queue url endpoint · Issue #4391 · localstack/localstack · GitHub

SQS_PROVIDER環境変数でelasticmqを指定すると良い、と書かれてもいるのですが、こちらの指定でも

$ SQS_PROVIDER=elasticmq localstack start

こちらの指定でもうまくいかなったので、今回は諦めることにしました。

$ docker container run --rm -it -p 4566:4566 -p 4571:4571 -e SQS_PROVIDER=elasticmq localstack/localstack

ElasticMQを使う

LocalStackではなく、ElasticMQを直接使うとうまくいったので、今回はこちらを使うことにしました。

GitHub - softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.

ElasticMQは、Amazon SQS互換のインターフェースを持ったインメモリ・メッセージキューです。
Scala・Akkaで実装されているようですね。

今回はElasticMQ自体にはあまり踏み込まず、単に使うだけにします。Dockerイメージもあるようなので、こちらを
使いましょう。

DockerHub / softwaremill/elasticmq

こちらで起動しておきます。

$ docker container run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq:1.2.1

これで、ElasticMQにはローカルのポート9324でアクセスできます。

ちなみに、GraalVMでネイティブ化したイメージもあるようです。

DockerHub / softwaremill/elasticmq-native

ElasticMQにキューを作成する

Temporary Queueを使うにも、本体のキューの作成は必要なので作成しておきます。

AWS CLIを使って

$ aws --version
aws-cli/2.2.39 Python/3.8.8 Linux/5.4.0-84-generic exe/x86_64.ubuntu.20 prompt/off

$ export AWS_ACCESS_KEY_ID=.....
$ export AWS_SECRET_ACCESS_KEY=.....
$ export AWS_DEFAULT_REGION=us-east-1

作成。AWSのクレデンシャルは、適当でOKです。

$ aws sqs create-queue --endpoint-url http://localhost:9324 --queue-name myqueue
{
    "QueueUrl": "http://localhost:9324/000000000000/myqueue"
}

AWS CLIで操作する時のエンドポイントの指定にさえ気をつければOKです。

では、このキューを使ったプログラムを書いていきます。

環境

今回の環境は、こちら。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.2 (ea98e05a04480131370aa0c110b8c54cf726c06f)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-84-generic", arch: "amd64", family: "unix"

Maven依存関係等は、このように設定。

    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>1.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.8.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.20.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

Temporary Queueを使うにあたってのポイントはここだけですね。

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>1.2.2</version>
        </dependency>

あとはテストコードのための依存関係です。確認は、テストコードで行います。

テストコードで確認する

今回、Amazon SQSのTemporary Queueを使うにあたり、簡単なEcho的なクライアント・サーバーを書いてみたいと思います。

お題として、クライアントがサーバーにメッセージを送り、サーバーはそのメッセージに装飾を付けて返すというものに
しましょう。

参考にするサンプルは、このあたりです。

Amazon SQS 一時キュー / シナリオの例: ログインリクエストの処理

GitHub - aws-samples/amazon-sqs-java-temporary-queues-client-samples: Example code for the Amazon SQS temporary queues client (https://github.com/awslabs/amazon-sqs-java-temporary-queues-client )

ElasticMQに接続するには、AWS SDK for Javaのエンドポイントを変更する必要があります。また、今回作成する
クライアント、サーバーともにElasticMQに接続する必要があるので、AmazonSQSのインスタンスを作成するところを
共通に切り出しました。

src/test/java/org/littlewings/localstack/sqs/LocalSqsBuilder.java

package org.littlewings.localstack.sqs;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;

public class LocalSqsBuilder {
    public static AmazonSQS create() {
        String accessKeyId = "[dummy]";
        String secretAccessKeyId = "[dummy]";
        String elasticmqSqsEndpoint = "http://localhost:9324";
        String region = "us-east-1";

        BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKeyId);

        AwsClientBuilder.EndpointConfiguration endpointConfiguration =
                new AwsClientBuilder.EndpointConfiguration(elasticmqSqsEndpoint, region);

        return AmazonSQSClientBuilder
                .standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withEndpointConfiguration(endpointConfiguration)
                .build();
    }
}

次に、サーバーを作成します。

src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcServer.java

package org.littlewings.localstack.sqs;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSResponder;
import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;

public class TemporaryQueueRpcServer {
    AmazonSQS sqs;
    AmazonSQSResponder responder;
    SQSMessageConsumer consumer;

    TemporaryQueueRpcServer() {
    }

    public static TemporaryQueueRpcServer create(String queueUrl) {
        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        AmazonSQS sqs = LocalSqsBuilder.create();
        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqs)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqs)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqs = sqs;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    public void start() {
        consumer.start();
    }

    public void stop() {
        consumer.shutdown();
        responder.shutdown();
        sqs.shutdown();
    }
}

AmazonSQSのインスタンスを作成して

        AmazonSQS sqs = LocalSqsBuilder.create();

受信したメッセージに応答するためのAmazonSQSResponderのインスタンスを作成。

        AmazonSQS sqs = LocalSqsBuilder.create();
        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqs)
                        .build();

また、実際にメッセージを受け取る部分と、受け取ったメッセージを処理する部分はSQSMessageConsumerとして
作成します。

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqs)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

SQSMessageConsumerは、実際にはキューをポーリングして確認するものになります。今回は設定していませんが、
ポーリングを行うスレッドを増やしたりすることも可能です。ElasticMQの場合は増やすと挙動が安定しなくなったので、
やめましたが…。

このため、どのキューをポーリングするのか指定が必要です。

                        .withQueueUrl(queueUrl)

ここで指定するキューのURLは、先ほど作成したこちらのURLを指定することになります。
※指定自体は、今回は別のクラスから渡すことにします。

{
    "QueueUrl": "http://localhost:9324/000000000000/myqueue"
}

ポーリングの開始は、SQSMessageConsumer#startで行います。

    public void start() {
        consumer.start();
    }

終了は、各種shutdownメソッドで。

    public void stop() {
        consumer.shutdown();
        responder.shutdown();
        sqs.shutdown();
    }

クライアントは、テストコードとして書くことにします。

src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcTest.java

package org.littlewings.localstack.sqs;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSRequester;
import com.amazonaws.services.sqs.AmazonSQSRequesterClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class TemporaryQueueRpcTest {
    interface ThrowableRunnable {
        void run() throws Exception;
    }

    public void withServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.create(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    // ここに、テストコードを書く!
]

先ほど作成したサーバーを起動停止するコードもつけておきます。

では、書いてみます。

    @Test
    public void simpleRpc() throws Exception {
        String queueUrl = "http://localhost:9324/000000000000/myqueue";

        withServer(queueUrl, () -> {
            AmazonSQS sqs = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqs)
                            .build();

            try {
                SendMessageRequest request =
                        new SendMessageRequest()
                                .withMessageBody("Hello World!!")
                                .withQueueUrl(queueUrl);

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
                MessageContent responseContent = MessageContent.fromMessage(responseMessage);

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");

            } finally {
                requester.shutdown();
                sqs.shutdown();
            }
        });
    }

クライアントは、AmazonSQSRequesterのインスタンスを作成します。

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqs)
                            .build();

送信先のキューのURLとメッセージ本文を指定して、SendMessageRequestのインスタンスを作成。
メッセージはStringとして送る必要があるみたいです。

                SendMessageRequest request =
                        new SendMessageRequest()
                                .withMessageBody("Hello World!!")
                                .withQueueUrl(queueUrl);

リクエストは、AmazonSQSRequester#sendMessageAndGetResponseで送ってレスポンスを待ちます。
この場合は、同期呼び出しになりますね。

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
                MessageContent responseContent = MessageContent.fromMessage(responseMessage);

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");

レスポンスはMessageとなり、MessageContentに変換することでメッセージ本文や属性を扱えるようになります。

サーバーから返ってきたメッセージの確認。

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");

ちなみに、ここの出力結果は

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

こんな感じになります。

{MessageId: 87da2d24-c0ce-444f-a954-356cac88d9ad,ReceiptHandle: 87da2d24-c0ce-444f-a954-356cac88d9ad#3e0b5598-c725-422a-b675-9120eb9bc14a,MD5OfBody: b14f6a41b31fd409988bec57f9a1a5cb,Body: ★★★Hello World!!★★★,Attributes: {ApproximateReceiveCount=1, SentTimestamp=1631948316882, SenderId=127.0.0.1, ApproximateFirstReceiveTimestamp=1631948316882},MD5OfMessageAttributes: 0438dbaa5711f2dfa98c7cdfcd2d321f,MessageAttributes: {__AmazonSQSVirtualQueuesClient.QueueName={StringValue: __RequesterClientQueues__3e338723-88b2-45da-802e-b48492f63d2c,StringListValues: [],BinaryListValues: [],DataType: String}}}


{__AmazonSQSVirtualQueuesClient.QueueName={StringValue: __RequesterClientQueues__3e338723-88b2-45da-802e-b48492f63d2c,StringListValues: [],BinaryListValues: [],DataType: String}}

RPCが終わったら、それぞれインスタンスをshutdown。

                requester.shutdown();
                sqs.shutdown();

もう少し送信するメッセージを増やしてみる

せっかくなので、ちゃんとRPCできているかどうか、もっとメッセージを増やして確認してみたいと思います。

まずはサーバー側に、こんな感じでレスポンスを返す際にランダムでスリープする処理を追加。

src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcServer.java

package org.littlewings.localstack.sqs;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSResponder;
import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;

public class TemporaryQueueRpcServer {
    AmazonSQS sqs;
    AmazonSQSResponder responder;
    SQSMessageConsumer consumer;

    TemporaryQueueRpcServer() {
    }

    // 省略

    public static TemporaryQueueRpcServer createRandomSleep(String queueUrl) {
        Random random = new Random();
        random.nextInt(10);

        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        AmazonSQS sqs = LocalSqsBuilder.create();
        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqs)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqs)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            try {
                                TimeUnit.SECONDS.sleep(random.nextInt(10));
                            } catch (InterruptedException e) {
                                // ignore
                            }

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqs = sqs;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    // 省略

}

クライアント側からは、メッセージを50個送ってみます。

    @Test
    public void concurrent() throws Exception {
        String queueUrl = "http://localhost:9324/000000000000/myqueue";

        withRandomSleepServer(queueUrl, () -> {
            AmazonSQS sqs = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqs)
                            .build();

            try {
                Map<String, CompletableFuture<Message>> futures = new LinkedHashMap<>();

                for (int i = 0; i < 50; i++) {
                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            new SendMessageRequest()
                                    .withMessageBody(uuid)
                                    .withQueueUrl(queueUrl);

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

                    futures.put(uuid, responseMessage);
                }

                assertThat(futures).hasSize(50);

                futures.forEach((uuid, message) -> {
                    long start = System.currentTimeMillis();
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    System.out.println((System.currentTimeMillis() - start) / 1000.0);
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });
            } finally {
                requester.shutdown();
                sqs.shutdown();
            }
        });
    }

レスポンスの受信は非同期に。今回は、タイムアウトを長めに取らないとタイムアウトが多発しました…。

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

送信するメッセージはUUIDにして

                for (int i = 0; i < 50; i++) {
                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            new SendMessageRequest()
                                    .withMessageBody(uuid)
                                    .withQueueUrl(queueUrl);

リクエストしたUUIDに対応する値が返ってくることを確認。

                futures.forEach((uuid, message) -> {
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });

とりあえず、RPCっぽく動いていることは確認できました。

なんですけど、今回の実装だとサーバー側が全部のメッセージを処理し終えないと、クライアントに最初のレスポンスが
返らないみたいなんですよね。かといって、サーバーのSQSMessageConsumerのスレッド数を増やすと動作が
安定しなくなるという…。

これがAmazon SQS Java Temporary Queue Clientの使い方が悪いのか、ElasticMQの使い方が悪いのかは
微妙なところですが…。

まあ、とりあえず使い方と動作の簡単な確認はできたので良しとしますか。

まとめ

Amazon SQS互換のElasticMQを使って、Temporary QueueとRPCを試してみました。

AWS SDK for Java+Amazon SQS Java Temporary Queue Clientというよりは、LocalStackやElasticMQという互換のものに
ハマった意味合いも強いのですが…。

やっぱり、こういうのは互換サービスではなく実際に使う時の本物で確認しないと、って感じでしょうね。

とりあえず、感覚を掴む分にはよいかなという気はします。

Redisのdatabase idってなんだ?

これは、なにをしたくて書いたもの?

Redisのクライアントライブラリを使っていると、たまにデータベースのインデックスらしいきものを指定させられることが
あります。

redis-cliのドキュメントでURI指定して接続している例だと、0の部分ですね。

$ redis-cli -u redis://p%40ssw0rd@redis-16379.hosted.com:16379/0 ping
PONG

redis-cli, the Redis command line interface – Redis

これがなんなのかはなんとなく予想できないことはないですが、気にはなっていたのでちゃんと見てみようと思います。

databases

Redisは、複数のデータベースを持ちます。設定ファイルのドキュメントに記載のある、databasesが該当します。

# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16

https://raw.githubusercontent.com/redis/redis/6.0/redis.conf

0から始まり、databases - 1までの数のデータベースを持ちます。dbidというみたいですね。

設定は、config getコマンドで確認できます。

CONFIG GET – Redis

databasesの説明は、selectコマンドにもうちょっと書いてあります。selectコマンドを使うと、使用するデータベースを
切り替えることができます。

データベースは複数ありますが、(永続化した場合)同じRDB、AOFファイルに保存されます、と。

Selectable Redis databases are a form of namespacing: all databases are still persisted in the same RDB / AOF file.

関連の無い複数のアプリケーションで、ひとつのRedisを使わない方が良いとしています。

not to use a single Redis instance for multiple unrelated applications.

また、Redis Clusterでは0のデータベースしかサポートしていないようです。

When using Redis Cluster, the SELECT command cannot be used, since Redis Cluster only supports database zero.

SELECT – Redis

この時点で、もういいかなぁとか思ったりします…。

ちなみに、現在の接続がどのデータベースを使っているかはわからないそうですが、Redisに接続しているクライアントが
どのデータベースを使っているかはclient listコマンドで確認できるようです。

CLIENT LIST – Redis

ドキュメントはこれくらいにして、実際に試してみましょう。

環境

今回の環境は、こちら。Redis 6.2.5を使います。

$ redis-server --version
Redis server v=6.2.5 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=e19c0ce090e3ecc7

起動コマンドは、こちらで。

$ redis-server --bind '0.0.0.0' --requirepass [パスワード]

Redisで複数データベースを使ってみる

とりあえず、redis-cliでRedisに接続してみます。

$ redis-cli -a [パスワード]

config getでdatabasesを見てみます。

127.0.0.1:6379> config get databases
1) "databases"
2) "16"

16ですね。1度Redisを停止して、databasesを32にして起動してみましょう。

$ redis-server --bind '0.0.0.0' --requirepass [パスワード] --databases 32

今度は、32になりました。

127.0.0.1:6379> config get databases
1) "databases"
2) "32"

これで、32個のデータベースを持っていることになります。

16に戻しましょう。

$ redis-server --bind '0.0.0.0' --requirepass [パスワード]

デフォルトでは0のデータベースに接続するようですが、redis-cliの-nオプションで使用するデータベースを指定することが
できます。たとえば、3を指定。

$ redis-cli -a redispass -n 3

すると、redis-cliでの表示でポートの隣に数字が表れます。

127.0.0.1:6379[3]>

selectで、データベースを変更してみましょう。15に変更します。

127.0.0.1:6379[3]> select 15
OK
127.0.0.1:6379[15]> 

現在のdatabasesは16なので、16 - 1で15までデータベースが使用可能です。16を指定すると、エラーになります。

127.0.0.1:6379[15]> select 16
(error) ERR DB index is out of range

0に戻すと、ポートの表示がなくなります。

127.0.0.1:6379[15]> select 0
OK
127.0.0.1:6379> 

見慣れた表示になりましたね。

-nを指定せずに接続した時と同じですね。

$ redis-cli -a [パスワード]
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
127.0.0.1:6379> 

次に、データベースを変更すると、データが分離されていることも確認してみましょう。

データベースの0で、setします。

127.0.0.1:6379> set key1 value1
OK
127.0.0.1:6379> get key1
"value1"

データベースの1に切り替え。

127.0.0.1:6379> select 1
OK

getしてみます。

127.0.0.1:6379[1]> get key1
(nil)

取得できませんね。逆もやってみましょう。

127.0.0.1:6379[1]> set key2 value2
OK
127.0.0.1:6379[1]> get key2
"value2"
127.0.0.1:6379[1]> select 0
OK
127.0.0.1:6379> get key2
(nil)

確認できましたね。

まあ、実際のアプリケーションでこんな感じで接続途中に切り替えるとよくわからないことになる気がするので、
基本的には接続とデータベースを紐づけて管理するんでしょうね。

クライアントライブラリを使って確認してみる

最後に、クライアントライブラリを使って確認してみましょう。今回はPythonを使います。

$ python3 -V
Python 3.8.10

redis-pyと、pytestを使って確認してみます。

$ pip3 install redis==3.5.3 pytest==6.2.5

Redisは、172.17.0.2で動作しているものとします。

確認したテストコードは、こちらです。

test_redis.py

import redis
from redis import Redis


def test_redis_database_indexes():
    redis_default: Redis = redis.Redis(host='172.17.0.2', port=6379, password='[パスワード]', decode_responses=True)
    redis0: Redis = redis.Redis(host='172.17.0.2', port=6379, password='[パスワード]', db=0, decode_responses=True)
    redis1: Redis = redis.Redis(host='172.17.0.2', port=6379, password='[パスワード]', db=1, decode_responses=True)

    redis_default.set('key1', 'value1')
    assert redis_default.get('key1') == 'value1'
    assert redis0.get('key1') == 'value1'

    assert redis1.get('key1') is None

    redis1.set('key2', 'value2')
    assert redis1.get('key2') == 'value2'

    assert redis_default.get('key2') is None
    assert redis0.get('key2') is None

    redis_default.close()
    redis0.close()
    redis1.close()

こちらでも確認できました、と。