CLOVER🍀

That was when it all began.

Amazon SQSのデッドレタヌキュヌをElasticMQで詊す

これは、なにをしたくお曞いたもの

前に、ElasticMQを䜿っおAmazon SQSの可芖性タむムアりトを詊しおみたした。

Amazon SQSの可視性タイムアウトをElasticMQで確認する - CLOVER🍀

今床は、デッドレタヌキュヌを詊しおみたいず思いたす。

Amazon SQSデッドレターキュー - Amazon Simple Queue Service

Amazon SQSのデッドレタヌキュヌ

Amazon SQSのデッドレタヌキュヌずは、あるキュヌで正垞に凊理できなかったメッセヌゞが送られる、別のキュヌのこずです。

Amazon SQSデッドレターキュー - Amazon Simple Queue Service

デッドレタヌキュヌは、通垞のAmazon SQSのキュヌずしお䜜成したす。利甚者が自分で䜜成し、あるキュヌのデッドレタヌキュヌずしお
蚭定する必芁がありたす。

あるキュヌのデッドレタヌキュヌを蚭定し、コンシュヌマヌがmaxReceiveCount回メッセヌゞの凊理に倱敗するずメッセヌゞは
デッドレタヌキュヌに移動したす。

Amazon SQSデッドレタヌキュヌ / デッドレタヌキュヌのしくみ

maxReceiveCountのデフォルト倀は、10です。

maxReceiveCount – The number of times a message is delivered to the source queue before being moved to the dead-letter queue. Default: 10. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.

CreateQueue - Amazon Simple Queue Service

なお、FIFOキュヌのデッドレタヌキュヌは、FIFOキュヌである必芁があるそうです。

たたデッドレタヌキュヌでの保持期間に぀いおは、デッドレタヌキュヌぞ入った時間ではなく、元のキュヌにメッセヌゞが入った時間が
起点ずなっお蚈算されるようです。

メッセヌゞの有効期限は、垞に元の゚ンキュヌのタむムスタンプに基づきたす。デッドレタヌキュヌに移動するず、゚ンキュヌのタむムスタンプは倉曎されたせん。ApproximateAgeOfOldestMessageメトリックは、メッセヌゞが最初に送信された日ではなく、メッセヌゞがデッドレタヌキュヌに移動した日を瀺したす。たずえば、メッセヌゞがデッドレタヌキュヌに移動される前に、元のキュヌで1日費やすず仮定したす。デッドレタヌキュヌの保持期間が4日間である堎合、メッセヌゞは3日埌にデッドレタヌキュヌから削陀され、ApproximateAgeOfOldestMessageは3日間です。したがっお、デッドレタヌキュヌの保持期間を、元のキュヌの保持期間よりも長く蚭定するこずがベストプラクティスです。

デッドレタヌキュヌを䜿甚した方がよい理由はこちらに曞かれおいお、凊理できなかったメッセヌゞを別のキュヌに移しお調査を
行ったりできたす。

Amazon SQSデッドレタヌキュヌ / デッドレタヌキュヌを䜿甚するメリット

䞀方で、各キュヌの皮類に察しおデッドレタヌキュヌを蚭けた方がよい理由は、以䞋に掘り䞋げお曞かれおいたす。

Amazon SQSデッドレタヌキュヌ / キュヌの皮類によるメッセヌゞ゚ラヌの凊理

  • 暙準キュヌ
    • 暙準キュヌは、倚くのメッセヌゞを溜め蟌むこずができる
    • 凊理に倱敗するメッセヌゞは保持期間が終わるたでメッセヌゞの凊理が繰り返されるこずになるため、このような堎合はデッドレタヌキュヌに移した方がよい
  • FIFOキュヌ
    • FIFOキュヌは、保留状態のメッセヌゞを倚くは持おない
    • 凊理に倱敗するメッセヌゞがあるず、FIFOキュヌは同じグルヌプ内のそれ以降のメッセヌゞが取埗できなくなり、事実䞊キュヌが䜿えなくなるのでデッドレタヌキュヌに移した方がよい

ずはいえ、デッドレタヌキュヌは暙準キュヌで䜿われるこずが想定されおいそうです。

Amazon SQSデッドレタヌキュヌ / デッドレタヌキュヌが適しおいる甚途

FIFOキュヌの堎合は、凊理の順が倉わるこずになるのでそれが蚱容できなければデッドレタヌキュヌを䜿甚すべきではないそうです。

