CLOVER🍀

That was when it all began.

Amazon SQSの可芖性タむムアりトをElasticMQで確認する

これは、なにをしたくお曞いたもの

Amazon SQSの可芖性タむムアりトずいうものを、1床芋おおこうかなず思いたしお。

こちらですね。

Amazon SQS可視性タイムアウト - Amazon Simple Queue Service

Amazon SQSの可芖性タむムアりト

Amazon SQSの可芖性タむムアりトずは、コンシュヌマヌがメッセヌゞを取埗しおから削陀するたでの間、他のコンシュヌマヌが
メッセヌゞを取埗できなくなる期間のこずです。

Amazon SQS可視性タイムアウト - Amazon Simple Queue Service

Amazon SQSでは、コンシュヌマヌはメッセヌゞを受信した埌、削陀する必芁がありたす。
コンシュヌマヌがメッセヌゞを受信しおも、メッセヌゞ自䜓はキュヌに残ったたたの状態になりたす。もし最初にメッセヌゞを受信した
コンシュヌマヌに問題が発生した堎合メッセヌゞの削陀はできなかった、他のコンシュヌマヌが該圓のメッセヌゞを凊理するこずに
なりたす。

ただ、同じメッセヌゞを耇数のコンシュヌマヌが凊理するこずがないよう、䞀定のタむムアりト期間を蚭けおいたす。
これが可芖性タむムアりトですね。

デフォルトは30秒、最小は0秒、最倧は12時間です。

可芖性タむムアりトはキュヌ単䜍、そしおメッセヌゞの受信時に個別に蚭定するこずができたす。

Amazon SQS可芖性タむムアりト / 可芖性タむムアりトの蚭定

途䞭で倉曎するこずもできるようです。

たた、暙準キュヌずFIFOキュヌでは違いもありたす。

  • 暙準キュヌ
    • 可芖性タむムアりトは、暙準キュヌで同じメッセヌゞを2回受信しない保蚌at least onceにはならない
    • 可芖性タむムアりト埅ちのメッセヌゞがあっおも、他のメッセヌゞは取埗できる
  • FIFOキュヌ
    • 同じグルヌプ内に可芖性タむムアりト埅ちのメッセヌゞがあった堎合、そのグルヌプ内のメッセヌゞはタむムアりトする、メッセヌゞを削陀するのいずれかを行うたで取埗できなくなる
    • 送信時に同じ重耇陀倖ID、受信時に同じ受信リク゚スト詊行IDを䜿うこずで、再詊行が可胜

今回は、可芖性タむムアりト埅ちになった時にそのメッセヌゞが取埗できなくなるこずや、FIFOキュヌでの確認をしおいきたいず
思いたす。

確認は、AWS SDK for Java v2ずElasticMQで行うこずにしたす。
※最初はLocalStackで詊しおいたのですが、挙動がだいぶ異なりドキュメントず動きが合わなかったのでElasticMQにしたした

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.6 2023-01-17
OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-67-generic", arch: "amd64", family: "unix"

ElasticMQは1.3.14を䜿いたす。

起動。

$ java -jar elasticmq-server-1.3.14.jar

ElasticMQ䞊のキュヌの䜜成は、Terraformで行うこずにしたす。

$ terraform version
Terraform v1.4.2
on linux_amd64

準備

たずは、ElasticMQ䞊にキュヌを䜜成したす。

Terraformの構成ファむルをこのように甚意。

main.tf

terraform {
  required_version = "1.4.2"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "4.59.0"
    }
  }
}

provider "aws" {
  access_key = "mock_access_key"
  secret_key = "mock_secret_key"
  region     = "us-east-1"

  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:9324"
  }
}

resource "aws_sqs_queue" "standard_queue" {
  name = "standard-queue"

  receive_wait_time_seconds  = 1
  visibility_timeout_seconds = 10
}

resource "aws_sqs_queue" "fifo_queue" {
  name = "fifo-queue.fifo"

  fifo_queue = true

  receive_wait_time_seconds  = 1
  visibility_timeout_seconds = 10
}

output "standard_queue_url" {
  value = aws_sqs_queue.standard_queue.url
}

output "fifo_queue_url" {
  value = aws_sqs_queue.fifo_queue.url
}

