CLOVER🍀

That was when it all began.

Amazon SQS互換のElasticMQを使って、Temoporary Queue+RPCを試してみる

これは、なにをしたくて書いたもの?

Amazon SQSで、Temporary Queueを使ったRPCができるというドキュメント、ブログを見かけまして。

AWS互換のソフトウェアを使って、ちょっとローカルで試してみようと思います。

Amazon SQSのTemporary QueueでRPC

Amazon SQSのTemporary Queueについては、こちらに記載があります。

Amazon SQS一時キュー - Amazon Simple Queue Service

Temporary Queueを使うと、リクエスト ー レスポンスのようなメッセージングができるようです。

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client | AWS Compute Blog

Temporary Queueを使う時のクライアントライブラリとしては、こちらのAmazon SQS Java Temporary Queue Clientを
使うようです。

GitHub - awslabs/amazon-sqs-java-temporary-queues-client: An Amazon SQS client that supports creating lightweight, automatically-deleted temporary queues, for use in common messaging patterns such as Request/Response. See http://aws.amazon.com/sqs.

サンプルはこちら。

GitHub - aws-samples/amazon-sqs-java-temporary-queues-client-samples: Example code for the Amazon SQS temporary queues client (https://github.com/awslabs/amazon-sqs-java-temporary-queues-client )

こちらなのですが、AWS SDK for Javaのバージョン1がベースになっているんですよね。

AWS SDK for Java - 1.12.400

Temporary Queueのドキュメントを見ていると通常のAmazon SQS互換だということなのですが、バージョン2や
その他の言語のライブラリからはどう使うんでしょうね?

ちなみに、AWS SDK for Javaのバージョン2のリポジトリにはissueがありましたが、特に進んではいなさそうです。

Two Way Messaging - Virtual Queues · Issue #1647 · aws/aws-sdk-java-v2 · GitHub

ドキュメントを見ていると、Temporary Queueを使わなくてもRPCを実装できそうな感じはするんですよね。

Amazon SQS メッセージの操作 / リクエストと応答システムの実装

それはまた今度試してみましょうか。

LocalStackで試そうとするも

AWSのサービスをある程度ローカルで動かせるソフトウェアといえば、LocalStackです。ただ、LocalStackでは
うまくいきませんでした。

$ localstack --version
0.12.17.5

こちらと同じ問題を踏むようです。

SQS Action TagQueue is not working with moto as provider · Issue #2690 · localstack/localstack · GitHub

bug: SQS with moto engine fails with 400 on TagQueue call using the queue url endpoint · Issue #4391 · localstack/localstack · GitHub

SQS_PROVIDER環境変数elasticmqを指定すると良い、と書かれてもいるのですが、こちらの指定でも

$ SQS_PROVIDER=elasticmq localstack start

こちらの指定でもうまくいかなったので、今回は諦めることにしました。

$ docker container run --rm -it -p 4566:4566 -p 4571:4571 -e SQS_PROVIDER=elasticmq localstack/localstack

ElasticMQを使う

LocalStackではなく、ElasticMQを直接使うとうまくいったので、今回はこちらを使うことにしました。

GitHub - softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.

ElasticMQは、Amazon SQS互換のインターフェースを持ったインメモリ・メッセージキューです。
Scala・Akkaで実装されているようですね。

今回はElasticMQ自体にはあまり踏み込まず、単に使うだけにします。Dockerイメージもあるようなので、こちらを
使いましょう。

DockerHub / softwaremill/elasticmq

こちらで起動しておきます。

$ docker container run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq:1.2.1

これで、ElasticMQにはローカルのポート9324でアクセスできます。

ちなみに、GraalVMでネイティブ化したイメージもあるようです。

DockerHub / softwaremill/elasticmq-native

ElasticMQにキューを作成する

Temporary Queueを使うにも、本体のキューの作成は必要なので作成しておきます。

AWS CLIを使って

$ aws --version
aws-cli/2.2.39 Python/3.8.8 Linux/5.4.0-84-generic exe/x86_64.ubuntu.20 prompt/off

$ export AWS_ACCESS_KEY_ID=.....
$ export AWS_SECRET_ACCESS_KEY=.....
$ export AWS_DEFAULT_REGION=us-east-1

作成。AWSのクレデンシャルは、適当でOKです。

$ aws sqs create-queue --endpoint-url http://localhost:9324 --queue-name myqueue
{
    "QueueUrl": "http://localhost:9324/000000000000/myqueue"
}

AWS CLIで操作する時のエンドポイントの指定にさえ気をつければOKです。

では、このキューを使ったプログラムを書いていきます。

環境

今回の環境は、こちら。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.2 (ea98e05a04480131370aa0c110b8c54cf726c06f)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-84-generic", arch: "amd64", family: "unix"

Maven依存関係等は、このように設定。

    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>1.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.8.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.20.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

Temporary Queueを使うにあたってのポイントはここだけですね。

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>1.2.2</version>
        </dependency>

あとはテストコードのための依存関係です。確認は、テストコードで行います。

テストコードで確認する

今回、Amazon SQSのTemporary Queueを使うにあたり、簡単なEcho的なクライアント・サーバーを書いてみたいと思います。

お題として、クライアントがサーバーにメッセージを送り、サーバーはそのメッセージに装飾を付けて返すというものに
しましょう。

参考にするサンプルは、このあたりです。

Amazon SQS 一時キュー / シナリオの例: ログインリクエストの処理

GitHub - aws-samples/amazon-sqs-java-temporary-queues-client-samples: Example code for the Amazon SQS temporary queues client (https://github.com/awslabs/amazon-sqs-java-temporary-queues-client )

ElasticMQに接続するには、AWS SDK for Javaのエンドポイントを変更する必要があります。また、今回作成する
クライアント、サーバーともにElasticMQに接続する必要があるので、AmazonSQSインスタンスを作成するところを
共通に切り出しました。

src/test/java/org/littlewings/localstack/sqs/LocalSqsBuilder.java

package org.littlewings.localstack.sqs;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;

public class LocalSqsBuilder {
    public static AmazonSQS create() {
        String accessKeyId = "[dummy]";
        String secretAccessKeyId = "[dummy]";
        String elasticmqSqsEndpoint = "http://localhost:9324";
        String region = "us-east-1";

        BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKeyId);

        AwsClientBuilder.EndpointConfiguration endpointConfiguration =
                new AwsClientBuilder.EndpointConfiguration(elasticmqSqsEndpoint, region);

        return AmazonSQSClientBuilder
                .standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withEndpointConfiguration(endpointConfiguration)
                .build();
    }
}

次に、サーバーを作成します。

src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcServer.java

package org.littlewings.localstack.sqs;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSResponder;
import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;

public class TemporaryQueueRpcServer {
    AmazonSQS sqs;
    AmazonSQSResponder responder;
    SQSMessageConsumer consumer;

    TemporaryQueueRpcServer() {
    }

    public static TemporaryQueueRpcServer create(String queueUrl) {
        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        AmazonSQS sqs = LocalSqsBuilder.create();
        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqs)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqs)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqs = sqs;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    public void start() {
        consumer.start();
    }

    public void stop() {
        consumer.shutdown();
        responder.shutdown();
        sqs.shutdown();
    }
}

