ããã¯ããªã«ãããããŠæžãããã®ïŒ
åã«ãElasticMQã䜿ã£ãŠAmazon SQSã®å¯èŠæ§ã¿ã€ã ã¢ãŠãã詊ããŠã¿ãŸããã
Amazon SQSの可視性タイムアウトをElasticMQで確認する - CLOVER🍀
ä»åºŠã¯ããããã¬ã¿ãŒãã¥ãŒã詊ããŠã¿ãããšæããŸãã
Amazon SQSデッドレターキュー - Amazon Simple Queue Service
Amazon SQSã®ãããã¬ã¿ãŒãã¥ãŒ
Amazon SQSã®ãããã¬ã¿ãŒãã¥ãŒãšã¯ããããã¥ãŒã§æ£åžžã«åŠçã§ããªãã£ãã¡ãã»ãŒãžãéããããå¥ã®ãã¥ãŒã®ããšã§ãã
Amazon SQSデッドレターキュー - Amazon Simple Queue Service
ãããã¬ã¿ãŒãã¥ãŒã¯ãéåžžã®Amazon SQSã®ãã¥ãŒãšããŠäœæããŸããå©çšè
ãèªåã§äœæãããããã¥ãŒã®ãããã¬ã¿ãŒãã¥ãŒãšããŠ
èšå®ããå¿
èŠããããŸãã
ãããã¥ãŒã®ãããã¬ã¿ãŒãã¥ãŒãèšå®ããã³ã³ã·ã¥ãŒããŒãmaxReceiveCount
åã¡ãã»ãŒãžã®åŠçã«å€±æãããšã¡ãã»ãŒãžã¯
ãããã¬ã¿ãŒãã¥ãŒã«ç§»åããŸãã
Amazon SQSãããã¬ã¿ãŒãã¥ãŒ / ãããã¬ã¿ãŒãã¥ãŒã®ããã¿
maxReceiveCount
ã®ããã©ã«ãå€ã¯ã10ã§ãã
maxReceiveCount â The number of times a message is delivered to the source queue before being moved to the dead-letter queue. Default: 10. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.
CreateQueue - Amazon Simple Queue Service
ãªããFIFOãã¥ãŒã®ãããã¬ã¿ãŒãã¥ãŒã¯ãFIFOãã¥ãŒã§ããå¿ èŠãããããã§ãã
ãŸããããã¬ã¿ãŒãã¥ãŒã§ã®ä¿ææéã«ã€ããŠã¯ããããã¬ã¿ãŒãã¥ãŒãžå
¥ã£ãæéã§ã¯ãªããå
ã®ãã¥ãŒã«ã¡ãã»ãŒãžãå
¥ã£ãæéã
èµ·ç¹ãšãªã£ãŠèšç®ãããããã§ãã
ã¡ãã»ãŒãžã®æå¹æéã¯ãåžžã«å ã®ãšã³ãã¥ãŒã®ã¿ã€ã ã¹ã¿ã³ãã«åºã¥ããŸãããããã¬ã¿ãŒãã¥ãŒã«ç§»åãããšããšã³ãã¥ãŒã®ã¿ã€ã ã¹ã¿ã³ãã¯å€æŽãããŸãããApproximateAgeOfOldestMessageã¡ããªãã¯ã¯ãã¡ãã»ãŒãžãæåã«éä¿¡ãããæ¥ã§ã¯ãªããã¡ãã»ãŒãžããããã¬ã¿ãŒãã¥ãŒã«ç§»åããæ¥ã瀺ããŸããããšãã°ãã¡ãã»ãŒãžããããã¬ã¿ãŒãã¥ãŒã«ç§»åãããåã«ãå ã®ãã¥ãŒã§1æ¥è²»ãããšä»®å®ããŸãããããã¬ã¿ãŒãã¥ãŒã®ä¿ææéã4æ¥éã§ããå Žåãã¡ãã»ãŒãžã¯3æ¥åŸã«ãããã¬ã¿ãŒãã¥ãŒããåé€ãããApproximateAgeOfOldestMessageã¯3æ¥éã§ãããããã£ãŠããããã¬ã¿ãŒãã¥ãŒã®ä¿ææéããå ã®ãã¥ãŒã®ä¿ææéãããé·ãèšå®ããããšããã¹ããã©ã¯ãã£ã¹ã§ãã
ãããã¬ã¿ãŒãã¥ãŒã䜿çšããæ¹ãããçç±ã¯ãã¡ãã«æžãããŠããŠãåŠçã§ããªãã£ãã¡ãã»ãŒãžãå¥ã®ãã¥ãŒã«ç§»ããŠèª¿æ»ã
è¡ã£ããã§ããŸãã
Amazon SQSãããã¬ã¿ãŒãã¥ãŒ / ãããã¬ã¿ãŒãã¥ãŒã䜿çšããã¡ãªãã
äžæ¹ã§ãåãã¥ãŒã®çš®é¡ã«å¯ŸããŠãããã¬ã¿ãŒãã¥ãŒãèšããæ¹ãããçç±ã¯ã以äžã«æãäžããŠæžãããŠããŸãã
Amazon SQSãããã¬ã¿ãŒãã¥ãŒ / ãã¥ãŒã®çš®é¡ã«ããã¡ãã»ãŒãžãšã©ãŒã®åŠç
- æšæºãã¥ãŒ
- æšæºãã¥ãŒã¯ãå€ãã®ã¡ãã»ãŒãžãæºã蟌ãããšãã§ãã
- åŠçã«å€±æããã¡ãã»ãŒãžã¯ä¿ææéãçµãããŸã§ã¡ãã»ãŒãžã®åŠçãç¹°ãè¿ãããããšã«ãªãããããã®ãããªå Žåã¯ãããã¬ã¿ãŒãã¥ãŒã«ç§»ããæ¹ããã
- FIFOãã¥ãŒ
- FIFOãã¥ãŒã¯ãä¿çç¶æ ã®ã¡ãã»ãŒãžãå€ãã¯æãŠãªã
- åŠçã«å€±æããã¡ãã»ãŒãžããããšãFIFOãã¥ãŒã¯åãã°ã«ãŒãå ã®ãã以éã®ã¡ãã»ãŒãžãååŸã§ããªããªããäºå®äžãã¥ãŒã䜿ããªããªãã®ã§ãããã¬ã¿ãŒãã¥ãŒã«ç§»ããæ¹ããã
ãšã¯ããããããã¬ã¿ãŒãã¥ãŒã¯æšæºãã¥ãŒã§äœ¿ãããããšãæ³å®ãããŠãããã§ãã
Amazon SQSãããã¬ã¿ãŒãã¥ãŒ / ãããã¬ã¿ãŒãã¥ãŒãé©ããŠããçšé
FIFOãã¥ãŒã®å Žåã¯ãåŠçã®é ãå€ããããšã«ãªãã®ã§ããã蚱容ã§ããªããã°ãããã¬ã¿ãŒãã¥ãŒã䜿çšãã¹ãã§ã¯ãªãããã§ãã
ã¡ãã»ãŒãžãŸãã¯æäœã®æ£ç¢ºãªé åºãç¶æããå¿ èŠãããå Žåã¯ãFIFOãã¥ãŒã§ãããã¬ã¿ãŒãã¥ãŒã䜿çšããªãã§ãã ããã
ãŸãããããã¬ã¿ãŒãã¥ãŒã«å
¥ã£ãã¡ãã»ãŒãžãå
ã®ãã¥ãŒã«æ»ãïŒãªãã©ã€ãïŒããããšãã§ãããããªã®ã§ãããããã¯
æšæºãã¥ãŒã®ã¿ã§è¡ããããã§ãã
Amazon SQSã§ã¯ãAmazon SQS ã³ã³ãœãŒã«ã®ã¹ã¿ã³ããŒããã¥ãŒã«å¯ŸããŠã®ã¿ããããã¬ã¿ãŒãã¥ãŒã®ãªãã©ã€ãããµããŒãããŠããŸãã
ãããã¬ã¿ãŒãã¥ãŒã®èšå®ããªãã©ã€ãã«ã€ããŠã¯ã以äžã«ãèšè¿°ããããŸãã
キューパラメータの設定(コンソール) - Amazon Simple Queue Service
デッドレターキュー(コンソール)を設定 - Amazon Simple Queue Service
デッドレターキューリドライブを設定します。(コンソール) - Amazon Simple Queue Service
ä»åã¯ãElasticMQã䜿ã£ãŠåŠçã«å€±æããã¡ãã»ãŒãžããããã¬ã¿ãŒãã¥ãŒã«ç§»ããšããã確èªããŠã¿ãããšæããŸãã
ç°å¢
ä»åã®ç°å¢ã¯ããã¡ãã
$ java --version openjdk 17.0.6 2023-01-17 OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04) OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing) $ mvn --version Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-67-generic", arch: "amd64", family: "unix"
ElasticMQã¯1.3.14ã䜿ããŸãã
æºå
ElasticMQã¯ãèšå®ãã¡ã€ã«ãçšæããŠèµ·åããããšã«ããŸãããTerraformã§ãã¥ãŒã®æ§ç¯ãè¡ãããšæã£ãã®ã§ããããã¥ãŒã®äœæã§
ã¿ã€ã ã¢ãŠãããã®ã§ããããã¬ã¿ãŒãã¥ãŒã®èšå®ãªã©ãŸã§ãã©ãçããªãã£ãããã§ãâŠã
èšå®ãã¡ã€ã«ã¯ã以äžãåèã«äœæã
ElasticMQ / Automatically creating queues on startup
ãããªæãã«ããŸããã
elasticmq-config/elasticmq.conf
queues { standard-queue { defaultVisibilityTimeout = 3 seconds fifo = false deadLettersQueue { name = "standard-queue-dead-letter-queue" maxReceiveCount = 3 } } standard-queue-dead-letter-queue { fifo = false } "fifo-queue.fifo" { defaultVisibilityTimeout = 3 seconds fifo = true deadLettersQueue { name = "fifo-queue-dead-letter-queue.fifo" maxReceiveCount = 3 } } "fifo-queue-dead-letter-queue.fifo" { fifo = true } }
æšæºãã¥ãŒã®æ¹ããèŠãŠãããŸãããã
äž»ãšãªããã¥ãŒã¯ãå¯èŠæ§ã¿ã€ã ã¢ãŠã3ç§ã3åã¡ãã»ãŒãžã®åŠçã«å€±æããããããã¬ã¿ãŒãã¥ãŒãžéä¿¡ããããã«èšå®ã
standard-queue { defaultVisibilityTimeout = 3 seconds fifo = false deadLettersQueue { name = "standard-queue-dead-letter-queue" maxReceiveCount = 3 } }
å¯èŠæ§ã¿ã€ã ã¢ãŠãã3ç§ãªã®ã§ã3ç§ Ã 3åã§é£ç¶ã§ã¡ãã»ãŒãžãåä¿¡ããããšããŠåé€ããªãç¶æ
ã«ããã°ïŒåä¿¡ã«å€±æããç¶æ
ã«
ããã°ïŒ10ç§ã»ã©ã§ãããã¬ã¿ãŒãã¥ãŒãžéä¿¡ãããããšã«ãªããŸãã
ãããã¬ã¿ãŒãã¥ãŒã¯ãã¡ãã§ããã
standard-queue-dead-letter-queue { fifo = false }
FIFOãã¥ãŒãçšæããŸããããå ã»ã©ã®æšæºãã¥ãŒãšã®èšå®äžã®éãã¯ãã¥ãŒã®ååãšFIFOãã¥ãŒã§ããããšãããããŸããã
"fifo-queue.fifo" { defaultVisibilityTimeout = 3 seconds fifo = true deadLettersQueue { name = "fifo-queue-dead-letter-queue.fifo" maxReceiveCount = 3 } } "fifo-queue-dead-letter-queue.fifo" { fifo = true }
ãã®èšå®ãã¡ã€ã«ãã-Dconfig.file
ã·ã¹ãã ããããã£ã§æå®ããŠèµ·åã
$ java -Dconfig.file=/path/to/elasticmq.conf -jar elasticmq-server-1.3.14.jar
èµ·åæã«ããã¥ãŒãäœæããŠãããã°ãåºåãããŸãã
23:18:02.092 [elasticmq-akka.actor.default-dispatcher-5] INFO o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifo-queue-dead-letter-queue.fifo,None,None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),None,true,false,None,None,Map()) 23:18:02.115 [elasticmq-akka.actor.default-dispatcher-5] INFO o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifo-queue.fifo,Some(MillisVisibilityTimeout(3000)),None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),Some(DeadLettersQueueData(fifo-queue-dead-letter-queue.fifo,3)),true,false,None,None,Map()) 23:18:02.117 [elasticmq-akka.actor.default-dispatcher-5] INFO o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standard-queue-dead-letter-queue,None,None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),None,false,false,None,None,Map()) 23:18:02.118 [elasticmq-akka.actor.default-dispatcher-5] INFO o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standard-queue,Some(MillisVisibilityTimeout(3000)),None,None,Some(2023-03-29T23:18:01.041+09:00),Some(2023-03-29T23:18:01.041+09:00),Some(DeadLettersQueueData(standard-queue-dead-letter-queue,3)),false,false,None,None,Map())
ElasticMQã®æºåã¯ãããã§å®äºã§ãã
次ã¯ãåäœç¢ºèªã«äœ¿ãã¢ããªã±ãŒã·ã§ã³ã®æºåãããŸããããJavaã§äœæãããã¹ãã³ãŒãã§åäœç¢ºèªããããšã«ããŸãã
MavenäŸåé¢ä¿çã¯ãã¡ãã
<dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.9.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.24.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M7</version> </plugin> </plugins> </build>
ãœãŒã¹ã³ãŒããäœæãã
ã§ã¯ãåäœç¢ºèªçšã®ãœãŒã¹ã³ãŒããäœæããŠãããŸãã
Amazon SQSãžã¢ã¯ã»ã¹ããã¯ã©ã€ã¢ã³ããäœæããã¯ã©ã¹ã
src/main/java/org/littlewings/aws/sqs/LocalSqsClientBuilder.java
package org.littlewings.aws.sqs; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import java.net.URI; public class LocalSqsClientBuilder { public static SqsClient create() { return SqsClient .builder() .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create("mock_access_key", "mock_secret_key") ) ) .region(Region.US_EAST_1) .defaultsMode(DefaultsMode.AUTO) .endpointOverride(URI.create("http://localhost:9324")) .build(); } }
ãšã³ããã€ã³ãã¯ãElasticMQã«åããŠããŸãã
Amazon SQSãžã¡ãã»ãŒãžãéä¿¡ããã¯ã©ã¹ã
src/main/java/org/littlewings/aws/sqs/SqsMessageSender.java
package org.littlewings.aws.sqs; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import java.util.List; import java.util.UUID; public class SqsMessageSender { String queueUrl; SqsClient sqsClient; boolean fifo; String messageGroupId; public static SqsMessageSender createStandard(String queueUrl) { SqsMessageSender sqsMessageSender = new SqsMessageSender(); sqsMessageSender.queueUrl = queueUrl; sqsMessageSender.sqsClient = LocalSqsClientBuilder.create(); sqsMessageSender.fifo = false; return sqsMessageSender; } public static SqsMessageSender createFifo(String queueUrl, String messageGroupId) { SqsMessageSender sqsMessageSender = new SqsMessageSender(); sqsMessageSender.queueUrl = queueUrl; sqsMessageSender.sqsClient = LocalSqsClientBuilder.create(); sqsMessageSender.fifo = true; sqsMessageSender.messageGroupId = messageGroupId; return sqsMessageSender; } public void sendMessages(List<String> messages) { List<SendMessageBatchRequestEntry> sendMessageBatchRequestEntries = messages .stream() .map(message -> fifo ? // FIFOãã¥ãŒ SendMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .messageGroupId(messageGroupId) .messageDeduplicationId(UUID.randomUUID().toString()) .messageBody(message) .build() : // æšæºãã¥ãŒ SendMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .messageBody(message) .build() ) .toList(); SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest .builder() .queueUrl(queueUrl) .entries(sendMessageBatchRequestEntries.toArray(new SendMessageBatchRequestEntry[sendMessageBatchRequestEntries.size()])) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); } }
FIFOãã¥ãŒã察象ãšããå Žåã¯ãã¡ãã»ãŒãžã°ã«ãŒãIDãæå®ã§ããããã«ããŠããŸãã
Amazon SQSããã¡ãã»ãŒãžãåä¿¡ããã¯ã©ã¹ã
src/main/java/org/littlewings/aws/sqs/SqsMessageReceiver.java
package org.littlewings.aws.sqs; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import java.util.List; import java.util.UUID; public class SqsMessageReceiver { String queueUrl; SqsClient sqsClient; boolean withDelete; public static SqsMessageReceiver create(String queueUrl, boolean withDelete) { SqsMessageReceiver sqsMessageReceiver = new SqsMessageReceiver(); sqsMessageReceiver.queueUrl = queueUrl; sqsMessageReceiver.sqsClient = LocalSqsClientBuilder.create(); sqsMessageReceiver.withDelete = withDelete; return sqsMessageReceiver; } public List<String> receiveMessages(int maxNumberOfMessages) { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest .builder() .queueUrl(queueUrl) .maxNumberOfMessages(maxNumberOfMessages) .waitTimeSeconds(1) .build(); ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest); List<String> messages = receiveMessageResponse .messages() .stream() .map(message -> message.body()) .toList(); if (withDelete) { List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries = receiveMessageResponse .messages() .stream() .map(message -> DeleteMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .receiptHandle(message.receiptHandle()) .build() ) .toList(); DeleteMessageBatchRequest deleteMessageBatchRequest = DeleteMessageBatchRequest .builder() .queueUrl(queueUrl) .entries(deleteMessageBatchRequestEntries.toArray(new DeleteMessageBatchRequestEntry[deleteMessageBatchRequestEntries.size()])) .build(); sqsClient.deleteMessageBatch(deleteMessageBatchRequest); } return messages; } }
ãã®ã¯ã©ã¹ã®ã€ã³ã¹ã¿ã³ã¹äœææã«ãã¡ãã»ãŒãžãåä¿¡ããæã«ã¡ãã»ãŒãžã®åé€ãŸã§è¡ããã©ããããtrue
ïŒfalse
ã§æå®ã§ããããã«
ããŠããŸãã
ãããã®ã¯ã©ã¹ã䜿ã£ãŠãåäœç¢ºèªããŠãããŸãããã
ãã¹ãã³ãŒãã§ç¢ºèªãã
åäœç¢ºèªã¯ããã¹ãã³ãŒãã§è¡ããŸãã
ãã¹ãã³ãŒãã®é圢ã
src/test/java/org/littlewings/aws/sqs/SqsDeadLetterQueueTest.java
package org.littlewings.aws.sqs; import org.junit.jupiter.api.Test; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; public class SqsDeadLetterQueueTest { void sleep(long sleepSec) { try { TimeUnit.SECONDS.sleep(sleepSec); } catch (InterruptedException e) { // no-op } } // ããã«ããã¹ããæžã }
é©å®ã¹ãªãŒããå ¥ããããã®ã¡ãœãããçšæã
ããããã¯ãæšæºãã¥ãŒãFIFOãã¥ãŒããããã§ç¢ºèªããŠãããŸããå¯èŠæ§ã¿ã€ã ã¢ãŠãã¯ãã©ã¡ãã3ç§ã«ããŠããŸããã
æšæºãã¥ãŒã®å Žå
ãŸãã¯ãæšæºãã¥ãŒããã
@Test void standardQueue() { String standardQueueUrl = "http://localhost:9324/000000000000/standard-queue"; String deadLetterQueueUrl = "http://localhost:9324/000000000000/standard-queue-dead-letter-queue"; SqsMessageSender senderFromStandardQueue = SqsMessageSender.createStandard(standardQueueUrl); SqsMessageReceiver receiverFromStandardQueueNoDelete = SqsMessageReceiver.create(standardQueueUrl, false); SqsMessageReceiver receiverFromStandardQueue = SqsMessageReceiver.create(standardQueueUrl, true); SqsMessageReceiver receiverFromDeadLetterQueue = SqsMessageReceiver.create(deadLetterQueueUrl, true); senderFromStandardQueue .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList()); // åä¿¡ããŠåé€ããªã assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // å ã®ãã¥ãŒã¯ç©ºã«ãªãïŒå ã«ãã¡ãã«ã¢ã¯ã»ã¹ããå¿ èŠãããïŒ assertThat(receiverFromStandardQueue.receiveMessages(3)) .isEmpty(); // 1床å ã®ãã¥ãŒã«ã¢ã¯ã»ã¹ããšããããã¬ã¿ãŒãã¥ãŒããååŸã§ããããã«ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); }
æšæºãã¥ãŒã«ã¡ãã»ãŒãžéä¿¡ã
senderFromStandardQueue .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList());
次ã«ã¡ãã»ãŒãžã®åä¿¡ãè¡ããŸããããã®æã«ã¡ãã»ãŒãžãAmazon SQSããåé€ããªãããã«ããŸãã
// åä¿¡ããŠåé€ããªã assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
å¯èŠæ§ã¿ã€ã ã¢ãŠãã¯3ç§ã«ããŠããã®ã§ãããã§åŸ ã¡ãŸãã
// å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L);
ãã®æç¹ã§ã¯ãããã¬ã¿ãŒãã¥ãŒã«ã¯ã¡ãã»ãŒãžã¯å
¥ã£ãŠããŸããããããŠãå床ã¡ãã»ãŒãžãååŸãããã®ã®ãåé€ããªãïŒåŠç倱æïŒ
ãšããæäœãç¹°ãè¿ããŸãã
// ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
ãã1床å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ
ã¡ãããŠããããã¬ã¿ãŒãã¥ãŒã«ã¡ãã»ãŒãžããªãããšã確èªããŠããŸãã¡ãã»ãŒãžã®ååŸïŒåé€ãã
ãšããæäœãè¡ããŸãã
// å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromStandardQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
ããã§ã3åã¡ãã»ãŒãžã®åŠçã«å€±æããŸãããmaxReceiveCount
ã¯3ã«ããŠããã®ã§ããããã¬ã¿ãŒãã¥ãŒã«ç§»ãåæ°ã«ãªããŸããã
å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ
ã¡ãããŠã
// å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L);
å ã®ãã¥ãŒã«ã¢ã¯ã»ã¹ããŠã¿ããšã¡ãã»ãŒãžãååŸã§ããªããªããŸãã
// å ã®ãã¥ãŒã¯ç©ºã«ãªãïŒå ã«ãã¡ãã«ã¢ã¯ã»ã¹ããå¿ èŠãããïŒ assertThat(receiverFromStandardQueue.receiveMessages(3)) .isEmpty();
ãããŠããããã¬ã¿ãŒãã¥ãŒã«ã¢ã¯ã»ã¹ãããšãå ã»ã©ãŸã§å ã®ãã¥ãŒããååŸããŠããã¡ãã»ãŒãžãååŸã§ããããã«ãªã£ãŠããŸãã
// 1床å ã®ãã¥ãŒã«ã¢ã¯ã»ã¹ããšããããã¬ã¿ãŒãã¥ãŒããååŸã§ããããã«ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
ããã§ãmaxReceiveCount
åã ãåŠçã«å€±æããã¡ãã»ãŒãžã¯ããããã¬ã¿ãŒãã¥ãŒã«ç§»ãããšã確èªã§ããŸããã
ãªããããã ãå
ã®ãã¥ãŒãšãããã¬ã¿ãŒãã¥ãŒã®ã¢ã¯ã»ã¹ã®é çªãå
¥ãæ¿ããŠããŸããããã®é ãéã«ãããšïŒå
ã«ãããã¬ã¿ãŒãã¥ãŒã«
ã¢ã¯ã»ã¹ãããšïŒããã®ãã¹ãã¯å€±æããŸãã
ã©ããããå
ã®ãã¥ãŒã«ã¢ã¯ã»ã¹ããã¿ã€ãã³ã°ã§ãããã¬ã¿ãŒãã¥ãŒã«ã¡ãã»ãŒãžãç§»ãããã¿ããã§ããããã®æåãAmazon SQSã§ã
åããã©ããã¯ããããŸãããã
ä»åã¯ãã¥ãŒã«å
¥ããã¡ãã»ãŒãžããã¹ãŠåãåºããŠå€±æããç¹°ãè¿ããŠãããã¬ã¿ãŒãã¥ãŒã«ç§»ããŸã§ç¢ºèªããŸãããã
æšæºãã¥ãŒã ãšåä¿¡é ãå
¥ãæ¿ãã£ããããã®ã§ããã以äžã®ã¡ãã»ãŒãžã¯å
¥ããŸããã§ããã
FIFOãã¥ãŒ
ç¶ããŠã¯ãFIFOãã¥ãŒã
@Test void fifoQueue() { String fifoQueueUrl = "http://localhost:9324/000000000000/fifo-queue.fifo"; String deadLetterQueueUrl = "http://localhost:9324/000000000000/fifo-queue-dead-letter-queue.fifo"; SqsMessageSender senderFromFifoQueue = SqsMessageSender.createFifo(fifoQueueUrl, "group"); SqsMessageReceiver receiverFromFifoQueueNoDelete = SqsMessageReceiver.create(fifoQueueUrl, false); SqsMessageReceiver receiverFromFifoQueue = SqsMessageReceiver.create(fifoQueueUrl, true); SqsMessageReceiver receiverFromDeadLetterQueue = SqsMessageReceiver.create(deadLetterQueueUrl, true); senderFromFifoQueue .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList()); // åä¿¡ããŠåé€ããªã assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ã°ã«ãŒãå ã®æ¬¡ã®ã¡ãã»ãŒãžãååŸã§ããããã«ãªãïŒå ã«ãã¡ãã«ã¢ã¯ã»ã¹ããå¿ èŠãããïŒ assertThat(receiverFromFifoQueue.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // ãããã¬ã¿ãŒãã¥ãŒããååŸã§ããããã«ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); }
ãã£ãŠããããšã¯ãåºæ¬çã«æšæºãã¥ãŒãšåãã§ãã
ãªã®ã§ãããFIFOãã¥ãŒã®å Žåã¯åäžã¡ãã»ãŒãžã°ã«ãŒãIDå ã§é åºãä¿èšŒãããã®ã§ãã¡ãã»ãŒãžã®æ°ãå°ãå¢ãããŠãããŸããã
senderFromFifoQueue .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());
6ä»¶å ¥ããŠããŸãã
æšæºãã¥ãŒã®æãšåãããã«ãã¡ãã»ãŒãžåä¿¡ â åŠç倱æ â å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ
ã¡ â ãããã¬ã¿ãŒãã¥ãŒç¢ºèª â ã¡ãã»ãŒãžåä¿¡
â åŠç倱æ â å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ
ã¡ â ãããã¬ã¿ãŒãã¥ãŒç¢ºèª â ã¡ãã»ãŒãžåä¿¡ããšããæµããç¹°ãè¿ããŸãã
ååŸããã¡ãã»ãŒãžã¯ã1åã§3ä»¶ã«ããŠããŸãã
// åä¿¡ããŠåé€ããªã assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ãããã¬ã¿ãŒãã¥ãŒã«ã¯ãŸã ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .isEmpty(); // ãã1床åä¿¡ããŠãåé€ããªã assertThat(receiverFromFifoQueueNoDelete.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
ãã®æãFIFOãã¥ãŒã«å ¥ããã¡ãã»ãŒãžã®å é ããã®ååŸãç¹°ãè¿ããŠããŸãã
ãããŠã次ã®å¯èŠæ§ã¿ã€ã ã¢ãŠããåŸ
ã£ãåŸãå
ã®ãã¥ãŒããã¯åŠçã«å€±æããã¡ãã»ãŒãžä»¥éã®ã¡ãã»ãŒãžãååŸããããšãã§ãã
ãããã¬ã¿ãŒãã¥ãŒããã¯ãããŸã§ã«åŠçã«å€±æãç¶ããŠããã¡ãã»ãŒãžãååŸã§ããããã«ãªã£ãŠããŸãã
// å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ sleep(3L); // ã°ã«ãŒãå ã®æ¬¡ã®ã¡ãã»ãŒãžãååŸã§ããããã«ãªãïŒå ã«ãã¡ãã«ã¢ã¯ã»ã¹ããå¿ èŠãããïŒ assertThat(receiverFromFifoQueue.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // ãããã¬ã¿ãŒãã¥ãŒããååŸã§ããããã«ãªã assertThat(receiverFromDeadLetterQueue.receiveMessages(3)) .hasSize(3) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3]));
FIFOãã¥ãŒã®å Žåãã¡ãã»ãŒãžã®åŠçã«å€±æãç¶ããç¶æ³ã§ã¯ãããã¬ã¿ãŒãã¥ãŒã«ã¡ãã»ãŒãžãç§»ããªããšå
ã«é²ããªããªãããšã
確èªã§ããŸããã
ãããªãšããã§ããããã
ãŸãšã
ElasticMQã䜿ã£ãŠãAmazon SQSã®ãããã¬ã¿ãŒãã¥ãŒã詊ããŠã¿ãŸããã
ã¡ãã»ãŒãžã®åŠçã«maxReceiveCount
å倱æãããšããã«ãããã¬ã¿ãŒãã¥ãŒã«ç§»ããšæã£ãŠããã®ã§ãããå
ã®ãã¥ãŒã«ã¢ã¯ã»ã¹
ããªããšãããã¬ã¿ãŒãã¥ãŒããååŸã§ããªãã®ã¯ElasticMQç¹æã®è©±ãã©ããã¯ããããªãã®ã§ããã
ãŸãããããŸã§æ°ã«ããã»ã©ã®ããšã§ããªãããªããšæããŸãã
åäœã¯ãããã£ãŠç¢ºèªã§ããã®ã§ããããšããŸãããã