暙準キュヌずFIFOキュヌの䞡方を䜜成し、可芖性タむムアりトはそれぞれ10秒ずしたした。

適甚。

$ terraform init
$ terraform apply

ElasticMQを察象に実行するず、なぜかタむムアりトするんですけどね 。

│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/standard-queue) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.standard_queue,
│   on main.tf line 26, in resource "aws_sqs_queue" "standard_queue":
│   26: resource "aws_sqs_queue" "standard_queue" {
│
╵
╷
│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/fifo-queue.fifo) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.fifo_queue,
│   on main.tf line 33, in resource "aws_sqs_queue" "fifo_queue":
│   33: resource "aws_sqs_queue" "fifo_queue" {
│
╵

䞀応、これでもキュヌはできおいるのでそのたた䜿いたす。

$ aws --endpoint-url http://localhost:9324 sqs list-queues
{
    "QueueUrls": [
        "http://localhost:9324/000000000000/standard-queue",
        "http://localhost:9324/000000000000/fifo-queue.fifo"
    ]
}

続いお、Java偎。Maven䟝存関係等はこのように蚭定。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.20.30</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

動䜜確認はテストコヌドで行うこずにしたす。

プログラムを䜜成する

では、プログラムを䜜成しおいきたしょう。

たずはAmazon SQSにアクセスするクラむアントを䜜成するクラス。

src/main/java/org/littlewings/aws/sqs/LocalSqsClientBuilder.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.net.URI;

public class LocalSqsClientBuilder {
    public static SqsClient create() {
        return SqsClient
                .builder()
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create("mock_access_key", "mock_scret_key")
                        )
                )
                .region(Region.US_EAST_1)
                .defaultsMode(DefaultsMode.AUTO)
                .endpointOverride(URI.create("http://localhost:9324"))
                .build();
    }
}

゚ンドポむントをElasticMQに差し替えおいたす。

Amazon SQSぞメッセヌゞを送信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageSender.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;

import java.util.List;
import java.util.UUID;

public class SqsMessageSender {
    String queueUrl;

    SqsClient sqsClient;

    boolean fifo;

    String messageGroupId;

    public static SqsMessageSender createStandard(String queueUrl) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = false;

        return sqsMessageSender;
    }

    public static SqsMessageSender createFifo(String queueUrl, String messageGroupId) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = true;
        sqsMessageSender.messageGroupId = messageGroupId;

        return sqsMessageSender;
    }

    public void sendMessages(List<String> messages) {
        List<SendMessageBatchRequestEntry> sendMessageBatchRequestEntries =
                messages
                        .stream()
                        .map(message ->
                                fifo ?
                                        // FIFOキュヌ
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageGroupId(messageGroupId)
                                                .messageDeduplicationId(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                                        :
                                        // 暙準キュヌ
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                        )
                        .toList();
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(sendMessageBatchRequestEntries.toArray(new SendMessageBatchRequestEntry[sendMessageBatchRequestEntries.size()]))
                        .build();

        sqsClient.sendMessageBatch(sendMessageBatchRequest);
    }
}

FIFOキュヌの堎合は、メッセヌゞグルヌプIDを指定できるようにしおいたす。

Amazon SQSからメッセヌゞを受信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageReceiver.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import java.util.List;
import java.util.UUID;

public class SqsMessageReceiver {
    String queueUrl;
    SqsClient sqsClient;

    boolean withDelete;

    public static SqsMessageReceiver create(String queueUrl, boolean withDelete) {
        SqsMessageReceiver sqsMessageReceiver = new SqsMessageReceiver();
        sqsMessageReceiver.queueUrl = queueUrl;
        sqsMessageReceiver.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageReceiver.withDelete = withDelete;

        return sqsMessageReceiver;
    }

    public List<String> receiveMessages(int maxNumberOfMessages) {
        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(maxNumberOfMessages)
                        .waitTimeSeconds(1)
                        .build();

        ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest);

        List<String> messages =
                receiveMessageResponse
                        .messages()
                        .stream()
                        .map(message -> message.body())
                        .toList();

