CLOVER🍀

That was when it all began.

Amazon SQSにメッセージを送受信するアプリケーションをSpring Bootで書いてみる(AWS SDK for Java v2、LocalStack利用)

これは、なにをしたくて書いたもの?

前に、Amazon SQS(といってもLocalStack上のものですが)を使ってFIFOキューを試してみました。

LocalStackでAmazon SQSのFIFOキューを試してみる(AWS SDK for Javaを使用) - CLOVER🍀

今回は、常駐アプリケーションとして書いてみたらどうなるかな?ということで試してみました。

お題

Spring Bootを使って、以下のようなWebアプリケーションとして書いてみます。

  • AWS SDK for Java v2を使う(Spring Cloud AWSは使用しない)
  • Amazon SQSに送信するメッセージは、RestControllerで受け取ってキューに送信
  • Amazon SQSからメッセージを受信する機能は、アプリケーションが停止するまで延々とポーリングする

今回は、特に後者をポイントにしています。
※ちなみに、受信のみを対象にする場合はWebアプリケーションにしなくてもOKです

作成するキューは、前回同様FIFOキューにします。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-126-generic", arch: "amd64", family: "unix"

LocalStack。

$ python3 --version
Python 3.8.10


$ localstack --version
1.1.0

起動。

$ localstack start

Amazon SQSのキューの作成には、Terraformを使うことにします。

$ terraform version
Terraform v1.3.1
on linux_amd64

Amazon SQSのキューを作成する

まずはAmazon SQSのキューを作成します。

Terraformのリソース定義ファイルは、こんな感じで用意。

main.tf

terraform {
  required_version = "1.3.1"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "4.33.0"
    }
  }
}

provider "aws" {
  access_key                  = "mock_access_key"
  region                      = "us-east-1"
  secret_key                  = "mock_secret_key"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:4566"
  }
}

