CLOVER🍀

That was when it all began.

Amazon SQSの可視性タイムアウトをElasticMQで確認する

これは、なにをしたくて書いたもの?

Amazon SQSの可視性タイムアウトというものを、1度見ておこうかなと思いまして。

こちらですね。

Amazon SQS可視性タイムアウト - Amazon Simple Queue Service

Amazon SQSの可視性タイムアウト

Amazon SQSの可視性タイムアウトとは、コンシューマーがメッセージを取得してから削除するまでの間、他のコンシューマーが
メッセージを取得できなくなる期間のことです。

Amazon SQS可視性タイムアウト - Amazon Simple Queue Service

Amazon SQSでは、コンシューマーはメッセージを受信した後、削除する必要があります。
コンシューマーがメッセージを受信しても、メッセージ自体はキューに残ったままの状態になります。もし最初にメッセージを受信した
コンシューマーに問題が発生した場合(=メッセージの削除はできなかった)、他のコンシューマーが該当のメッセージを処理することに
なります。

ただ、同じメッセージを複数のコンシューマーが処理することがないよう、一定のタイムアウト期間を設けています。
これが可視性タイムアウトですね。

デフォルトは30秒、最小は0秒、最大は12時間です。

可視性タイムアウトはキュー単位、そしてメッセージの受信時に個別に設定することができます。

Amazon SQS可視性タイムアウト / 可視性タイムアウトの設定

途中で変更することもできるようです。

また、標準キューとFIFOキューでは違いもあります。

  • 標準キュー
    • 可視性タイムアウトは、標準キューで同じメッセージを2回受信しない保証(at least once)にはならない
    • 可視性タイムアウト待ちのメッセージがあっても、他のメッセージは取得できる
  • FIFOキュー
    • 同じグループ内に可視性タイムアウト待ちのメッセージがあった場合、そのグループ内のメッセージはタイムアウトする、メッセージを削除するのいずれかを行うまで取得できなくなる
    • 送信時に同じ重複除外ID、受信時に同じ受信リクエスト試行IDを使うことで、再試行が可能

今回は、可視性タイムアウト待ちになった時にそのメッセージが取得できなくなることや、FIFOキューでの確認をしていきたいと
思います。

確認は、AWS SDK for Java v2とElasticMQで行うことにします。
※最初はLocalStackで試していたのですが、挙動がだいぶ異なりドキュメントと動きが合わなかったのでElasticMQにしました

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.6 2023-01-17
OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-67-generic", arch: "amd64", family: "unix"

ElasticMQは1.3.14を使います。

起動。

$ java -jar elasticmq-server-1.3.14.jar

ElasticMQ上のキューの作成は、Terraformで行うことにします。

$ terraform version
Terraform v1.4.2
on linux_amd64

準備

まずは、ElasticMQ上にキューを作成します。

Terraformの構成ファイルをこのように用意。

main.tf

terraform {
  required_version = "1.4.2"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "4.59.0"
    }
  }
}

provider "aws" {
  access_key = "mock_access_key"
  secret_key = "mock_secret_key"
  region     = "us-east-1"

  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:9324"
  }
}

resource "aws_sqs_queue" "standard_queue" {
  name = "standard-queue"

  receive_wait_time_seconds  = 1
  visibility_timeout_seconds = 10
}

resource "aws_sqs_queue" "fifo_queue" {
  name = "fifo-queue.fifo"

  fifo_queue = true

  receive_wait_time_seconds  = 1
  visibility_timeout_seconds = 10
}

output "standard_queue_url" {
  value = aws_sqs_queue.standard_queue.url
}

output "fifo_queue_url" {
  value = aws_sqs_queue.fifo_queue.url
}

標準キューとFIFOキューの両方を作成し、可視性タイムアウトはそれぞれ10秒としました。

適用。

$ terraform init
$ terraform apply

ElasticMQを対象に実行すると、なぜかタイムアウトするんですけどね…。