        if (withDelete) {
            List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries =
                    receiveMessageResponse
                            .messages()
                            .stream()
                            .map(message ->
                                    DeleteMessageBatchRequestEntry
                                            .builder()
                                            .id(UUID.randomUUID().toString())
                                            .receiptHandle(message.receiptHandle())
                                            .build()
                            )
                            .toList();

            DeleteMessageBatchRequest deleteMessageBatchRequest =
                    DeleteMessageBatchRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .entries(deleteMessageBatchRequestEntries.toArray(new DeleteMessageBatchRequestEntry[deleteMessageBatchRequestEntries.size()]))
                            .build();

            sqsClient.deleteMessageBatch(deleteMessageBatchRequest);
        }

        return messages;
    }
}

このクラスのむンスタンス䜜成時に、メッセヌゞを受信した時にメッセヌゞの削陀たで行うかどうかを、truefalseで指定できるように
しおいたす。

これらのクラスを䜿っお、動䜜確認しおいきたしょう。

テストコヌドで確認する

動䜜確認は、テストコヌドで行いたす。

テストコヌドの雛圢。

src/test/java/org/littlewings/aws/sqs/SqsVisibilityTimeoutTest.java

package org.littlewings.aws.sqs;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class SqsVisibilityTimeoutTest {
    // ここに、テストを曞く

    void sleep(long sleepSec) {
        try {
            TimeUnit.SECONDS.sleep(sleepSec);
        } catch (InterruptedException e) {
            // no-op
        }
    }
}

適宜スリヌプを入れるためのメ゜ッドも甚意。

ここからは、暙準キュヌ、FIFOキュヌそれぞれで確認しおいきたす。可芖性タむムアりトは、どちらも10秒です。

暙準キュヌの堎合

最初は暙準キュヌで確認したす。

    @Test
    void standardQueue() {
        String queueUrl = "http://localhost:9324/000000000000/standard-queue";

        SqsMessageSender sqsMessageSender = SqsMessageSender.createStandard(queueUrl);
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信しお削陀しない
        // 登録したメッセヌゞのいずれかが返っおくる順序保蚌はない
        List<String> nonDeletedMessages = sqsMessageReceiverWithoutDelete.receiveMessages(3);
        assertThat(nonDeletedMessages)
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // 受信しお削陀
        // 最初に受信したメッセヌゞ以倖のメッセヌゞのいずれかが返っおくる順序保蚌はない
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsAnyOf(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]))
                .doesNotContainAnyElementsOf(nonDeletedMessages);

        sleep(1L);

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞが芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞが芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以䞊経過

        // 削陀しなかったメッセヌゞが芋えるようになる同じ順序になるずは限らない
        // 受信しお削陀
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsOnly(nonDeletedMessages.toArray(new String[nonDeletedMessages.size()]));

        // 残メッセヌゞは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
    }

メッセヌゞを送信するむンスタンス、メッセヌゞを受信しお削陀は行わないむンスタンス、メッセヌゞの受信をしお削陀たで行う
むンスタンスを甚意したす。

        SqsMessageSender sqsMessageSender = SqsMessageSender.createStandard(queueUrl);
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

キュヌに6件メッセヌゞを送信。

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

最初の受信は、3件取埗しお削陀を行いたせん。

        // 受信しお削陀しない
        // 登録したメッセヌゞのいずれかが返っおくる順序保蚌はない
        List<String> nonDeletedMessages = sqsMessageReceiverWithoutDelete.receiveMessages(3);
        assertThat(nonDeletedMessages)
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

暙準キュヌは入れた順に取り出せないこずがあるので、アサヌションはこんな感じにしたした。
ただ、ここで1床取埗したメッセヌゞは保持しおおきたす。

たた適宜スリヌプを挟んでいきたす。

次は3件取埗したすが、最初に受信したメッセヌゞ以倖が返っおきたす。

        // 受信しお削陀
        // 最初に受信したメッセヌゞ以倖のメッセヌゞのいずれかが返っおくる順序保蚌はない
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsAnyOf(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]))
                .doesNotContainAnyElementsOf(nonDeletedMessages);

        sleep(1L);

この埌は、可芖性タむムアりトで指定した秒数が経過するたではメッセヌゞが取埗できなくなりたす。

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞが芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞが芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

そしお10秒以䞊経過するず、最初に取埗しお削陀しなかったメッセヌゞが取埗できるようになりたす。

        sleep(4L); // 10秒以䞊経過

        // 削陀しなかったメッセヌゞが芋えるようになる同じ順序になるずは限らない
        // 受信しお削陀
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsOnly(nonDeletedMessages.toArray(new String[nonDeletedMessages.size()]));

