CLOVER🍀

That was when it all began.

Amazon SQSにメッセージを送受信するアプリケーションをSpring Bootで書いてみる(AWS SDK for Java v2、LocalStack利用)

これは、なにをしたくて書いたもの?

前に、Amazon SQS(といってもLocalStack上のものですが)を使ってFIFOキューを試してみました。

LocalStackでAmazon SQSのFIFOキューを試してみる(AWS SDK for Javaを使用) - CLOVER🍀

今回は、常駐アプリケーションとして書いてみたらどうなるかな?ということで試してみました。

お題

Spring Bootを使って、以下のようなWebアプリケーションとして書いてみます。

  • AWS SDK for Java v2を使う(Spring Cloud AWSは使用しない)
  • Amazon SQSに送信するメッセージは、RestControllerで受け取ってキューに送信
  • Amazon SQSからメッセージを受信する機能は、アプリケーションが停止するまで延々とポーリングする

今回は、特に後者をポイントにしています。
※ちなみに、受信のみを対象にする場合はWebアプリケーションにしなくてもOKです

作成するキューは、前回同様FIFOキューにします。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-126-generic", arch: "amd64", family: "unix"

LocalStack。

$ python3 --version
Python 3.8.10


$ localstack --version
1.1.0

起動。

$ localstack start

Amazon SQSのキューの作成には、Terraformを使うことにします。

$ terraform version
Terraform v1.3.1
on linux_amd64

Amazon SQSのキューを作成する

まずはAmazon SQSのキューを作成します。

Terraformのリソース定義ファイルは、こんな感じで用意。

main.tf

terraform {
  required_version = "1.3.1"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "4.33.0"
    }
  }
}

provider "aws" {
  access_key                  = "mock_access_key"
  region                      = "us-east-1"
  secret_key                  = "mock_secret_key"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:4566"
  }
}

