CLOVER🍀

That was when it all began.

AWS SDK for Java v2に対応したAmazon SQS Java Temporary Queue ClientとLocalStackを使って、一時キュー+RPCを試してみる

これは、なにをしたくて書いたもの?

以前、Amazon SQS Java Temporary Queue ClientとElasticMQを使って、一時キューとRPCを使ったエントリーを書いたことがあります。

Amazon SQS互換のElasticMQを使って、Temoporary Queue+RPCを試してみる - CLOVER🍀

このAmazon SQS Java Temporary Queue Clientですが、AWS SDK for Javaのバージョン1を使っていたのですが、2022年の5月に
AWS SDK for Java v2を使うようになった2.0.0がリリースされていたので、ちょっと試してみようかなと。

Amazon SQSの一時キューとRPC

そもそも、Amazon SQSの一時キューとRPCについて思い出すところから。

一時キューについてはこちらに記載があり、Amazon SQS Java Temporary Queue Clientを使うことで一時キューを作ることができると
書かれています。

Amazon SQS一時キュー - Amazon Simple Queue Service

Amazon SQS Java Temporary Queue ClientのGitHubリポジトリは、こちら。

GitHub - awslabs/amazon-sqs-java-temporary-queues-client: An Amazon SQS client that supports creating lightweight, automatically-deleted temporary queues, for use in common messaging patterns such as Request/Response. See http://aws.amazon.com/sqs.

そして、一時キューの最も一般的な使用例がリクエスト - レスポンス型式のメッセージングパターンだとされています。

例として、ログイン処理をAmazon SQSを介して別のアプリケーション(サーバー側)に処理させ、結果を受け取る(クライアント側)ような
ソースコードが書かれています。

同様のことが、AWSのブログエントリーにも書かれています。

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client | AWS Compute Blog

それで、以前試した時にはAmazon SQS Java Temporary Queue ClientはAWS SDK for Java v1を使っており、v2に対してはissueがあるだけ
だったのですが、Amazon SQS Java Temporary Queue Client 2.0.0がリリースされたことで状況が変わったようです。

Two Way Messaging - Virtual Queues · Issue #1647 · aws/aws-sdk-java-v2 · GitHub

Release Version 2.0.0 of the Amazon SQS Java Temporary Queues Client · awslabs/amazon-sqs-java-temporary-queues-client · GitHub

Amazon SQS Java Temporary Queue Client 2.0.0からは、AWS SDK for Java v2を使います。

せっかくなので、今回はAmazon SQS Java Temporary Queue Client 2.0.0とLocalStackを使って、前のエントリーで書いた内容を
書き直してみようと思います。

環境

今回の環境は、こちら。

LocalStack。

$ python3 -V
Python 3.10.6


$ localstack --version
1.3.1

起動。

$ localstack start

AWS CLI。LocalStackの提供するものを重ねています。

$ awslocal --version
aws-cli/2.9.21 Python/3.9.11 Linux/5.15.0-58-generic exe/x86_64.ubuntu.22 prompt/off

Java

$ java --version
openjdk 17.0.5 2022-10-18
OpenJDK Runtime Environment (build 17.0.5+8-Ubuntu-2ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.5+8-Ubuntu-2ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.7 (b89d5959fcde851dcb1c8946a785a163f14e1e29)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.5, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-58-generic", arch: "amd64", family: "unix"

準備

まずは、キューを作成する必要がありますね。今回はmy-queueという名前で作成しました。

$ awslocal sqs create-queue --queue-name my-queue
{
    "QueueUrl": "http://localhost:4566/000000000000/my-queue"
}

次に、Amazon SQS Java Temporary Queue Clientを使うためにMaven依存関係などの設定を行います。確認はテストコードで行うことに
するので、JUnit等も合わせて追加。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <aws-java-sdk.version>2.19.31</aws-java-sdk.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
            <version>2.19.31</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>test-utils</artifactId>
            <version>2.19.31</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

Amazon SQS Java Temporary Queue Clientは以下で、推移的依存関係の中にAWS SDK for Java v2も含まれてはいるのですが、

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>2.0.0</version>
        </dependency>

ちょっと古かったので、明示的に新しいバージョンを指定しておきました。

        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
            <version>2.19.31</version>
        </dependency>


        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>test-utils</artifactId>
            <version>2.19.31</version>
            <scope>test</scope>
        </dependency>

簡単なEchoクライアント/サーバーを書く

では、Amazon SQSの一時キューを使ったプログラムを書くわけですが、前回のお題と同様に簡単なEchoクライアント/サーバーを
書いていこうと思います。

クライアントがサーバーにメッセージを送り、サーバーは受け取ったメッセージに簡単に装飾して返すというものにします。

まずは、LocalStack上のAmazon SQSにアクセスするためのSqsClientを作成する部分。

src/test/java/org/littlewings/sqs/LocalSqsBuilder.java

package org.littlewings.sqs;

import java.net.URI;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

public class LocalSqsBuilder {
    public static SqsClient create() {
        return SqsClient
                .builder()
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create("mock", "mock")
                        )
                )
                .region(Region.US_EAST_1)
                .endpointOverride(URI.create("http://localhost:4566"))
                .build();
    }
}