残りのメッセヌゞ件数は0です。

        // 残メッセヌゞは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

暙準キュヌでは、可芖性タむムアりト埅ちのメッセヌゞは取埗できなくなりたすが、その埌に続くメッセヌゞはふ぀うに取埗できる
ずいうずころがポむントですね。

FIFOキュヌの堎合

続いおは、FIFOキュヌで確認したす。いく぀かバリ゚ヌションがありたす。

メッセヌゞグルヌプIDがひず぀の堎合

たずはメッセヌゞグルヌプIDがひず぀だった堎合を芋おいきたす。

    @Test
    void fifoQueue() {
        String queueUrl = "http://localhost:4566/000000000000/fifo-queue.fifo";

        SqsMessageSender sqsMessageSender = SqsMessageSender.createFifo(queueUrl, "group");
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信しお削陀しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキュヌの堎合、同じメッセヌゞグルヌプIDの埌続のメッセヌゞはすべお芋えなくなる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞ以降のメッセヌゞ含め、すべお芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞ以降のメッセヌゞ含め、すべお芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以䞊経過

        // 削陀しなかったメッセヌゞおよび埌続のメッセヌゞが芋えるようになる
        // 受信しお削陀
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 残メッセヌゞは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
    }

メッセヌゞの送受信で䜿うむンスタンスは、FIFOキュヌのものを䜿っおメッセヌゞグルヌプIDを今回はひず぀にしたす。

        SqsMessageSender sqsMessageSender = SqsMessageSender.createFifo(queueUrl, "group");
        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

メッセヌゞを6件入れお、3件のメッセヌゞを削陀せずに取り出すず、このキュヌからはそれ以降のメッセヌゞが取り出せなくなりたす。

        sqsMessageSender
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信しお削陀しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキュヌの堎合、同じメッセヌゞグルヌプIDの埌続のメッセヌゞはすべお芋えなくなる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

タむムアりトするず、タむムアりト埅ちのメッセヌゞずそれ以降のメッセヌゞが取埗できるようになりたす。

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞ以降のメッセヌゞ含め、すべお芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(1L);

        // 可芖性タむムアりト10秒経過しおいないので、受信しお削陀しおいないメッセヌゞ以降のメッセヌゞ含め、すべお芋えない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以䞊経過

        // 削陀しなかったメッセヌゞおよび埌続のメッセヌゞが芋えるようになる
        // 受信しお削陀
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 残メッセヌゞは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
メッセヌゞグルヌプIDが耇数の堎合

最埌は、メッセヌゞグルヌプIDが耇数の堎合を芋おいきたす。

Amazon SQSの可芖性タむムアりトのペヌゞに、以䞋の蚘述がありたした。

メッセヌゞグルヌプ IDがあるメッセヌゞを受信した堎合、メッセヌゞを削陀するか、衚瀺されない限り、同じメッセヌゞグルヌプ IDのメッセヌゞはそれ以䞊返信されたせん。

Amazon SQS可視性タイムアウト - Amazon Simple Queue Service

こちらを芋おいきたす。

    @Test
    void fifoQueueLeftMessagesPerMessageGroup() {
        String queueUrl = "http://localhost:9324/000000000000/fifo-queue.fifo";

        // 異なるメッセヌゞグルヌプIDを持぀クラむアントを䜜成
        SqsMessageSender sqsMessageSenderGroup1 = SqsMessageSender.createFifo(queueUrl, "group1");
        SqsMessageSender sqsMessageSenderGroup2 = SqsMessageSender.createFifo(queueUrl, "group2");
        SqsMessageSender sqsMessageSenderGroup3 = SqsMessageSender.createFifo(queueUrl, "group3");

        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

        sqsMessageSenderGroup1
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList());
        sqsMessageSenderGroup2
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList());
        sqsMessageSenderGroup3
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message3-" + i).toList());

        // 受信しお削陀しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキュヌの堎合、可芖性タむムアりト埅ちずなった同じメッセヌゞグルヌプIDのメッセヌゞはすべお芋えなくなる
        // 他のメッセヌゞグルヌプIDのメッセヌゞは取埗できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキュヌの堎合、可芖性タむムアりト埅ちずなった同じメッセヌゞグルヌプIDのメッセヌゞはすべお芋えなくなる
        // 他のメッセヌゞグルヌプIDのメッセヌゞは取埗できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // 残っおいるのは可芖性タむムアりト埅ちずなったメッセヌゞグルヌプIDを持぀メッセヌゞのみであり、
        // 可芖性タむムアりト10秒経過しおいないので、この時点で取埗できるメッセヌゞはない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

        sleep(4L); // 10秒以䞊経過

        // 削陀しなかったメッセヌゞおよび埌続のメッセヌゞが芋えるようになる
        // 受信しお削陀
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

        // 残メッセヌゞは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();
    }

