CLOVER🍀

That was when it all began.

Amazon SQSのデッドレターキューをElasticMQで試す

これは、なにをしたくて書いたもの?

前に、ElasticMQを使ってAmazon SQSの可視性タイムアウトを試してみました。

Amazon SQSの可視性タイムアウトをElasticMQで確認する - CLOVER🍀

今度は、デッドレターキューを試してみたいと思います。

Amazon SQSデッドレターキュー - Amazon Simple Queue Service

Amazon SQSのデッドレターキュー

Amazon SQSのデッドレターキューとは、あるキューで正常に処理できなかったメッセージが送られる、別のキューのことです。

Amazon SQSデッドレターキュー - Amazon Simple Queue Service

デッドレターキューは、通常のAmazon SQSのキューとして作成します。利用者が自分で作成し、あるキューのデッドレターキューとして
設定する必要があります。

あるキューのデッドレターキューを設定し、コンシューマーがmaxReceiveCount回メッセージの処理に失敗するとメッセージは
デッドレターキューに移動します。

Amazon SQSデッドレターキュー / デッドレターキューのしくみ

maxReceiveCountのデフォルト値は、10です。

maxReceiveCount – The number of times a message is delivered to the source queue before being moved to the dead-letter queue. Default: 10. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.

CreateQueue - Amazon Simple Queue Service

なお、FIFOキューのデッドレターキューは、FIFOキューである必要があるそうです。

またデッドレターキューでの保持期間については、デッドレターキューへ入った時間ではなく、元のキューにメッセージが入った時間が
起点となって計算されるようです。

メッセージの有効期限は、常に元のエンキューのタイムスタンプに基づきます。デッドレターキューに移動すると、エンキューのタイムスタンプは変更されません。ApproximateAgeOfOldestMessageメトリックは、メッセージが最初に送信された日ではなく、メッセージがデッドレターキューに移動した日を示します。たとえば、メッセージがデッドレターキューに移動される前に、元のキューで1日費やすと仮定します。デッドレターキューの保持期間が4日間である場合、メッセージは3日後にデッドレターキューから削除され、ApproximateAgeOfOldestMessageは3日間です。したがって、デッドレターキューの保持期間を、元のキューの保持期間よりも長く設定することがベストプラクティスです。

デッドレターキューを使用した方がよい理由はこちらに書かれていて、処理できなかったメッセージを別のキューに移して調査を
行ったりできます。

Amazon SQSデッドレターキュー / デッドレターキューを使用するメリット

一方で、各キューの種類に対してデッドレターキューを設けた方がよい理由は、以下に掘り下げて書かれています。

Amazon SQSデッドレターキュー / キューの種類によるメッセージエラーの処理

  • 標準キュー
    • 標準キューは、多くのメッセージを溜め込むことができる
    • 処理に失敗するメッセージは保持期間が終わるまでメッセージの処理が繰り返されることになるため、このような場合はデッドレターキューに移した方がよい
  • FIFOキュー
    • FIFOキューは、保留状態のメッセージを多くは持てない
    • 処理に失敗するメッセージがあると、FIFOキューは同じグループ内のそれ以降のメッセージが取得できなくなり、事実上キューが使えなくなるのでデッドレターキューに移した方がよい

とはいえ、デッドレターキューは標準キューで使われることが想定されていそうです。

Amazon SQSデッドレターキュー / デッドレターキューが適している用途

FIFOキューの場合は、処理の順が変わることになるのでそれが許容できなければデッドレターキューを使用すべきではないそうです。

メッセージまたは操作の正確な順序を維持する必要がある場合は、FIFOキューでデッドレターキューを使用しないでください。

また、デッドレターキューに入ったメッセージを元のキューに戻す(リドライブ)することもできるようなのですが、これは
標準キューのみで行えるようです。

Amazon SQSでは、Amazon SQS コンソールのスタンダードキューに対してのみ、デッドレターキューのリドライブをサポートしています。

Amazon SQSデッドレターキュー / デッドレターキューからメッセージを移動するには

デッドレターキューの設定やリドライブについては、以下にも記述があります。

キューパラメータの設定(コンソール) - Amazon Simple Queue Service

デッドレターキュー(コンソール)を設定 - Amazon Simple Queue Service

デッドレターキューリドライブを設定します。(コンソール) - Amazon Simple Queue Service

今回は、ElasticMQを使って処理に失敗したメッセージがデッドレターキューに移るところを確認してみようと思います。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.6 2023-01-17
OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-67-generic", arch: "amd64", family: "unix"

ElasticMQは1.3.14を使います。

準備