クレデンシャルは固定、エンドポイントはLocalStackのものを指すようにしています。

次に、サーバーを作成。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcServer.java

package org.littlewings.sqs;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQSResponder;
import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;
import software.amazon.awssdk.services.sqs.SqsClient;

public class TemporaryQueueRpcServer {
    SqsClient sqsClient;
    AmazonSQSResponder responder;
    SQSMessageConsumer consumer;

    TemporaryQueueRpcServer() {
    }

    public static TemporaryQueueRpcServer create(String queueUrl) {
        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        SqsClient sqsClient = LocalSqsBuilder.create();

        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqsClient = sqsClient;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    public void start() {
        consumer.start();
    }

    public void stop() {
        consumer.close();
        responder.shutdown();
        sqsClient.close();
    }
}

こちらが受け取ったメッセージを送り返すためのAmazonSQSResponder

        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .build();

こちらがメッセージを受信して処理を行うためのSQSMessageConsumerです。

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

Amazon SQSではキューをポーリングしてメッセージの監視を行うので、SQSMessageConsumerにはキューのURLを設定する必要が
あります。

SQSMessageConsumer#startでキューの監視が始まります。

    public void start() {
        consumer.start();
    }

停止はこちら。

    public void stop() {
        consumer.close();
        responder.shutdown();
        sqsClient.close();
    }

クライアント側は、テストコードとして作成します。まずは雛形から。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcTest.java

package org.littlewings.sqs;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQSRequester;
import com.amazonaws.services.sqs.AmazonSQSRequesterClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import static org.assertj.core.api.Assertions.assertThat;

public class TemporaryQueueRpcTest {
    interface ThrowableRunnable {
        void run() throws Exception;
    }

    void withServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.create(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    // ここに、テストを書く!!
}

先ほど書いたサーバーの、起動と停止を行うメソッドも付けています。

では、クライアント側を書いてみます。

    @Test
    void simpleRpc() throws Exception {
        String queueUrl = "http://localhost:4566/000000000000/my-queue";

        withServer(queueUrl, () -> {
            SqsClient sqsClient = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqsClient)
                            .build();

            try {
                SendMessageRequest request =
                        SendMessageRequest
                                .builder()
                                .queueUrl(queueUrl)
                                .messageBody("Hello World!!")
                                .build();

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
                MessageContent responseContent = MessageContent.fromMessage(responseMessage);

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");
            } finally {
                requester.shutdown();
                sqsClient.close();
            }
        });
    }

メッセージ送信を行うのは、AmazonSQSRequesterとなります。

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqsClient)
                            .build();

AmazonSQSRequester#sendMessageAndGetResponseで、メッセージ送信。ここでは、同期呼び出しを行っています。

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);

結果はここで1度出力していますが、

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

それぞれ、こちらと

Message(MessageId=2f0e38ea-8440-4247-ab9b-260306b29273, ReceiptHandle=NDA5OWM0ZWUtOTEzMi00YTUzLThlMDEtOTkyZGFiNGNiYzkxIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6X19SZXF1ZXN0ZXJDbGllbnRRdWV1ZXNfX2ViOTNlNDkyLWFhOGYtNGNlNC04NGI1LWZiZDQwMzllMDRkNC0wIDJmMGUzOGVhLTg0NDAtNDI0Ny1hYjliLTI2MDMwNmIyOTI3MyAxNjc1NjA5Njk2LjE5MDY1OQ==, MD5OfBody=b14f6a41b31fd409988bec57f9a1a5cb, Body=★★★Hello World!!★★★, Attributes={ApproximateReceiveCount=1, SentTimestamp=1675609696169, SenderId=000000000000, ApproximateFirstReceiveTimestamp=1675609696190}, MD5OfMessageAttributes=d3d6adc048d9998de883abfd0e923eca, MessageAttributes={__AmazonSQSVirtualQueuesClient.QueueName=MessageAttributeValue(StringValue=__RequesterClientQueues__2a96b18c-ab99-4e99-a48f-d6c4b92043f1, DataType=String)})

こちらですね。

{__AmazonSQSVirtualQueuesClient.QueueName=MessageAttributeValue(StringValue=__RequesterClientQueues__2a96b18c-ab99-4e99-a48f-d6c4b92043f1, DataType=String)}

