CLOVER🍀

That was when it all began.

ElasticMQのキュヌ定矩をTerraformで行っおみる

これは、なにをしたくお曞いたもの

ElasticMQを䜿っおいきたいず思うのですが、リ゜ヌス定矩をTerraformで行えないかなず思いたしお。

結果を芋るず、できるにはできるのですがちょっず難ありです。

ElasticMQ

ElasticMQは、Amazon SQS互換のむンタヌフェヌスを実装したメッセヌゞキュヌです。ScalaずAkkaを䜿っお実装されおいたす。

GitHub - softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.

以前にも䜿ったこずがありたす。

Amazon SQS互換のElasticMQを使って、Temoporary Queue+RPCを試してみる - CLOVER🍀

この時はAWS CLIでキュヌを䜜成したのですが、できればTerraformで行いたいなず。

Terraform AWS Providerずカスタム゚ンドポむント

TerraformでAWS互換のサヌビスを利甚するには、カスタム゚ンドポむントを蚭定したす。Amazon DynamoDB LocalずLocalStackを䜿った䟋が、
TerraformのAWS Providerに曞かれおいたすね。

Custom Service Endpoint Configuration / Connecting to Local AWS Compatible Solutions

ElasticMQを䜿う時も、こちらに習っおみたいず思いたす。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ terraform version
Terraform v1.3.1
on linux_amd64


$ aws --version
aws-cli/2.8.0 Python/3.9.11 Linux/5.4.0-126-generic exe/x86_64.ubuntu.20 prompt/off

ElasticMQをむンストヌルする

たずは、ElasticMQをむンストヌルしたす。こちらは簡単で、JARファむルをダりンロヌドしお

$ curl -LO https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.3.11.jar

java -jarで起動。

$ java -jar elasticmq-server-1.3.11.jar

以䞋のようなログを出力し぀぀、起動したす。

20:52:41.650 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.3.11) ...
20:52:42.104 [elasticmq-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
20:52:42.843 [elasticmq-akka.actor.default-dispatcher-4] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
20:52:42.909 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
20:52:42.949 [elasticmq-akka.actor.default-dispatcher-4] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
20:52:42.953 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.3.11) started in 1587 ms ===

ポヌトは9324ず9325を䜿いたすが、キュヌぞのアクセスに䜿甚するのは9324ポヌトです。

ちなみに、今回は䜿いたせんが、アプリケヌションに組み蟌んだりDockerコンテナで起動するこずもできたす。

TerraformでElasticMQにキュヌを䜜成する

では、TerraformでElasticMQにキュヌを䜜成したしょう。

Terraformのリ゜ヌス定矩ファむルは、こんな感じで䜜成。

main.tf

terraform {
  required_version = "1.3.1"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "4.33.0"
    }
  }
}

provider "aws" {
  access_key                  = "mock_access_key"
  region                      = "us-east-1"
  secret_key                  = "mock_secret_key"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:9324"
  }
}

resource "aws_sqs_queue" "queue" {
  name = "my-queue"
}