│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/standard-queue) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.standard_queue,
│   on main.tf line 26, in resource "aws_sqs_queue" "standard_queue":
│   26: resource "aws_sqs_queue" "standard_queue" {
│
╵
╷
│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/fifo-queue.fifo) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.fifo_queue,
│   on main.tf line 33, in resource "aws_sqs_queue" "fifo_queue":
│   33: resource "aws_sqs_queue" "fifo_queue" {
│
╵

一応、これでもキューはできているのでそのまま使います。

$ aws --endpoint-url http://localhost:9324 sqs list-queues
{
    "QueueUrls": [
        "http://localhost:9324/000000000000/standard-queue",
        "http://localhost:9324/000000000000/fifo-queue.fifo"
    ]
}

続いて、Java側。Maven依存関係等はこのように設定。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.20.30</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

動作確認はテストコードで行うことにします。

プログラムを作成する

では、プログラムを作成していきましょう。

まずはAmazon SQSにアクセスするクライアントを作成するクラス。

src/main/java/org/littlewings/aws/sqs/LocalSqsClientBuilder.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.net.URI;

public class LocalSqsClientBuilder {
    public static SqsClient create() {
        return SqsClient
                .builder()
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create("mock_access_key", "mock_scret_key")
                        )
                )
                .region(Region.US_EAST_1)
                .defaultsMode(DefaultsMode.AUTO)
                .endpointOverride(URI.create("http://localhost:9324"))
                .build();
    }
}

エンドポイントをElasticMQに差し替えています。

Amazon SQSへメッセージを送信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageSender.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;

import java.util.List;
import java.util.UUID;

public class SqsMessageSender {
    String queueUrl;

    SqsClient sqsClient;

    boolean fifo;

    String messageGroupId;

    public static SqsMessageSender createStandard(String queueUrl) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = false;

        return sqsMessageSender;
    }

    public static SqsMessageSender createFifo(String queueUrl, String messageGroupId) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = true;
        sqsMessageSender.messageGroupId = messageGroupId;

        return sqsMessageSender;
    }

    public void sendMessages(List<String> messages) {
        List<SendMessageBatchRequestEntry> sendMessageBatchRequestEntries =
                messages
                        .stream()
                        .map(message ->
                                fifo ?
                                        // FIFOキュー
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageGroupId(messageGroupId)
                                                .messageDeduplicationId(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                                        :
                                        // 標準キュー
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                        )
                        .toList();
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(sendMessageBatchRequestEntries.toArray(new SendMessageBatchRequestEntry[sendMessageBatchRequestEntries.size()]))
                        .build();

        sqsClient.sendMessageBatch(sendMessageBatchRequest);
    }
}

FIFOキューの場合は、メッセージグループIDを指定できるようにしています。

Amazon SQSからメッセージを受信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageReceiver.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import java.util.List;
import java.util.UUID;

public class SqsMessageReceiver {
    String queueUrl;
    SqsClient sqsClient;

    boolean withDelete;

    public static SqsMessageReceiver create(String queueUrl, boolean withDelete) {
        SqsMessageReceiver sqsMessageReceiver = new SqsMessageReceiver();
        sqsMessageReceiver.queueUrl = queueUrl;
        sqsMessageReceiver.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageReceiver.withDelete = withDelete;

        return sqsMessageReceiver;
    }

    public List<String> receiveMessages(int maxNumberOfMessages) {
        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(maxNumberOfMessages)
                        .waitTimeSeconds(1)
                        .build();

        ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest);

        List<String> messages =
                receiveMessageResponse
                        .messages()
                        .stream()
                        .map(message -> message.body())
                        .toList();

        if (withDelete) {
            List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries =
                    receiveMessageResponse
                            .messages()
                            .stream()
                            .map(message ->
                                    DeleteMessageBatchRequestEntry
                                            .builder()
                                            .id(UUID.randomUUID().toString())
                                            .receiptHandle(message.receiptHandle())
                                            .build()
                            )
                            .toList();

            DeleteMessageBatchRequest deleteMessageBatchRequest =
                    DeleteMessageBatchRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .entries(deleteMessageBatchRequestEntries.toArray(new DeleteMessageBatchRequestEntry[deleteMessageBatchRequestEntries.size()]))
                            .build();

            sqsClient.deleteMessageBatch(deleteMessageBatchRequest);
        }

        return messages;
    }
}

このクラスのインスタンス作成時に、メッセージを受信した時にメッセージの削除まで行うかどうかを、true/falseで指定できるように
しています。

これらのクラスを使って、動作確認していきましょう。

テストコードで確認する

動作確認は、テストコードで行います。

テストコードの雛形。

src/test/java/org/littlewings/aws/sqs/SqsVisibilityTimeoutTest.java

package org.littlewings.aws.sqs;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class SqsVisibilityTimeoutTest {
    // ここに、テストを書く!

    void sleep(long sleepSec) {
        try {
            TimeUnit.SECONDS.sleep(sleepSec);
        } catch (InterruptedException e) {
            // no-op
        }
    }
}

適宜スリープを入れるためのメソッドも用意。

ここからは、標準キュー、FIFOキューそれぞれで確認していきます。可視性タイムアウトは、どちらも10秒です。

標準キューの場合

