ããã¯ããªã«ãããããŠæžãããã®ïŒ
Amazon SQSã®å¯èŠæ§ã¿ã€ã ã¢ãŠããšãããã®ãã1床èŠãŠãããããªãšæããŸããŠã
ãã¡ãã§ããã
Amazon SQS可視性タイムアウト - Amazon Simple Queue Service
Amazon SQSã®å¯èŠæ§ã¿ã€ã ã¢ãŠã
Amazon SQSã®å¯èŠæ§ã¿ã€ã ã¢ãŠããšã¯ãã³ã³ã·ã¥ãŒããŒãã¡ãã»ãŒãžãååŸããŠããåé€ãããŸã§ã®éãä»ã®ã³ã³ã·ã¥ãŒããŒã
ã¡ãã»ãŒãžãååŸã§ããªããªãæéã®ããšã§ãã
Amazon SQS可視性タイムアウト - Amazon Simple Queue Service
Amazon SQSã§ã¯ãã³ã³ã·ã¥ãŒããŒã¯ã¡ãã»ãŒãžãåä¿¡ããåŸãåé€ããå¿
èŠããããŸãã
ã³ã³ã·ã¥ãŒããŒãã¡ãã»ãŒãžãåä¿¡ããŠããã¡ãã»ãŒãžèªäœã¯ãã¥ãŒã«æ®ã£ããŸãŸã®ç¶æ
ã«ãªããŸããããæåã«ã¡ãã»ãŒãžãåä¿¡ãã
ã³ã³ã·ã¥ãŒããŒã«åé¡ãçºçããå ŽåïŒïŒã¡ãã»ãŒãžã®åé€ã¯ã§ããªãã£ãïŒãä»ã®ã³ã³ã·ã¥ãŒããŒã該åœã®ã¡ãã»ãŒãžãåŠçããããšã«
ãªããŸãã
ãã ãåãã¡ãã»ãŒãžãè€æ°ã®ã³ã³ã·ã¥ãŒããŒãåŠçããããšããªããããäžå®ã®ã¿ã€ã ã¢ãŠãæéãèšããŠããŸãã
ãããå¯èŠæ§ã¿ã€ã ã¢ãŠãã§ããã
ããã©ã«ãã¯30ç§ãæå°ã¯0ç§ãæå€§ã¯12æéã§ãã
å¯èŠæ§ã¿ã€ã ã¢ãŠãã¯ãã¥ãŒåäœããããŠã¡ãã»ãŒãžã®åä¿¡æã«åå¥ã«èšå®ããããšãã§ããŸãã
Amazon SQSå¯èŠæ§ã¿ã€ã ã¢ãŠã / å¯èŠæ§ã¿ã€ã ã¢ãŠãã®èšå®
éäžã§å€æŽããããšãã§ããããã§ãã
- Amazon SQSå¯èŠæ§ã¿ã€ã ã¢ãŠã / ã¡ãã»ãŒãžã®å¯èŠæ§ã¿ã€ã ã¢ãŠãã®å€æŽ
- Amazon SQSå¯èŠæ§ã¿ã€ã ã¢ãŠã / ã¡ãã»ãŒãžã®å¯èŠæ§ã¿ã€ã ã¢ãŠãã®çµäº
ãŸããæšæºãã¥ãŒãšFIFOãã¥ãŒã§ã¯éãããããŸãã
- æšæºãã¥ãŒ
- å¯èŠæ§ã¿ã€ã ã¢ãŠãã¯ãæšæºãã¥ãŒã§åãã¡ãã»ãŒãžã2ååä¿¡ããªãä¿èšŒïŒat least onceïŒã«ã¯ãªããªã
- å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ã®ã¡ãã»ãŒãžããã£ãŠããä»ã®ã¡ãã»ãŒãžã¯ååŸã§ãã
- FIFOãã¥ãŒ
- åãã°ã«ãŒãå ã«å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ã®ã¡ãã»ãŒãžããã£ãå Žåããã®ã°ã«ãŒãå ã®ã¡ãã»ãŒãžã¯ã¿ã€ã ã¢ãŠããããã¡ãã»ãŒãžãåé€ããã®ãããããè¡ããŸã§ååŸã§ããªããªã
- éä¿¡æã«åãéè€é€å€IDãåä¿¡æã«åãåä¿¡ãªã¯ãšã¹ã詊è¡IDã䜿ãããšã§ãå詊è¡ãå¯èœ
ä»åã¯ãå¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ
ã¡ã«ãªã£ãæã«ãã®ã¡ãã»ãŒãžãååŸã§ããªããªãããšããFIFOãã¥ãŒã§ã®ç¢ºèªãããŠãããããš
æããŸãã
確èªã¯ãAWS SDK for Java v2ãšElasticMQã§è¡ãããšã«ããŸãã
â»æåã¯LocalStackã§è©ŠããŠããã®ã§ãããæåãã ãã¶ç°ãªãããã¥ã¡ã³ããšåããåããªãã£ãã®ã§ElasticMQã«ããŸãã
ç°å¢
ä»åã®ç°å¢ã¯ããã¡ãã
$ java --version openjdk 17.0.6 2023-01-17 OpenJDK Runtime Environment (build 17.0.6+10-Ubuntu-0ubuntu122.04) OpenJDK 64-Bit Server VM (build 17.0.6+10-Ubuntu-0ubuntu122.04, mixed mode, sharing) $ mvn --version Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.6, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-67-generic", arch: "amd64", family: "unix"
ElasticMQã¯1.3.14ã䜿ããŸãã
èµ·åã
$ java -jar elasticmq-server-1.3.14.jar
ElasticMQäžã®ãã¥ãŒã®äœæã¯ãTerraformã§è¡ãããšã«ããŸãã
$ terraform version Terraform v1.4.2 on linux_amd64
æºå
ãŸãã¯ãElasticMQäžã«ãã¥ãŒãäœæããŸãã
Terraformã®æ§æãã¡ã€ã«ããã®ããã«çšæã
main.tf
terraform { required_version = "1.4.2" required_providers { aws = { source = "hashicorp/aws" version = "4.59.0" } } } provider "aws" { access_key = "mock_access_key" secret_key = "mock_secret_key" region = "us-east-1" skip_credentials_validation = true skip_metadata_api_check = true skip_requesting_account_id = true endpoints { sqs = "http://localhost:9324" } } resource "aws_sqs_queue" "standard_queue" { name = "standard-queue" receive_wait_time_seconds = 1 visibility_timeout_seconds = 10 } resource "aws_sqs_queue" "fifo_queue" { name = "fifo-queue.fifo" fifo_queue = true receive_wait_time_seconds = 1 visibility_timeout_seconds = 10 } output "standard_queue_url" { value = aws_sqs_queue.standard_queue.url } output "fifo_queue_url" { value = aws_sqs_queue.fifo_queue.url }
æšæºãã¥ãŒãšFIFOãã¥ãŒã®äž¡æ¹ãäœæããå¯èŠæ§ã¿ã€ã ã¢ãŠãã¯ãããã10ç§ãšããŸããã
é©çšã
$ terraform init $ terraform apply
ElasticMQã察象ã«å®è¡ãããšããªããã¿ã€ã ã¢ãŠããããã§ããã©ãâŠã
â Error: waiting for SQS Queue (http://localhost:9324/000000000000/standard-queue) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s) â â with aws_sqs_queue.standard_queue, â on main.tf line 26, in resource "aws_sqs_queue" "standard_queue": â 26: resource "aws_sqs_queue" "standard_queue" { â âµ â· â Error: waiting for SQS Queue (http://localhost:9324/000000000000/fifo-queue.fifo) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s) â â with aws_sqs_queue.fifo_queue, â on main.tf line 33, in resource "aws_sqs_queue" "fifo_queue": â 33: resource "aws_sqs_queue" "fifo_queue" { â âµ
äžå¿ãããã§ããã¥ãŒã¯ã§ããŠããã®ã§ãã®ãŸãŸäœ¿ããŸãã
$ aws --endpoint-url http://localhost:9324 sqs list-queues { "QueueUrls": [ "http://localhost:9324/000000000000/standard-queue", "http://localhost:9324/000000000000/fifo-queue.fifo" ] }
ç¶ããŠãJavaåŽãMavenäŸåé¢ä¿çã¯ãã®ããã«èšå®ã
<properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.20.30</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.9.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.24.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M7</version> </plugin> </plugins> </build>
åäœç¢ºèªã¯ãã¹ãã³ãŒãã§è¡ãããšã«ããŸãã
ããã°ã©ã ãäœæãã
ã§ã¯ãããã°ã©ã ãäœæããŠãããŸãããã
ãŸãã¯Amazon SQSã«ã¢ã¯ã»ã¹ããã¯ã©ã€ã¢ã³ããäœæããã¯ã©ã¹ã
src/main/java/org/littlewings/aws/sqs/LocalSqsClientBuilder.java
package org.littlewings.aws.sqs; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import java.net.URI; public class LocalSqsClientBuilder { public static SqsClient create() { return SqsClient .builder() .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create("mock_access_key", "mock_scret_key") ) ) .region(Region.US_EAST_1) .defaultsMode(DefaultsMode.AUTO) .endpointOverride(URI.create("http://localhost:9324")) .build(); } }
ãšã³ããã€ã³ããElasticMQã«å·®ãæ¿ããŠããŸãã
Amazon SQSãžã¡ãã»ãŒãžãéä¿¡ããã¯ã©ã¹ã
src/main/java/org/littlewings/aws/sqs/SqsMessageSender.java
package org.littlewings.aws.sqs; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import java.util.List; import java.util.UUID; public class SqsMessageSender { String queueUrl; SqsClient sqsClient; boolean fifo; String messageGroupId; public static SqsMessageSender createStandard(String queueUrl) { SqsMessageSender sqsMessageSender = new SqsMessageSender(); sqsMessageSender.queueUrl = queueUrl; sqsMessageSender.sqsClient = LocalSqsClientBuilder.create(); sqsMessageSender.fifo = false; return sqsMessageSender; } public static SqsMessageSender createFifo(String queueUrl, String messageGroupId) { SqsMessageSender sqsMessageSender = new SqsMessageSender(); sqsMessageSender.queueUrl = queueUrl; sqsMessageSender.sqsClient = LocalSqsClientBuilder.create(); sqsMessageSender.fifo = true; sqsMessageSender.messageGroupId = messageGroupId; return sqsMessageSender; } public void sendMessages(List<String> messages) { List<SendMessageBatchRequestEntry> sendMessageBatchRequestEntries = messages .stream() .map(message -> fifo ? // FIFOãã¥ãŒ SendMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .messageGroupId(messageGroupId) .messageDeduplicationId(UUID.randomUUID().toString()) .messageBody(message) .build() : // æšæºãã¥ãŒ SendMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .messageBody(message) .build() ) .toList(); SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest .builder() .queueUrl(queueUrl) .entries(sendMessageBatchRequestEntries.toArray(new SendMessageBatchRequestEntry[sendMessageBatchRequestEntries.size()])) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); } }
FIFOãã¥ãŒã®å Žåã¯ãã¡ãã»ãŒãžã°ã«ãŒãIDãæå®ã§ããããã«ããŠããŸãã
Amazon SQSããã¡ãã»ãŒãžãåä¿¡ããã¯ã©ã¹ã
src/main/java/org/littlewings/aws/sqs/SqsMessageReceiver.java
package org.littlewings.aws.sqs; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import java.util.List; import java.util.UUID; public class SqsMessageReceiver { String queueUrl; SqsClient sqsClient; boolean withDelete; public static SqsMessageReceiver create(String queueUrl, boolean withDelete) { SqsMessageReceiver sqsMessageReceiver = new SqsMessageReceiver(); sqsMessageReceiver.queueUrl = queueUrl; sqsMessageReceiver.sqsClient = LocalSqsClientBuilder.create(); sqsMessageReceiver.withDelete = withDelete; return sqsMessageReceiver; } public List<String> receiveMessages(int maxNumberOfMessages) { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest .builder() .queueUrl(queueUrl) .maxNumberOfMessages(maxNumberOfMessages) .waitTimeSeconds(1) .build(); ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest); List<String> messages = receiveMessageResponse .messages() .stream() .map(message -> message.body()) .toList(); if (withDelete) { List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries = receiveMessageResponse .messages() .stream() .map(message -> DeleteMessageBatchRequestEntry .builder() .id(UUID.randomUUID().toString()) .receiptHandle(message.receiptHandle()) .build() ) .toList(); DeleteMessageBatchRequest deleteMessageBatchRequest = DeleteMessageBatchRequest .builder() .queueUrl(queueUrl) .entries(deleteMessageBatchRequestEntries.toArray(new DeleteMessageBatchRequestEntry[deleteMessageBatchRequestEntries.size()])) .build(); sqsClient.deleteMessageBatch(deleteMessageBatchRequest); } return messages; } }
ãã®ã¯ã©ã¹ã®ã€ã³ã¹ã¿ã³ã¹äœææã«ãã¡ãã»ãŒãžãåä¿¡ããæã«ã¡ãã»ãŒãžã®åé€ãŸã§è¡ããã©ããããtrue
ïŒfalse
ã§æå®ã§ããããã«
ããŠããŸãã
ãããã®ã¯ã©ã¹ã䜿ã£ãŠãåäœç¢ºèªããŠãããŸãããã
ãã¹ãã³ãŒãã§ç¢ºèªãã
åäœç¢ºèªã¯ããã¹ãã³ãŒãã§è¡ããŸãã
ãã¹ãã³ãŒãã®é圢ã
src/test/java/org/littlewings/aws/sqs/SqsVisibilityTimeoutTest.java
package org.littlewings.aws.sqs; import org.junit.jupiter.api.Test; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; public class SqsVisibilityTimeoutTest { // ããã«ããã¹ããæžãïŒ void sleep(long sleepSec) { try { TimeUnit.SECONDS.sleep(sleepSec); } catch (InterruptedException e) { // no-op } } }
é©å®ã¹ãªãŒããå ¥ããããã®ã¡ãœãããçšæã
ããããã¯ãæšæºãã¥ãŒãFIFOãã¥ãŒããããã§ç¢ºèªããŠãããŸããå¯èŠæ§ã¿ã€ã ã¢ãŠãã¯ãã©ã¡ãã10ç§ã§ãã
æšæºãã¥ãŒã®å Žå
æåã¯æšæºãã¥ãŒã§ç¢ºèªããŸãã
@Test void standardQueue() { String queueUrl = "http://localhost:9324/000000000000/standard-queue"; SqsMessageSender sqsMessageSender = SqsMessageSender.createStandard(queueUrl); SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false); SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true); sqsMessageSender .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList()); // åä¿¡ããŠåé€ããªã // ç»é²ããã¡ãã»ãŒãžã®ãããããè¿ã£ãŠããïŒé åºä¿èšŒã¯ãªãïŒ List<String> nonDeletedMessages = sqsMessageReceiverWithoutDelete.receiveMessages(3); assertThat(nonDeletedMessages) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); sleep(1L); // åä¿¡ããŠåé€ // æåã«åä¿¡ããã¡ãã»ãŒãžä»¥å€ã®ã¡ãã»ãŒãžã®ãããããè¿ã£ãŠããïŒé åºä¿èšŒã¯ãªãïŒ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsAnyOf(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])) .doesNotContainAnyElementsOf(nonDeletedMessages); sleep(1L); // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžãèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(1L); // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžãèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(4L); // 10ç§ä»¥äžçµé // åé€ããªãã£ãã¡ãã»ãŒãžãèŠããããã«ãªãïŒåãé åºã«ãªããšã¯éããªãïŒ // åä¿¡ããŠåé€ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsOnly(nonDeletedMessages.toArray(new String[nonDeletedMessages.size()])); // æ®ã¡ãã»ãŒãžã¯0 assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); }
ã¡ãã»ãŒãžãéä¿¡ããã€ã³ã¹ã¿ã³ã¹ãã¡ãã»ãŒãžãåä¿¡ããŠåé€ã¯è¡ããªãã€ã³ã¹ã¿ã³ã¹ãã¡ãã»ãŒãžã®åä¿¡ãããŠåé€ãŸã§è¡ã
ã€ã³ã¹ã¿ã³ã¹ãçšæããŸãã
SqsMessageSender sqsMessageSender = SqsMessageSender.createStandard(queueUrl); SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false); SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);
ãã¥ãŒã«6ä»¶ã¡ãã»ãŒãžãéä¿¡ã
sqsMessageSender .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList());
æåã®åä¿¡ã¯ã3ä»¶ååŸããŠåé€ãè¡ããŸããã
// åä¿¡ããŠåé€ããªã // ç»é²ããã¡ãã»ãŒãžã®ãããããè¿ã£ãŠããïŒé åºä¿èšŒã¯ãªãïŒ List<String> nonDeletedMessages = sqsMessageReceiverWithoutDelete.receiveMessages(3); assertThat(nonDeletedMessages) .hasSize(3) .containsAnyOf(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); sleep(1L);
æšæºãã¥ãŒã¯å
¥ããé ã«åãåºããªãããšãããã®ã§ãã¢ãµãŒã·ã§ã³ã¯ãããªæãã«ããŸããã
ãã ãããã§1床ååŸããã¡ãã»ãŒãžã¯ä¿æããŠãããŸãã
ãŸãé©å®ã¹ãªãŒããæãã§ãããŸãã
次ã¯3ä»¶ååŸããŸãããæåã«åä¿¡ããã¡ãã»ãŒãžä»¥å€ãè¿ã£ãŠããŸãã
// åä¿¡ããŠåé€ // æåã«åä¿¡ããã¡ãã»ãŒãžä»¥å€ã®ã¡ãã»ãŒãžã®ãããããè¿ã£ãŠããïŒé åºä¿èšŒã¯ãªãïŒ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsAnyOf(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])) .doesNotContainAnyElementsOf(nonDeletedMessages); sleep(1L);
ãã®åŸã¯ãå¯èŠæ§ã¿ã€ã ã¢ãŠãã§æå®ããç§æ°ãçµéãããŸã§ã¯ã¡ãã»ãŒãžãååŸã§ããªããªããŸãã
// å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžãèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(1L); // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžãèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty();
ãããŠ10ç§ä»¥äžçµéãããšãæåã«ååŸããŠåé€ããªãã£ãã¡ãã»ãŒãžãååŸã§ããããã«ãªããŸãã
sleep(4L); // 10ç§ä»¥äžçµé // åé€ããªãã£ãã¡ãã»ãŒãžãèŠããããã«ãªãïŒåãé åºã«ãªããšã¯éããªãïŒ // åä¿¡ããŠåé€ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsOnly(nonDeletedMessages.toArray(new String[nonDeletedMessages.size()]));
æ®ãã®ã¡ãã»ãŒãžä»¶æ°ã¯0ã§ãã
// æ®ã¡ãã»ãŒãžã¯0 assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty();
æšæºãã¥ãŒã§ã¯ãå¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ
ã¡ã®ã¡ãã»ãŒãžã¯ååŸã§ããªããªããŸããããã®åŸã«ç¶ãã¡ãã»ãŒãžã¯ãµã€ãã«ååŸã§ãã
ãšãããšããããã€ã³ãã§ããã
FIFOãã¥ãŒã®å Žå
ç¶ããŠã¯ãFIFOãã¥ãŒã§ç¢ºèªããŸããããã€ãããªãšãŒã·ã§ã³ããããŸãã
ã¡ãã»ãŒãžã°ã«ãŒãIDãã²ãšã€ã®å Žå
ãŸãã¯ã¡ãã»ãŒãžã°ã«ãŒãIDãã²ãšã€ã ã£ãå ŽåãèŠãŠãããŸãã
@Test void fifoQueue() { String queueUrl = "http://localhost:4566/000000000000/fifo-queue.fifo"; SqsMessageSender sqsMessageSender = SqsMessageSender.createFifo(queueUrl, "group"); SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false); SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true); sqsMessageSender .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList()); // åä¿¡ããŠåé€ããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); sleep(1L); // FIFOãã¥ãŒã®å Žåãåãã¡ãã»ãŒãžã°ã«ãŒãIDã®åŸç¶ã®ã¡ãã»ãŒãžã¯ãã¹ãŠèŠããªããªã assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(1L); // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžä»¥éã®ã¡ãã»ãŒãžå«ãããã¹ãŠèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(1L); // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžä»¥éã®ã¡ãã»ãŒãžå«ãããã¹ãŠèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(4L); // 10ç§ä»¥äžçµé // åé€ããªãã£ãã¡ãã»ãŒãžããã³åŸç¶ã®ã¡ãã»ãŒãžãèŠããããã«ãªã // åä¿¡ããŠåé€ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // æ®ã¡ãã»ãŒãžã¯0 assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); }
ã¡ãã»ãŒãžã®éåä¿¡ã§äœ¿ãã€ã³ã¹ã¿ã³ã¹ã¯ãFIFOãã¥ãŒã®ãã®ã䜿ã£ãŠã¡ãã»ãŒãžã°ã«ãŒãIDãä»åã¯ã²ãšã€ã«ããŸãã
SqsMessageSender sqsMessageSender = SqsMessageSender.createFifo(queueUrl, "group"); SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false); SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);
ã¡ãã»ãŒãžã6ä»¶å ¥ããŠã3ä»¶ã®ã¡ãã»ãŒãžãåé€ããã«åãåºããšããã®ãã¥ãŒããã¯ãã以éã®ã¡ãã»ãŒãžãåãåºããªããªããŸãã
sqsMessageSender .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message-" + i).toList()); // åä¿¡ããŠåé€ããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); sleep(1L); // FIFOãã¥ãŒã®å Žåãåãã¡ãã»ãŒãžã°ã«ãŒãIDã®åŸç¶ã®ã¡ãã»ãŒãžã¯ãã¹ãŠèŠããªããªã assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(1L);
ã¿ã€ã ã¢ãŠããããšãã¿ã€ã ã¢ãŠãåŸ ã¡ã®ã¡ãã»ãŒãžãšãã以éã®ã¡ãã»ãŒãžãååŸã§ããããã«ãªããŸãã
// å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžä»¥éã®ã¡ãã»ãŒãžå«ãããã¹ãŠèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(1L); // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ãåä¿¡ããŠåé€ããŠããªãã¡ãã»ãŒãžä»¥éã®ã¡ãã»ãŒãžå«ãããã¹ãŠèŠããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(4L); // 10ç§ä»¥äžçµé // åé€ããªãã£ãã¡ãã»ãŒãžããã³åŸç¶ã®ã¡ãã»ãŒãžãèŠããããã«ãªã // åä¿¡ããŠåé€ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message-" + i).toList().toArray(new String[3])); // æ®ã¡ãã»ãŒãžã¯0 assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty();
ã¡ãã»ãŒãžã°ã«ãŒãIDãè€æ°ã®å Žå
æåŸã¯ãã¡ãã»ãŒãžã°ã«ãŒãIDãè€æ°ã®å ŽåãèŠãŠãããŸãã
Amazon SQSã®å¯èŠæ§ã¿ã€ã ã¢ãŠãã®ããŒãžã«ã以äžã®èšè¿°ããããŸããã
ã¡ãã»ãŒãžã°ã«ãŒã IDãããã¡ãã»ãŒãžãåä¿¡ããå Žåãã¡ãã»ãŒãžãåé€ãããã衚瀺ãããªãéããåãã¡ãã»ãŒãžã°ã«ãŒã IDã®ã¡ãã»ãŒãžã¯ãã以äžè¿ä¿¡ãããŸããã
Amazon SQS可視性タイムアウト - Amazon Simple Queue Service
ãã¡ããèŠãŠãããŸãã
@Test void fifoQueueLeftMessagesPerMessageGroup() { String queueUrl = "http://localhost:9324/000000000000/fifo-queue.fifo"; // ç°ãªãã¡ãã»ãŒãžã°ã«ãŒãIDãæã€ã¯ã©ã€ã¢ã³ããäœæ SqsMessageSender sqsMessageSenderGroup1 = SqsMessageSender.createFifo(queueUrl, "group1"); SqsMessageSender sqsMessageSenderGroup2 = SqsMessageSender.createFifo(queueUrl, "group2"); SqsMessageSender sqsMessageSenderGroup3 = SqsMessageSender.createFifo(queueUrl, "group3"); SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false); SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true); sqsMessageSenderGroup1 .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList()); sqsMessageSenderGroup2 .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList()); sqsMessageSenderGroup3 .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message3-" + i).toList()); // åä¿¡ããŠåé€ããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3])); sleep(1L); // FIFOãã¥ãŒã®å Žåãå¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ãšãªã£ãåãã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ãã¹ãŠèŠããªããªã // ä»ã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ååŸã§ãã assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList().toArray(new String[3])); sleep(1L); // FIFOãã¥ãŒã®å Žåãå¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ãšãªã£ãåãã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ãã¹ãŠèŠããªããªã // ä»ã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ååŸã§ãã assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList().toArray(new String[3])); sleep(1L); // æ®ã£ãŠããã®ã¯å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ãšãªã£ãã¡ãã»ãŒãžã°ã«ãŒãIDãæã€ã¡ãã»ãŒãžã®ã¿ã§ããã // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ããã®æç¹ã§ååŸã§ããã¡ãã»ãŒãžã¯ãªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); sleep(4L); // 10ç§ä»¥äžçµé // åé€ããªãã£ãã¡ãã»ãŒãžããã³åŸç¶ã®ã¡ãã»ãŒãžãèŠããããã«ãªã // åä¿¡ããŠåé€ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3])); assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3])); // æ®ã¡ãã»ãŒãžã¯0 assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty(); }
ã¡ãã»ãŒãžã®éä¿¡åŽã¯ãç°ãªãã¡ãã»ãŒãžã°ã«ãŒãIDãæã€ã€ã³ã¹ã¿ã³ã¹ã3ã€äœæããŸãã
// ç°ãªãã¡ãã»ãŒãžã°ã«ãŒãIDãæã€ã¯ã©ã€ã¢ã³ããäœæ SqsMessageSender sqsMessageSenderGroup1 = SqsMessageSender.createFifo(queueUrl, "group1"); SqsMessageSender sqsMessageSenderGroup2 = SqsMessageSender.createFifo(queueUrl, "group2"); SqsMessageSender sqsMessageSenderGroup3 = SqsMessageSender.createFifo(queueUrl, "group3"); SqsMessageReceiver sqsMessageReceiverWithoutDelete = SqsMessageReceiver.create(queueUrl, false); SqsMessageReceiver sqsMessageReceiver = SqsMessageReceiver.create(queueUrl, true);
åä¿¡åŽã¯ããããŸã§ãšåãã§ãã
åã¡ãã»ãŒãžã°ã«ãŒãIDã«å¯ŸããŠãã¡ãã»ãŒãžãç»é²ããŸãã
sqsMessageSenderGroup1 .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList()); sqsMessageSenderGroup2 .sendMessages(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList()); sqsMessageSenderGroup3 .sendMessages(IntStream.rangeClosed(1, 6).mapToObj(i -> "Message3-" + i).toList());
æåŸã®ã°ã«ãŒãã ã6ä»¶ã®ã¡ãã»ãŒãžãéä¿¡ãããã以å€ã¯3ä»¶ã§ãã
ElasticMQã§è©ŠããšãæåŸã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ãã®ãåããããã£ãã®ã§ããããã¡ããååŸããŠåé€ã¯ããªãã§ãããŸãã
// åä¿¡ããŠåé€ããªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3])); sleep(1L);
次ã«ã¡ãã»ãŒãžãååŸãããšãä»ã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ãã®ãååŸã§ããŸãã
// FIFOãã¥ãŒã®å Žåãå¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ãšãªã£ãåãã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ãã¹ãŠèŠããªããªã // ä»ã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ååŸã§ãã assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message2-" + i).toList().toArray(new String[3])); sleep(1L); // FIFOãã¥ãŒã®å Žåãå¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ãšãªã£ãåãã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ãã¹ãŠèŠããªããªã // ä»ã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ã¡ãã»ãŒãžã¯ååŸã§ãã assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message1-" + i).toList().toArray(new String[3])); sleep(1L);
ããã§æ®ãã¯å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ã®ã¡ãã»ãŒãžã°ã«ãŒãIDã®ãã®ãªã®ã§ããããã¡ãã¯ã¡ãã»ãŒãžãååŸã§ããªããŸãŸã§ãã
// æ®ã£ãŠããã®ã¯å¯èŠæ§ã¿ã€ã ã¢ãŠãåŸ ã¡ãšãªã£ãã¡ãã»ãŒãžã°ã«ãŒãIDãæã€ã¡ãã»ãŒãžã®ã¿ã§ããã // å¯èŠæ§ã¿ã€ã ã¢ãŠãïŒ10ç§ïŒçµéããŠããªãã®ã§ããã®æç¹ã§ååŸã§ããã¡ãã»ãŒãžã¯ãªã assertThat(sqsMessageReceiverWithoutDelete.receiveMessages(3)) .isEmpty(); assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty();
ããã§10ç§ä»¥äžçµéãããšãåé€ããªãã£ãã¡ãã»ãŒãžãšãšãã«ããã以éã®ã¡ãã»ãŒãžãååŸã§ããããã«ãªããŸãã
sleep(4L); // 10ç§ä»¥äžçµé // åé€ããªãã£ãã¡ãã»ãŒãžããã³åŸç¶ã®ã¡ãã»ãŒãžãèŠããããã«ãªã // åä¿¡ããŠåé€ assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(1, 3).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3])); assertThat(sqsMessageReceiver.receiveMessages(3)) .containsExactly(IntStream.rangeClosed(4, 6).mapToObj(i -> "Message3-" + i).toList().toArray(new String[3]));
æçµçã«ã空ã®ãã¥ãŒã«ãªããŸãã
// æ®ã¡ãã»ãŒãžã¯0 assertThat(sqsMessageReceiver.receiveMessages(3)) .isEmpty();
ãããªãšããã§ããããã
ãŸãšã
Amazon SQSã®å¯èŠæ§ã¿ã€ã ã¢ãŠãã«ã€ããŠãElasticMQã䜿ã£ãŠç¢ºèªããŠã¿ãŸããã
æšæºãã¥ãŒãšFIFOãã¥ãŒãšã®æåã®éãããFIFOãã¥ãŒãšå¯èŠæ§ã¿ã€ã ã¢ãŠãã®çްããé¢ä¿ã¯ããŸãèªããŠããªãã£ãã®ã§ã
ãã®æ©äŒã«ã¡ãããšè©ŠããŠè¯ãã£ãããªãšæããŸãã
ãšãã£ãŠãããããŸã§ç¢ºèªã¯ElasticMQãªã®ã§ããã
ãŸããLocalStackã§æå詊ããŠããã®ã§ãããåããããã¥ã¡ã³ããšã ãã¶ç°ãªã£ãã®ã§ããªãããããŸãããå±ãããå
šç¶éãæåã
ä¿¡ãããã«ãªããŸããããããã¥ã¡ã³ããšã¡ãããšèŠæ¯ã¹ãŠãããŠè¯ãã£ãã§ãâŠã