CLOVER🍀

That was when it all began.

LocalStackでAmazon SQSのFIFOキューを試してみる(AWS SDK for Javaを使用)

これは、なにをしたくて書いたもの?

Amazon SQSをちょっと試しておきたいなと思ったのですが、動かす時にいろいろ考えた結果FIFOキューを試してみることにしました。

Amazon SQS自体は本物ではなく、LocalStackのものを使用します。

Amazon SQS

Amazon SQS自体はこちら。

Amazon SQS(サーバーレスアプリのためのメッセージキューサービス)| AWS

「What is Amazon Simple Queue Service? - Amazon Simple Queue Service

AWSが提供する、分散キューのようです。

設定や仕組みなど、ドキュメントにいろいろ書かれています。

Amazon SQS キューの設定 (コンソール) - Amazon Simple Queue Service

Amazon SQS の仕組み - Amazon Simple Queue Service

キューは標準キューとFIFOキューの2種類があるようです。

Amazon SQS 標準キュー - Amazon Simple Queue Service

Amazon SQS FIFO (先入れ先出し) キュー - Amazon Simple Queue Service

標準キューとFIFOキューの特徴は、こんな感じのようです。

  • 標準キュー
    • 1秒あたりほぼ無制限のAPIコールをサポート
    • 少なくとも1回のメッセージ配信をサポート
    • メッセージの複数のコピーが順不同で配信されることがある
  • FIFOキュー
    • 標準キューの機能をすべて持つ
    • メッセージの配信はFIFOで行われ、順序が厳密に維持される
    • スループットに制限がある
    • キューの名前は.fifoで終わる必要がある

今回は、FIFOキューを使ってみたいと思います。

TerraformでAmazon SQSのFIFOキューをLocalStack上に作成し、AWS SDK for Java v2を使ってアクセスすることをテーマにしてみたいと
思います。

FIFOキューは、こちらも参考に。

【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応) | DevelopersIO

環境

今回の環境は、こちら。

LocalStack。

$ localstack --version
1.0.3

起動。

$ localstack start

Terraform。

$ terraform version
Terraform v1.2.6
on linux_amd64

Java

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-122-generic", arch: "amd64", family: "unix"

Amazon SQSのFIFOキューを作成する

では、まずAmazon SQSのFIFOキューを作成しましょう。

作成したTerraformの構成ファイルは、こちら。

main.tf

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "4.24.0"
    }
  }
}

provider "aws" {
  access_key                  = "mock_access_key"
  region                      = "us-east-1"
  secret_key                  = "mock_secret_key"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:4566"
  }
}

resource "aws_sqs_queue" "queue" {
  name                        = "my-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

output "queue_url" {
  value = aws_sqs_queue.queue.url
}

TerraformでLocalStackに接続する設定は、こちらからAmazon SQSの部分のみ引用しています。

Custom Service Endpoint Configuration / Connecting to Local AWS Compatible Solutions / LocalStack

provider "aws" {
  access_key                  = "mock_access_key"
  region                      = "us-east-1"
  secret_key                  = "mock_secret_key"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:4566"
  }
}

FIFOキューの定義は、こちらを使いました。

Resource: aws_sqs_queue / FIFO queue

content_based_deduplicationというのは、重複メッセージを防止できる設定のようです。