最初は標準キューで確認します。

    @Test
    void standardQueue() {
        String queueUrl = "http://localhost:9324/000000000000/standard-queue";

        SqsMessageSender sqsMessageSender = SqsMessageSender.createStandard(queueUrl);
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信して削除しない
        // 登録したメッセージのいずれかが返ってくる(順序保証はない)
        List<String> nonDeletedMessages = sqsMessageReceiverWithoutDelete.receiveMessages(3);
        assertThat(nonDeletedMessages)
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // 受信して削除
        // 最初に受信したメッセージ以外のメッセージのいずれかが返ってくる(順序保証はない)
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsAnyOf(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]))
                .doesNotContainAnyElementsOf(nonDeletedMessages);

        sleep(1L);

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージが見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージが見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以上経過

        // 削除しなかったメッセージが見えるようになる(同じ順序になるとは限らない)
        // 受信して削除
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsOnly(nonDeletedMessages.toArray(new String[nonDeletedMessages.size()]));

        // 残メッセージは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
    }

メッセージを送信するインスタンス、メッセージを受信して削除は行わないインスタンス、メッセージの受信をして削除まで行う
インスタンスを用意します。

        SqsMessageSender sqsMessageSender = SqsMessageSender.createStandard(queueUrl);
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

キューに6件メッセージを送信。

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

最初の受信は、3件取得して削除を行いません。

        // 受信して削除しない
        // 登録したメッセージのいずれかが返ってくる(順序保証はない)
        List<String> nonDeletedMessages = sqsMessageReceiverWithoutDelete.receiveMessages(3);
        assertThat(nonDeletedMessages)
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

標準キューは入れた順に取り出せないことがあるので、アサーションはこんな感じにしました。
ただ、ここで1度取得したメッセージは保持しておきます。

また適宜スリープを挟んでいきます。

次は3件取得しますが、最初に受信したメッセージ以外が返ってきます。

        // 受信して削除
        // 最初に受信したメッセージ以外のメッセージのいずれかが返ってくる(順序保証はない)
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsAnyOf(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]))
                .doesNotContainAnyElementsOf(nonDeletedMessages);

        sleep(1L);

この後は、可視性タイムアウトで指定した秒数が経過するまではメッセージが取得できなくなります。

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージが見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージが見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

そして10秒以上経過すると、最初に取得して削除しなかったメッセージが取得できるようになります。

        sleep(4L); // 10秒以上経過

        // 削除しなかったメッセージが見えるようになる(同じ順序になるとは限らない)
        // 受信して削除
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsOnly(nonDeletedMessages.toArray(new String[nonDeletedMessages.size()]));

残りのメッセージ件数は0です。

        // 残メッセージは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

標準キューでは、可視性タイムアウト待ちのメッセージは取得できなくなりますが、その後に続くメッセージはふつうに取得できる
というところがポイントですね。

FIFOキューの場合

続いては、FIFOキューで確認します。いくつかバリエーションがあります。

メッセージグループIDがひとつの場合

まずはメッセージグループIDがひとつだった場合を見ていきます。

    @Test
    void fifoQueue() {
        String queueUrl = "http://localhost:4566/000000000000/fifo-queue.fifo";

        SqsMessageSender sqsMessageSender = SqsMessageSender.createFifo(queueUrl, "group");
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信して削除しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキューの場合、同じメッセージグループIDの後続のメッセージはすべて見えなくなる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージ以降のメッセージ含め、すべて見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージ以降のメッセージ含め、すべて見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以上経過

        // 削除しなかったメッセージおよび後続のメッセージが見えるようになる
        // 受信して削除
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 残メッセージは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
    }

メッセージの送受信で使うインスタンスは、FIFOキューのものを使ってメッセージグループIDを今回はひとつにします。

        SqsMessageSender sqsMessageSender = SqsMessageSender.createFifo(queueUrl, "group");
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

メッセージを6件入れて、3件のメッセージを削除せずに取り出すと、このキューからはそれ以降のメッセージが取り出せなくなります。

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信して削除しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキューの場合、同じメッセージグループIDの後続のメッセージはすべて見えなくなる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

タイムアウトすると、タイムアウト待ちのメッセージとそれ以降のメッセージが取得できるようになります。

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージ以降のメッセージ含め、すべて見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可視性タイムアウト(10秒)経過していないので、受信して削除していないメッセージ以降のメッセージ含め、すべて見えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以上経過

        // 削除しなかったメッセージおよび後続のメッセージが見えるようになる
        // 受信して削除
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 残メッセージは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
メッセージグループIDが複数の場合

最後は、メッセージグループIDが複数の場合を見ていきます。

Amazon SQSの可視性タイムアウトのページに、以下の記述がありました。

