これは、なにをしたくて書いたもの?
以前、Amazon SQS Java Temporary Queue ClientとElasticMQを使って、一時キューとRPCを使ったエントリーを書いたことがあります。
Amazon SQS互換のElasticMQを使って、Temoporary Queue+RPCを試してみる - CLOVER🍀
このAmazon SQS Java Temporary Queue Clientですが、AWS SDK for Javaのバージョン1を使っていたのですが、2022年の5月に
AWS SDK for Java v2を使うようになった2.0.0がリリースされていたので、ちょっと試してみようかなと。
Amazon SQSの一時キューとRPC
そもそも、Amazon SQSの一時キューとRPCについて思い出すところから。
一時キューについてはこちらに記載があり、Amazon SQS Java Temporary Queue Clientを使うことで一時キューを作ることができると
書かれています。
Amazon SQS一時キュー - Amazon Simple Queue Service
Amazon SQS Java Temporary Queue ClientのGitHubリポジトリは、こちら。
そして、一時キューの最も一般的な使用例がリクエスト - レスポンス型式のメッセージングパターンだとされています。
例として、ログイン処理をAmazon SQSを介して別のアプリケーション(サーバー側)に処理させ、結果を受け取る(クライアント側)ような
ソースコードが書かれています。
同様のことが、AWSのブログエントリーにも書かれています。
Simple Two-way Messaging using the Amazon SQS Temporary Queue Client | AWS Compute Blog
それで、以前試した時にはAmazon SQS Java Temporary Queue ClientはAWS SDK for Java v1を使っており、v2に対してはissueがあるだけ
だったのですが、Amazon SQS Java Temporary Queue Client 2.0.0がリリースされたことで状況が変わったようです。
Two Way Messaging - Virtual Queues · Issue #1647 · aws/aws-sdk-java-v2 · GitHub
Amazon SQS Java Temporary Queue Client 2.0.0からは、AWS SDK for Java v2を使います。
せっかくなので、今回はAmazon SQS Java Temporary Queue Client 2.0.0とLocalStackを使って、前のエントリーで書いた内容を
書き直してみようと思います。
環境
今回の環境は、こちら。
LocalStack。
$ python3 -V Python 3.10.6 $ localstack --version 1.3.1
起動。
$ localstack start
AWS CLI。LocalStackの提供するものを重ねています。
$ awslocal --version aws-cli/2.9.21 Python/3.9.11 Linux/5.15.0-58-generic exe/x86_64.ubuntu.22 prompt/off
Java。
$ java --version openjdk 17.0.5 2022-10-18 OpenJDK Runtime Environment (build 17.0.5+8-Ubuntu-2ubuntu122.04) OpenJDK 64-Bit Server VM (build 17.0.5+8-Ubuntu-2ubuntu122.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.7 (b89d5959fcde851dcb1c8946a785a163f14e1e29) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.5, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-58-generic", arch: "amd64", family: "unix"
準備
まずは、キューを作成する必要がありますね。今回はmy-queue
という名前で作成しました。
$ awslocal sqs create-queue --queue-name my-queue { "QueueUrl": "http://localhost:4566/000000000000/my-queue" }
次に、Amazon SQS Java Temporary Queue Clientを使うためにMaven依存関係などの設定を行います。確認はテストコードで行うことに
するので、JUnit等も合わせて追加。
<properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <aws-java-sdk.version>2.19.31</aws-java-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-temporary-queues-client</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> <version>2.19.31</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.9.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.24.2</version> <scope>test</scope> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>test-utils</artifactId> <version>2.19.31</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M7</version> </plugin> </plugins> </build>
Amazon SQS Java Temporary Queue Clientは以下で、推移的依存関係の中にAWS SDK for Java v2も含まれてはいるのですが、
<dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-temporary-queues-client</artifactId> <version>2.0.0</version> </dependency>
ちょっと古かったので、明示的に新しいバージョンを指定しておきました。
<dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> <version>2.19.31</version> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>test-utils</artifactId> <version>2.19.31</version> <scope>test</scope> </dependency>
簡単なEchoクライアント/サーバーを書く
では、Amazon SQSの一時キューを使ったプログラムを書くわけですが、前回のお題と同様に簡単なEchoクライアント/サーバーを
書いていこうと思います。
クライアントがサーバーにメッセージを送り、サーバーは受け取ったメッセージに簡単に装飾して返すというものにします。
まずは、LocalStack上のAmazon SQSにアクセスするためのSqsClient
を作成する部分。
src/test/java/org/littlewings/sqs/LocalSqsBuilder.java
package org.littlewings.sqs; import java.net.URI; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; public class LocalSqsBuilder { public static SqsClient create() { return SqsClient .builder() .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create("mock", "mock") ) ) .region(Region.US_EAST_1) .endpointOverride(URI.create("http://localhost:4566")) .build(); } }
クレデンシャルは固定、エンドポイントはLocalStackのものを指すようにしています。
次に、サーバーを作成。
src/test/java/org/littlewings/sqs/TemporaryQueueRpcServer.java
package org.littlewings.sqs; import java.util.Random; import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.AmazonSQSResponder; import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder; import com.amazonaws.services.sqs.MessageContent; import com.amazonaws.services.sqs.util.SQSMessageConsumer; import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder; import software.amazon.awssdk.services.sqs.SqsClient; public class TemporaryQueueRpcServer { SqsClient sqsClient; AmazonSQSResponder responder; SQSMessageConsumer consumer; TemporaryQueueRpcServer() { } public static TemporaryQueueRpcServer create(String queueUrl) { TemporaryQueueRpcServer server = new TemporaryQueueRpcServer(); SqsClient sqsClient = LocalSqsBuilder.create(); AmazonSQSResponder responder = AmazonSQSResponderClientBuilder .standard() .withAmazonSQS(sqsClient) .build(); SQSMessageConsumer consumer = SQSMessageConsumerBuilder .standard() .withAmazonSQS(sqsClient) .withQueueUrl(queueUrl) .withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); responder.sendResponseMessage(requestContent, responseContent); }) .build(); server.sqsClient = sqsClient; server.responder = responder; server.consumer = consumer; return server; } public void start() { consumer.start(); } public void stop() { consumer.close(); responder.shutdown(); sqsClient.close(); } }
こちらが受け取ったメッセージを送り返すためのAmazonSQSResponder
、
AmazonSQSResponder responder = AmazonSQSResponderClientBuilder .standard() .withAmazonSQS(sqsClient) .build();
こちらがメッセージを受信して処理を行うためのSQSMessageConsumer
です。
SQSMessageConsumer consumer = SQSMessageConsumerBuilder .standard() .withAmazonSQS(sqsClient) .withQueueUrl(queueUrl) .withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); responder.sendResponseMessage(requestContent, responseContent); }) .build();
Amazon SQSではキューをポーリングしてメッセージの監視を行うので、SQSMessageConsumer
にはキューのURLを設定する必要が
あります。
SQSMessageConsumer#start
でキューの監視が始まります。
public void start() { consumer.start(); }
停止はこちら。
public void stop() { consumer.close(); responder.shutdown(); sqsClient.close(); }
クライアント側は、テストコードとして作成します。まずは雛形から。
src/test/java/org/littlewings/sqs/TemporaryQueueRpcTest.java
package org.littlewings.sqs; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.AmazonSQSRequester; import com.amazonaws.services.sqs.AmazonSQSRequesterClientBuilder; import com.amazonaws.services.sqs.MessageContent; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import static org.assertj.core.api.Assertions.assertThat; public class TemporaryQueueRpcTest { interface ThrowableRunnable { void run() throws Exception; } void withServer(String queueUrl, ThrowableRunnable runnable) throws Exception { TemporaryQueueRpcServer server = TemporaryQueueRpcServer.create(queueUrl); server.start(); try { runnable.run(); } finally { server.stop(); } } // ここに、テストを書く!! }
先ほど書いたサーバーの、起動と停止を行うメソッドも付けています。
では、クライアント側を書いてみます。
@Test void simpleRpc() throws Exception { String queueUrl = "http://localhost:4566/000000000000/my-queue"; withServer(queueUrl, () -> { SqsClient sqsClient = LocalSqsBuilder.create(); AmazonSQSRequester requester = AmazonSQSRequesterClientBuilder .standard() .withAmazonSQS(sqsClient) .build(); try { SendMessageRequest request = SendMessageRequest .builder() .queueUrl(queueUrl) .messageBody("Hello World!!") .build(); Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS); MessageContent responseContent = MessageContent.fromMessage(responseMessage); System.out.println(responseMessage); System.out.println(responseContent.getMessageAttributes()); assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★"); } finally { requester.shutdown(); sqsClient.close(); } }); }
メッセージ送信を行うのは、AmazonSQSRequester
となります。
AmazonSQSRequester requester = AmazonSQSRequesterClientBuilder .standard() .withAmazonSQS(sqsClient) .build();
AmazonSQSRequester#sendMessageAndGetResponse
で、メッセージ送信。ここでは、同期呼び出しを行っています。
Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
結果はここで1度出力していますが、
System.out.println(responseMessage); System.out.println(responseContent.getMessageAttributes());
それぞれ、こちらと
Message(MessageId=2f0e38ea-8440-4247-ab9b-260306b29273, ReceiptHandle=NDA5OWM0ZWUtOTEzMi00YTUzLThlMDEtOTkyZGFiNGNiYzkxIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6X19SZXF1ZXN0ZXJDbGllbnRRdWV1ZXNfX2ViOTNlNDkyLWFhOGYtNGNlNC04NGI1LWZiZDQwMzllMDRkNC0wIDJmMGUzOGVhLTg0NDAtNDI0Ny1hYjliLTI2MDMwNmIyOTI3MyAxNjc1NjA5Njk2LjE5MDY1OQ==, MD5OfBody=b14f6a41b31fd409988bec57f9a1a5cb, Body=★★★Hello World!!★★★, Attributes={ApproximateReceiveCount=1, SentTimestamp=1675609696169, SenderId=000000000000, ApproximateFirstReceiveTimestamp=1675609696190}, MD5OfMessageAttributes=d3d6adc048d9998de883abfd0e923eca, MessageAttributes={__AmazonSQSVirtualQueuesClient.QueueName=MessageAttributeValue(StringValue=__RequesterClientQueues__2a96b18c-ab99-4e99-a48f-d6c4b92043f1, DataType=String)})
こちらですね。
{__AmazonSQSVirtualQueuesClient.QueueName=MessageAttributeValue(StringValue=__RequesterClientQueues__2a96b18c-ab99-4e99-a48f-d6c4b92043f1, DataType=String)}
呼び出しが終わったら、それぞれ停止。
requester.shutdown(); sqsClient.close();
送信するメッセージを増やしてみる
次に、送信するメッセージを増やして、ちゃんとRPCとして動作できているか確認してみたいと思います。
まずはサーバー側。メッセージごとに、ランダムにスリープするようにしてみます。
src/test/java/org/littlewings/sqs/TemporaryQueueRpcSCerver.java
// 省略 public class TemporaryQueueRpcServer { // 省略 public static TemporaryQueueRpcServer createRandomSleep(String queueUrl) { Random random = new Random(); random.nextInt(10); TemporaryQueueRpcServer server = new TemporaryQueueRpcServer(); SqsClient sqsClient = LocalSqsBuilder.create(); AmazonSQSResponder responder = AmazonSQSResponderClientBuilder.standard() .withAmazonSQS(sqsClient) .build(); SQSMessageConsumer consumer = SQSMessageConsumerBuilder .standard() .withAmazonSQS(sqsClient) .withQueueUrl(queueUrl) .withPollingThreadCount(5) .withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); try { int sleepTime = random.nextInt(10); System.out.printf("Server[%s]: %d sec sleep%n", Thread.currentThread().getName(), sleepTime); TimeUnit.SECONDS.sleep(sleepTime); } catch (InterruptedException e) { // ignore } responder.sendResponseMessage(requestContent, responseContent); }) .build(); server.sqsClient = sqsClient; server.responder = responder; server.consumer = consumer; return server; } // 省略 }
前回ElasticMQで試した時はポーリングするスレッド数を増やすと挙動が不安定になりましたが、今回はそうはなりませんでした。
SQSMessageConsumer consumer =
SQSMessageConsumerBuilder
.standard()
.withAmazonSQS(sqsClient)
.withQueueUrl(queueUrl)
.withPollingThreadCount(5)
メッセージを受信した後は、ランダムにスリープさせています。
.withConsumer(requestMessage -> { MessageContent requestContent = MessageContent.fromMessage(requestMessage); String requestBody = requestContent.getMessageBody(); MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★"); try { int sleepTime = random.nextInt(10); System.out.printf("Server[%s]: %d sec sleep%n", Thread.currentThread().getName(), sleepTime); TimeUnit.SECONDS.sleep(sleepTime); } catch (InterruptedException e) { // ignore }
続いて、クライアント側。50個のメッセージを送るようにしてみます。
src/test/java/org/littlewings/sqs/TemporaryQueueRpcTest.java
// 省略 public class TemporaryQueueRpcTest { // 省略 void withRandomSleepServer(String queueUrl, ThrowableRunnable runnable) throws Exception { TemporaryQueueRpcServer server = TemporaryQueueRpcServer.createRandomSleep(queueUrl); server.start(); try { runnable.run(); } finally { server.stop(); } } @Test public void concurrent() throws Exception { String queueUrl = "http://localhost:4566/000000000000/my-queue"; withRandomSleepServer(queueUrl, () -> { SqsClient sqsClient = LocalSqsBuilder.create(); AmazonSQSRequester requester = AmazonSQSRequesterClientBuilder .standard() .withAmazonSQS(sqsClient) .build(); try { Map<String, CompletableFuture<Message>> futures = new LinkedHashMap<>(); for (int i = 0; i < 50; i++) { String uuid = UUID.randomUUID().toString(); SendMessageRequest request = SendMessageRequest .builder() .queueUrl(queueUrl) .messageBody(uuid) .build(); CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS); futures.put(uuid, responseMessage); } assertThat(futures).hasSize(50); futures.forEach((uuid, message) -> { long start = System.currentTimeMillis(); MessageContent responseContent = MessageContent.fromMessage(message.join()); System.out.println("elapsed: " + (System.currentTimeMillis() - start) / 1000.0 + " sec"); assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★"); }); } finally { requester.shutdown(); sqsClient.close(); } }); } }
送信するメッセージの内容は、リクエストごとに別々になるようにUUIDにしてみました。
String uuid = UUID.randomUUID().toString(); SendMessageRequest request = SendMessageRequest .builder() .queueUrl(queueUrl) .messageBody(uuid) .build();
メッセージの送信は、AmazonSQSRequester#sendMessageAndGetResponseAsync
を使って非同期に変更。
CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);
これで、期待のメッセージが処理できているか確認。
assertThat(futures).hasSize(50); futures.forEach((uuid, message) -> { long start = System.currentTimeMillis(); MessageContent responseContent = MessageContent.fromMessage(message.join()); System.out.println("elapsed: " + (System.currentTimeMillis() - start) / 1000.0 + " sec"); assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★"); });
リクエストとレスポンスの内容が対になっていたので、今回もRPCとして動作できていることは確認できました。
ただ、今回の環境ではメッセージをたくさん送るとサーバー側である程度メッセージを処理しきらないとクライアントにメッセージが
返らないようで、今回の書き方だと最初に送ったメッセージを受け取るまでの時間がかなり長くなったりします。
しかも安定していない感じがするので、このあたりは本当に使うならちゃんとしたAmazon SQSで挙動を確認した方が良さそうですね。
とりあえず、Amazon SQS Java Temporary Queue Clientの2.0.0と一時キューを使ってRPCを行うという目的は達成できたので、
良しとしますか。
まとめ
2.0.0になったAmazon SQS Java Temporary Queue ClientとLocalStackを使って、一時キューとRPCを試してみました。
前回はけっこうハマったのですが、今回はそうでもなかったですね。ElasticMQでなくても、LocalStackでも動きましたし。
ただ、たくさんメッセージを送った時の動きが不安定なのは今回も変わらなかったので、実際の挙動は本物を使って確認、というのが
正解だろうという感覚も変わりませんが、やりやすくなったのは良かったかなと思います。