AmazonSQSインスタンスを作成して

        AmazonSQS sqs = LocalSqsBuilder.create();

受信したメッセージに応答するためのAmazonSQSResponderインスタンスを作成。

        AmazonSQS sqs = LocalSqsBuilder.create();
        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqs)
                        .build();

また、実際にメッセージを受け取る部分と、受け取ったメッセージを処理する部分はSQSMessageConsumerとして
作成します。

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqs)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

SQSMessageConsumerは、実際にはキューをポーリングして確認するものになります。今回は設定していませんが、
ポーリングを行うスレッドを増やしたりすることも可能です。ElasticMQの場合は増やすと挙動が安定しなくなったので、
やめましたが…。

このため、どのキューをポーリングするのか指定が必要です。

                        .withQueueUrl(queueUrl)

ここで指定するキューのURLは、先ほど作成したこちらのURLを指定することになります。
※指定自体は、今回は別のクラスから渡すことにします。

{
    "QueueUrl": "http://localhost:9324/000000000000/myqueue"
}

ポーリングの開始は、SQSMessageConsumer#startで行います。

    public void start() {
        consumer.start();
    }

終了は、各種shutdownメソッドで。

    public void stop() {
        consumer.shutdown();
        responder.shutdown();
        sqs.shutdown();
    }

クライアントは、テストコードとして書くことにします。

src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcTest.java

package org.littlewings.localstack.sqs;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSRequester;
import com.amazonaws.services.sqs.AmazonSQSRequesterClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class TemporaryQueueRpcTest {
    interface ThrowableRunnable {
        void run() throws Exception;
    }

    public void withServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.create(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    // ここに、テストコードを書く!
]

先ほど作成したサーバーを起動停止するコードもつけておきます。

では、書いてみます。

    @Test
    public void simpleRpc() throws Exception {
        String queueUrl = "http://localhost:9324/000000000000/myqueue";

        withServer(queueUrl, () -> {
            AmazonSQS sqs = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqs)
                            .build();

            try {
                SendMessageRequest request =
                        new SendMessageRequest()
                                .withMessageBody("Hello World!!")
                                .withQueueUrl(queueUrl);

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
                MessageContent responseContent = MessageContent.fromMessage(responseMessage);

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");

            } finally {
                requester.shutdown();
                sqs.shutdown();
            }
        });
    }

クライアントは、AmazonSQSRequesterインスタンスを作成します。

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqs)
                            .build();

送信先のキューのURLとメッセージ本文を指定して、SendMessageRequestインスタンスを作成。
メッセージはStringとして送る必要があるみたいです。

                SendMessageRequest request =
                        new SendMessageRequest()
                                .withMessageBody("Hello World!!")
                                .withQueueUrl(queueUrl);