ElasticMQは、設定ファイルを用意して起動することにしました。Terraformでキューの構築を行おうと思ったのですが、キューの作成で
タイムアウトするので、デッドレターキューの設定などまでたどり着かなかったからです…。

設定ファイルは、以下を参考に作成。

ElasticMQ / Automatically creating queues on startup

こんな感じにしました。

elasticmq-config/elasticmq.conf

queues {
  standard-queue {
    defaultVisibilityTimeout = 3 seconds
    fifo = false

    deadLettersQueue {
      name = "standard-queue-dead-letter-queue"
      maxReceiveCount = 3
    }
  }

  standard-queue-dead-letter-queue {
    fifo = false
  }

  "fifo-queue.fifo" {
    defaultVisibilityTimeout = 3 seconds
    fifo = true

    deadLettersQueue {
      name = "fifo-queue-dead-letter-queue.fifo"
      maxReceiveCount = 3
    }
  }

  "fifo-queue-dead-letter-queue.fifo" {
    fifo = true
  }
}

標準キューの方から見ていきましょう。

主となるキューは、可視性タイムアウト3秒、3回メッセージの処理に失敗したらデッドレターキューへ送信するように設定。

  standard-queue {
    defaultVisibilityTimeout = 3 seconds
    fifo = false

    deadLettersQueue {
      name = "standard-queue-dead-letter-queue"
      maxReceiveCount = 3
    }
  }

可視性タイムアウトが3秒なので、3秒 × 3回で連続でメッセージを受信しようとして削除しない状態にすれば(受信に失敗した状態に
すれば)10秒ほどでデッドレターキューへ送信されることになります。

デッドレターキューはこちらですね。

  standard-queue-dead-letter-queue {
    fifo = false
  }

FIFOキューも用意しましたが、先ほどの標準キューとの設定上の違いはキューの名前とFIFOキューであることしかありません。

  "fifo-queue.fifo" {
    defaultVisibilityTimeout = 3 seconds
    fifo = true

    deadLettersQueue {
      name = "fifo-queue-dead-letter-queue.fifo"
      maxReceiveCount = 3
    }
  }

  "fifo-queue-dead-letter-queue.fifo" {
    fifo = true
  }

この設定ファイルを、-Dconfig.fileシステムプロパティで指定して起動。

$ java -Dconfig.file=/path/to/elasticmq.conf -jar elasticmq-server-1.3.14.jar

起動時に、キューを作成しているログが出力されます。

23:18:02.092 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifo-queue-dead-letter-queue.fifo,None,None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),None,true,false,None,None,Map())
23:18:02.115 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifo-queue.fifo,Some(MillisVisibilityTimeout(3000)),None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),Some(DeadLettersQueueData(fifo-queue-dead-letter-queue.fifo,3)),true,false,None,None,Map())
23:18:02.117 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standard-queue-dead-letter-queue,None,None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),None,false,false,None,None,Map())
23:18:02.118 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standard-queue,Some(MillisVisibilityTimeout(3000)),None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),Some(DeadLettersQueueData(standard-queue-dead-letter-queue,3)),false,false,None,None,Map())

ElasticMQの準備は、これで完了です。

次は、動作確認に使うアプリケーションの準備をしましょう。Javaで作成し、テストコードで動作確認することにします。

Maven依存関係等はこちら。

    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

ソースコードを作成する

では、動作確認用のソースコードを作成していきます。

Amazon SQSへアクセスするクライアントを作成するクラス。

src/main/java/org/littlewings/aws/sqs/LocalSqsClientBuilder.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.net.URI;

public class LocalSqsClientBuilder {
    public static SqsClient create() {
        return SqsClient
                .builder()
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create("mock_access_key", "mock_secret_key")
                        )
                )
                .region(Region.US_EAST_1)
                .defaultsMode(DefaultsMode.AUTO)
                .endpointOverride(URI.create("http://localhost:9324"))
                .build();
    }
}

エンドポイントは、ElasticMQに向けています。

Amazon SQSへメッセージを送信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageSender.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;

import java.util.List;
import java.util.UUID;

public class SqsMessageSender {
    String queueUrl;

    SqsClient sqsClient;

    boolean fifo;

    String messageGroupId;

    public static SqsMessageSender createStandard(String queueUrl) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = false;

