これは、なにをしたくて書いたもの?
前に、Amazon SQS(といってもLocalStack上のものですが)を使ってFIFOキューを試してみました。
LocalStackでAmazon SQSのFIFOキューを試してみる(AWS SDK for Javaを使用) - CLOVER🍀
今回は、常駐アプリケーションとして書いてみたらどうなるかな?ということで試してみました。
お題
Spring Bootを使って、以下のようなWebアプリケーションとして書いてみます。
- AWS SDK for Java v2を使う(Spring Cloud AWSは使用しない)
- Amazon SQSに送信するメッセージは、
RestController
で受け取ってキューに送信 - Amazon SQSからメッセージを受信する機能は、アプリケーションが停止するまで延々とポーリングする
今回は、特に後者をポイントにしています。
※ちなみに、受信のみを対象にする場合はWebアプリケーションにしなくてもOKです
作成するキューは、前回同様FIFOキューにします。
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.4 2022-07-19 OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-126-generic", arch: "amd64", family: "unix"
LocalStack。
$ python3 --version Python 3.8.10 $ localstack --version 1.1.0
起動。
$ localstack start
Amazon SQSのキューの作成には、Terraformを使うことにします。
$ terraform version Terraform v1.3.1 on linux_amd64
Amazon SQSのキューを作成する
まずはAmazon SQSのキューを作成します。
Terraformのリソース定義ファイルは、こんな感じで用意。
main.tf
terraform { required_version = "1.3.1" required_providers { aws = { source = "hashicorp/aws" version = "4.33.0" } } } provider "aws" { access_key = "mock_access_key" region = "us-east-1" secret_key = "mock_secret_key" skip_credentials_validation = true skip_metadata_api_check = true skip_requesting_account_id = true endpoints { sqs = "http://localhost:4566" } } resource "aws_sqs_queue" "queue" { name = "my-queue.fifo" fifo_queue = true content_based_deduplication = true } output "queue_url" { value = aws_sqs_queue.queue.url }
init
してapply
。
$ terraform init $ terraform apply
キューが作成されました。
Apply complete! Resources: 1 added, 0 changed, 0 destroyed. Outputs: queue_url = "http://localhost:4566/000000000000/my-queue.fifo"
このURLを覚えておきます。
Spring Bootアプリケーションを作成する
では、アプリケーションを作成しましょう。
まずはSpring Bootプロジェクトを作成。依存関係は、web
のみにしています。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.7.4 \ -d javaVersion=17 \ -d name=spring-boot-sdk-v2-sqs \ -d groupId=org.littlewings \ -d artifactId=spring-boot-sdk-v2-sqs \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.aws \ -d dependencies=web \ -d baseDir=spring-boot-sdk-v2-sqs | tar zxvf -
プロジェクト内に移動。
$ cd spring-boot-sdk-v2-sqs
生成されたソースコードは、削除しておきます。
$ rm src/main/java/org/littlewings/spring/aws/SpringBootSdkV2SqsApplication.java src/test/java/org/littlewings/spring/aws/SpringBootSdkV2SqsApplicationTests.java
<properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
AWS SDK for Java v2を使うので、依存関係はこのように修正しました。
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.17.285</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Apache Maven プロジェクトの設定 / SDK を依存関係として宣言する
SqsClient
はメッセージの送信でも受信でも使うので、Beanとして定義することにしました。
src/main/java/org/littlewings/spring/aws/AwsConfig.java
package org.littlewings.spring.aws; import java.net.URI; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; @Configuration public class AwsConfig { @Bean public SqsClient sqsClient() { AwsCredentials awsCredentials = AwsBasicCredentials.create("mock_access_key", "mock_secret_key"); SqsClient sqsClient = SqsClient .builder() .defaultsMode(DefaultsMode.AUTO) .credentialsProvider(StaticCredentialsProvider.create(awsCredentials)) .region(Region.US_EAST_1) .endpointOverride(URI.create("http://localhost:4566")) .build(); return sqsClient; } }
エンドポイントはLocalStackのものを設定し、Smart Configuration Defaultsはauto
を選択。
AWS SDK for Java v2のSmart Configuration Defaultsの紹介 | Amazon Web Services ブログ
Amazon SQSキューへ、メッセージの送信を行うRestController
。
src/main/java/org/littlewings/spring/aws/SendMessageController.java
package org.littlewings.spring.aws; import java.util.Map; import java.util.UUID; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; @RestController public class SendMessageController { SqsClient sqsClient; ObjectMapper objectMapper; public SendMessageController(SqsClient sqsClient, ObjectMapper objectMapper) { this.sqsClient = sqsClient; this.objectMapper = objectMapper; } @PostMapping(value = "send", consumes = "application/json") public String send(@RequestBody Map<String, Object> message) throws JsonProcessingException { String messageAsString = objectMapper.writeValueAsString(message); SendMessageRequest request = SendMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .messageDeduplicationId(UUID.randomUUID().toString()) .messageGroupId("group-1") .messageBody(messageAsString) .build(); SendMessageResponse response = sqsClient.sendMessage(request); return String.format( "send message = [%s], message-id = [%s], sequence-number = [%s]", messageAsString, response.messageId(), response.sequenceNumber() ); } }
リクエストボディをそのままJSON文字列に変換して、メッセージのボディとしてキューへ送信します。
参考にしているのは、このあたりです。
- Amazon Simple Queue Service メッセージの送信、受信、削除 / メッセージの送信
- Amazon SQS の例:Amazon SQSDK for Java 例 - AWS SDK for Java
メッセージの受信側。
src/main/java/org/littlewings/spring/aws/MessageReceiver.java
package org.littlewings.spring.aws; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; @Component public class MessageReceiver { SqsClient sqsClient; TaskExecutor taskExecutor; AtomicBoolean run = new AtomicBoolean(true); CountDownLatch latch = new CountDownLatch(1); public MessageReceiver(SqsClient sqsClient, TaskExecutor taskExecutor) { this.sqsClient = sqsClient; this.taskExecutor = taskExecutor; } @PostConstruct public void receiveMessageWhile() { taskExecutor.execute(() -> { while (run.get()) { ReceiveMessageRequest request = ReceiveMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(3) .waitTimeSeconds(5) .build(); ReceiveMessageResponse response = sqsClient.receiveMessage(request); response.messages().forEach(message -> { System.out.printf( "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n", message.body(), message.messageId(), message.receiptHandle() ); DeleteMessageRequest deleteRequest = DeleteMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiptHandle(message.receiptHandle()) .build(); DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest); if (deleteResponse.sdkHttpResponse().isSuccessful()) { System.out.printf("message[%s] deleted%n", message.messageId()); } else { System.out.printf( "delete message failure, code = [%d], text = [%s]%n", deleteResponse.sdkHttpResponse().statusCode(), deleteResponse.sdkHttpResponse().statusText() ); } }); if (run.get()) { System.out.println("waiting, next messages..."); } else { System.out.println("enter shutdown sequence"); latch.countDown(); } } }); } @PreDestroy public void shutdown() throws InterruptedException { run.set(false); latch.await(); System.out.println("shutdown"); } }
こちらは、@Component
として定義。
@Component public class MessageReceiver {
メッセージの受信は、@PostConstruct
を使ってBeanが構築され次第開始するようにしています。
@PostConstruct public void receiveMessageWhile() { taskExecutor.execute(() -> { while (run.get()) { ReceiveMessageRequest request = ReceiveMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(3) .waitTimeSeconds(5) .build(); ReceiveMessageResponse response = sqsClient.receiveMessage(request); response.messages().forEach(message -> { System.out.printf( "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n", message.body(), message.messageId(), message.receiptHandle() ); DeleteMessageRequest deleteRequest = DeleteMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiptHandle(message.receiptHandle()) .build(); DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest); if (deleteResponse.sdkHttpResponse().isSuccessful()) { System.out.printf("message[%s] deleted%n", message.messageId()); } else { System.out.printf( "delete message failure, code = [%d], text = [%s]%n", deleteResponse.sdkHttpResponse().statusCode(), deleteResponse.sdkHttpResponse().statusText() ); } }); if (run.get()) { System.out.println("waiting, next messages..."); } else { System.out.println("enter shutdown sequence"); latch.countDown(); } } }); }
アプリケーションが起動している間は、メッセージの受信を継続するように無限ループにしています。
while (run.get()) { ReceiveMessageRequest request = ReceiveMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(3) .waitTimeSeconds(5) .build(); ReceiveMessageResponse response = sqsClient.receiveMessage(request);
この処理をそのまま書いてしまうとスレッドを専有してアプリケーションが起動しなくなってしまうので、TaskExecutor
を使って
別スレッドで実行するようにしました。
taskExecutor.execute(() -> {
while (run.get()) {
停止に関しては、受信したメッセージを処理している最中に止まって欲しくはありません。
そこで、今回はCoundDownLatch
を使いました。@PreDestroy
でwhile
の条件になっているAtomicBoolean
の変数をfalse
にして、
CountDownLatch
がカウントダウンするのを待ちます。
@PreDestroy public void shutdown() throws InterruptedException { run.set(false); latch.await(); System.out.println("shutdown"); }
カウントダウンは、先ほどのメッセージ受信のループ内で行われます。
if (run.get()) { System.out.println("waiting, next messages..."); } else { System.out.println("enter shutdown sequence"); latch.countDown(); }
そしてCountDownLatch
の値は1
で設定しているので、1回カウントダウンすればブロックが解除されます。
CountDownLatch latch = new CountDownLatch(1);
これで、処理しているメッセージがない状態になれば停止します。
メッセージの受信は、受信したメッセージを標準出力に書き出したら、メッセージを削除する流れで実装しています。
ReceiveMessageRequest request = ReceiveMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(3) .waitTimeSeconds(5) .build(); ReceiveMessageResponse response = sqsClient.receiveMessage(request); response.messages().forEach(message -> { System.out.printf( "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n", message.body(), message.messageId(), message.receiptHandle() ); DeleteMessageRequest deleteRequest = DeleteMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiptHandle(message.receiptHandle()) .build(); DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest); if (deleteResponse.sdkHttpResponse().isSuccessful()) { System.out.printf("message[%s] deleted%n", message.messageId()); } else { System.out.printf( "delete message failure, code = [%d], text = [%s]%n", deleteResponse.sdkHttpResponse().statusCode(), deleteResponse.sdkHttpResponse().statusText() ); }
やっぱり、このあたりを参考に。
- Amazon Simple Queue Service メッセージの送信、受信、削除 / メッセージの送信
- Amazon SQS の例:Amazon SQSDK for Java 例 - AWS SDK for Java
メッセージの受信はロングポーリングで行っていて、waitTimeSeconds
を5に設定しているので受信時に最大で5秒ブロックします。
ReceiveMessageRequest request = ReceiveMessageRequest .builder() .queueUrl("http://localhost:4566/000000000000/my-queue.fifo") .receiveRequestAttemptId(UUID.randomUUID().toString()) .maxNumberOfMessages(3) .waitTimeSeconds(5) .build();
Amazon SQS ショートポーリングとロングポーリング / ロングポーリングを使用したメッセージの消費
5秒経ってもメッセージがこなかったら諦めて次のループへ移り、待機中にメッセージが来たらすぐに処理を行って次のループへ移ります。
main
メソッドを持ったクラス。
src/main/java/org/littlewings/spring/aws/App.java
package org.littlewings.spring.aws; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
アプリケーションの設定。なんとなく、Graceful Shutdownも有効にしておきました。
※設定しなくても、今回の動作確認は可能です
src/main/resources/application.properties
server.shutdown=graceful spring.lifecycle.timeout-per-shutdown-phase=15s
確認する
では、動かして確認してみましょう。
パッケージングして
$ mvn package
実行。
$ java -jar target/spring-boot-sdk-v2-sqs-0.0.1-SNAPSHOT.jar
アプリケーションが起動したら、5秒おきに標準出力にメッセージが流れます。
waiting, next messages... waiting, next messages...
メッセージを送ってみましょう。
$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}' send message = [{"messsage":"Hello World"}], message-id = [973ccd24-0fc6-4996-9d0a-78623c13eb15], sequence-number = [14300583401512501286]
メッセージIDやシーケンス番号も取得できています。
アプリケーション側の標準出力には、こんな内容が出力されます。
receive message = [{"messsage":"Hello World"}], message-id = [973ccd24-0fc6-4996-9d0a-78623c13eb15], receipt-handle=[ZjNiYjI3NTQtYzg1OS00YWRlLThlOTEtNDMwMmJlMWJiMzIxIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA5NzNjY2QyNC0wZmM2LTQ5OTYtOWQwYS03ODYyM2MxM2ViMTUgMTY2NDgxNDE1My45NzAwOTk=] message[973ccd24-0fc6-4996-9d0a-78623c13eb15] deleted
そして、次のループへ移ります。
waiting, next messages... waiting, next messages...
この後にメッセージを送り続けてみます。
$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}' send message = [{"messsage":"Hello World"}], message-id = [581814e9-98d6-48f4-8878-24877f81e2eb], sequence-number = [14300583401512501287] $ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}' send message = [{"messsage":"Hello World"}], message-id = [595dc1ff-0736-4efa-9c1b-80d5a5dd2214], sequence-number = [14300583401512501288] $ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}' send message = [{"messsage":"Hello World"}], message-id = [a9a07acb-071e-46ce-bd6e-194ac33bf048], sequence-number = [14300583401512501289]
アプリケーション側は、メッセージの受信を続けます。
receive message = [{"messsage":"Hello World"}], message-id = [581814e9-98d6-48f4-8878-24877f81e2eb], receipt-handle=[OTgzODRmODItMTgyYy00YTRjLWIyZmEtMjY4YTMxZjM5YmFjIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA1ODE4MTRlOS05OGQ2LTQ4ZjQtODg3OC0yNDg3N2Y4MWUyZWIgMTY2NDgxNDI1MS41NDc4MTE=] message[581814e9-98d6-48f4-8878-24877f81e2eb] deleted waiting, next messages... receive message = [{"messsage":"Hello World"}], message-id = [595dc1ff-0736-4efa-9c1b-80d5a5dd2214], receipt-handle=[NTQ4NDc1ZjEtMTE1ZC00ZTU3LWJmZTUtYjkyNTU2Y2IzOGI3IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA1OTVkYzFmZi0wNzM2LTRlZmEtOWMxYi04MGQ1YTVkZDIyMTQgMTY2NDgxNDI1Mi4yNDUwMzQy] message[595dc1ff-0736-4efa-9c1b-80d5a5dd2214] deleted waiting, next messages... receive message = [{"messsage":"Hello World"}], message-id = [a9a07acb-071e-46ce-bd6e-194ac33bf048], receipt-handle=[YTNmZGVkZjMtYWFkZC00MTkzLWE0MDAtZmI1ZjFjMGI2YjE4IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyBhOWEwN2FjYi0wNzFlLTQ2Y2UtYmQ2ZS0xOTRhYzMzYmYwNDggMTY2NDgxNDI1Mi43OTI3MDc=] message[a9a07acb-071e-46ce-bd6e-194ac33bf048] deleted waiting, next messages...
OKみたいですね。
最後にCtrl-cでアプリケーションを停止。
2022-10-04 01:25:27.552 INFO 48599 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown : Commencing graceful shutdown. Waiting for active requests to complete 2022-10-04 01:25:27.555 INFO 48599 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown : Graceful shutdown complete enter shutdown sequence shutdown
Beanの破棄時に、メッセージの受信を停止するように動作していることが確認できました。
OKですね。
オマケ
お題にも書きましたが、受信の部分だけであればspring-boot-starter-web
ではなくspring-boot-starter
でOKです。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>
TaskExecutor
を使って処理を実行したままなので、これでもアプリケーションは終了せずに実行され続けます。
停止時の処理が行われるのも同じです。
まとめ
Amazon SQSを使って、メッセージを送受信するプログラムを書いてみました。
今回はずっとメッセージを待ち続けるアプリケーションにしたかったので、Webアプリケーションにしつつ、TaskExecutor
を使って
別スレッドでメッセージの受信を行う構成にしました。
なにも考えずに無限に受信処理を行うと、そこでアプリケーションがブロックして先に進まなくなるなどのお決まりを踏んだりをしましたが、
とりあえず目標は達成できたのでよしとしましょう。