resource "aws_sqs_queue" "queue" {
  name                        = "my-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

output "queue_url" {
  value = aws_sqs_queue.queue.url
}

initしてapply。

$ terraform init
$ terraform apply

キューが作成されました。

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

Outputs:

queue_url = "http://localhost:4566/000000000000/my-queue.fifo"

このURLを覚えておきます。

Spring Bootアプリケーションを作成する

では、アプリケーションを作成しましょう。

まずはSpring Bootプロジェクトを作成。依存関係は、webのみにしています。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.7.4 \
  -d javaVersion=17 \
  -d name=spring-boot-sdk-v2-sqs \
  -d groupId=org.littlewings \
  -d artifactId=spring-boot-sdk-v2-sqs \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.aws \
  -d dependencies=web \
  -d baseDir=spring-boot-sdk-v2-sqs | tar zxvf -

プロジェクト内に移動。

$ cd spring-boot-sdk-v2-sqs

生成されたソースコードは、削除しておきます。

$ rm src/main/java/org/littlewings/spring/aws/SpringBootSdkV2SqsApplication.java src/test/java/org/littlewings/spring/aws/SpringBootSdkV2SqsApplicationTests.java

Maven依存関係やプラグインの設定など。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>

                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

AWS SDK for Java v2を使うので、依存関係はこのように修正しました。

        <dependencyManagement>
                <dependencies>
                        <dependency>
                                <groupId>software.amazon.awssdk</groupId>
                                <artifactId>bom</artifactId>
                                <version>2.17.285</version>
                                <type>pom</type>
                                <scope>import</scope>
                        </dependency>
                </dependencies>
        </dependencyManagement>

        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>
                <dependency>
                        <groupId>software.amazon.awssdk</groupId>
                        <artifactId>sqs</artifactId>
                </dependency>

                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

Apache Maven プロジェクトの設定 / SDK を依存関係として宣言する

SqsClientはメッセージの送信でも受信でも使うので、Beanとして定義することにしました。

src/main/java/org/littlewings/spring/aws/AwsConfig.java

package org.littlewings.spring.aws;

import java.net.URI;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

@Configuration
public class AwsConfig {
    @Bean
    public SqsClient sqsClient() {
        AwsCredentials awsCredentials =
                AwsBasicCredentials.create("mock_access_key", "mock_secret_key");

        SqsClient sqsClient =
                SqsClient
                        .builder()
                        .defaultsMode(DefaultsMode.AUTO)
                        .credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
                        .region(Region.US_EAST_1)
                        .endpointOverride(URI.create("http://localhost:4566"))
                        .build();

        return sqsClient;
    }
}

エンドポイントはLocalStackのものを設定し、Smart Configuration Defaultsはautoを選択。

AWS SDK for Java v2のSmart Configuration Defaultsの紹介 | Amazon Web Services ブログ

Amazon SQSキューへ、メッセージの送信を行うRestController。

src/main/java/org/littlewings/spring/aws/SendMessageController.java

package org.littlewings.spring.aws;

import java.util.Map;
import java.util.UUID;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

@RestController
public class SendMessageController {
    SqsClient sqsClient;

    ObjectMapper objectMapper;

    public SendMessageController(SqsClient sqsClient, ObjectMapper objectMapper) {
        this.sqsClient = sqsClient;
        this.objectMapper = objectMapper;
    }

    @PostMapping(value = "send", consumes = "application/json")
    public String send(@RequestBody Map<String, Object> message) throws JsonProcessingException {
        String messageAsString = objectMapper.writeValueAsString(message);

        SendMessageRequest request =
                SendMessageRequest
                        .builder()
                        .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                        .messageDeduplicationId(UUID.randomUUID().toString())
                        .messageGroupId("group-1")
                        .messageBody(messageAsString)
                        .build();

        SendMessageResponse response = sqsClient.sendMessage(request);

        return String.format(
                "send message = [%s], message-id = [%s], sequence-number = [%s]",
                messageAsString,
                response.messageId(),
                response.sequenceNumber()
        );
    }
}

リクエストボディをそのままJSON文字列に変換して、メッセージのボディとしてキューへ送信します。

参考にしているのは、このあたりです。

メッセージの受信側。

src/main/java/org/littlewings/spring/aws/MessageReceiver.java

package org.littlewings.spring.aws;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

@Component
public class MessageReceiver {
    SqsClient sqsClient;

    TaskExecutor taskExecutor;

    AtomicBoolean run = new AtomicBoolean(true);
    CountDownLatch latch = new CountDownLatch(1);

    public MessageReceiver(SqsClient sqsClient, TaskExecutor taskExecutor) {
        this.sqsClient = sqsClient;
        this.taskExecutor = taskExecutor;
    }

    @PostConstruct
    public void receiveMessageWhile() {
        taskExecutor.execute(() -> {
            while (run.get()) {
                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

                response.messages().forEach(message -> {
                    System.out.printf(
                            "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n",
                            message.body(),
                            message.messageId(),
                            message.receiptHandle()
                    );

                    DeleteMessageRequest deleteRequest =
                            DeleteMessageRequest
                                    .builder()
                                    .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                    .receiptHandle(message.receiptHandle())
                                    .build();

                    DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest);
                    if (deleteResponse.sdkHttpResponse().isSuccessful()) {
                        System.out.printf("message[%s] deleted%n", message.messageId());
                    } else {
                        System.out.printf(
                                "delete message failure, code = [%d], text = [%s]%n",
                                deleteResponse.sdkHttpResponse().statusCode(),
                                deleteResponse.sdkHttpResponse().statusText()
                        );
                    }
                });

                if (run.get()) {
                    System.out.println("waiting, next messages...");
                } else {
                    System.out.println("enter shutdown sequence");
                    latch.countDown();
                }
            }
        });
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        run.set(false);

        latch.await();
        System.out.println("shutdown");
    }
}

こちらは、@Componentとして定義。

@Component
public class MessageReceiver {

メッセージの受信は、@PostConstructを使ってBeanが構築され次第開始するようにしています。

    @PostConstruct
    public void receiveMessageWhile() {
        taskExecutor.execute(() -> {
            while (run.get()) {
                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

                response.messages().forEach(message -> {
                    System.out.printf(
                            "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n",
                            message.body(),
                            message.messageId(),
                            message.receiptHandle()
                    );

                    DeleteMessageRequest deleteRequest =
                            DeleteMessageRequest
                                    .builder()
                                    .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                    .receiptHandle(message.receiptHandle())
                                    .build();

                    DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest);
                    if (deleteResponse.sdkHttpResponse().isSuccessful()) {
                        System.out.printf("message[%s] deleted%n", message.messageId());
                    } else {
                        System.out.printf(
                                "delete message failure, code = [%d], text = [%s]%n",
                                deleteResponse.sdkHttpResponse().statusCode(),
                                deleteResponse.sdkHttpResponse().statusText()
                        );
                    }
                });

                if (run.get()) {
                    System.out.println("waiting, next messages...");
                } else {
                    System.out.println("enter shutdown sequence");
                    latch.countDown();
                }
            }
        });
    }

