これは、なにをしたくて書いたもの?
Amazon SQSで、Temporary Queueを使ったRPCができるというドキュメント、ブログを見かけまして。
AWS互換のソフトウェアを使って、ちょっとローカルで試してみようと思います。
Amazon SQSのTemporary QueueでRPC
Amazon SQSのTemporary Queueについては、こちらに記載があります。
Amazon SQS一時キュー - Amazon Simple Queue Service
Temporary Queueを使うと、リクエスト ー レスポンスのようなメッセージングができるようです。
Simple Two-way Messaging using the Amazon SQS Temporary Queue Client | AWS Compute Blog
Temporary Queueを使う時のクライアントライブラリとしては、こちらのAmazon SQS Java Temporary Queue Clientを
使うようです。
サンプルはこちら。
こちらなのですが、AWS SDK for Javaのバージョン1がベースになっているんですよね。
Temporary Queueのドキュメントを見ていると通常のAmazon SQS互換だということなのですが、バージョン2や
その他の言語のライブラリからはどう使うんでしょうね?
ちなみに、AWS SDK for Javaのバージョン2のリポジトリにはissueがありましたが、特に進んではいなさそうです。
Two Way Messaging - Virtual Queues · Issue #1647 · aws/aws-sdk-java-v2 · GitHub
ドキュメントを見ていると、Temporary Queueを使わなくてもRPCを実装できそうな感じはするんですよね。
Amazon SQS メッセージの操作 / リクエストと応答システムの実装
それはまた今度試してみましょうか。
LocalStackで試そうとするも
AWSのサービスをある程度ローカルで動かせるソフトウェアといえば、LocalStackです。ただ、LocalStackでは
うまくいきませんでした。
$ localstack --version 0.12.17.5
こちらと同じ問題を踏むようです。
SQS_PROVIDER
環境変数でelasticmq
を指定すると良い、と書かれてもいるのですが、こちらの指定でも
$ SQS_PROVIDER=elasticmq localstack start
こちらの指定でもうまくいかなったので、今回は諦めることにしました。
$ docker container run --rm -it -p 4566:4566 -p 4571:4571 -e SQS_PROVIDER=elasticmq localstack/localstack
ElasticMQを使う
LocalStackではなく、ElasticMQを直接使うとうまくいったので、今回はこちらを使うことにしました。
ElasticMQは、Amazon SQS互換のインターフェースを持ったインメモリ・メッセージキューです。
Scala・Akkaで実装されているようですね。
今回はElasticMQ自体にはあまり踏み込まず、単に使うだけにします。Dockerイメージもあるようなので、こちらを
使いましょう。
DockerHub / softwaremill/elasticmq
こちらで起動しておきます。
$ docker container run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq:1.2.1
これで、ElasticMQにはローカルのポート9324でアクセスできます。
ちなみに、GraalVMでネイティブ化したイメージもあるようです。
DockerHub / softwaremill/elasticmq-native
ElasticMQにキューを作成する
Temporary Queueを使うにも、本体のキューの作成は必要なので作成しておきます。
$ aws --version aws-cli/2.2.39 Python/3.8.8 Linux/5.4.0-84-generic exe/x86_64.ubuntu.20 prompt/off $ export AWS_ACCESS_KEY_ID=..... $ export AWS_SECRET_ACCESS_KEY=..... $ export AWS_DEFAULT_REGION=us-east-1
作成。AWSのクレデンシャルは、適当でOKです。
$ aws sqs create-queue --endpoint-url http://localhost:9324 --queue-name myqueue { "QueueUrl": "http://localhost:9324/000000000000/myqueue" }
AWS CLIで操作する時のエンドポイントの指定にさえ気をつければOKです。
では、このキューを使ったプログラムを書いていきます。
環境
今回の環境は、こちら。
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.2 (ea98e05a04480131370aa0c110b8c54cf726c06f) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-84-generic", arch: "amd64", family: "unix"
Maven依存関係等は、このように設定。
<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-temporary-queues-client</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.8.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.20.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
Temporary Queueを使うにあたってのポイントはここだけですね。
<dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-temporary-queues-client</artifactId> <version>1.2.2</version> </dependency>
あとはテストコードのための依存関係です。確認は、テストコードで行います。
テストコードで確認する
今回、Amazon SQSのTemporary Queueを使うにあたり、簡単なEcho的なクライアント・サーバーを書いてみたいと思います。
お題として、クライアントがサーバーにメッセージを送り、サーバーはそのメッセージに装飾を付けて返すというものに
しましょう。
参考にするサンプルは、このあたりです。
Amazon SQS 一時キュー / シナリオの例: ログインリクエストの処理
ElasticMQに接続するには、AWS SDK for Javaのエンドポイントを変更する必要があります。また、今回作成する
クライアント、サーバーともにElasticMQに接続する必要があるので、AmazonSQS
のインスタンスを作成するところを
共通に切り出しました。
src/test/java/org/littlewings/localstack/sqs/LocalSqsBuilder.java
package org.littlewings.localstack.sqs; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; public class LocalSqsBuilder { public static AmazonSQS create() { String accessKeyId = "[dummy]"; String secretAccessKeyId = "[dummy]"; String elasticmqSqsEndpoint = "http://localhost:9324"; String region = "us-east-1"; BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKeyId); AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(elasticmqSqsEndpoint, region); return AmazonSQSClientBuilder .standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withEndpointConfiguration(endpointConfiguration) .build(); } }
次に、サーバーを作成します。
src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcServer.java
package org.littlewings.localstack.sqs; import java.util.Random; import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSResponder; import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder; import com.amazonaws.services.sqs.MessageContent; import com.amazonaws.services.sqs.util.SQSMessageConsumer; import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder; public class TemporaryQueueRpcServer { AmazonSQS sqs; AmazonSQSResponder responder; SQSMessageConsumer consumer; TemporaryQueueRpcServer() { } public static TemporaryQueueRpcServer create(String queueUrl) { TemporaryQueueRpcServer server = new TemporaryQueueRpcServer(); AmazonSQS sqs = LocalSqsBuilder.create(); AmazonSQSResponder responder = AmazonSQSResponderClientBuilder.standard() .withAmazonSQS(sqs) .build(); SQSMessageConsumer consumer = SQSMessageConsumerBuilder .standard() .withAmazonSQS(sqs) .withQueueUrl(queueUrl) .withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); responder.sendResponseMessage(requestContent, responseContent); }) .build(); server.sqs = sqs; server.responder = responder; server.consumer = consumer; return server; } public void start() { consumer.start(); } public void stop() { consumer.shutdown(); responder.shutdown(); sqs.shutdown(); } }
AmazonSQS
のインスタンスを作成して
AmazonSQS sqs = LocalSqsBuilder.create();
受信したメッセージに応答するためのAmazonSQSResponder
のインスタンスを作成。
AmazonSQS sqs = LocalSqsBuilder.create(); AmazonSQSResponder responder = AmazonSQSResponderClientBuilder.standard() .withAmazonSQS(sqs) .build();
また、実際にメッセージを受け取る部分と、受け取ったメッセージを処理する部分はSQSMessageConsumer
として
作成します。
SQSMessageConsumer consumer = SQSMessageConsumerBuilder .standard() .withAmazonSQS(sqs) .withQueueUrl(queueUrl) .withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); responder.sendResponseMessage(requestContent, responseContent); }) .build();
SQSMessageConsumer
は、実際にはキューをポーリングして確認するものになります。今回は設定していませんが、
ポーリングを行うスレッドを増やしたりすることも可能です。ElasticMQの場合は増やすと挙動が安定しなくなったので、
やめましたが…。
このため、どのキューをポーリングするのか指定が必要です。
.withQueueUrl(queueUrl)
ここで指定するキューのURLは、先ほど作成したこちらのURLを指定することになります。
※指定自体は、今回は別のクラスから渡すことにします。
{ "QueueUrl": "http://localhost:9324/000000000000/myqueue" }
ポーリングの開始は、SQSMessageConsumer#start
で行います。
public void start() { consumer.start(); }
終了は、各種shutdown
メソッドで。
public void stop() { consumer.shutdown(); responder.shutdown(); sqs.shutdown(); }
クライアントは、テストコードとして書くことにします。
src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcTest.java
package org.littlewings.localstack.sqs; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSRequester; import com.amazonaws.services.sqs.AmazonSQSRequesterClientBuilder; import com.amazonaws.services.sqs.MessageContent; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.SendMessageRequest; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class TemporaryQueueRpcTest { interface ThrowableRunnable { void run() throws Exception; } public void withServer(String queueUrl, ThrowableRunnable runnable) throws Exception { TemporaryQueueRpcServer server = TemporaryQueueRpcServer.create(queueUrl); server.start(); try { runnable.run(); } finally { server.stop(); } } // ここに、テストコードを書く! ]
先ほど作成したサーバーを起動停止するコードもつけておきます。
では、書いてみます。
@Test public void simpleRpc() throws Exception { String queueUrl = "http://localhost:9324/000000000000/myqueue"; withServer(queueUrl, () -> { AmazonSQS sqs = LocalSqsBuilder.create(); AmazonSQSRequester requester = AmazonSQSRequesterClientBuilder .standard() .withAmazonSQS(sqs) .build(); try { SendMessageRequest request = new SendMessageRequest() .withMessageBody("Hello World!!") .withQueueUrl(queueUrl); Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS); MessageContent responseContent = MessageContent.fromMessage(responseMessage); System.out.println(responseMessage); System.out.println(responseContent.getMessageAttributes()); assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★"); } finally { requester.shutdown(); sqs.shutdown(); } }); }
クライアントは、AmazonSQSRequester
のインスタンスを作成します。
AmazonSQSRequester requester = AmazonSQSRequesterClientBuilder .standard() .withAmazonSQS(sqs) .build();
送信先のキューのURLとメッセージ本文を指定して、SendMessageRequest
のインスタンスを作成。
メッセージはString
として送る必要があるみたいです。
SendMessageRequest request = new SendMessageRequest() .withMessageBody("Hello World!!") .withQueueUrl(queueUrl);
リクエストは、AmazonSQSRequester#sendMessageAndGetResponse
で送ってレスポンスを待ちます。
この場合は、同期呼び出しになりますね。
Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS); MessageContent responseContent = MessageContent.fromMessage(responseMessage); System.out.println(responseMessage); System.out.println(responseContent.getMessageAttributes()); assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");
レスポンスはMessage
となり、MessageContent
に変換することでメッセージ本文や属性を扱えるようになります。
サーバーから返ってきたメッセージの確認。
assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");
ちなみに、ここの出力結果は
System.out.println(responseMessage); System.out.println(responseContent.getMessageAttributes());
こんな感じになります。
{MessageId: 87da2d24-c0ce-444f-a954-356cac88d9ad,ReceiptHandle: 87da2d24-c0ce-444f-a954-356cac88d9ad#3e0b5598-c725-422a-b675-9120eb9bc14a,MD5OfBody: b14f6a41b31fd409988bec57f9a1a5cb,Body: ★★★Hello World!!★★★,Attributes: {ApproximateReceiveCount=1, SentTimestamp=1631948316882, SenderId=127.0.0.1, ApproximateFirstReceiveTimestamp=1631948316882},MD5OfMessageAttributes: 0438dbaa5711f2dfa98c7cdfcd2d321f,MessageAttributes: {__AmazonSQSVirtualQueuesClient.QueueName={StringValue: __RequesterClientQueues__3e338723-88b2-45da-802e-b48492f63d2c,StringListValues: [],BinaryListValues: [],DataType: String}}} {__AmazonSQSVirtualQueuesClient.QueueName={StringValue: __RequesterClientQueues__3e338723-88b2-45da-802e-b48492f63d2c,StringListValues: [],BinaryListValues: [],DataType: String}}
RPCが終わったら、それぞれインスタンスをshutdown
。
requester.shutdown(); sqs.shutdown();
もう少し送信するメッセージを増やしてみる
せっかくなので、ちゃんとRPCできているかどうか、もっとメッセージを増やして確認してみたいと思います。
まずはサーバー側に、こんな感じでレスポンスを返す際にランダムでスリープする処理を追加。
src/test/java/org/littlewings/localstack/sqs/TemporaryQueueRpcServer.java
package org.littlewings.localstack.sqs; import java.util.Random; import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSResponder; import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder; import com.amazonaws.services.sqs.MessageContent; import com.amazonaws.services.sqs.util.SQSMessageConsumer; import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder; public class TemporaryQueueRpcServer { AmazonSQS sqs; AmazonSQSResponder responder; SQSMessageConsumer consumer; TemporaryQueueRpcServer() { } // 省略 public static TemporaryQueueRpcServer createRandomSleep(String queueUrl) { Random random = new Random(); random.nextInt(10); TemporaryQueueRpcServer server = new TemporaryQueueRpcServer(); AmazonSQS sqs = LocalSqsBuilder.create(); AmazonSQSResponder responder = AmazonSQSResponderClientBuilder.standard() .withAmazonSQS(sqs) .build(); SQSMessageConsumer consumer = SQSMessageConsumerBuilder .standard() .withAmazonSQS(sqs) .withQueueUrl(queueUrl) .withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); try { TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (InterruptedException e) { // ignore } responder.sendResponseMessage(requestContent, responseContent); }) .build(); server.sqs = sqs; server.responder = responder; server.consumer = consumer; return server; } // 省略 }
クライアント側からは、メッセージを50個送ってみます。
public void withRandomSleepServer(String queueUrl, ThrowableRunnable runnable) throws Exception { TemporaryQueueRpcServer server = TemporaryQueueRpcServer.createRandomSleep(queueUrl); server.start(); try { runnable.run(); } finally { server.stop(); } } @Test public void concurrent() throws Exception { String queueUrl = "http://localhost:9324/000000000000/myqueue"; withRandomSleepServer(queueUrl, () -> { AmazonSQS sqs = LocalSqsBuilder.create(); AmazonSQSRequester requester = AmazonSQSRequesterClientBuilder .standard() .withAmazonSQS(sqs) .build(); try { Map<String, CompletableFuture<Message>> futures = new LinkedHashMap<>(); for (int i = 0; i < 50; i++) { String uuid = UUID.randomUUID().toString(); SendMessageRequest request = new SendMessageRequest() .withMessageBody(uuid) .withQueueUrl(queueUrl); CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS); futures.put(uuid, responseMessage); } assertThat(futures).hasSize(50); futures.forEach((uuid, message) -> { long start = System.currentTimeMillis(); MessageContent responseContent = MessageContent.fromMessage(message.join()); System.out.println((System.currentTimeMillis() - start) / 1000.0); assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★"); }); } finally { requester.shutdown(); sqs.shutdown(); } }); }
レスポンスの受信は非同期に。今回は、タイムアウトを長めに取らないとタイムアウトが多発しました…。
CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);
送信するメッセージはUUIDにして
for (int i = 0; i < 50; i++) { String uuid = UUID.randomUUID().toString(); SendMessageRequest request = new SendMessageRequest() .withMessageBody(uuid) .withQueueUrl(queueUrl);
リクエストしたUUIDに対応する値が返ってくることを確認。
futures.forEach((uuid, message) -> { MessageContent responseContent = MessageContent.fromMessage(message.join()); assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★"); });
とりあえず、RPCっぽく動いていることは確認できました。
なんですけど、今回の実装だとサーバー側が全部のメッセージを処理し終えないと、クライアントに最初のレスポンスが
返らないみたいなんですよね。かといって、サーバーのSQSMessageConsumer
のスレッド数を増やすと動作が
安定しなくなるという…。
これがAmazon SQS Java Temporary Queue Clientの使い方が悪いのか、ElasticMQの使い方が悪いのかは
微妙なところですが…。
まあ、とりあえず使い方と動作の簡単な確認はできたので良しとしますか。
まとめ
Amazon SQS互換のElasticMQを使って、Temporary QueueとRPCを試してみました。
AWS SDK for Java+Amazon SQS Java Temporary Queue Clientというよりは、LocalStackやElasticMQという互換のものに
ハマった意味合いも強いのですが…。
やっぱり、こういうのは互換サービスではなく実際に使う時の本物で確認しないと、って感じでしょうね。
とりあえず、感覚を掴む分にはよいかなという気はします。