resource "aws_sqs_queue" "queue" {
  name                        = "my-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

output "queue_url" {
  value = aws_sqs_queue.queue.url
}

initしてapply

$ terraform init
$ terraform apply

キューが作成されました。

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

Outputs:

queue_url = "http://localhost:4566/000000000000/my-queue.fifo"

このURLを覚えておきます。

Spring Bootアプリケーションを作成する

では、アプリケーションを作成しましょう。

まずはSpring Bootプロジェクトを作成。依存関係は、webのみにしています。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.7.4 \
  -d javaVersion=17 \
  -d name=spring-boot-sdk-v2-sqs \
  -d groupId=org.littlewings \
  -d artifactId=spring-boot-sdk-v2-sqs \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.aws \
  -d dependencies=web \
  -d baseDir=spring-boot-sdk-v2-sqs | tar zxvf -

プロジェクト内に移動。

$ cd spring-boot-sdk-v2-sqs

生成されたソースコードは、削除しておきます。

$ rm src/main/java/org/littlewings/spring/aws/SpringBootSdkV2SqsApplication.java src/test/java/org/littlewings/spring/aws/SpringBootSdkV2SqsApplicationTests.java

Maven依存関係やプラグインの設定など。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>

                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

AWS SDK for Java v2を使うので、依存関係はこのように修正しました。

        <dependencyManagement>
                <dependencies>
                        <dependency>
                                <groupId>software.amazon.awssdk</groupId>
                                <artifactId>bom</artifactId>
                                <version>2.17.285</version>
                                <type>pom</type>
                                <scope>import</scope>
                        </dependency>
                </dependencies>
        </dependencyManagement>

        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>
                <dependency>
                        <groupId>software.amazon.awssdk</groupId>
                        <artifactId>sqs</artifactId>
                </dependency>

                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

Apache Maven プロジェクトの設定 / SDK を依存関係として宣言する

SqsClientはメッセージの送信でも受信でも使うので、Beanとして定義することにしました。

src/main/java/org/littlewings/spring/aws/AwsConfig.java

package org.littlewings.spring.aws;

import java.net.URI;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

@Configuration
public class AwsConfig {
    @Bean
    public SqsClient sqsClient() {
        AwsCredentials awsCredentials =
                AwsBasicCredentials.create("mock_access_key", "mock_secret_key");

        SqsClient sqsClient =
                SqsClient
                        .builder()
                        .defaultsMode(DefaultsMode.AUTO)
                        .credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
                        .region(Region.US_EAST_1)
                        .endpointOverride(URI.create("http://localhost:4566"))
                        .build();

        return sqsClient;
    }
}

エンドポイントはLocalStackのものを設定し、Smart Configuration Defaultsはautoを選択。

AWS SDK for Java v2のSmart Configuration Defaultsの紹介 | Amazon Web Services ブログ

Amazon SQSキューへ、メッセージの送信を行うRestController

src/main/java/org/littlewings/spring/aws/SendMessageController.java

package org.littlewings.spring.aws;

import java.util.Map;
import java.util.UUID;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

@RestController
public class SendMessageController {
    SqsClient sqsClient;

    ObjectMapper objectMapper;

    public SendMessageController(SqsClient sqsClient, ObjectMapper objectMapper) {
        this.sqsClient = sqsClient;
        this.objectMapper = objectMapper;
    }

    @PostMapping(value = "send", consumes = "application/json")
    public String send(@RequestBody Map<String, Object> message) throws JsonProcessingException {
        String messageAsString = objectMapper.writeValueAsString(message);

        SendMessageRequest request =
                SendMessageRequest
                        .builder()
                        .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                        .messageDeduplicationId(UUID.randomUUID().toString())
                        .messageGroupId("group-1")
                        .messageBody(messageAsString)
                        .build();

        SendMessageResponse response = sqsClient.sendMessage(request);

        return String.format(
                "send message = [%s], message-id = [%s], sequence-number = [%s]",
                messageAsString,
                response.messageId(),
                response.sequenceNumber()
        );
    }
}

リクエストボディをそのままJSON文字列に変換して、メッセージのボディとしてキューへ送信します。

参考にしているのは、このあたりです。

メッセージの受信側。

src/main/java/org/littlewings/spring/aws/MessageReceiver.java

package org.littlewings.spring.aws;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

@Component
public class MessageReceiver {
    SqsClient sqsClient;

    TaskExecutor taskExecutor;

    AtomicBoolean run = new AtomicBoolean(true);
    CountDownLatch latch = new CountDownLatch(1);

    public MessageReceiver(SqsClient sqsClient, TaskExecutor taskExecutor) {
        this.sqsClient = sqsClient;
        this.taskExecutor = taskExecutor;
    }

    @PostConstruct
    public void receiveMessageWhile() {
        taskExecutor.execute(() -> {
            while (run.get()) {
                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

                response.messages().forEach(message -> {
                    System.out.printf(
                            "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n",
                            message.body(),
                            message.messageId(),
                            message.receiptHandle()
                    );

                    DeleteMessageRequest deleteRequest =
                            DeleteMessageRequest
                                    .builder()
                                    .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                    .receiptHandle(message.receiptHandle())
                                    .build();

                    DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest);
                    if (deleteResponse.sdkHttpResponse().isSuccessful()) {
                        System.out.printf("message[%s] deleted%n", message.messageId());
                    } else {
                        System.out.printf(
                                "delete message failure, code = [%d], text = [%s]%n",
                                deleteResponse.sdkHttpResponse().statusCode(),
                                deleteResponse.sdkHttpResponse().statusText()
                        );
                    }
                });

                if (run.get()) {
                    System.out.println("waiting, next messages...");
                } else {
                    System.out.println("enter shutdown sequence");
                    latch.countDown();
                }
            }
        });
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        run.set(false);

        latch.await();
        System.out.println("shutdown");
    }
}

こちらは、@Componentとして定義。

@Component
public class MessageReceiver {

メッセージの受信は、@PostConstructを使ってBeanが構築され次第開始するようにしています。