        return sqsMessageSender;
    }

    public static SqsMessageSender createFifo(String queueUrl, String messageGroupId) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = true;
        sqsMessageSender.messageGroupId = messageGroupId;

        return sqsMessageSender;
    }

    public void sendMessages(List<String> messages) {
        List<SendMessageBatchRequestEntry> sendMessageBatchRequestEntries =
                messages
                        .stream()
                        .map(message ->
                                fifo ?
                                        // FIFOキュー
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageGroupId(messageGroupId)
                                                .messageDeduplicationId(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                                        :
                                        // 標準キュー
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                        )
                        .toList();
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(sendMessageBatchRequestEntries.toArray(new SendMessageBatchRequestEntry[sendMessageBatchRequestEntries.size()]))
                        .build();

        sqsClient.sendMessageBatch(sendMessageBatchRequest);
    }
}

FIFOキューを対象とする場合は、メッセージグループIDを指定できるようにしています。

Amazon SQSからメッセージを受信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageReceiver.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import java.util.List;
import java.util.UUID;

public class SqsMessageReceiver {
    String queueUrl;
    SqsClient sqsClient;

    boolean withDelete;

    public static SqsMessageReceiver create(String queueUrl, boolean withDelete) {
        SqsMessageReceiver sqsMessageReceiver = new SqsMessageReceiver();
        sqsMessageReceiver.queueUrl = queueUrl;
        sqsMessageReceiver.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageReceiver.withDelete = withDelete;

        return sqsMessageReceiver;
    }

    public List<String> receiveMessages(int maxNumberOfMessages) {
        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(maxNumberOfMessages)
                        .waitTimeSeconds(1)
                        .build();

        ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest);

        List<String> messages =
                receiveMessageResponse
                        .messages()
                        .stream()
                        .map(message -> message.body())
                        .toList();

        if (withDelete) {
            List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries =
                    receiveMessageResponse
                            .messages()
                            .stream()
                            .map(message ->
                                    DeleteMessageBatchRequestEntry
                                            .builder()
                                            .id(UUID.randomUUID().toString())
                                            .receiptHandle(message.receiptHandle())
                                            .build()
                            )
                            .toList();

            DeleteMessageBatchRequest deleteMessageBatchRequest =
                    DeleteMessageBatchRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .entries(deleteMessageBatchRequestEntries.toArray(new DeleteMessageBatchRequestEntry[deleteMessageBatchRequestEntries.size()]))
                            .build();

            sqsClient.deleteMessageBatch(deleteMessageBatchRequest);
        }

        return messages;
    }
}

このクラスのインスタンス作成時に、メッセージを受信した時にメッセージの削除まで行うかどうかを、true/falseで指定できるように しています。

これらのクラスを使って、動作確認していきましょう。

テストコードで確認する

動作確認は、テストコードで行います。

テストコードの雛形。

src/test/java/org/littlewings/aws/sqs/SqsDeadLetterQueueTest.java

package org.littlewings.aws.sqs;

import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class SqsDeadLetterQueueTest {
    void sleep(long sleepSec) {
        try {
            TimeUnit.SECONDS.sleep(sleepSec);
        } catch (InterruptedException e) {
            // no-op
        }
    }

    // ここに、テストを書く
}

適宜スリープを入れるためのメソッドも用意。

ここからは、標準キュー、FIFOキューそれぞれで確認していきます。可視性タイムアウトは、どちらも3秒にしていました。

標準キューの場合

まずは、標準キューから。

    @Test
    void standardQueue() {
        String standardQueueUrl = "http://localhost:9324/000000000000/standard-queue";
        String deadLetterQueueUrl = "http://localhost:9324/000000000000/standard-queue-dead-letter-queue";

        SqsMessageSender senderFromStandardQueue = SqsMessageSender.createStandard(standardQueueUrl);
        SqsMessageReceiver receiverFromStandardQueueNoDelete = SqsMessageReceiver.create(standardQueueUrl, false);
        SqsMessageReceiver receiverFromStandardQueue = SqsMessageReceiver.create(standardQueueUrl, true);
        SqsMessageReceiver receiverFromDeadLetterQueue = SqsMessageReceiver.create(deadLetterQueueUrl, true);

        senderFromStandardQueue
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList());

        // 受信して削除しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // 元のキューは空になる(先にこちらにアクセスする必要がある)
        assertThat(receiverFromStandardQueue.receiveMessages(3))
                .isEmpty();

        // 1度元のキューにアクセスると、デッドレターキューから取得できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
    }

標準キューにメッセージ送信。

        senderFromStandardQueue
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList());

次にメッセージの受信を行いますが、この時にメッセージをAmazon SQSから削除しないようにします。

        // 受信して削除しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

可視性タイムアウトは3秒にしていたので、ここで待ちます。

        // 可視性タイムアウト待ち
        sleep(3L);

この時点ではデッドレターキューにはメッセージは入っていません。そして、再度メッセージを取得するものの、削除しない(処理失敗)
という操作を繰り返します。

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