メッセヌゞたたは操䜜の正確な順序を維持する必芁がある堎合は、FIFOキュヌでデッドレタヌキュヌを䜿甚しないでください。

たた、デッドレタヌキュヌに入ったメッセヌゞを元のキュヌに戻すリドラむブするこずもできるようなのですが、これは
暙準キュヌのみで行えるようです。

Amazon SQSでは、Amazon SQS コン゜ヌルのスタンダヌドキュヌに察しおのみ、デッドレタヌキュヌのリドラむブをサポヌトしおいたす。

Amazon SQSデッドレタヌキュヌ / デッドレタヌキュヌからメッセヌゞを移動するには

デッドレタヌキュヌの蚭定やリドラむブに぀いおは、以䞋にも蚘述がありたす。

キューパラメータの設定(コンソール) - Amazon Simple Queue Service

デッドレターキュー(コンソール)を設定 - Amazon Simple Queue Service

デッドレターキューリドライブを設定します。(コンソール) - Amazon Simple Queue Service

今回は、ElasticMQを䜿っお凊理に倱敗したメッセヌゞがデッドレタヌキュヌに移るずころを確認しおみようず思いたす。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.6 2023-01-17
OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-67-generic", arch: "amd64", family: "unix"

ElasticMQは1.3.14を䜿いたす。

準備

ElasticMQは、蚭定ファむルを甚意しお起動するこずにしたした。Terraformでキュヌの構築を行おうず思ったのですが、キュヌの䜜成で
タむムアりトするので、デッドレタヌキュヌの蚭定などたでたどり着かなかったからです 。

蚭定ファむルは、以䞋を参考に䜜成。

ElasticMQ / Automatically creating queues on startup

こんな感じにしたした。

elasticmq-config/elasticmq.conf

queues {
  standard-queue {
    defaultVisibilityTimeout = 3 seconds
    fifo = false

    deadLettersQueue {
      name = "standard-queue-dead-letter-queue"
      maxReceiveCount = 3
    }
  }

  standard-queue-dead-letter-queue {
    fifo = false
  }

  "fifo-queue.fifo" {
    defaultVisibilityTimeout = 3 seconds
    fifo = true

    deadLettersQueue {
      name = "fifo-queue-dead-letter-queue.fifo"
      maxReceiveCount = 3
    }
  }

  "fifo-queue-dead-letter-queue.fifo" {
    fifo = true
  }
}

暙準キュヌの方から芋おいきたしょう。

䞻ずなるキュヌは、可芖性タむムアりト3秒、3回メッセヌゞの凊理に倱敗したらデッドレタヌキュヌぞ送信するように蚭定。

  standard-queue {
    defaultVisibilityTimeout = 3 seconds
    fifo = false

    deadLettersQueue {
      name = "standard-queue-dead-letter-queue"
      maxReceiveCount = 3
    }
  }

可芖性タむムアりトが3秒なので、3秒 × 3回で連続でメッセヌゞを受信しようずしお削陀しない状態にすれば受信に倱敗した状態に
すれば10秒ほどでデッドレタヌキュヌぞ送信されるこずになりたす。

デッドレタヌキュヌはこちらですね。

  standard-queue-dead-letter-queue {
    fifo = false
  }

FIFOキュヌも甚意したしたが、先ほどの暙準キュヌずの蚭定䞊の違いはキュヌの名前ずFIFOキュヌであるこずしかありたせん。

  "fifo-queue.fifo" {
    defaultVisibilityTimeout = 3 seconds
    fifo = true

    deadLettersQueue {
      name = "fifo-queue-dead-letter-queue.fifo"
      maxReceiveCount = 3
    }
  }

  "fifo-queue-dead-letter-queue.fifo" {
    fifo = true
  }

この蚭定ファむルを、-Dconfig.fileシステムプロパティで指定しお起動。

$ java -Dconfig.file=/path/to/elasticmq.conf -jar elasticmq-server-1.3.14.jar

起動時に、キュヌを䜜成しおいるログが出力されたす。