メッセヌゞの送信偎は、異なるメッセヌゞグルヌプIDを持぀むンスタンスを3぀䜜成したす。

        // 異なるメッセヌゞグルヌプIDを持぀クラむアントを䜜成
        SqsMessageSender sqsMessageSenderGroup1 = SqsMessageSender.createFifo(queueUrl, "group1");
        SqsMessageSender sqsMessageSenderGroup2 = SqsMessageSender.createFifo(queueUrl, "group2");
        SqsMessageSender sqsMessageSenderGroup3 = SqsMessageSender.createFifo(queueUrl, "group3");

        SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false);
        SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);

受信偎は、これたでず同じです。

各メッセヌゞグルヌプIDに察しお、メッセヌゞを登録したす。

        sqsMessageSenderGroup1
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList());
        sqsMessageSenderGroup2
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList());
        sqsMessageSenderGroup3
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message3-" + i).toList());

最埌のグルヌプだけ6件のメッセヌゞを送信し、それ以倖は3件です。

ElasticMQで詊すず、最埌のメッセヌゞグルヌプIDのものが取れやすかったのですが、こちらを取埗しお削陀はしないでおきたす。

        // 受信しお削陀しない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

        sleep(1L);

次にメッセヌゞを取埗するず、他のメッセヌゞグルヌプIDのものが取埗できたす。

        // FIFOキュヌの堎合、可芖性タむムアりト埅ちずなった同じメッセヌゞグルヌプIDのメッセヌゞはすべお芋えなくなる
        // 他のメッセヌゞグルヌプIDのメッセヌゞは取埗できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList().toArray(new String[3]));

        sleep(1L);

        // FIFOキュヌの堎合、可芖性タむムアりト埅ちずなった同じメッセヌゞグルヌプIDのメッセヌゞはすべお芋えなくなる
        // 他のメッセヌゞグルヌプIDのメッセヌゞは取埗できる
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList().toArray(new String[3]));

        sleep(1L);

これで残るは可芖性タむムアりト埅ちのメッセヌゞグルヌプIDのものなのですが、こちらはメッセヌゞを取埗できないたたです。

        // 残っおいるのは可芖性タむムアりト埅ちずなったメッセヌゞグルヌプIDを持぀メッセヌゞのみであり、
        // 可芖性タむムアりト10秒経過しおいないので、この時点で取埗できるメッセヌゞはない
        assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3))
                .isEmpty();
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

これで10秒以䞊経過するず、削陀しなかったメッセヌゞずずもに、それ以降のメッセヌゞも取埗できるようになりたす。

        sleep(4L); // 10秒以䞊経過

        // 削陀しなかったメッセヌゞおよび埌続のメッセヌゞが芋えるようになる
        // 受信しお削陀
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));

最終的に、空のキュヌになりたす。

        // 残メッセヌゞは0
        assertThat(sqsMessageReceiver.receiveMessages(3))
                .isEmpty();

こんなずころでしょうか。

たずめ

Amazon SQSの可芖性タむムアりトに぀いお、ElasticMQを䜿っお確認しおみたした。

暙準キュヌずFIFOキュヌずの挙動の違いや、FIFOキュヌず可芖性タむムアりトの现かい関係はあたり読めおいなかったので、
この機䌚にちゃんず詊せお良かったかなず思いたす。

ずいっおも、あくたで確認はElasticMQなのですが。

たた、LocalStackで最初詊しおいたのですが、動きがドキュメントずだいぶ異なったのでかなりハマりたした。危うく、党然違う挙動を
信じそうになりたしたが、ドキュメントずちゃんず芋比べおおいお良かったです 。