resource "aws_sqs_queue" "queue_fifo" {
  name                        = "my-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

output "queue_url" {
  value = aws_sqs_queue.queue.url
}

output "queue_fifo_url" {
  value = aws_sqs_queue.queue_fifo.url
}

通垞のキュヌずFIFOキュヌを䜜成したす。

ポむントは、AWS Providerの蚭定ですね。

provider "aws" {
  access_key                  = "mock_access_key"
  region                      = "us-east-1"
  secret_key                  = "mock_secret_key"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true

  endpoints {
    sqs = "http://localhost:9324"
  }
}

Amazon DynamoDB LocalやLocalStackの時ず同じように、ダミヌのクレデンシャルや゚ンドポむントを指定したす。

Custom Service Endpoint Configuration / Connecting to Local AWS Compatible Solutions

あずはinitしお

$ terraform init

planで確認。

$ terraform plan

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  # aws_sqs_queue.queue will be created
  + resource "aws_sqs_queue" "queue" {
      + arn                               = (known after apply)
      + content_based_deduplication       = false
      + deduplication_scope               = (known after apply)
      + delay_seconds                     = 0
      + fifo_queue                        = false
      + fifo_throughput_limit             = (known after apply)
      + id                                = (known after apply)
      + kms_data_key_reuse_period_seconds = (known after apply)
      + max_message_size                  = 262144
      + message_retention_seconds         = 345600
      + name                              = "my-queue"
      + name_prefix                       = (known after apply)
      + policy                            = (known after apply)
      + receive_wait_time_seconds         = 0
      + redrive_allow_policy              = (known after apply)
      + redrive_policy                    = (known after apply)
      + tags_all                          = (known after apply)
      + url                               = (known after apply)
      + visibility_timeout_seconds        = 30
    }

  # aws_sqs_queue.queue_fifo will be created
  + resource "aws_sqs_queue" "queue_fifo" {
      + arn                               = (known after apply)
      + content_based_deduplication       = true
      + deduplication_scope               = (known after apply)
      + delay_seconds                     = 0
      + fifo_queue                        = true
      + fifo_throughput_limit             = (known after apply)
      + id                                = (known after apply)
      + kms_data_key_reuse_period_seconds = (known after apply)
      + max_message_size                  = 262144
      + message_retention_seconds         = 345600
      + name                              = "my-queue.fifo"
      + name_prefix                       = (known after apply)
      + policy                            = (known after apply)
      + receive_wait_time_seconds         = 0
      + redrive_allow_policy              = (known after apply)
      + redrive_policy                    = (known after apply)
      + tags_all                          = (known after apply)
      + url                               = (known after apply)
      + visibility_timeout_seconds        = 30
    }

Plan: 2 to add, 0 to change, 0 to destroy.

Changes to Outputs:
  + queue_fifo_url = (known after apply)
  + queue_url      = (known after apply)

──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Note: You didn't use the -out option to save this plan, so Terraform can't guarantee to take exactly these actions if you run "terraform apply" now.

うたくいきそうな感じなので、apply。

$ terraform apply

なのですが、キュヌの䜜成が終わらずタむムアりトしたす。

Changes to Outputs:
  + queue_fifo_url = (known after apply)
  + queue_url      = (known after apply)
aws_sqs_queue.queue_fifo: Creating...
aws_sqs_queue.queue: Creating...
aws_sqs_queue.queue_fifo: Still creating... [10s elapsed]
aws_sqs_queue.queue: Still creating... [10s elapsed]
aws_sqs_queue.queue: Still creating... [20s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [20s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [30s elapsed]
aws_sqs_queue.queue: Still creating... [30s elapsed]
aws_sqs_queue.queue: Still creating... [40s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [40s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [50s elapsed]
aws_sqs_queue.queue: Still creating... [50s elapsed]
aws_sqs_queue.queue: Still creating... [1m0s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m0s elapsed]
aws_sqs_queue.queue: Still creating... [1m10s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m10s elapsed]
aws_sqs_queue.queue: Still creating... [1m20s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m20s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m30s elapsed]
aws_sqs_queue.queue: Still creating... [1m30s elapsed]
aws_sqs_queue.queue: Still creating... [1m40s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m40s elapsed]
aws_sqs_queue.queue: Still creating... [1m50s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m50s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [2m0s elapsed]
aws_sqs_queue.queue: Still creating... [2m0s elapsed]
╷
│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/my-queue) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.queue,
│   on main.tf line 25, in resource "aws_sqs_queue" "queue":
│   25: resource "aws_sqs_queue" "queue" {
│
╵
╷
│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/my-queue.fifo) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.queue_fifo,
│   on main.tf line 29, in resource "aws_sqs_queue" "queue_fifo":
│   29: resource "aws_sqs_queue" "queue_fifo" {
│
╵

outputも空です。

$ terraform output
╷
│ Warning: No outputs found
│
│ The state file either has no outputs defined, or all the defined outputs are empty. Please define an output in your configuration with the `output` keyword and run
│ `terraform refresh` for it to become available. If you are using interpolation, please verify the interpolated value is not empty. You can use the `terraform console`
│ command to assist.
╵

なお、destroyしようずするず

$ terraform destroy

2぀のリ゜ヌスの砎棄を聞かれたす。

aws_sqs_queue.queue_fifo: Refreshing state... [id=http://localhost:9324/000000000000/my-queue.fifo]
aws_sqs_queue.queue: Refreshing state... [id=http://localhost:9324/000000000000/my-queue]

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  - destroy

Terraform will perform the following actions:

  # aws_sqs_queue.queue will be destroyed
  - resource "aws_sqs_queue" "queue" {
      - arn                               = "arn:aws:sqs:elasticmq:000000000000:my-queue" -> null
      - content_based_deduplication       = false -> null
      - delay_seconds                     = 0 -> null
      - fifo_queue                        = false -> null
      - id                                = "http://localhost:9324/000000000000/my-queue" -> null
      - kms_data_key_reuse_period_seconds = 300 -> null
      - max_message_size                  = 0 -> null
      - message_retention_seconds         = 0 -> null
      - name                              = "my-queue" -> null
      - receive_wait_time_seconds         = 0 -> null
      - sqs_managed_sse_enabled           = false -> null
      - tags                              = {} -> null
      - tags_all                          = {} -> null
      - url                               = "http://localhost:9324/000000000000/my-queue" -> null
      - visibility_timeout_seconds        = 30 -> null
    }

  # aws_sqs_queue.queue_fifo will be destroyed
  - resource "aws_sqs_queue" "queue_fifo" {
      - arn                               = "arn:aws:sqs:elasticmq:000000000000:my-queue.fifo" -> null
      - content_based_deduplication       = true -> null
      - delay_seconds                     = 0 -> null
      - fifo_queue                        = true -> null
      - id                                = "http://localhost:9324/000000000000/my-queue.fifo" -> null
      - kms_data_key_reuse_period_seconds = 300 -> null
      - max_message_size                  = 0 -> null
      - message_retention_seconds         = 0 -> null
      - name                              = "my-queue.fifo" -> null
      - receive_wait_time_seconds         = 0 -> null
      - sqs_managed_sse_enabled           = false -> null
      - tags                              = {} -> null
      - tags_all                          = {} -> null
      - url                               = "http://localhost:9324/000000000000/my-queue.fifo" -> null
      - visibility_timeout_seconds        = 30 -> null
    }

Plan: 0 to add, 0 to change, 2 to destroy.

Do you really want to destroy all resources?
  Terraform will destroy all your managed infrastructure, as shown above.
  There is no undo. Only 'yes' will be accepted to confirm.

  Enter a value:

ここで「yes」ずするず、キュヌが砎棄されたす。

ずいうわけで、タむムアりトはしおいるのですが、実はキュヌはできおいたす。

このような゚ラヌメッセヌゞが出るたで埅たずに途䞭でCtrl-cで止めたりしおも、裏でキュヌはできおいたした。

〜省略〜

aws_sqs_queue.queue: Still creating... [1m40s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m40s elapsed]
aws_sqs_queue.queue: Still creating... [1m50s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [1m50s elapsed]
aws_sqs_queue.queue_fifo: Still creating... [2m0s elapsed]
aws_sqs_queue.queue: Still creating... [2m0s elapsed]
╷
│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/my-queue) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.queue,
│   on main.tf line 25, in resource "aws_sqs_queue" "queue":
│   25: resource "aws_sqs_queue" "queue" {
│
╵
╷
│ Error: waiting for SQS Queue (http://localhost:9324/000000000000/my-queue.fifo) attributes create: timeout while waiting for state to become 'equal' (last state: 'notequal', timeout: 2m0s)
│
│   with aws_sqs_queue.queue_fifo,
│   on main.tf line 29, in resource "aws_sqs_queue" "queue_fifo":
│   29: resource "aws_sqs_queue" "queue_fifo" {
│
╵

䜜成自䜓は、10秒ず経たずに完了するようです。

確認する

AWS CLI向けに、ダミヌのクレデンシャルを蚭定。

$ export AWS_ACCESS_KEY_ID=mock_access_key
$ export AWS_SECRET_ACCESS_KEY=mock_secret_key
$ export AWS_DEFAULT_REGION=us-east-1

キュヌの䞀芧。

$ aws --endpoint-url http://localhost:9324 sqs list-queues
{
    "QueueUrls": [
        "http://localhost:9324/000000000000/my-queue",
        "http://localhost:9324/000000000000/my-queue.fifo"
    ]
}

キュヌがありたすね。

キュヌの属性を確認しおみたす。

通垞のキュヌ。

$ aws --endpoint-url http://localhost:9324 sqs get-queue-attributes --queue-url http://localhost:9324/000000000000/my-queue --attribute-names All
{
    "Attributes": {
        "VisibilityTimeout": "30",
        "DelaySeconds": "0",
        "ReceiveMessageWaitTimeSeconds": "0",
        "ApproximateNumberOfMessages": "0",
        "ApproximateNumberOfMessagesNotVisible": "0",
        "ApproximateNumberOfMessagesDelayed": "0",
        "CreatedTimestamp": "1664712029",
        "LastModifiedTimestamp": "1664712029",
        "QueueArn": "arn:aws:sqs:elasticmq:000000000000:my-queue"
    }
}

FIFOキュヌ。

$ aws --endpoint-url http://localhost:9324 sqs get-queue-attributes --queue-url http://localhost:9324/000000000000/my-queue.fifo --attribute-names All
{
    "Attributes": {
        "VisibilityTimeout": "30",
        "DelaySeconds": "0",
        "ReceiveMessageWaitTimeSeconds": "0",
        "ApproximateNumberOfMessages": "0",
        "ApproximateNumberOfMessagesNotVisible": "0",
        "ApproximateNumberOfMessagesDelayed": "0",
        "CreatedTimestamp": "1664712029",
        "LastModifiedTimestamp": "1664712029",
        "QueueArn": "arn:aws:sqs:elasticmq:000000000000:my-queue.fifo",
        "ContentBasedDeduplication": "true",
        "FifoQueue": "true"
    }
}

Terraformのリ゜ヌス定矩に指定した倀も、ちゃんず反映されおいそうですね。

メッセヌゞの送受信を詊しおみたしょう。

通垞のキュヌにメッセヌゞを送信。

$ aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/000000000000/my-queue --message-body 'Hello Normal Queue'
{
    "MD5OfMessageBody": "56f5b1a443d080fe35c21bf08ba1c193",
    "MessageId": "41b785a6-2807-455a-b633-08250b29df66"
}

受信。

$ aws --endpoint-url http://localhost:9324 sqs receive-message --queue-url http://localhost:9324/000000000000/my-queue
{
    "Messages": [
        {
            "MessageId": "41b785a6-2807-455a-b633-08250b29df66",
            "ReceiptHandle": "41b785a6-2807-455a-b633-08250b29df66#64946d96-fae2-455f-83a8-3cb5ad620f79",
            "MD5OfBody": "56f5b1a443d080fe35c21bf08ba1c193",
            "Body": "Hello Normal Queue"
        }
    ]
}

OKですね。

FIFOキュヌ。

メッセヌゞの送信。

$ aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/000000000000/my-queue.fifo --message-group-id 1 --message-body 'Hello FIFO Queue'
{
    "MD5OfMessageBody": "5aea1fee988fbe6da0b76d02bdbc147a",
    "MessageId": "48cad0b6-90cd-401f-a55e-becabf43b68f",
    "SequenceNumber": "0"
}

受信。

$ aws --endpoint-url http://localhost:9324 sqs receive-message --queue-url http://localhost:9324/000000000000/my-queue.fifo
{
    "Messages": [
        {
            "MessageId": "48cad0b6-90cd-401f-a55e-becabf43b68f",
            "ReceiptHandle": "48cad0b6-90cd-401f-a55e-becabf43b68f#eb63369f-a030-4ed6-9368-084548958bdc",
            "MD5OfBody": "5aea1fee988fbe6da0b76d02bdbc147a",
            "Body": "Hello FIFO Queue"
        }
    ]
}

動䜜自䜓はOKですね。タむムアりトするのが気持ち悪いですが 。

ちなみに、この問題はAWS ProviderやLocalStackでも挙がっおいるこずがあるので、なにか盞性があるのかも 。

`aws_sqs_queue` resource times out when creating an SQS queue with a built-in policy · Issue #24046 · hashicorp/terraform-provider-aws · GitHub

bug: Creating SQS queue with `receive_wait_time_seconds = 1` fails · Issue #5197 · localstack/localstack · GitHub

たずめ

ElasticMQのキュヌ定矩をTerraformを䜿っお行っおみたした。

タむムアりトするのが難点ですが、動きはするのでたあいいかなず。TerraformずAWS Provider、ElasticMQのバヌゞョンの組み合わせによっおは
うたく動くのかもしれたせんが。

気になる堎合は、LocalStackで䜿った堎合はこうならなかったのでこちらに切り替えたり、

LocalStackでAmazon SQSのFIFOキューを試してみる(AWS SDK for Javaを使用) - CLOVER🍀

そもそもTerraformで構築するのではなくおElasticMQのやり方でキュヌを䜜成するのがよいのでしょう。

蚭定ファむルで定矩するこずもできるようですし。

ElasticMQ / Installation: stand-alone

Infinispan 14.0の新しいHot Rod Client APIを詊しお諊めたずいう話

これは、なにをしたくお曞いたもの

先日、Infinispan 14.0.0.Finalがリリヌスされたした。

Infinispan 14.0.0.Final

曎新内容の䞭に、新しいHot Rod Clientが含たれおいるずいうので詊しおみたした。

結論から蚀うず、珟時点では埓来のHot Rod Clientを䜿っおいた方が良さそうです。
Infinispan 14.0.0.Finalの時点でこのペヌゞを芋た人は、読むのはここたでにしおおいた方が無難かもですね。

゚ントリヌ自䜓を曞くのをやめおもよかったのですが、゜ヌスコヌドもいろいろ芋たのでせっかくなので曞き残しおおいお今埌の
アップデヌト時に芋返したいなず思いたしお。

久しぶりにInfinispanの倧きめの新機胜を觊っお、「いろいろ螏み抜いたなヌ」ずいう気分になりたした。前もちょいちょい螏んでいたので、
やや懐かしい感が。

新しいHot Rod Client

こちらのブログには、新しいHot Rod Clientに぀いお以䞋のように玹介されおいたす。

  • 完党に再蚭蚈した新しいHot Rod Client
  • プログラミングモデルは、同期、非同期、MutinyのAPIの䞭から遞択する

Infinispan 14.0.0.Final

サンプルコヌドは以䞋のように玹介されおいたす。
※間違っおそうずか、動かないずかいろいろあるんですが 

try (SyncContainer infinispan = Infinispan.create("hotrod://localhost")) {
    // Sync
    SyncCache<String, String> mycache = infinispan.sync().caches().get("mycache");
    mycache.set("key", "value");
    String value = mycache.get("key");
    // set with options
    mycache.set("key", "anothervalue", writeOptions().lifespan(Duration.ofHours(1)).timeout(Duration.ofMillis(500)).build());

    // Async
    infinispan.async().caches()
        .get("mycache").thenApply(c ->
            c.set("key", "value").thenApply(ignore ->
                c.get("key").thenApply(value ->
                    c.set("key", "anothervalue",
                                writeOptions().lifespan(Duration.ofHours(1)).timeout(Duration.ofMillis(500)).build()))
    ));

    // Mutiny
    infinispan.mutiny().caches()
        .get("mycache").map(c ->
            c.query("age > :age").param("age", 80).skip(5).limit(10).find())
            .subscribe().with(System.out::println);
}

新しいAPIに぀いおは、14.0.0.Dev03のリリヌス時に少し玹介がありたした。

Infinispan 14.0.0.Dev03

今回のHot Rod Clientに関係ありそうな範囲は、以䞋ですね。

  • EmbeddedずRemoteで共通のAPI
  • 同期ず非同期、そしおMutiny甚のAPIを明確に分離

珟時点ではHot Rod Client向けの実装のみが出おいたすが、そのうちEmbeddedの方も提䟛されそうですね。EmbeddedはInfinispan 15での
リリヌスを目暙にしおいるようですが、どうでしょう。

Infinispan 14.0のドキュメントの䞀芧はこちらなのですが、新しいHot Rod Clientに関する情報はありたせん。

Infinispan 14.0 documentation index

Hot Rod Java Clientのドキュメントは、あくたで既存のClient APIに関する説明になりたす。

Using Hot Rod Java clients

ずいうわけで、今回は実装ずブログの内容を芋぀぀詊しおみたいず思いたす。なお、オチは最初に曞きたした。

新しいHot Rod Clientのモゞュヌルはこちら。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod

テストコヌドは非同期版のみがありたす。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod/src/test/java/org/infinispan/hotrod

埓来のHot Rod Clientはこちら。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod-client

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-126-generic", arch: "amd64", family: "unix"

Infinispan Serverは、172.17.0.2で動䜜しおいるものずしたす。

$ java --version
openjdk 17.0.4.1 2022-08-12
OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1)
OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 14.0.0.Final (Flying Saucer)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

起動は、以䞋のコマンドで。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=`hostname -i`

準備

たずは、Infinispan Serverの準備を行いたす。ナヌザヌずCacheを定矩したしょう。

管理CLIで、管理ナヌザヌずアプリケヌションナヌザヌを䜜成。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

管理CLIでログむン。

$ bin/cli.sh -c-
Username: ispn-admin
Password:
[e6c550bca326-45515@cluster//containers/default]>

Distributed Cacheを2぀䜜成したす。

create cache --template=org.infinispan.DIST_SYNC simpleCache
create cache --template=org.infinispan.DIST_SYNC bookCache

定矩の確認。

describe caches/simpleCache
{
  "simpleCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}



describe caches/bookCache
{
  "bookCache" : {
    "distributed-cache" : {
      "mode" : "SYNC",
      "remote-timeout" : "17500",
      "statistics" : true,
      "locking" : {
        "concurrency-level" : "1000",
        "acquire-timeout" : "15000",
        "striping" : false
      },
      "state-transfer" : {
        "timeout" : "60000"
      }
    }
  }
}

こちらを䜿うように、アプリケヌションを䜜成したす。

テストコヌドの準備

確認は、テストコヌドで行うこずにしたす。

Maven䟝存関係等。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.infinispan</groupId>
                <artifactId>infinispan-bom</artifactId>
                <version>14.0.0.Final</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>jakarta.platform</groupId>
                <artifactId>jakarta.jakartaee-bom</artifactId>
                <version>8.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.junit</groupId>
                <artifactId>junit-bom</artifactId>
                <version>5.9.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-hotrod</artifactId>
        </dependency>
        <dependency>
            <groupId>jakarta.transaction</groupId>
            <artifactId>jakarta.transaction-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.23.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

せっかくなので、ProtoStreamも䜿うこずにしたす。

゚ンティティクラス。

src/main/java/org/littlewings/infinispan/remote/newclient/Book.java

package org.littlewings.infinispan.remote.newclient;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.descriptors.Type;

public class Book {
    @ProtoField(number = 1)
    String isbn;

    @ProtoField(number = 2)
    String title;

    @ProtoField(number = 3, type = Type.INT32, defaultValue = "0")
    int price;

    @ProtoFactory
    public static Book create(String isbn, String title, int price) {
        Book book = new Book();
        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    // gettersettterは省略
}

MarshallerおよびProtocol BuffersのIDLを生成するためのSerializationContextInitializerむンタヌフェヌスのサブむンタヌフェヌスを䜜成したす。
今はそのサブむンタヌフェヌスであるGeneratedSchemaむンタヌフェヌスを継承するのが良さそうですが。

src/main/java/org/littlewings/infinispan/remote/newclient/EntitiesInitializer.java

package org.littlewings.infinispan.remote.newclient;

import org.infinispan.protostream.GeneratedSchema;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = {
                Book.class
        },
        schemaFileName = "entities.proto",
        schemaFilePath = "proto/",
        schemaPackageName = "remote_newclient")
public interface EntitiesInitializer extends GeneratedSchema {
}

テストコヌドの雛圢。

src/test/java/org/littlewings/infinispan/remote/newclient/HotRodNewClientTest.java

package org.littlewings.infinispan.remote.newclient;

import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;

import org.infinispan.api.Infinispan;
import org.infinispan.api.async.AsyncCache;
import org.infinispan.api.async.AsyncContainer;
import org.infinispan.api.mutiny.MutinyContainer;
import org.infinispan.api.sync.SyncCache;
import org.infinispan.api.sync.SyncContainer;
import org.infinispan.api.sync.events.cache.SyncCacheContinuousQueryListener;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.configuration.HotRodConfigurationBuilder;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HotRodNewClientTest {

    // ここに、テストを曞く
}

では、テストを曞いおいきたしょう。

Infinispan Serverに接続する

たずはInfinispan Serverに接続する必芁がありたす。以䞋のような゜ヌスコヌドになりたすCacheは同期Cacheを䜿っおいたす。

    @Test
    public void connectInfinispanServerUsingURI() {
        String uriString = "hotrod://ispn-user:password@172.17.0.2:11222";
        URI uri = URI.create(uriString);

        try (Infinispan infinispan = Infinispan.create(uri);
             // たたは
             // try (Infinispan infinispan = Infinispan.create(uriString)) {
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            cache.clear();
        }
    }

Infinispan#createに接続URIをStringたたはjava.net.URIずしお枡すず、Infinispanのむンスタンスが返っおきたす。

        try (Infinispan infinispan = Infinispan.create(uri);
             // たたは
             // try (Infinispan infinispan = Infinispan.create(uriString)) {

ここから、䜿いたいAPIの皮類に応じおメ゜ッドを呌び出し、コンテナを取埗したす。メ゜ッドは、同期ならsync、非同期ならasync、
Mutinyならmutinyです。

今回は、同期APIを䜿っおいたす。

             SyncContainer container = infinispan.sync()) {

Infinispanのむンスタンスおよびコンテナのむンスタンスは、終了時にcloseしたす。
コンテナも、呌び出したメ゜ッドに応じおSyncContainer、AsyncContainer、MutinyContainerの3皮類のいずれかが返っおきたす。

たた、接続URI以倖にもConfigurationのむンスタンスを枡すこずで、Infinispan Serverぞ接続できたす。

    @Test
    public void connectInfinispanServerUsingConfiguration() {
        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            cache.clear();
        }
    }

同期APIを䜿う

Infinispan Serverぞの接続方法がわかったずころで、APIを䜿っおいっおみたしょう。たずは同期APIから。

    @Test
    public void simpleSyncCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> cache.set("key" + i, "value" + i));

            assertThat(cache.get("key1")).isEqualTo("value1");
            assertThat(cache.get("key50")).isEqualTo("value50");
            assertThat(cache.get("key100")).isEqualTo("value100");

            cache.clear();

            assertThat(cache.get("key1")).isNull();
            assertThat(cache.get("key50")).isNull();
            assertThat(cache.get("key100")).isNull();
        }
    }

接続方法を玹介した時に少し出おきおいたしたが、SyncContainer#cachesからSyncCaches#getでSyncCacheを取埗したす。

            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

次に、自分で䜜成した゚ンティティを䜿っおみたす。

    @Test
    public void bookSyncCache() {
        // URIではPropertiesの郚分は読たない
        //URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, Book> cache =
                    container
                            .caches()
                            .get("bookCache");

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            books.forEach(b -> cache.set(b.getIsbn(), b));

            assertThat(cache.get("978-1782169970").getTitle())
                    .isEqualTo("Infinispan Data Grid Platform Definitive Guide");
            assertThat(cache.get("978-1782169970").getPrice())
                    .isEqualTo(5344);
            assertThat(cache.get("978-0359439379").getTitle())
                    .isEqualTo("The Apache Ignite Book");
            assertThat(cache.get("978-0359439379").getPrice())
                    .isEqualTo(9964);

            cache.clear();

            assertThat(cache.get("978-1782169970")).isNull();
            assertThat(cache.get("978-1782169970")).isNull();
        }
    }

驚くこずに、こちらは接続URIではなくConfigurationのむンスタンスを䜿わないずうたく動䜜したせん 。

APIの玹介や、うたく動かなかったずころは最埌にたずめお曞きたすね。

非同期API

次は、非同期API。

    @Test
    public void simpleAsyncCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, String> cache =
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .toCompletableFuture()
                            .join();

            IntStream
                    .rangeClosed(1, 100)
                    .<CompletionStage<?>>mapToObj(i -> cache.set("key" + i, "value" + i))
                    .map(CompletionStage::toCompletableFuture)
                    .forEach(CompletableFuture::join);

            cache
                    .get("key1")
                    .thenAccept(value -> assertThat(value).isEqualTo("value1"))
                    .thenCompose(v -> cache.get("key50"))
                    .thenAccept(value -> assertThat(value).isEqualTo("value50"))
                    .thenCompose(v -> cache.get("key100"))
                    .thenAccept(value -> assertThat(value).isEqualTo("value100"))
                    .toCompletableFuture()
                    .join();

            cache.clear().toCompletableFuture().join();

            cache
                    .get("key1")
                    .thenAccept(value -> assertThat(value).isNull())
                    .thenCompose(v -> cache.get("key50"))
                    .thenAccept(value -> assertThat(value).isNull())
                    .thenCompose(v -> cache.get("key100"))
                    .thenAccept(value -> assertThat(value).isNull())
                    .toCompletableFuture()
                    .join();
        }
    }

こちらは、Infinispan#asyncを䜿甚しおAsyncContainerを取埗したす。

             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, String> cache =
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .toCompletableFuture()
                            .join();

あずは、各操䜜の戻り倀がCompletionStageずなっおいる点に泚意し぀぀entriesなどのたずたっお操䜜を行うメ゜ッドはFlow.Publisherに
なっおいるものもありたす、操䜜を行いたす。

゚ンティティを䜿った䟋。

    @Test
    public void bookAsyncCache() {
        HotRodConfiguration configuration =
                new HotRodConfigurationBuilder()
                        .addServers("172.17.0.2:11222")
                        .addContextInitializer("org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl")
                        .security()
                        .authentication()
                        .username("ispn-user")
                        .password("password".toCharArray())
                        .build();

        try (Infinispan infinispan = Infinispan.create(configuration);
             AsyncContainer container = infinispan.async()) {
            AsyncCache<String, Book> cache =
                    container
                            .caches()
                            .<String, Book>get("bookCache")
                            .toCompletableFuture()
                            .join();

            List<Book> books = List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition", 5484),
                    Book.create("978-0359439379", "The Apache Ignite Book", 9964),
                    Book.create("978-1783988181", "Mastering Redis", 8719),
                    Book.create("978-1492080510", "High Performance MySQL", 6428)
            );

            books
                    .stream()
                    .map(b -> cache.set(b.getIsbn(), b).toCompletableFuture())
                    .forEach(CompletableFuture::join);

            cache
                    .get("978-1782169970")
                    .thenAccept(b -> {
                        assertThat(b.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide");
                        assertThat(b.getPrice()).isEqualTo(5344);
                    })
                    .thenCompose(v -> cache.get("978-0359439379"))
                    .thenAccept(b -> {
                        assertThat(b.getTitle()).isEqualTo("The Apache Ignite Book");
                        assertThat(b.getPrice()).isEqualTo(9964);
                    })
                    .toCompletableFuture()
                    .join();

            cache.clear().toCompletableFuture().join();

            cache
                    .get("978-1782169970")
                    .thenAccept(b -> assertThat(b).isNull())
                    .thenCompose(v -> cache.get("978-0359439379"))
                    .thenAccept(b -> assertThat(b).isNull())
                    .toCompletableFuture()
                    .join();
        }
    }

Mutiny API

最埌はMutiny 正確にはSmallRye Mutinyですが、こちらは以䞋になりたす。

    @Test
    public void simpleMutinyCache() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             MutinyContainer container = infinispan.mutiny()) {
            assertThatThrownBy(() ->
                    container
                            .caches()
                            .<String, String>get("simpleCache")
                            .await()
                            .indefinitely()
            )
                    .isInstanceOf(ClassCastException.class)
                    .hasMessage("class java.util.concurrent.CompletableFuture cannot be cast to class org.infinispan.hotrod.impl.cache.RemoteCache (java.util.concurrent.CompletableFuture is in module java.base of loader 'bootstrap'; org.infinispan.hotrod.impl.cache.RemoteCache is in unnamed module of loader 'app')");
        }
    }

残念ですが、珟時点ではMutinyCacheを取埗する際に倱敗したす。

远蚘 Infinispan 14.0.4.Finalで修正されたので、远加の゚ントリヌを曞きたした

Infinispan 14.0の新しいHot Rod Client APIのMutiny版を試す - CLOVER🍀

新しいAPIに぀いお

ここからは、新しいAPIに぀いおいろいろ曞いおいきたいず思いたす。

infinispan-apiモゞュヌル

今回、Infinispanに関するimport文のほずんどはorg.infinispan.apiのものでした。

import org.infinispan.api.Infinispan;
import org.infinispan.api.async.AsyncCache;
import org.infinispan.api.async.AsyncContainer;
import org.infinispan.api.mutiny.MutinyContainer;
import org.infinispan.api.sync.SyncCache;
import org.infinispan.api.sync.SyncContainer;
import org.infinispan.api.sync.events.cache.SyncCacheContinuousQueryListener;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.configuration.HotRodConfigurationBuilder;

これは、infinispan-apiずいうモゞュヌルです。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api

README.adocを芋るず、どうも䜜成途䞭であるこずず、実隓的でInfinispan 10で倉曎される可胜性がある旚が曞かれおいたす。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/README.adoc

Infinispan 10の時点で、このモゞュヌルはあったんですよね。

https://github.com/infinispan/infinispan/tree/10.0.0.Final/api

これ、Infinispan 10の時に1床䜜成された新しいAPIが再床圢を倉えたものな気がしたすね。

Infinispan 10.0.0.Beta4

API 2.0ずいう扱いのようです以前のこのチケットのタむトルは「New Reactive API」でした。

[ISPN-9893] API 2.0 - Red Hat Issue Tracker

そしお、Infinispan 10時点の新しいAPIのモゞュヌルは空っぜになっおいたす。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/infinispan-key-value-store-hotrod

ずいうわけで、今回のモゞュヌルは過去のAPIモゞュヌルが再構築されたものである、ずいえそうです。

Infinispan#create

今回のAPIは、Infinispan#createにURIやConfigurationを枡すず、実䜓がロヌドされるようになっおいたす。

        try (Infinispan infinispan = Infinispan.create(uri);

        // たたは
        try (Infinispan infinispan = Infinispan.create(configuration);

これは、Service Loaderの仕組みで実珟しおいるようです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/Infinispan.java#L34-L35

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/Infinispan.java#L48-L49

JDBCドラむバヌあたりず同じですね。

構成

Infinispan#sync、Infinispan#async、Infinispan#mutinyを呌び出すず、呌び出したメ゜ッドに応じたContainerが取埗できたす。

   /**
    * Returns a synchronous version of the Infinispan API
    *
    * @return
    */
   SyncContainer sync();

   /**
    * Returns an asynchronous version of the Infinispan API
    *
    * @return
    */
   AsyncContainer async();

   /**
    * Returns a mutiny version of the Infinispan API
    *
    * @return
    */
   MutinyContainer mutiny();

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/Infinispan.java#L57-L76

各ContainerからはCache、MultiMap、StrongCounter、WeakCounter、Lockずいった様々なデヌタ構造を取埗するこずができ、
゚ントリヌポむントになっおいるず蚀えたす。

SyncContainer

public interface SyncContainer extends Infinispan {

   SyncCaches caches();

   SyncMultiMaps multiMaps();

   SyncStrongCounters strongCounters();

   SyncWeakCounters weakCounters();

   SyncLocks locks();

   void listen(SyncContainerListener listener, ContainerListenerEventType... types);

   <T> T batch(Function<SyncContainer, T> function);
}

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/sync/SyncContainer.java

AsyncContainer

public interface AsyncContainer extends Infinispan {

   AsyncCaches caches();

   AsyncMultiMaps multiMaps();

   AsyncStrongCounters strongCounters();

   AsyncWeakCounters weakCounters();

   AsyncLocks locks();

   Flow.Publisher<ContainerEvent> listen(ContainerListenerEventType... types);

   <T> CompletionStage<T> batch(Function<AsyncContainer, CompletionStage<T>> function);
}

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/async/AsyncContainer.java

MutinyContainer。

public interface MutinyContainer extends Infinispan {
   MutinyCaches caches();

   MutinyMultiMaps multiMaps();

   MutinyStrongCounters strongCounters();

   MutinyWeakCounters weakCounters();

   MutinyLocks locks();

   /**
    * @param types
    * @return
    */
   Multi<ContainerEvent> listen(ContainerListenerEventType... types);

   <R> Uni<R> execute(String name, Object... args);

   <T> Uni<T> batch(Function<MutinyContainer, Uni<T>> function);
}

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/mutiny/MutinyContainer.java

今回䜿ったのはCacheのみですが。

そしお、各Containerが配眮されおいるパッケヌゞ内に、各皮デヌタ構造に察するむンタヌフェヌスが配眮されおいたす。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api/src/main/java/org/infinispan/api/sync

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api/src/main/java/org/infinispan/api/async

https://github.com/infinispan/infinispan/tree/14.0.0.Final/api/src/main/java/org/infinispan/api/mutiny

Cacheのむンタヌフェヌス。぀いに、java.util.Mapむンタヌフェヌスを実装しなくなりたしたね。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/sync/SyncCache.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/async/AsyncCache.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/api/src/main/java/org/infinispan/api/mutiny/MutinyCache.java

実装は、別モゞュヌル今回はHot Rod Clientであるinfinispan-hotrodに含たれおいたす。

同期、非同期、Mutinyの実珟

今回のベヌスず成るAPIは、同期、非同期、Mutinyの3皮類で提䟛されたす。

むンタヌフェヌスはinfinispan-apiモゞュヌルにありたしたが、実装はどういった圢で実珟しおいるんでしょうか。

実装を芋おいるず、基本的には非同期およびRxJavaがベヌスになっおいたす。぀たりHotRodAsyncCacheが1番オリゞナルに近いです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodAsyncCache.java

SyncCacheの堎合は、CompletableFuture#getで同期的に動いおいるように芋せかけおいたす。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodSyncCache.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/Util.java#L44-L76

MutinyCacheの堎合はCompletableFutureをUniぞ、Flow.PublisherをMultiに倉換するこずでSmallRye MutinyのAPIずしお
芋せおいたす。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodMutinyCache.java

なお、いずれのCacheもRemoteCacheずいうむンタヌフェヌスが実䜓になっおいお、こちらがCompletionStageやFlow.Publisherを
APIの基盀ずしお䜿甚しおいたす。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCacheImpl.java

ちなみに、このようなCompletableFutureやFlow.Publisherをベヌスにしおいるものの、同期呌び出しはCompletableFutureの埅ち合わせで
実珟するずいった方法は、既存のHot Rod Clientも同じです。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java

新しいHot Rod Clientず既存のHot Rod Clientの関係

珟時点で2系統のHot Rod Clientモゞュヌルがあるわけですが、䞡者の関係はずいうず、関係ありたせん。ブログに曞いおあるずおり、
新しいHot Rod Clientは完党に再蚭蚈しおいるようです。

こちらが新しいHot Rod Client。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod

こちらが既存のHot Rod Client。

https://github.com/infinispan/infinispan/tree/14.0.0.Final/client/hotrod-client

最初は埓来のHot Rod Clientのラッパヌになっおいたりするのではずも思いたしたが、そんなこずはありたせんでした。

ただただできないこずが倚い

RemoteCacheImplを芋おいるずわかりたすが、UnsupportedOperationExceptionを投げおくるメ゜ッドがそれなりにありたす。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/cache/RemoteCacheImpl.java

Configurationの取埗、Near Cacheぞのリスナヌの远加、Cacheのサむズ蚈算、リスナヌの远加、Entry Processor、トランザクションなど。

たた、SyncCacheのみできないこずもありたす。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodSyncCache.java

確認のコヌド。

    @Test
    public void simpleSyncCacheUnsupportedOperation() {
        URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222");

        try (Infinispan infinispan = Infinispan.create(uri);
             SyncContainer container = infinispan.sync()) {
            SyncCache<String, String> cache =
                    container
                            .caches()
                            .get("simpleCache");

            assertThatThrownBy(() -> cache.entries())
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.keys())
                    .isInstanceOf(UnsupportedOperationException.class);
            assertThatThrownBy(() -> cache.listen(new SyncCacheContinuousQueryListener<>() {
            }))
                    .isInstanceOf(UnsupportedOperationException.class);

            assertThatThrownBy(() -> cache.estimateSize())
                    .isInstanceOf(UnsupportedOperationException.class);

        }
    }

ハマったこず

ここでは、ハマったこずを少し曞いおおきたす。

䟝存関係にjakarta.transaction-apiが必芁

最初、䟝存関係にinfinispan-hotrodずprotostream-processorだけを足しお詊しおいたのですが、Infinispan Serverに接続できずに
苊劎したした。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-hotrod</artifactId>
        </dependency>
        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <optional>true</optional>
        </dependency>

URIを指定しおの接続方法だず、ログも出おおらず「なんだろう」ず思っおトレヌスしたのですが、javax.transactionパッケヌゞのクラスに
察しおClassNotFoundExceptionを起こしおいたようです。

org.infinispan.api.exception.InfinispanConfigurationException: No factory to handle URI hotrod://ispn-user:password@172.17.0.2:11222

    at org.infinispan.api.Infinispan.create(Infinispan.java:40)

埌から気づきたしたが、Configurationを枡す方だずもっずハデにコケおくれたした 。

java.lang.NoClassDefFoundError: javax/transaction/RollbackException

    at org.infinispan.hotrod.impl.HotRodTransport.<init>(HotRodTransport.java:91)
    at org.infinispan.hotrod.HotRod.<init>(HotRod.java:16)
    at org.infinispan.hotrod.HotRodFactory.create(HotRodFactory.java:29)
    at org.infinispan.api.Infinispan.create(Infinispan.java:49)
    at org.littlewings.infinispan.remote.newclient.HotRodNewClientTest.connectInfinispanServerUsingConfiguration(HotRodNewClientTest.java:53)
    
       〜省略〜

    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.ClassNotFoundException: javax.transaction.RollbackException
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
    ... 75 more

pom.xml䞊はoptionalになっおいたすが、実質必須ですね 。

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/pom.xml#L73-L77

URI指定でSerializationContextInitializerが指定できない

先に曞いた郚分で、以䞋のように曞いおいた事象ですね。

        // URIではPropertiesの郚分は読たない
        //URI uri = URI.create("hotrod://ispn-user:password@172.17.0.2:11222?context-initializers=org.littlewings.infinispan.remote.newclient.EntitiesInitializerImpl");

これはどうしお発生するかずいうず、URIをパヌスしお埗られるPropertiesを

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/impl/HotRodURI.java#L113

ConfigurationBuilder内で無芖しおいるからです 。

   @Override
   public HotRodConfigurationBuilder withProperties(Properties properties) {
      //FIXME
      return this;
   }

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/configuration/HotRodConfigurationBuilder.java#L313-L317

FIXMEが付いおいるので、修正されるのを埅ちたしょう 。

Mutinyで動䜜しない

MutinyCacheを取埗しようずするず、謎のClassCastExceptionが発生したす。

java.lang.ClassCastException: class java.util.concurrent.CompletableFuture cannot be cast to class org.infinispan.hotrod.impl.cache.RemoteCache (java.util.concurrent.CompletableFuture is in module java.base of loader 'bootstrap'; org.infinispan.hotrod.impl.cache.RemoteCache is in unnamed module of loader 'app')

    at org.infinispan.hotrod.HotRodMutinyCaches.lambda$get$2(HotRodMutinyCaches.java:35)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.access$100(UniCreateFromKnownItem.java:26)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:60)
    at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
    at io.smallrye.mutiny.groups.UniAwait.indefinitely(UniAwait.java:46)

        〜省略〜

HotRodMutinyCachesクラスを芋おみるず、バッチリFIXMEず曞いおありたす。

   @Override
   public <K, V> Uni<MutinyCache<K, V>> create(String name, CacheConfiguration cacheConfiguration) {
      // FIXME
      return Uni.createFrom().item(hotrod.transport.getRemoteCache(name)).map(r -> new HotRodMutinyCache<>(hotrod, (RemoteCache<K, V>) r));
   }

   @Override
   public <K, V> Uni<MutinyCache<K, V>> create(String name, String template) {
      // FIXME
      return Uni.createFrom().item(hotrod.transport.getRemoteCache(name)).map(r -> new HotRodMutinyCache<>(hotrod, (RemoteCache<K, V>) r));
   }

   @Override
   public <K, V> Uni<MutinyCache<K, V>> get(String name) {
      return Uni.createFrom().item(hotrod.transport.getRemoteCache(name)).map(r -> new HotRodMutinyCache<>(hotrod, (RemoteCache<K, V>) r));
   }

https://github.com/infinispan/infinispan/blob/14.0.0.Final/client/hotrod/src/main/java/org/infinispan/hotrod/HotRodMutinyCaches.java#L21-L36

HotRod#transport#getRemoteCacheの結果はCompletableFutureなので、RemoteCacheにキャストできずに倱敗しおいたす 。

たあ、テストはAsyncCache分しかなかったですしね 。

远蚘 Infinispan 14.0.4.Finalで修正されたので、远加の゚ントリヌを曞きたした

Infinispan 14.0の新しいHot Rod Client APIのMutiny版を試す - CLOVER🍀

たずめ

Infinispan 14.0で远加された、新しいHot Rod Client APIを詊しおみたした。

が、ドキュメントがなかったり、動かなかったり、機胜がただ足りなかったりずいう状態なこずがわかりたした。

たあ、ただできたばっかりですし、実装が進むのをもうちょっず埅぀こずにしたす。

Infinispan 10.0の時にも、新しいAPIを詊しおみたもののそのたた話題に挙がらなくなった時のこずを思い出したしたが 。

Infinispan 10のNew Reactive API(Hot Rod)を試す - CLOVER🍀

今回䜜成した゜ヌスコヌドは、こちらに眮いおいたす。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-new-hotrod-client-underconstruction