23:18:02.092 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifo-queue-dead-letter-queue.fifo,None,None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),None,true,false,None,None,Map())
23:18:02.115 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifo-queue.fifo,Some(MillisVisibilityTimeout(3000)),None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),Some(DeadLettersQueueData(fifo-queue-dead-letter-queue.fifo,3)),true,false,None,None,Map())
23:18:02.117 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standard-queue-dead-letter-queue,None,None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),None,false,false,None,None,Map())
23:18:02.118 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standard-queue,Some(MillisVisibilityTimeout(3000)),None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),Some(DeadLettersQueueData(standard-queue-dead-letter-queue,3)),false,false,None,None,Map())

ElasticMQの準備は、これで完了です。

次は、動䜜確認に䜿うアプリケヌションの準備をしたしょう。Javaで䜜成し、テストコヌドで動䜜確認するこずにしたす。

Maven䟝存関係等はこちら。

    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

゜ヌスコヌドを䜜成する

では、動䜜確認甚の゜ヌスコヌドを䜜成しおいきたす。

Amazon SQSぞアクセスするクラむアントを䜜成するクラス。

src/main/java/org/littlewings/aws/sqs/LocalSqsClientBuilder.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.net.URI;

public class LocalSqsClientBuilder {
    public static SqsClient create() {
        return SqsClient
                .builder()
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create("mock_access_key", "mock_secret_key")
                        )
                )
                .region(Region.US_EAST_1)
                .defaultsMode(DefaultsMode.AUTO)
                .endpointOverride(URI.create("http://localhost:9324"))
                .build();
    }
}

゚ンドポむントは、ElasticMQに向けおいたす。

Amazon SQSぞメッセヌゞを送信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageSender.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;

import java.util.List;
import java.util.UUID;

public class SqsMessageSender {
    String queueUrl;

    SqsClient sqsClient;

    boolean fifo;

    String messageGroupId;

    public static SqsMessageSender createStandard(String queueUrl) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = false;

        return sqsMessageSender;
    }

    public static SqsMessageSender createFifo(String queueUrl, String messageGroupId) {
        SqsMessageSender sqsMessageSender = new SqsMessageSender();
        sqsMessageSender.queueUrl = queueUrl;
        sqsMessageSender.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageSender.fifo = true;
        sqsMessageSender.messageGroupId = messageGroupId;

        return sqsMessageSender;
    }

    public void sendMessages(List<String> messages) {
        List<SendMessageBatchRequestEntry> sendMessageBatchRequestEntries =
                messages
                        .stream()
                        .map(message ->
                                fifo ?
                                        // FIFOキュヌ
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageGroupId(messageGroupId)
                                                .messageDeduplicationId(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                                        :
                                        // 暙準キュヌ
                                        SendMessageBatchRequestEntry
                                                .builder()
                                                .id(UUID.randomUUID().toString())
                                                .messageBody(message)
                                                .build()
                        )
                        .toList();
        SendMessageBatchRequest sendMessageBatchRequest =
                SendMessageBatchRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .entries(sendMessageBatchRequestEntries.toArray(new SendMessageBatchRequestEntry[sendMessageBatchRequestEntries.size()]))
                        .build();

        sqsClient.sendMessageBatch(sendMessageBatchRequest);
    }
}

FIFOキュヌを察象ずする堎合は、メッセヌゞグルヌプIDを指定できるようにしおいたす。

Amazon SQSからメッセヌゞを受信するクラス。

src/main/java/org/littlewings/aws/sqs/SqsMessageReceiver.java

package org.littlewings.aws.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import java.util.List;
import java.util.UUID;

public class SqsMessageReceiver {
    String queueUrl;
    SqsClient sqsClient;

    boolean withDelete;

    public static SqsMessageReceiver create(String queueUrl, boolean withDelete) {
        SqsMessageReceiver sqsMessageReceiver = new SqsMessageReceiver();
        sqsMessageReceiver.queueUrl = queueUrl;
        sqsMessageReceiver.sqsClient = LocalSqsClientBuilder.create();
        sqsMessageReceiver.withDelete = withDelete;

        return sqsMessageReceiver;
    }

    public List<String> receiveMessages(int maxNumberOfMessages) {
        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(maxNumberOfMessages)
                        .waitTimeSeconds(1)
                        .build();

        ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest);

        List<String> messages =
                receiveMessageResponse
                        .messages()
                        .stream()
                        .map(message -> message.body())
                        .toList();

