これは、なにをしたくて書いたもの?
Amazon SQSをちょっと試しておきたいなと思ったのですが、動かす時にいろいろ考えた結果FIFOキューを試してみることにしました。
Amazon SQS自体は本物ではなく、LocalStackのものを使用します。
Amazon SQS
Amazon SQS自体はこちら。
Amazon SQS(サーバーレスアプリのためのメッセージキューサービス)| AWS
「What is Amazon Simple Queue Service? - Amazon Simple Queue Service
AWSが提供する、分散キューのようです。
設定や仕組みなど、ドキュメントにいろいろ書かれています。
Amazon SQS キューの設定 (コンソール) - Amazon Simple Queue Service
Amazon SQS の仕組み - Amazon Simple Queue Service
キューは標準キューとFIFOキューの2種類があるようです。
Amazon SQS 標準キュー - Amazon Simple Queue Service
Amazon SQS FIFO (先入れ先出し) キュー - Amazon Simple Queue Service
標準キューとFIFOキューの特徴は、こんな感じのようです。
今回は、FIFOキューを使ってみたいと思います。
TerraformでAmazon SQSのFIFOキューをLocalStack上に作成し、AWS SDK for Java v2を使ってアクセスすることをテーマにしてみたいと
思います。
FIFOキューは、こちらも参考に。
【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応) | DevelopersIO
環境
今回の環境は、こちら。
LocalStack。
$ localstack --version 1.0.3
起動。
$ localstack start
Terraform。
$ terraform version Terraform v1.2.6 on linux_amd64
Java。
$ java --version openjdk 17.0.3 2022-04-19 OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1) OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-122-generic", arch: "amd64", family: "unix"
Amazon SQSのFIFOキューを作成する
では、まずAmazon SQSのFIFOキューを作成しましょう。
作成したTerraformの構成ファイルは、こちら。
main.tf
terraform { required_providers { aws = { source = "hashicorp/aws" version = "4.24.0" } } } provider "aws" { access_key = "mock_access_key" region = "us-east-1" secret_key = "mock_secret_key" skip_credentials_validation = true skip_metadata_api_check = true skip_requesting_account_id = true endpoints { sqs = "http://localhost:4566" } } resource "aws_sqs_queue" "queue" { name = "my-queue.fifo" fifo_queue = true content_based_deduplication = true } output "queue_url" { value = aws_sqs_queue.queue.url }
TerraformでLocalStackに接続する設定は、こちらからAmazon SQSの部分のみ引用しています。
Custom Service Endpoint Configuration / Connecting to Local AWS Compatible Solutions / LocalStack
provider "aws" { access_key = "mock_access_key" region = "us-east-1" secret_key = "mock_secret_key" skip_credentials_validation = true skip_metadata_api_check = true skip_requesting_account_id = true endpoints { sqs = "http://localhost:4566" } }
FIFOキューの定義は、こちらを使いました。
Resource: aws_sqs_queue / FIFO queue
content_based_deduplication
というのは、重複メッセージを防止できる設定のようです。
resource "aws_sqs_queue" "queue" { name = "my-queue.fifo" fifo_queue = true content_based_deduplication = true }
こちらですね。
1 回だけの処理 - Amazon Simple Queue Service
5分以内は、重複する同じメッセージを排除するようですね。
5 分間の重複排除間隔内に SendMessage アクションを再試行しても、Amazon SQS ではキューに重複を導入しません。
重複の排除は、以下のどちらかで行うようです。Terraformで有効にしているのは、前者ですね。
- コンテンツベースの重複排除を有効にする
- メッセージ本文を使用して (ただしメッセージの属性ではない) SHA-256 ハッシュでメッセージ重複排除 ID を生成するように指示
- メッセージに明示的にメッセージ重複排除 ID を指定する
とりあえず、準備はできたのでTerraformでFIFOキューを作成します。
$ terraform init $ terraform apply -auto-approve
LocalStackで作成するにしてはやや時間がかかりますが(20秒以上)、FIFOキューが作成されました。
Apply complete! Resources: 1 added, 0 changed, 0 destroyed. Outputs: queue_url = "http://localhost:4566/000000000000/my-queue.fifo"
こちらを使って、プログラムを作成していきます。
Amazon SQSのFIFOキューを使ったプログラムを作成する
次は、作成したAmazon SQSのFIFOキューを使ったプログラムを書いていきます。
<dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> <version>2.17.232</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.9.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.23.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
メッセージを受信するクラスを書き、メッセージの送信とその確認はテストコードで表現する、という形にしたいと思います。
メッセージを受信するプログラムは、こちら。
src/main/java/org/littlewings/aws/sqs/QueueSubscriber.java
package org.littlewings.aws.sqs; import java.util.List; import java.util.UUID; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; public class QueueSubscriber { SqsClient sqsClient; String queueUrl; public QueueSubscriber(SqsClient sqsClient, String queueUrl) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; } public List<String> subscribe() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest .builder() .queueUrl(queueUrl) .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(5) .waitTimeSeconds(20) .build(); // receive message List<Message> receivedMessage = sqsClient.receiveMessage(receiveMessageRequest).messages(); // process message List<String> messages = receivedMessage.stream().map(m -> m.body()).toList(); // delete message receivedMessage.forEach(m -> { DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest .builder() .queueUrl(queueUrl) .receiptHandle(m.receiptHandle()) .build(); DeleteMessageResponse deleteMessageResponse = sqsClient.deleteMessage(deleteMessageRequest); if (!deleteMessageResponse.sdkHttpResponse().isSuccessful()) { throw new RuntimeException( "Error: " + deleteMessageResponse.sdkHttpResponse().statusCode() + ", " + deleteMessageResponse.sdkHttpResponse().statusText() ); } }); return messages; } }
このあたりを参考にしています。
- Amazon Simple Queue Service メッセージの送信、受信、削除 / メッセージを取得する
- Amazon SQS for Java 2.x を使用SDK for Java 2.x - AWS SDK for Java
Amazon SQSのクライアントと、キューのURLはコンストラクタで受け取るものとしておきます。
public QueueSubscriber(SqsClient sqsClient, String queueUrl) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; }
メッセージの受信。
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest .builder() .queueUrl(queueUrl) .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(5) .waitTimeSeconds(20) .build(); // receive message List<Message> receivedMessage = sqsClient.receiveMessage(receiveMessageRequest).messages();
重複排除用にreceiveRequestAttemptId
を設定し、
Amazon SQS 受信リクエスト試行 ID の使用 - Amazon Simple Queue Service
waitTimeSeconds
を20秒にしてロングポーリングとしています。
Amazon SQS ショートポーリングとロングポーリング / ロングポーリングを使用したメッセージの消費
受信したメッセージからボディを取り出したら
// process message
List<String> messages = receivedMessage.stream().map(m -> m.body()).toList();
これをメソッドの戻り値とします。
return messages;
この前に、メッセージをキューから削除しておきます。
// delete message receivedMessage.forEach(m -> { DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest .builder() .queueUrl(queueUrl) .receiptHandle(m.receiptHandle()) .build(); DeleteMessageResponse deleteMessageResponse = sqsClient.deleteMessage(deleteMessageRequest); if (!deleteMessageResponse.sdkHttpResponse().isSuccessful()) { throw new RuntimeException( "Error: " + deleteMessageResponse.sdkHttpResponse().statusCode() + ", " + deleteMessageResponse.sdkHttpResponse().statusText() ); } });
こちらを参考にしています。
Amazon Simple Queue Service メッセージの送信、受信、削除 / 受信後にメッセージを削除する
Amazon SQSでは、メッセージを受信してもキューから消えるわけではなく、明示的に削除する必要があるようです。
Amazon SQS 可視性タイムアウト - Amazon Simple Queue Service
メッセージ受信後に一時的に受信できなくなるようですが、一定時間(設定可能)が経過した後にまた見えるようになるとか。
この時間を、可視性タイムアウトと呼ぶようです。
続いて、送信側。こちらはテストコード上で表現します。また、先ほどのメッセージを受信するクラスも使用します。
src/test/java/org/littlewings/aws/sqs/SqsClientTest.java
package org.littlewings.aws.sqs; import java.net.URI; import java.util.List; import java.util.UUID; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import static org.assertj.core.api.Assertions.assertThat; public class SqsClientTest { @Test public void sendReceive() { String queueUrl = "http://localhost:4566/000000000000/my-queue.fifo"; AwsCredentials awsCredentials = AwsBasicCredentials.create("mock_access_key", "mock_secret_key"); SqsClient sqsClient = SqsClient .builder() .credentialsProvider(StaticCredentialsProvider.create(awsCredentials)) .region(Region.US_EAST_1) .endpointOverride(URI.create("http://localhost:4566")) .build(); QueueSubscriber subscriber = new QueueSubscriber(sqsClient, queueUrl); List<String> messages1 = List.of( "Hello-SQS 1", "Hello-SQS 2", "Hello-SQS 3", "Hello-SQS 4", "Hello-SQS 5" ); send(sqsClient, queueUrl, messages1, "group-1"); // receive List<String> receivedMessages1 = subscriber.subscribe(); assertThat(receivedMessages1).containsExactlyElementsOf(messages1); List<String> messages2 = List.of( "Hello-SQS 6", "Hello-SQS 7", "Hello-SQS 8", "Hello-SQS 9", "Hello-SQS 10" ); send(sqsClient, queueUrl, messages2, "group-1"); // receive List<String> receivedMessages2 = subscriber.subscribe(); assertThat(receivedMessages2).containsExactlyElementsOf(messages2); } private void send(SqsClient sqsClient, String queueUrl, List<String> messages, String groupId) { // batch request SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest .builder() .queueUrl(queueUrl) .entries( messages .stream() .map(m -> SendMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .messageBody(m) .messageGroupId(groupId) .messageDeduplicationId(UUID.randomUUID().toString()) .build()) .toList() ) .build(); // send SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(sendMessageBatchRequest); assertThat(sendMessageBatchResponse.failed()).isEmpty(); } }
AWSのクレデンシャルの設定と、Amazon SQSのクライアントを作成。
String queueUrl = "http://localhost:4566/000000000000/my-queue.fifo"; AwsCredentials awsCredentials = AwsBasicCredentials.create("mock_access_key", "mock_secret_key"); SqsClient sqsClient = SqsClient .builder() .credentialsProvider(StaticCredentialsProvider.create(awsCredentials)) .region(Region.US_EAST_1) .endpointOverride(URI.create("http://localhost:4566")) .build();
先ほどの、メッセージを受信するクラスのインスタンスを作成。
QueueSubscriber subscriber = new QueueSubscriber(sqsClient, queueUrl);
あとはメッセージを2回に分けて送り、送信したメッセージがすべて受信できたことを確認する、というテストにしました。
List<String> messages1 = List.of( "Hello-SQS 1", "Hello-SQS 2", "Hello-SQS 3", "Hello-SQS 4", "Hello-SQS 5" ); send(sqsClient, queueUrl, messages1, "group-1"); // receive List<String> receivedMessages1 = subscriber.subscribe(); assertThat(receivedMessages1).containsExactlyElementsOf(messages1); List<String> messages2 = List.of( "Hello-SQS 6", "Hello-SQS 7", "Hello-SQS 8", "Hello-SQS 9", "Hello-SQS 10" ); send(sqsClient, queueUrl, messages2, "group-1"); // receive List<String> receivedMessages2 = subscriber.subscribe(); assertThat(receivedMessages2).containsExactlyElementsOf(messages2);
メッセージ送信部分を見ていきます。
private void send(SqsClient sqsClient, String queueUrl, List<String> messages, String groupId) { // batch request SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest .builder() .queueUrl(queueUrl) .entries( messages .stream() .map(m -> SendMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .messageBody(m) .messageGroupId(groupId) .messageDeduplicationId(UUID.randomUUID().toString()) .build()) .toList() ) .build(); // send SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(sendMessageBatchRequest); assertThat(sendMessageBatchResponse.failed()).isEmpty(); }
メッセージは、バッチ送信することにしました。
- Amazon Simple Queue Service メッセージの送信、受信、削除 / リクエストで複数のメッセージを送信する
- Amazon SQS for Java 2.x を使用SDK for Java 2.x - AWS SDK for Java
// batch request
SendMessageBatchRequest sendMessageBatchRequest =
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(
messages
.stream()
.map(m ->
SendMessageBatchRequestEntry
.builder()
.id(UUID.randomUUID().toString())
.messageBody(m)
.messageGroupId(groupId)
.messageDeduplicationId(UUID.randomUUID().toString())
.build())
.toList()
)
.build();
グループIDと、重複排除用のIDを指定しています。
.messageGroupId(groupId) .messageDeduplicationId(UUID.randomUUID().toString())
Amazon SQS メッセージグループ ID の使用 - Amazon Simple Queue Service
Amazon SQS メッセージ重複排除 ID の使用 - Amazon Simple Queue Service
送信結果にエラーがないか確認。
// send
SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(sendMessageBatchRequest);
assertThat(sendMessageBatchResponse.failed()).isEmpty();
まずはこんな感じでしょうか。
まとめ
Amazon SQSのFIFOキューをLocalStackで試してみました。
Amazon SQS自体をあまり知らないので、用語の確認等でいろいろ学ぶことになりましたが、もうちょっと情報を追っておきたいですね。
今回はこんなところで。