呼び出しが終わったら、それぞれ停止。

                requester.shutdown();
                sqsClient.close();

送信するメッセージを増やしてみる

次に、送信するメッセージを増やして、ちゃんとRPCとして動作できているか確認してみたいと思います。

まずはサーバー側。メッセージごとに、ランダムにスリープするようにしてみます。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcSCerver.java

// 省略

public class TemporaryQueueRpcServer {

    // 省略

    public static TemporaryQueueRpcServer createRandomSleep(String queueUrl) {
        Random random = new Random();
        random.nextInt(10);

        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        SqsClient sqsClient = LocalSqsBuilder.create();

        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqsClient)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withPollingThreadCount(5)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            try {
                                int sleepTime = random.nextInt(10);
                                System.out.printf("Server[%s]: %d sec sleep%n", Thread.currentThread().getName(), sleepTime);
                                TimeUnit.SECONDS.sleep(sleepTime);
                            } catch (InterruptedException e) {
                                // ignore
                            }

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqsClient = sqsClient;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    // 省略
}

前回ElasticMQで試した時はポーリングするスレッド数を増やすと挙動が不安定になりましたが、今回はそうはなりませんでした。

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withPollingThreadCount(5)

メッセージを受信した後は、ランダムにスリープさせています。

                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            try {
                                int sleepTime = random.nextInt(10);
                                System.out.printf("Server[%s]: %d sec sleep%n", Thread.currentThread().getName(), sleepTime);
                                TimeUnit.SECONDS.sleep(sleepTime);
                            } catch (InterruptedException e) {
                                // ignore
                            }

続いて、クライアント側。50個のメッセージを送るようにしてみます。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcTest.java

// 省略

public class TemporaryQueueRpcTest {

    // 省略

    void withRandomSleepServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.createRandomSleep(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    @Test
    public void concurrent() throws Exception {
        String queueUrl = "http://localhost:4566/000000000000/my-queue";

        withRandomSleepServer(queueUrl, () -> {
            SqsClient sqsClient = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqsClient)
                            .build();

            try {
                Map<String, CompletableFuture<Message>> futures = new LinkedHashMap<>();

                for (int i = 0; i < 50; i++) {
                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            SendMessageRequest
                                    .builder()
                                    .queueUrl(queueUrl)
                                    .messageBody(uuid)
                                    .build();

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

                    futures.put(uuid, responseMessage);
                }

                assertThat(futures).hasSize(50);

                futures.forEach((uuid, message) -> {
                    long start = System.currentTimeMillis();
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    System.out.println("elapsed: " + (System.currentTimeMillis() - start) / 1000.0 + " sec");
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });
            } finally {
                requester.shutdown();
                sqsClient.close();
            }
        });
    }
}

送信するメッセージの内容は、リクエストごとに別々になるようにUUIDにしてみました。

                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            SendMessageRequest
                                    .builder()
                                    .queueUrl(queueUrl)
                                    .messageBody(uuid)
                                    .build();

メッセージの送信は、AmazonSQSRequester#sendMessageAndGetResponseAsyncを使って非同期に変更。

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

これで、期待のメッセージが処理できているか確認。

                assertThat(futures).hasSize(50);

                futures.forEach((uuid, message) -> {
                    long start = System.currentTimeMillis();
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    System.out.println("elapsed: " + (System.currentTimeMillis() - start) / 1000.0 + " sec");
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });

リクエストとレスポンスの内容が対になっていたので、今回もRPCとして動作できていることは確認できました。

ただ、今回の環境ではメッセージをたくさん送るとサーバー側である程度メッセージを処理しきらないとクライアントにメッセージが
返らないようで、今回の書き方だと最初に送ったメッセージを受け取るまでの時間がかなり長くなったりします。
しかも安定していない感じがするので、このあたりは本当に使うならちゃんとしたAmazon SQSで挙動を確認した方が良さそうですね。

とりあえず、Amazon SQS Java Temporary Queue Clientの2.0.0と一時キューを使ってRPCを行うという目的は達成できたので、
良しとしますか。

まとめ

2.0.0になったAmazon SQS Java Temporary Queue ClientとLocalStackを使って、一時キューとRPCを試してみました。
前回はけっこうハマったのですが、今回はそうでもなかったですね。ElasticMQでなくても、LocalStackでも動きましたし。

ただ、たくさんメッセージを送った時の動きが不安定なのは今回も変わらなかったので、実際の挙動は本物を使って確認、というのが
正解だろうという感覚も変わりませんが、やりやすくなったのは良かったかなと思います。

AWS SDK for Java v1の制約もなくなりましたし。