        if (withDelete) {
            List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries =
                    receiveMessageResponse
                            .messages()
                            .stream()
                            .map(message ->
                                    DeleteMessageBatchRequestEntry
                                            .builder()
                                            .id(UUID.randomUUID().toString())
                                            .receiptHandle(message.receiptHandle())
                                            .build()
                            )
                            .toList();

            DeleteMessageBatchRequest deleteMessageBatchRequest =
                    DeleteMessageBatchRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .entries(deleteMessageBatchRequestEntries.toArray(new DeleteMessageBatchRequestEntry[deleteMessageBatchRequestEntries.size()]))
                            .build();

            sqsClient.deleteMessageBatch(deleteMessageBatchRequest);
        }

        return messages;
    }
}

このクラスのむンスタンス䜜成時に、メッセヌゞを受信した時にメッセヌゞの削陀たで行うかどうかを、truefalseで指定できるように しおいたす。

これらのクラスを䜿っお、動䜜確認しおいきたしょう。

テストコヌドで確認する

動䜜確認は、テストコヌドで行いたす。

テストコヌドの雛圢。

src/test/java/org/littlewings/aws/sqs/SqsDeadLetterQueueTest.java

package org.littlewings.aws.sqs;

import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class SqsDeadLetterQueueTest {
    void sleep(long sleepSec) {
        try {
            TimeUnit.SECONDS.sleep(sleepSec);
        } catch (InterruptedException e) {
            // no-op
        }
    }

    // ここに、テストを曞く
}

適宜スリヌプを入れるためのメ゜ッドも甚意。

ここからは、暙準キュヌ、FIFOキュヌそれぞれで確認しおいきたす。可芖性タむムアりトは、どちらも3秒にしおいたした。

暙準キュヌの堎合

たずは、暙準キュヌから。

    @Test
    void standardQueue() {
        String standardQueueUrl = "http://localhost:9324/000000000000/standard-queue";
        String deadLetterQueueUrl = "http://localhost:9324/000000000000/standard-queue-dead-letter-queue";

        SqsMessageSender senderFromStandardQueue = SqsMessageSender.createStandard(standardQueueUrl);
        SqsMessageReceiver receiverFromStandardQueueNoDelete = SqsMessageReceiver.create(standardQueueUrl, false);
        SqsMessageReceiver receiverFromStandardQueue = SqsMessageReceiver.create(standardQueueUrl, true);
        SqsMessageReceiver receiverFromDeadLetterQueue = SqsMessageReceiver.create(deadLetterQueueUrl, true);

        senderFromStandardQueue
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList());

        // 受信しお削陀しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // 元のキュヌは空になる先にこちらにアクセスする必芁がある
        assertThat(receiverFromStandardQueue.receiveMessages(3))
                .isEmpty();

        // 1床元のキュヌにアクセスるず、デッドレタヌキュヌから取埗できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
    }

暙準キュヌにメッセヌゞ送信。

        senderFromStandardQueue
                .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList());

次にメッセヌゞの受信を行いたすが、この時にメッセヌゞをAmazon SQSから削陀しないようにしたす。

        // 受信しお削陀しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

可芖性タむムアりトは3秒にしおいたので、ここで埅ちたす。

        // 可芖性タむムアりト埅ち
        sleep(3L);

この時点ではデッドレタヌキュヌにはメッセヌゞは入っおいたせん。そしお、再床メッセヌゞを取埗するものの、削陀しない凊理倱敗
ずいう操䜜を繰り返したす。

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

もう1床可芖性タむムアりト埅ちをしお、デッドレタヌキュヌにメッセヌゞがないこずを確認しお、たたメッセヌゞの取埗削陀せず
ずいう操䜜を行いたす。

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

これで、3回メッセヌゞの凊理に倱敗したした。maxReceiveCountは3にしおいたので、デッドレタヌキュヌに移る回数になりたした。
可芖性タむムアりト埅ちをしお、

        // 可芖性タむムアりト埅ち
        sleep(3L);

元のキュヌにアクセスしおみるずメッセヌゞが取埗できなくなりたす。

        // 元のキュヌは空になる先にこちらにアクセスする必芁がある
        assertThat(receiverFromStandardQueue.receiveMessages(3))
                .isEmpty();

そしお、デッドレタヌキュヌにアクセスするず、先ほどたで元のキュヌから取埗しおいたメッセヌゞが取埗できるようになっおいたす。

        // 1床元のキュヌにアクセスるず、デッドレタヌキュヌから取埗できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

これで、maxReceiveCount回だけ凊理に倱敗したメッセヌゞは、デッドレタヌキュヌに移るこずが確認できたした。