もう1度可視性タイムアウト待ちをして、デッドレターキューにメッセージがないことを確認して、またメッセージの取得&削除せず
という操作を行います。

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

これで、3回メッセージの処理に失敗しました。maxReceiveCountは3にしていたので、デッドレターキューに移る回数になりました。
可視性タイムアウト待ちをして、

        // 可視性タイムアウト待ち
        sleep(3L);

元のキューにアクセスしてみるとメッセージが取得できなくなります。

        // 元のキューは空になる(先にこちらにアクセスする必要がある)
        assertThat(receiverFromStandardQueue.receiveMessages(3))
                .isEmpty();

そして、デッドレターキューにアクセスすると、先ほどまで元のキューから取得していたメッセージが取得できるようになっています。

        // 1度元のキューにアクセスると、デッドレターキューから取得できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

これで、maxReceiveCount回だけ処理に失敗したメッセージは、デッドレターキューに移ることが確認できました。

なお、ここだけ元のキューとデッドレターキューのアクセスの順番を入れ替えていますが、この順を逆にすると(先にデッドレターキューに
アクセスすると)、このテストは失敗します。

どうやら、元のキューにアクセスしたタイミングでデッドレターキューにメッセージが移されるみたいですね。この挙動がAmazon SQSでも
同じかどうかはわかりませんが。

今回はキューに入れたメッセージをすべて取り出して失敗、を繰り返してデッドレターキューに移るまで確認しましたが、
標準キューだと受信順が入れ替わったりするので、これ以上のメッセージは入れませんでした。

FIFOキュー

続いては、FIFOキュー。

    @Test
    void fifoQueue() {
        String fifoQueueUrl = "http://localhost:9324/000000000000/fifo-queue.fifo";
        String deadLetterQueueUrl = "http://localhost:9324/000000000000/fifo-queue-dead-letter-queue.fifo";

        SqsMessageSender senderFromFifoQueue = SqsMessageSender.createFifo(fifoQueueUrl, "group");
        SqsMessageReceiver receiverFromFifoQueueNoDelete = SqsMessageReceiver.create(fifoQueueUrl, false);
        SqsMessageReceiver receiverFromFifoQueue = SqsMessageReceiver.create(fifoQueueUrl, true);
        SqsMessageReceiver receiverFromDeadLetterQueue = SqsMessageReceiver.create(deadLetterQueueUrl, true);

        senderFromFifoQueue
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信して削除しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // グループ内の次のメッセージが取得できるようになる(先にこちらにアクセスする必要がある)
        assertThat(receiverFromFifoQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // デッドレターキューから取得できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
    }

やっていることは、基本的に標準キューと同じです。

なのですが、FIFOキューの場合は同一メッセージグループID内で順序が保証されるので、メッセージの数を少し増やしておきました。

        senderFromFifoQueue
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

6件入れています。

標準キューの時と同じように、メッセージ受信 → 処理失敗 → 可視性タイムアウト待ち → デッドレターキュー確認 → メッセージ受信
→ 処理失敗 → 可視性タイムアウト待ち → デッドレターキュー確認 → メッセージ受信、という流れを繰り返します。

取得するメッセージは、1回で3件にしています。

        // 受信して削除しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可視性タイムアウト待ち
        sleep(3L);

        // デッドレターキューにはまだない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1度受信して、削除しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

この時、FIFOキューに入れたメッセージの先頭からの取得を繰り返しています。

そして、次の可視性タイムアウトを待った後、元のキューからは処理に失敗したメッセージ以降のメッセージを取得することができ、
デッドレターキューからはここまでに処理に失敗し続けていたメッセージが取得できるようになっています。

        // 可視性タイムアウト待ち
        sleep(3L);

        // グループ内の次のメッセージが取得できるようになる(先にこちらにアクセスする必要がある)
        assertThat(receiverFromFifoQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // デッドレターキューから取得できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

FIFOキューの場合、メッセージの処理に失敗し続ける状況ではデッドレターキューにメッセージが移らないと先に進めなくなることが
確認できました。

こんなところでしょうか。

まとめ

ElasticMQを使って、Amazon SQSのデッドレターキューを試してみました。

メッセージの処理にmaxReceiveCount回失敗するとすぐにデッドレターキューに移ると思っていたのですが、元のキューにアクセス
しないとデッドレターキューから取得できないのはElasticMQ特有の話かどうかはわからないのですが。

まあ、そこまで気にするほどのことでもないかな、と思います。

動作はこうやって確認できたので、よしとしましょう。