resource "aws_sqs_queue" "queue" {
  name                        = "my-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

こちらですね。

1 回だけの処理 - Amazon Simple Queue Service

5分以内は、重複する同じメッセージを排除するようですね。

5 分間の重複排除間隔内に SendMessage アクションを再試行しても、Amazon SQS ではキューに重複を導入しません。

重複の排除は、以下のどちらかで行うようです。Terraformで有効にしているのは、前者ですね。

  • コンテンツベースの重複排除を有効にする
    • メッセージ本文を使用して (ただしメッセージの属性ではない) SHA-256 ハッシュでメッセージ重複排除 ID を生成するように指示
  • メッセージに明示的にメッセージ重複排除 ID を指定する

とりあえず、準備はできたのでTerraformでFIFOキューを作成します。

$ terraform init
$ terraform apply -auto-approve

LocalStackで作成するにしてはやや時間がかかりますが(20秒以上)、FIFOキューが作成されました。

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

Outputs:

queue_url = "http://localhost:4566/000000000000/my-queue.fifo"

こちらを使って、プログラムを作成していきます。

Amazon SQSのFIFOキューを使ったプログラムを作成する

次は、作成したAmazon SQSのFIFOキューを使ったプログラムを書いていきます。

まずは、Maven依存関係やプラグインの設定。

    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
            <version>2.17.232</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.23.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

メッセージを受信するクラスを書き、メッセージの送信とその確認はテストコードで表現する、という形にしたいと思います。

メッセージを受信するプログラムは、こちら。

src/main/java/org/littlewings/aws/sqs/QueueSubscriber.java

package org.littlewings.aws.sqs;

import java.util.List;
import java.util.UUID;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

public class QueueSubscriber {
    SqsClient sqsClient;
    String queueUrl;

    public QueueSubscriber(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

    public List<String> subscribe() {
        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .receiveRequestAttemptId(UUID.randomUUID().toString())
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(20)
                        .build();

        // receive message
        List<Message> receivedMessage = sqsClient.receiveMessage(receiveMessageRequest).messages();

        // process message
        List<String> messages = receivedMessage.stream().map(m -> m.body()).toList();

        // delete message
        receivedMessage.forEach(m -> {
            DeleteMessageRequest deleteMessageRequest =
                    DeleteMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .receiptHandle(m.receiptHandle())
                            .build();
            DeleteMessageResponse deleteMessageResponse = sqsClient.deleteMessage(deleteMessageRequest);

            if (!deleteMessageResponse.sdkHttpResponse().isSuccessful()) {
                throw new RuntimeException(
                        "Error: " + deleteMessageResponse.sdkHttpResponse().statusCode() + ", " + deleteMessageResponse.sdkHttpResponse().statusText()
                );
            }
        });

        return messages;
    }
}

このあたりを参考にしています。

Amazon SQSのクライアントと、キューのURLはコンストラクタで受け取るものとしておきます。

    public QueueSubscriber(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

メッセージの受信。

        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .receiveRequestAttemptId(UUID.randomUUID().toString())
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(20)
                        .build();

        // receive message
        List<Message> receivedMessage = sqsClient.receiveMessage(receiveMessageRequest).messages();

重複排除用にreceiveRequestAttemptIdを設定し、

Amazon SQS 受信リクエスト試行 ID の使用 - Amazon Simple Queue Service

waitTimeSecondsを20秒にしてロングポーリングとしています。

Amazon SQS ショートポーリングとロングポーリング / ロングポーリングを使用したメッセージの消費

受信したメッセージからボディを取り出したら

        // process message
        List<String> messages = receivedMessage.stream().map(m -> m.body()).toList();

これをメソッドの戻り値とします。

        return messages;

この前に、メッセージをキューから削除しておきます。

        // delete message
        receivedMessage.forEach(m -> {
            DeleteMessageRequest deleteMessageRequest =
                    DeleteMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .receiptHandle(m.receiptHandle())
                            .build();
            DeleteMessageResponse deleteMessageResponse = sqsClient.deleteMessage(deleteMessageRequest);

            if (!deleteMessageResponse.sdkHttpResponse().isSuccessful()) {
                throw new RuntimeException(
                        "Error: " + deleteMessageResponse.sdkHttpResponse().statusCode() + ", " + deleteMessageResponse.sdkHttpResponse().statusText()
                );
            }
        });

こちらを参考にしています。

Amazon Simple Queue Service メッセージの送信、受信、削除 / 受信後にメッセージを削除する

Amazon SQSでは、メッセージを受信してもキューから消えるわけではなく、明示的に削除する必要があるようです。

Amazon SQS 可視性タイムアウト - Amazon Simple Queue Service

メッセージ受信後に一時的に受信できなくなるようですが、一定時間(設定可能)が経過した後にまた見えるようになるとか。
この時間を、可視性タイムアウトと呼ぶようです。

続いて、送信側。こちらはテストコード上で表現します。また、先ほどのメッセージを受信するクラスも使用します。

src/test/java/org/littlewings/aws/sqs/SqsClientTest.java

package org.littlewings.aws.sqs;

import java.net.URI;
import java.util.List;
import java.util.UUID;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;

import static org.assertj.core.api.Assertions.assertThat;

public class SqsClientTest {
    @Test
    public void sendReceive() {
        String queueUrl = "http://localhost:4566/000000000000/my-queue.fifo";

        AwsCredentials awsCredentials =
                AwsBasicCredentials.create("mock_access_key", "mock_secret_key");

        SqsClient sqsClient =
                SqsClient
                        .builder()
                        .credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
                        .region(Region.US_EAST_1)
                        .endpointOverride(URI.create("http://localhost:4566"))
                        .build();

        QueueSubscriber subscriber = new QueueSubscriber(sqsClient, queueUrl);

        List<String> messages1 =
                List.of(
                        "Hello-SQS 1",
                        "Hello-SQS 2",
                        "Hello-SQS 3",
                        "Hello-SQS 4",
                        "Hello-SQS 5"
                );

        send(sqsClient, queueUrl, messages1, "group-1");

        // receive
        List<String> receivedMessages1 = subscriber.subscribe();
        assertThat(receivedMessages1).containsExactlyElementsOf(messages1);

        List<String> messages2 =
                List.of(
                        "Hello-SQS 6",
                        "Hello-SQS 7",
                        "Hello-SQS 8",
                        "Hello-SQS 9",
                        "Hello-SQS 10"
                );

        send(sqsClient, queueUrl, messages2, "group-1");

        // receive
        List<String> receivedMessages2 = subscriber.subscribe();
        assertThat(receivedMessages2).containsExactlyElementsOf(messages2);
    }

    private void send(SqsClient sqsClient, String queueUrl, List<String> messages, String groupId) {
        // batch request
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(
                                messages
                                        .stream()
                                        .map(m ->
                                                SendMessageBatchRequestEntry
                                                        .builder()
                                                        .id(UUID.randomUUID().toString())
                                                        .messageBody(m)
                                                        .messageGroupId(groupId)
                                                        .messageDeduplicationId(UUID.randomUUID().toString())
                                                        .build())
                                        .toList()
                        )
                        .build();

        // send
        SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(sendMessageBatchRequest);
        assertThat(sendMessageBatchResponse.failed()).isEmpty();
    }
}

AWSのクレデンシャルの設定と、Amazon SQSのクライアントを作成。

        String queueUrl = "http://localhost:4566/000000000000/my-queue.fifo";

        AwsCredentials awsCredentials =
                AwsBasicCredentials.create("mock_access_key", "mock_secret_key");

        SqsClient sqsClient =
                SqsClient
                        .builder()
                        .credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
                        .region(Region.US_EAST_1)
                        .endpointOverride(URI.create("http://localhost:4566"))
                        .build();

先ほどの、メッセージを受信するクラスのインスタンスを作成。

        QueueSubscriber subscriber = new QueueSubscriber(sqsClient, queueUrl);

あとはメッセージを2回に分けて送り、送信したメッセージがすべて受信できたことを確認する、というテストにしました。

        List<String> messages1 =
                List.of(
                        "Hello-SQS 1",
                        "Hello-SQS 2",
                        "Hello-SQS 3",
                        "Hello-SQS 4",
                        "Hello-SQS 5"
                );

        send(sqsClient, queueUrl, messages1, "group-1");

        // receive
        List<String> receivedMessages1 = subscriber.subscribe();
        assertThat(receivedMessages1).containsExactlyElementsOf(messages1);

        List<String> messages2 =
                List.of(
                        "Hello-SQS 6",
                        "Hello-SQS 7",
                        "Hello-SQS 8",
                        "Hello-SQS 9",
                        "Hello-SQS 10"
                );

        send(sqsClient, queueUrl, messages2, "group-1");

        // receive
        List<String> receivedMessages2 = subscriber.subscribe();
        assertThat(receivedMessages2).containsExactlyElementsOf(messages2);

メッセージ送信部分を見ていきます。

    private void send(SqsClient sqsClient, String queueUrl, List<String> messages, String groupId) {
        // batch request
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(
                                messages
                                        .stream()
                                        .map(m ->
                                                SendMessageBatchRequestEntry
                                                        .builder()
                                                        .id(UUID.randomUUID().toString())
                                                        .messageBody(m)
                                                        .messageGroupId(groupId)
                                                        .messageDeduplicationId(UUID.randomUUID().toString())
                                                        .build())
                                        .toList()
                        )
                        .build();

        // send
        SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(sendMessageBatchRequest);
        assertThat(sendMessageBatchResponse.failed()).isEmpty();
    }

メッセージは、バッチ送信することにしました。

        // batch request
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(
                                messages
                                        .stream()
                                        .map(m ->
                                                SendMessageBatchRequestEntry
                                                        .builder()
                                                        .id(UUID.randomUUID().toString())
                                                        .messageBody(m)
                                                        .messageGroupId(groupId)
                                                        .messageDeduplicationId(UUID.randomUUID().toString())
                                                        .build())
                                        .toList()
                        )
                        .build();

グループIDと、重複排除用のIDを指定しています。

                                                        .messageGroupId(groupId)
                                                        .messageDeduplicationId(UUID.randomUUID().toString())

Amazon SQS メッセージグループ ID の使用 - Amazon Simple Queue Service

Amazon SQS メッセージ重複排除 ID の使用 - Amazon Simple Queue Service

送信結果にエラーがないか確認。

        // send
        SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(sendMessageBatchRequest);
        assertThat(sendMessageBatchResponse.failed()).isEmpty();

まずはこんな感じでしょうか。

まとめ

Amazon SQSのFIFOキューをLocalStackで試してみました。

Amazon SQS自体をあまり知らないので、用語の確認等でいろいろ学ぶことになりましたが、もうちょっと情報を追っておきたいですね。
今回はこんなところで。