なお、ここだけ元のキュヌずデッドレタヌキュヌのアクセスの順番を入れ替えおいたすが、この順を逆にするず先にデッドレタヌキュヌに
アクセスするず、このテストは倱敗したす。

どうやら、元のキュヌにアクセスしたタむミングでデッドレタヌキュヌにメッセヌゞが移されるみたいですね。この挙動がAmazon SQSでも
同じかどうかはわかりたせんが。

今回はキュヌに入れたメッセヌゞをすべお取り出しお倱敗、を繰り返しおデッドレタヌキュヌに移るたで確認したしたが、
暙準キュヌだず受信順が入れ替わったりするので、これ以䞊のメッセヌゞは入れたせんでした。

FIFOキュヌ

続いおは、FIFOキュヌ。

    @Test
    void fifoQueue() {
        String fifoQueueUrl = "http://localhost:9324/000000000000/fifo-queue.fifo";
        String deadLetterQueueUrl = "http://localhost:9324/000000000000/fifo-queue-dead-letter-queue.fifo";

        SqsMessageSender senderFromFifoQueue = SqsMessageSender.createFifo(fifoQueueUrl, "group");
        SqsMessageReceiver receiverFromFifoQueueNoDelete = SqsMessageReceiver.create(fifoQueueUrl, false);
        SqsMessageReceiver receiverFromFifoQueue = SqsMessageReceiver.create(fifoQueueUrl, true);
        SqsMessageReceiver receiverFromDeadLetterQueue = SqsMessageReceiver.create(deadLetterQueueUrl, true);

        senderFromFifoQueue
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

        // 受信しお削陀しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // グルヌプ内の次のメッセヌゞが取埗できるようになる先にこちらにアクセスする必芁がある
        assertThat(receiverFromFifoQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // デッドレタヌキュヌから取埗できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
    }

やっおいるこずは、基本的に暙準キュヌず同じです。

なのですが、FIFOキュヌの堎合は同䞀メッセヌゞグルヌプID内で順序が保蚌されるので、メッセヌゞの数を少し増やしおおきたした。

        senderFromFifoQueue
                .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());

6件入れおいたす。

暙準キュヌの時ず同じように、メッセヌゞ受信 → 凊理倱敗 → 可芖性タむムアりト埅ち → デッドレタヌキュヌ確認 → メッセヌゞ受信
→ 凊理倱敗 → 可芖性タむムアりト埅ち → デッドレタヌキュヌ確認 → メッセヌゞ受信、ずいう流れを繰り返したす。

取埗するメッセヌゞは、1回で3件にしおいたす。

        // 受信しお削陀しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // デッドレタヌキュヌにはただない
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .isEmpty();

        // もう1床受信しお、削陀しない
        assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

この時、FIFOキュヌに入れたメッセヌゞの先頭からの取埗を繰り返しおいたす。

そしお、次の可芖性タむムアりトを埅った埌、元のキュヌからは凊理に倱敗したメッセヌゞ以降のメッセヌゞを取埗するこずができ、
デッドレタヌキュヌからはここたでに凊理に倱敗し続けおいたメッセヌゞが取埗できるようになっおいたす。

        // 可芖性タむムアりト埅ち
        sleep(3L);

        // グルヌプ内の次のメッセヌゞが取埗できるようになる先にこちらにアクセスする必芁がある
        assertThat(receiverFromFifoQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

        // デッドレタヌキュヌから取埗できるようになる
        assertThat(receiverFromDeadLetterQueue.receiveMessages(3))
                .hasSize(3)
                .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));

FIFOキュヌの堎合、メッセヌゞの凊理に倱敗し続ける状況ではデッドレタヌキュヌにメッセヌゞが移らないず先に進めなくなるこずが
確認できたした。

こんなずころでしょうか。

たずめ

ElasticMQを䜿っお、Amazon SQSのデッドレタヌキュヌを詊しおみたした。

メッセヌゞの凊理にmaxReceiveCount回倱敗するずすぐにデッドレタヌキュヌに移るず思っおいたのですが、元のキュヌにアクセス
しないずデッドレタヌキュヌから取埗できないのはElasticMQ特有の話かどうかはわからないのですが。

たあ、そこたで気にするほどのこずでもないかな、ず思いたす。

動䜜はこうやっお確認できたので、よしずしたしょう。