メッセージグループ IDがあるメッセージを受信した場合、メッセージを削除するか、表示されない限り、同じメッセージグループ IDのメッセージはそれ以上返信されません。

Amazon SQS可視性タイムアウト - Amazon Simple Queue Service

こちらを見ていきます。

    @Test
    void fifoQueueLeftMessagesPerMessageGroup() {
        String queueUrl = "http://localhost:9324/000000000000/fifo-queue.fifo";

        // 異なるメッセージグループIDを持つクライアントを作成
        SqsMessageSender sqsMessageSenderGroup1 = SqsMessageSender.createFifo(queueUrl, "group1");
        SqsMessageSender sqsMessageSenderGroup2 = SqsMessageSender.createFifo(queueUrl, "group2");
        SqsMessageSender sqsMessageSenderGroup3 = SqsMessageSender.createFifo(queueUrl, "group3");

        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

        sqsMessageSenderGroup1
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList());
        sqsMessageSenderGroup2
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList());
        sqsMessageSenderGroup3
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message3-" + i).toList());

        // 受信して削除しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキューの場合、可視性タイムアウト待ちとなった同じメッセージグループIDのメッセージはすべて見えなくなる
        // 他のメッセージグループIDのメッセージは取得できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキューの場合、可視性タイムアウト待ちとなった同じメッセージグループIDのメッセージはすべて見えなくなる
        // 他のメッセージグループIDのメッセージは取得できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // 残っているのは可視性タイムアウト待ちとなったメッセージグループIDを持つメッセージのみであり、
        // 可視性タイムアウト(10秒)経過していないので、この時点で取得できるメッセージはない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以上経過

        // 削除しなかったメッセージおよび後続のメッセージが見えるようになる
        // 受信して削除
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

        // 残メッセージは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
    }

メッセージの送信側は、異なるメッセージグループIDを持つインスタンスを3つ作成します。

        // 異なるメッセージグループIDを持つクライアントを作成
        SqsMessageSender sqsMessageSenderGroup1 = SqsMessageSender.createFifo(queueUrl, "group1");
        SqsMessageSender sqsMessageSenderGroup2 = SqsMessageSender.createFifo(queueUrl, "group2");
        SqsMessageSender sqsMessageSenderGroup3 = SqsMessageSender.createFifo(queueUrl, "group3");

        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

受信側は、これまでと同じです。

各メッセージグループIDに対して、メッセージを登録します。

        sqsMessageSenderGroup1
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList());
        sqsMessageSenderGroup2
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList());
        sqsMessageSenderGroup3
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message3-" + i).toList());

最後のグループだけ6件のメッセージを送信し、それ以外は3件です。

ElasticMQで試すと、最後のメッセージグループIDのものが取れやすかったのですが、こちらを取得して削除はしないでおきます。

        // 受信して削除しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

        sleep(1L);

次にメッセージを取得すると、他のメッセージグループIDのものが取得できます。

        // FIFOキューの場合、可視性タイムアウト待ちとなった同じメッセージグループIDのメッセージはすべて見えなくなる
        // 他のメッセージグループIDのメッセージは取得できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキューの場合、可視性タイムアウト待ちとなった同じメッセージグループIDのメッセージはすべて見えなくなる
        // 他のメッセージグループIDのメッセージは取得できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList().toArray(new String[3]));

        sleep(1L);

これで残るは可視性タイムアウト待ちのメッセージグループIDのものなのですが、こちらはメッセージを取得できないままです。

        // 残っているのは可視性タイムアウト待ちとなったメッセージグループIDを持つメッセージのみであり、
        // 可視性タイムアウト(10秒)経過していないので、この時点で取得できるメッセージはない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

これで10秒以上経過すると、削除しなかったメッセージとともに、それ以降のメッセージも取得できるようになります。

        sleep(4L); // 10秒以上経過

        // 削除しなかったメッセージおよび後続のメッセージが見えるようになる
        // 受信して削除
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

最終的に、空のキューになります。

        // 残メッセージは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

こんなところでしょうか。

まとめ

Amazon SQSの可視性タイムアウトについて、ElasticMQを使って確認してみました。

標準キューとFIFOキューとの挙動の違いや、FIFOキューと可視性タイムアウトの細かい関係はあまり読めていなかったので、
この機会にちゃんと試せて良かったかなと思います。

といっても、あくまで確認はElasticMQなのですが。

また、LocalStackで最初試していたのですが、動きがドキュメントとだいぶ異なったのでかなりハマりました。危うく、全然違う挙動を
信じそうになりましたが、ドキュメントとちゃんと見比べておいて良かったです…。