アプリケーションが起動している間は、メッセージの受信を継続するように無限ループにしています。

            while (run.get()) {
                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

この処理をそのまま書いてしまうとスレッドを専有してアプリケーションが起動しなくなってしまうので、TaskExecutorを使って
別スレッドで実行するようにしました。

        taskExecutor.execute(() -> {
            while (run.get()) {

Integration / Task Execution and Scheduling / The Spring TaskExecutor Abstraction / Using a TaskExecutor

停止に関しては、受信したメッセージを処理している最中に止まって欲しくはありません。

そこで、今回はCoundDownLatchを使いました。@PreDestroyでwhileの条件になっているAtomicBooleanの変数をfalseにして、
CountDownLatchがカウントダウンするのを待ちます。

    @PreDestroy
    public void shutdown() throws InterruptedException {
        run.set(false);

        latch.await();
        System.out.println("shutdown");
    }

カウントダウンは、先ほどのメッセージ受信のループ内で行われます。

                if (run.get()) {
                    System.out.println("waiting, next messages...");
                } else {
                    System.out.println("enter shutdown sequence");
                    latch.countDown();
                }

そしてCountDownLatchの値は1で設定しているので、1回カウントダウンすればブロックが解除されます。

    CountDownLatch latch = new CountDownLatch(1);

これで、処理しているメッセージがない状態になれば停止します。

メッセージの受信は、受信したメッセージを標準出力に書き出したら、メッセージを削除する流れで実装しています。

                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(request);

                response.messages().forEach(message -> {
                    System.out.printf(
                            "receive message = [%s], message-id = [%s], receipt-handle=[%s]%n",
                            message.body(),
                            message.messageId(),
                            message.receiptHandle()
                    );

                    DeleteMessageRequest deleteRequest =
                            DeleteMessageRequest
                                    .builder()
                                    .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                    .receiptHandle(message.receiptHandle())
                                    .build();

                    DeleteMessageResponse deleteResponse = sqsClient.deleteMessage(deleteRequest);
                    if (deleteResponse.sdkHttpResponse().isSuccessful()) {
                        System.out.printf("message[%s] deleted%n", message.messageId());
                    } else {
                        System.out.printf(
                                "delete message failure, code = [%d], text = [%s]%n",
                                deleteResponse.sdkHttpResponse().statusCode(),
                                deleteResponse.sdkHttpResponse().statusText()
                        );
                    }

やっぱり、このあたりを参考に。

メッセージの受信はロングポーリングで行っていて、waitTimeSecondsを5に設定しているので受信時に最大で5秒ブロックします。

                ReceiveMessageRequest request =
                        ReceiveMessageRequest
                                .builder()
                                .queueUrl("http://localhost:4566/000000000000/my-queue.fifo")
                                .receiveRequestAttemptId(UUID.randomUUID().toString())
                                .maxNumberOfMessages(3)
                                .waitTimeSeconds(5)
                                .build();

Amazon SQS ショートポーリングとロングポーリング / ロングポーリングを使用したメッセージの消費

5秒経ってもメッセージがこなかったら諦めて次のループへ移り、待機中にメッセージが来たらすぐに処理を行って次のループへ移ります。

mainメソッドを持ったクラス。

src/main/java/org/littlewings/spring/aws/App.java

package org.littlewings.spring.aws;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

アプリケーションの設定。なんとなく、Graceful Shutdownも有効にしておきました。
※設定しなくても、今回の動作確認は可能です

src/main/resources/application.properties

server.shutdown=graceful
spring.lifecycle.timeout-per-shutdown-phase=15s

確認する

では、動かして確認してみましょう。

パッケージングして

$ mvn package

実行。

$ java -jar target/spring-boot-sdk-v2-sqs-0.0.1-SNAPSHOT.jar

アプリケーションが起動したら、5秒おきに標準出力にメッセージが流れます。

waiting, next messages...
waiting, next messages...

メッセージを送ってみましょう。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [973ccd24-0fc6-4996-9d0a-78623c13eb15], sequence-number = [14300583401512501286]

メッセージIDやシーケンス番号も取得できています。

アプリケーション側の標準出力には、こんな内容が出力されます。

receive message = [{"messsage":"Hello World"}], message-id = [973ccd24-0fc6-4996-9d0a-78623c13eb15], receipt-handle=[ZjNiYjI3NTQtYzg1OS00YWRlLThlOTEtNDMwMmJlMWJiMzIxIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA5NzNjY2QyNC0wZmM2LTQ5OTYtOWQwYS03ODYyM2MxM2ViMTUgMTY2NDgxNDE1My45NzAwOTk=]
message[973ccd24-0fc6-4996-9d0a-78623c13eb15] deleted

そして、次のループへ移ります。

waiting, next messages...
waiting, next messages...

この後にメッセージを送り続けてみます。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [581814e9-98d6-48f4-8878-24877f81e2eb], sequence-number = [14300583401512501287]


$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [595dc1ff-0736-4efa-9c1b-80d5a5dd2214], sequence-number = [14300583401512501288]


$ curl -XPOST -H 'Content-Type: application/json' localhost:8080/send -d '{"messsage": "Hello World"}'
send message = [{"messsage":"Hello World"}], message-id = [a9a07acb-071e-46ce-bd6e-194ac33bf048], sequence-number = [14300583401512501289]

アプリケーション側は、メッセージの受信を続けます。

receive message = [{"messsage":"Hello World"}], message-id = [581814e9-98d6-48f4-8878-24877f81e2eb], receipt-handle=[OTgzODRmODItMTgyYy00YTRjLWIyZmEtMjY4YTMxZjM5YmFjIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA1ODE4MTRlOS05OGQ2LTQ4ZjQtODg3OC0yNDg3N2Y4MWUyZWIgMTY2NDgxNDI1MS41NDc4MTE=]
message[581814e9-98d6-48f4-8878-24877f81e2eb] deleted
waiting, next messages...
receive message = [{"messsage":"Hello World"}], message-id = [595dc1ff-0736-4efa-9c1b-80d5a5dd2214], receipt-handle=[NTQ4NDc1ZjEtMTE1ZC00ZTU3LWJmZTUtYjkyNTU2Y2IzOGI3IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyA1OTVkYzFmZi0wNzM2LTRlZmEtOWMxYi04MGQ1YTVkZDIyMTQgMTY2NDgxNDI1Mi4yNDUwMzQy]
message[595dc1ff-0736-4efa-9c1b-80d5a5dd2214] deleted
waiting, next messages...
receive message = [{"messsage":"Hello World"}], message-id = [a9a07acb-071e-46ce-bd6e-194ac33bf048], receipt-handle=[YTNmZGVkZjMtYWFkZC00MTkzLWE0MDAtZmI1ZjFjMGI2YjE4IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bXktcXVldWUuZmlmbyBhOWEwN2FjYi0wNzFlLTQ2Y2UtYmQ2ZS0xOTRhYzMzYmYwNDggMTY2NDgxNDI1Mi43OTI3MDc=]
message[a9a07acb-071e-46ce-bd6e-194ac33bf048] deleted
waiting, next messages...

OKみたいですね。

最後にCtrl-cでアプリケーションを停止。

2022-10-04 01:25:27.552  INFO 48599 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete
2022-10-04 01:25:27.555  INFO 48599 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
enter shutdown sequence
shutdown

Beanの破棄時に、メッセージの受信を停止するように動作していることが確認できました。

OKですね。

オマケ

お題にも書きましたが、受信の部分だけであればspring-boot-starter-webではなくspring-boot-starterでOKです。

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

TaskExecutorを使って処理を実行したままなので、これでもアプリケーションは終了せずに実行され続けます。

停止時の処理が行われるのも同じです。

まとめ

Amazon SQSを使って、メッセージを送受信するプログラムを書いてみました。

今回はずっとメッセージを待ち続けるアプリケーションにしたかったので、Webアプリケーションにしつつ、TaskExecutorを使って
別スレッドでメッセージの受信を行う構成にしました。

なにも考えずに無限に受信処理を行うと、そこでアプリケーションがブロックして先に進まなくなるなどのお決まりを踏んだりをしましたが、
とりあえず目標は達成できたのでよしとしましょう。

Infinispan 14.0のRESPエンドポイントをRedis CLIで試す

これは、なにをしたくて書いたもの?

Infinispan 14.0で「RESP endpoint」というものが実装され、Redis互換のエンドポイントをサポートするようになったというのでちょっと
試してみました。

Infinispan 14.0.0.Final

なお、このブログには書かれていませんが、現時点でRESPエンドポイントは「実験的モジュール」扱いです。

RESPエンドポイント

RESPエンドポイントに関するドキュメントは、こちら。

Using the RESP protocol endpoint with Infinispan

以下のようなドキュメントの一覧ページには載っていないのですが、

Guides

Infinispan 14.0 documentation index

各ドキュメント内の「Document Index」の中から「RESP protocol endpoint」を選択すると見ることができます。

ドキュメントによると、RESPエンドポイントはRESP3を実装した実験的モジュールのようです。RESPエンドポイントを使うと、
Infinispan ServerをバックエンドとするRESPサーバーに対してRedisクライアントから接続し、Cache操作を行うことができます。

Infinispan Server includes an experimental module that implements the RESP3 protocol. The RESP endpoint allows Redis clients to connect to one or more one or several Infinispan-backed RESP servers and perform cache operations.

Using the RESP endpoint

RESPエンドポイントのモジュールは、こちら。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/server/resp

RESP3自体については、RedisのGitHubリポジトリに記載があります。

https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md

RESPというのは、「Redis Serialization Protocol」の略のようです。

Redis Serialization Protocol (RESP)

https://github.com/redis/redis-specifications/blob/master/protocol/README.md

RESPとは、クライアントとサーバーの間のリクエスト、レスポンスをどのように扱うかを設計したプロトコルです。クライアントがなんらかの
リクエストを実行し、サーバーがなんらかのデータを返すようなものです。

The protocol is designed to handle request-response chats between clients and servers, where the client performs some kind of request, and the server replies with some data.

RESP3 overview

RESP3では、以下のデータ型を扱えます。

  • 配列
  • Blob
  • 文字列
  • エラー
  • Number
  • Null
  • Double
  • Boolean
  • Blobエラー
  • Verbatim string
  • Map
  • Set
  • Attribute
  • Push
  • Hello

RESP3 types

Infinispan ServerのRESP3エンドポイントでサポートされているコマンドは、以下になります。

  • AUTH
  • DECR
  • DEL
  • ECHO
  • GET
  • HELLO
  • INCR
  • MGET
  • MSET
  • PING
  • PUBLISH
  • QUIT
  • RESET
  • SET
  • SUBSCRIBE
  • UNSUBSCRIBE

Using the RESP protocol endpoint with Infinispan / Redis commands

Redis本家と比べると、まだまだ少ないですね。

Commands | Redis

InfinispanとRedis

RESPエンドポイントに対するチケットはこちらです。

[ISPN-12439] Add server endpoint that implements RESP protocol from Redis - Red Hat Issue Tracker

このチケットにはInfinispan ServerにRESPを使ってRedisクライアントから接続できれば便利だ、と書かれているのですが。

It could be beneficial to have a server endpoint that we can enable to allow a Redis client to connect to Infinispan Server using the RESP protocol to perform cache operations.

Redisとして使えた方が、Infinispan Serverのユースケースが増えるんですかね…?
Redisの代替として使いたいというリクエストが多かったのでしょうか?

個人的には「今、この機能を追加するんだ…?」と思ったので、もう少し背景が知りたかったですね。

それはさておき、ちょっと試してみるとしましょう。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4.1 2022-08-12
OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1)
OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 14.0.0.Final (Flying Saucer)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

Infinispan Serverは、172.17.0.2で動作しているものとします。

起動は、以下のコマンドで行っておきます。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=`hostname -i`

RedisはCLIのみ使います。

$ bin/redis-cli --version
redis-cli 7.0.5

RESPエンドポイントにRedis CLIから接続する

では、RESPエンドポイントにRedis CLIから接続してみましょう。

なお、Infinispan Serverを起動すると気づきますが、RESPエンドポイントはデフォルトで有効になっているようです。

2022-10-02 14:44:12,868 INFO  (ForkJoinPool.commonPool-worker-1) [org.infinispan.SERVER] ISPN080018: Started connector Resp (internal)

デフォルトの状態では、認証情報なしで接続しようとするとエラーになります。

$ bin/redis-cli -h 172.17.0.2 -p 11222
172.17.0.2:11222> set key1 value1
(error) WRONGPASS invalid username-password pair or user is disabled.

そこで、まずはInfinispan Serverにユーザーを追加します。今回は、管理ユーザーとアプリケーションユーザーを追加しました。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

アプリケーションユーザーのユーザー名、パスワードを使って、もう1度Redis CLIから接続。

$ bin/redis-cli -h 172.17.0.2 -p 11222 --user ispn-user --pass password
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.

今度は接続できたので、setおよびgetコマンドで確認。

172.17.0.2:11222> set key1 value1
OK

172.17.0.2:11222> get key1
"value1"

すごく、あっさり動きました…。

このデータが永続化されるかどうかなどは、RedisのふりをしているInfinispan ServerのCacheの設定次第ということになるわけですが。

RESPエンドポイントが使用するCacheについて

こうあっさり動いてしまうと、いろいろ気になります。まず、データはどのCacheに保存されているのでしょうか?

Infinispanの管理CLIでログイン。

$ bin/cli.sh -c-
Username: ispn-admin
Password:

Cacheを見ると、respCacheという見慣れないCacheがあります。

[c6b2176b23ba-18766@cluster//containers/default]> ls caches
___script_cache
respCache

データを見てみます。

[c6b2176b23ba-18766@cluster//containers/default]> ls caches/respCache
Key
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
key1


[c6b2176b23ba-18766@cluster//containers/default]> cache respCache
[c6b2176b23ba-18766@cluster//containers/default]> get key1
value1

確かに、REPSエンドポイントで使われているCacheのようです。

Cacheの種類は、Replicated Cacheでした。

describe caches/respCache
{
  "respCache" : {
    "replicated-cache" : {
      "mode" : "SYNC",
      "encoding" : {
        "key" : {
          "media-type" : "text/plain"
        },
        "value" : {
          "media-type" : "application/octet-stream"
        }
      }
    }
  }
}

このCache名は、RESPエンドポイントのドキュメントには記載がありませんでしたが、XML Schemaの方には記載がありました。

Names the cache that the RESP connector exposes. The default cache name is respCache.

urn:infinispan:server:14.0

respCacheというのは、RESPエンドポイントが使うデフォルトのCache名のようです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/resp/src/main/java/org/infinispan/server/resp/configuration/RespServerConfiguration.java#L23

このReplicated Cacheは、起動時に作成するようです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/resp/src/main/java/org/infinispan/server/resp/RespServer.java#L43-L56

そして、クラスタリングが有効になっている場合はReplicated Cacheに設定されます。

            if (cacheManager.getCacheManagerConfiguration().isClustered()) { // We are running in clustered mode
               builder.clustering().cacheMode(CacheMode.REPL_SYNC);
            }

メディアタイプは、明示的に指定されていますね。キーはtext/plain、値はapplication/octet-streamですね。

            builder.encoding().key().mediaType(MediaType.TEXT_PLAIN_TYPE);
            builder.encoding().value().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);

RESPエンドポイントの設定を変えてみる

ドキュメントを見ていると、resp-connectorのcache属性で使用するCacheを設定できそうです。

Using the RESP protocol endpoint with Infinispan / Enabling the RESP endpoint

といっても、起動時にCacheを作成してしまうのでした。

実際、デフォルトのinfinispan.xmlにはRESPエンドポイント用のCacheの定義はありません。

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
      <security>
         <authorization/>
      </security>
   </cache-container>

なんなら、エンドポイントの定義もなにもないわけですが。

      <endpoints socket-binding="default" security-realm="default" />

ということは、Cacheを明示的に定義する場合はInfinispan Serverの起動前に行う必要がありますね。

まずは、ドキュメントに習ってRESPエンドポイントを設定してみます。myRespCacheというCacheを使うように設定してみました。

      <endpoints socket-binding="default" security-realm="default">
        <endpoint>
          <resp-connector cache="myRespCache">
            <authentication/>
          </resp-connector>
          <hotrod-connector>
            <authentication>
              <sasl mechanisms="SCRAM-SHA-512 SCRAM-SHA-384 SCRAM-SHA-256
                                SCRAM-SHA-1 DIGEST-SHA-512 DIGEST-SHA-384
                                DIGEST-SHA-256 DIGEST-SHA DIGEST-MD5 PLAIN"
                    server-name="infinispan"
                    qop="auth"/>
            </authentication>
          </hotrod-connector>
          <rest-connector>
            <authentication mechanisms="DIGEST BASIC"/>
          </rest-connector>
        </endpoint>
      </endpoints>

Hot RodエンドポイントとRESTエンドポイントは明示的に設定が必要です。

認証が有効になっているInfinispan Serverでは、Hot RodエンドポイントまたはRESTエンドポイントの少なくともどちらかは認証の設定が
行われている必要があるようです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/runtime/src/main/java/org/infinispan/server/configuration/endpoint/EndpointsConfigurationBuilder.java#L77-L86

単にRESPエンドポイントの設定だけすれば良いと思って、以下のような状態にしてしまうと

      <endpoints socket-binding="default" security-realm="default">
        <endpoint>
          <resp-connector cache="myRespCache">
            <authentication/>
          </resp-connector>
        </endpoint>
      </endpoints>

Infinispan Serverが起動に失敗します。

2022-10-02 16:09:28,754 FATAL (main) [org.infinispan.SERVER] ISPN080028: Infinispan Server failed to start org.infinispan.commons.CacheConfigurationException: ISPN080070: The cache container requires authorization, but none of the connectors enable authentication

endpointを定義したら、Hot RodエンドポイントもRESTエンドポイントも明示的に設定しなければいけません。

気を取り直して、Infinispan Serverを再起動します。

すると、RESPエンドポイントに指定した名前でCacheが作成されていました。

Redis CLIでデータを保存したりするとこのCacheに格納されますが、確認は省略。

次に、infinispan.xmlに明示的にCacheを定義して、そちらを使うようにしてみましょう。

今回はDistributed Cacheを定義。

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>

      <distributed-cache name="distributedRespCache">
        <encoding>
          <key media-type="text/plain"/>
          <value media-type="application/octet-stream"/>
        </encoding>
      </distributed-cache>

      <security>
         <authorization/>
      </security>
   </cache-container>

RESPエンドポイントは、distributedRespCacheを参照するように設定。

      <endpoints socket-binding="default" security-realm="default">
        <endpoint>
          <resp-connector cache="distributedRespCache">
            <authentication/>
          </resp-connector>
          <hotrod-connector>
            <authentication>
              <sasl mechanisms="SCRAM-SHA-512 SCRAM-SHA-384 SCRAM-SHA-256
                                SCRAM-SHA-1 DIGEST-SHA-512 DIGEST-SHA-384
                                DIGEST-SHA-256 DIGEST-SHA DIGEST-MD5 PLAIN"
                    server-name="infinispan"
                    qop="auth"/>
            </authentication>
          </hotrod-connector>
          <rest-connector>
            <authentication mechanisms="DIGEST BASIC"/>
          </rest-connector>
        </endpoint>
      </endpoints>

これで、Infinispan Serverを再起動。

定義したDistributed Cacheが現れます。

[3f666276f239-57422@cluster//containers/default]> ls caches
distributedRespCache
___script_cache

定義の確認。

[3f666276f239-57422@cluster//containers/default]> describe caches/distributedRespCache
{
  "distributedRespCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "encoding" : {
        "key" : {
          "media-type" : "text/plain"
        },
        "value" : {
          "media-type" : "application/octet-stream"
        }
      },
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}

Redis CLIから、再度データを保存してみましょう。

172.17.0.2:11222> set key1 value1
OK
172.17.0.2:11222> get key1
"value1"

Infinispan側からも確認してみます。

[3f666276f239-57422@cluster//containers/default]> ls caches/distributedRespCache
Key
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
key1
[3f666276f239-57422@cluster//containers/default]> cache distributedRespCache
[3f666276f239-57422@cluster//containers/default/caches/distributedRespCache]> get key1
value1

OKですね。

オマケ

RESPエンドポイントで実装されているコマンドを確認するは、このあたりを見るとよさそうです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/resp/src/main/java/org/infinispan/server/resp/Resp3AuthHandler.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/resp/src/main/java/org/infinispan/server/resp/Resp3Handler.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/resp/src/main/java/org/infinispan/server/resp/SubscriberHandler.java

また、内部的にはLettuceも使われていたりして、ちょっと面白いです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/server/resp/src/main/java/org/infinispan/server/resp/RespLettuceHandler.java

まとめ

Infinispan 14.0で追加された、RESPエンドポイントを試してみました。

割と簡単に使えるのですが、ちょっと動きを変えようとすると癖がある…というかドキュメントがあまり書かれていない感じがするので
ちょっと注意ですね。

個人的には、この機能の使いどころが気になるところですが…実験的モジュールのようなので、もう少し様子をみましょうか。