    @PostConstruct
    public void receiveMessageWhile() {
        taskExecutor.execute(() -> {
            while (run.get()) {
                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

                response.messages().forEach(message -> {
                    System.out.printf(
                            "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n",
                            message.body(),
                            message.messageId(),
                            message.receiptHandle()
                    );

                    DeleteMessageRequest deleteRequest =
                            DeleteMessageRequest
                                    .builder()
                                    .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                    .receiptHandle(message.receiptHandle())
                                    .build();

                    DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest);
                    if (deleteResponse.sdkHttpResponse().isSuccessful()) {
                        System.out.printf("message[%s] deleted%n", message.messageId());
                    } else {
                        System.out.printf(
                                "delete message failure, code = [%d], text = [%s]%n",
                                deleteResponse.sdkHttpResponse().statusCode(),
                                deleteResponse.sdkHttpResponse().statusText()
                        );
                    }
                });

                if (run.get()) {
                    System.out.println("waiting, next messages...");
                } else {
                    System.out.println("enter shutdown sequence");
                    latch.countDown();
                }
            }
        });
    }

アプリケーションが起動している間は、メッセージの受信を継続するように無限ループにしています。

            while (run.get()) {
                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

この処理をそのまま書いてしまうとスレッドを専有してアプリケーションが起動しなくなってしまうので、TaskExecutorを使って
別スレッドで実行するようにしました。

        taskExecutor.execute(() -> {
            while (run.get()) {

Integration / Task Execution and Scheduling / The Spring TaskExecutor Abstraction / Using a TaskExecutor

停止に関しては、受信したメッセージを処理している最中に止まって欲しくはありません。

そこで、今回はCoundDownLatchを使いました。@PreDestroywhileの条件になっているAtomicBooleanの変数をfalseにして、
CountDownLatchがカウントダウンするのを待ちます。

    @PreDestroy
    public void shutdown() throws InterruptedException {
        run.set(false);

        latch.await();
        System.out.println("shutdown");
    }

カウントダウンは、先ほどのメッセージ受信のループ内で行われます。

                if (run.get()) {
                    System.out.println("waiting, next messages...");
                } else {
                    System.out.println("enter shutdown sequence");
                    latch.countDown();
                }

そしてCountDownLatchの値は1で設定しているので、1回カウントダウンすればブロックが解除されます。

    CountDownLatch latch = new CountDownLatch(1);

これで、処理しているメッセージがない状態になれば停止します。

メッセージの受信は、受信したメッセージを標準出力に書き出したら、メッセージを削除する流れで実装しています。

                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

                response.messages().forEach(message -> {
                    System.out.printf(
                            "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n",
                            message.body(),
                            message.messageId(),
                            message.receiptHandle()
                    );

                    DeleteMessageRequest deleteRequest =
                            DeleteMessageRequest
                                    .builder()
                                    .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                    .receiptHandle(message.receiptHandle())
                                    .build();

                    DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest);
                    if (deleteResponse.sdkHttpResponse().isSuccessful()) {
                        System.out.printf("message[%s] deleted%n", message.messageId());
                    } else {
                        System.out.printf(
                                "delete message failure, code = [%d], text = [%s]%n",
                                deleteResponse.sdkHttpResponse().statusCode(),
                                deleteResponse.sdkHttpResponse().statusText()
                        );
                    }

やっぱり、このあたりを参考に。

メッセージの受信はロングポーリングで行っていて、waitTimeSecondsを5に設定しているので受信時に最大で5秒ブロックします。

                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

Amazon SQS ショートポーリングとロングポーリング / ロングポーリングを使用したメッセージの消費

5秒経ってもメッセージがこなかったら諦めて次のループへ移り、待機中にメッセージが来たらすぐに処理を行って次のループへ移ります。

mainメソッドを持ったクラス。

src/main/java/org/littlewings/spring/aws/App.java

package org.littlewings.spring.aws;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

アプリケーションの設定。なんとなく、Graceful Shutdownも有効にしておきました。
※設定しなくても、今回の動作確認は可能です

src/main/resources/application.properties

server.shutdown=graceful
spring.lifecycle.timeout-per-shutdown-phase=15s

確認する

では、動かして確認してみましょう。

パッケージングして

$ mvn package

実行。

$ java -jar target/spring-boot-sdk-v2-sqs-0.0.1-SNAPSHOT.jar

アプリケーションが起動したら、5秒おきに標準出力にメッセージが流れます。

waiting, next messages...
waiting, next messages...

メッセージを送ってみましょう。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [973ccd24-0fc6-4996-9d0a-78623c13eb15], sequence-number = [14300583401512501286]

メッセージIDやシーケンス番号も取得できています。

アプリケーション側の標準出力には、こんな内容が出力されます。

receive message = [{"messsage":"Hello World"}], message-id = [973ccd24-0fc6-4996-9d0a-78623c13eb15], receipt-handle=[ZjNiYjI3NTQtYzg1OS00YWRlLThlOTEtNDMwMmJlMWJiMzIxIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA5NzNjY2QyNC0wZmM2LTQ5OTYtOWQwYS03ODYyM2MxM2ViMTUgMTY2NDgxNDE1My45NzAwOTk=]
message[973ccd24-0fc6-4996-9d0a-78623c13eb15] deleted

そして、次のループへ移ります。

waiting, next messages...
waiting, next messages...

この後にメッセージを送り続けてみます。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [581814e9-98d6-48f4-8878-24877f81e2eb], sequence-number = [14300583401512501287]


$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [595dc1ff-0736-4efa-9c1b-80d5a5dd2214], sequence-number = [14300583401512501288]


$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [a9a07acb-071e-46ce-bd6e-194ac33bf048], sequence-number = [14300583401512501289]

アプリケーション側は、メッセージの受信を続けます。

receive message = [{"messsage":"Hello World"}], message-id = [581814e9-98d6-48f4-8878-24877f81e2eb], receipt-handle=[OTgzODRmODItMTgyYy00YTRjLWIyZmEtMjY4YTMxZjM5YmFjIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA1ODE4MTRlOS05OGQ2LTQ4ZjQtODg3OC0yNDg3N2Y4MWUyZWIgMTY2NDgxNDI1MS41NDc4MTE=]
message[581814e9-98d6-48f4-8878-24877f81e2eb] deleted
waiting, next messages...
receive message = [{"messsage":"Hello World"}], message-id = [595dc1ff-0736-4efa-9c1b-80d5a5dd2214], receipt-handle=[NTQ4NDc1ZjEtMTE1ZC00ZTU3LWJmZTUtYjkyNTU2Y2IzOGI3IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA1OTVkYzFmZi0wNzM2LTRlZmEtOWMxYi04MGQ1YTVkZDIyMTQgMTY2NDgxNDI1Mi4yNDUwMzQy]
message[595dc1ff-0736-4efa-9c1b-80d5a5dd2214] deleted
waiting, next messages...
receive message = [{"messsage":"Hello World"}], message-id = [a9a07acb-071e-46ce-bd6e-194ac33bf048], receipt-handle=[YTNmZGVkZjMtYWFkZC00MTkzLWE0MDAtZmI1ZjFjMGI2YjE4IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyBhOWEwN2FjYi0wNzFlLTQ2Y2UtYmQ2ZS0xOTRhYzMzYmYwNDggMTY2NDgxNDI1Mi43OTI3MDc=]
message[a9a07acb-071e-46ce-bd6e-194ac33bf048] deleted
waiting, next messages...

OKみたいですね。

最後にCtrl-cでアプリケーションを停止。

2022-10-04 01:25:27.552  INFO 48599 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete
2022-10-04 01:25:27.555  INFO 48599 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
enter shutdown sequence
shutdown

Beanの破棄時に、メッセージの受信を停止するように動作していることが確認できました。

OKですね。

オマケ

お題にも書きましたが、受信の部分だけであればspring-boot-starter-webではなくspring-boot-starterでOKです。

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

TaskExecutorを使って処理を実行したままなので、これでもアプリケーションは終了せずに実行され続けます。

停止時の処理が行われるのも同じです。

まとめ

Amazon SQSを使って、メッセージを送受信するプログラムを書いてみました。

今回はずっとメッセージを待ち続けるアプリケーションにしたかったので、Webアプリケーションにしつつ、TaskExecutorを使って
別スレッドでメッセージの受信を行う構成にしました。

なにも考えずに無限に受信処理を行うと、そこでアプリケーションがブロックして先に進まなくなるなどのお決まりを踏んだりをしましたが、
とりあえず目標は達成できたのでよしとしましょう。