リクエストは、AmazonSQSRequester#sendMessageAndGetResponseで送ってレスポンスを待ちます。
この場合は、同期呼び出しになりますね。

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
                MessageContent responseContent = MessageContent.fromMessage(responseMessage);

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");

レスポンスはMessageとなり、MessageContentに変換することでメッセージ本文や属性を扱えるようになります。

サーバーから返ってきたメッセージの確認。

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");

ちなみに、ここの出力結果は

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

こんな感じになります。

{MessageId: 87da2d24-c0ce-444f-a954-356cac88d9ad,ReceiptHandle: 87da2d24-c0ce-444f-a954-356cac88d9ad#3e0b5598-c725-422a-b675-9120eb9bc14a,MD5OfBody: b14f6a41b31fd409988bec57f9a1a5cb,Body: ★★★Hello World!!★★★,Attributes: {ApproximateReceiveCount=1, SentTimestamp=1631948316882, SenderId=127.0.0.1, ApproximateFirstReceiveTimestamp=1631948316882},MD5OfMessageAttributes: 0438dbaa5711f2dfa98c7cdfcd2d321f,MessageAttributes: {__AmazonSQSVirtualQueuesClient.QueueName={StringValue: __RequesterClientQueues__3e338723-88b2-45da-802e-b48492f63d2c,StringListValues: [],BinaryListValues: [],DataType: String}}}


{__AmazonSQSVirtualQueuesClient.QueueName={StringValue: __RequesterClientQueues__3e338723-88b2-45da-802e-b48492f63d2c,StringListValues: [],BinaryListValues: [],DataType: String}}

RPCが終わったら、それぞれインスタンスshutdown

                requester.shutdown();
                sqs.shutdown();

もう少し送信するメッセージを増やしてみる

せっかくなので、ちゃんとRPCできているかどうか、もっとメッセージを増やして確認してみたいと思います。

まずはサーバー側に、こんな感じでレスポンスを返す際にランダムでスリープする処理を追加。

src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcServer.java

package org.littlewings.localstack.sqs;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSResponder;
import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;

public class TemporaryQueueRpcServer {
    AmazonSQS sqs;
    AmazonSQSResponder responder;
    SQSMessageConsumer consumer;

    TemporaryQueueRpcServer() {
    }

    // 省略

    public static TemporaryQueueRpcServer createRandomSleep(String queueUrl) {
        Random random = new Random();
        random.nextInt(10);

        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        AmazonSQS sqs = LocalSqsBuilder.create();
        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqs)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqs)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            try {
                                TimeUnit.SECONDS.sleep(random.nextInt(10));
                            } catch (InterruptedException e) {
                                // ignore
                            }

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqs = sqs;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    // 省略

}

クライアント側からは、メッセージを50個送ってみます。

    public void withRandomSleepServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.createRandomSleep(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    @Test
    public void concurrent() throws Exception {
        String queueUrl = "http://localhost:9324/000000000000/myqueue";

        withRandomSleepServer(queueUrl, () -> {
            AmazonSQS sqs = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqs)
                            .build();

            try {
                Map<String, CompletableFuture<Message>> futures = new LinkedHashMap<>();

                for (int i = 0; i < 50; i++) {
                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            new SendMessageRequest()
                                    .withMessageBody(uuid)
                                    .withQueueUrl(queueUrl);

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

                    futures.put(uuid, responseMessage);
                }

                assertThat(futures).hasSize(50);

                futures.forEach((uuid, message) -> {
                    long start = System.currentTimeMillis();
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    System.out.println((System.currentTimeMillis() - start) / 1000.0);
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });
            } finally {
                requester.shutdown();
                sqs.shutdown();
            }
        });
    }

レスポンスの受信は非同期に。今回は、タイムアウトを長めに取らないとタイムアウトが多発しました…。

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

送信するメッセージはUUIDにして

                for (int i = 0; i < 50; i++) {
                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            new SendMessageRequest()
                                    .withMessageBody(uuid)
                                    .withQueueUrl(queueUrl);

リクエストしたUUIDに対応する値が返ってくることを確認。

                futures.forEach((uuid, message) -> {
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });

とりあえず、RPCっぽく動いていることは確認できました。

なんですけど、今回の実装だとサーバー側が全部のメッセージを処理し終えないと、クライアントに最初のレスポンスが
返らないみたいなんですよね。かといって、サーバーのSQSMessageConsumerのスレッド数を増やすと動作が
安定しなくなるという…。

これがAmazon SQS Java Temporary Queue Clientの使い方が悪いのか、ElasticMQの使い方が悪いのかは
微妙なところですが…。

まあ、とりあえず使い方と動作の簡単な確認はできたので良しとしますか。

まとめ

Amazon SQS互換のElasticMQを使って、Temporary QueueとRPCを試してみました。

AWS SDK for JavaAmazon SQS Java Temporary Queue Clientというよりは、LocalStackやElasticMQという互換のものに
ハマった意味合いも強いのですが…。

やっぱり、こういうのは互換サービスではなく実際に使う時の本物で確認しないと、って感じでしょうね。

とりあえず、感覚を掴む分にはよいかなという気はします。