CLOVER🍀

That was when it all began.

AWS SDK for Java v2に対応したAmazon SQS Java Temporary Queue ClientとLocalStackを使って、一時キュー+RPCを試してみる

これは、なにをしたくて書いたもの?

以前、Amazon SQS Java Temporary Queue ClientとElasticMQを使って、一時キューとRPCを使ったエントリーを書いたことがあります。

Amazon SQS互換のElasticMQを使って、Temoporary Queue+RPCを試してみる - CLOVER🍀

このAmazon SQS Java Temporary Queue Clientですが、AWS SDK for Javaのバージョン1を使っていたのですが、2022年の5月に
AWS SDK for Java v2を使うようになった2.0.0がリリースされていたので、ちょっと試してみようかなと。

Amazon SQSの一時キューとRPC

そもそも、Amazon SQSの一時キューとRPCについて思い出すところから。

一時キューについてはこちらに記載があり、Amazon SQS Java Temporary Queue Clientを使うことで一時キューを作ることができると
書かれています。

Amazon SQS一時キュー - Amazon Simple Queue Service

Amazon SQS Java Temporary Queue ClientのGitHubリポジトリは、こちら。

GitHub - awslabs/amazon-sqs-java-temporary-queues-client: An Amazon SQS client that supports creating lightweight, automatically-deleted temporary queues, for use in common messaging patterns such as Request/Response. See http://aws.amazon.com/sqs.

そして、一時キューの最も一般的な使用例がリクエスト - レスポンス型式のメッセージングパターンだとされています。

例として、ログイン処理をAmazon SQSを介して別のアプリケーション(サーバー側)に処理させ、結果を受け取る(クライアント側)ような
ソースコードが書かれています。

同様のことが、AWSのブログエントリーにも書かれています。

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client | AWS Compute Blog

それで、以前試した時にはAmazon SQS Java Temporary Queue ClientはAWS SDK for Java v1を使っており、v2に対してはissueがあるだけ
だったのですが、Amazon SQS Java Temporary Queue Client 2.0.0がリリースされたことで状況が変わったようです。

Two Way Messaging - Virtual Queues · Issue #1647 · aws/aws-sdk-java-v2 · GitHub

Release Version 2.0.0 of the Amazon SQS Java Temporary Queues Client · awslabs/amazon-sqs-java-temporary-queues-client · GitHub

Amazon SQS Java Temporary Queue Client 2.0.0からは、AWS SDK for Java v2を使います。

せっかくなので、今回はAmazon SQS Java Temporary Queue Client 2.0.0とLocalStackを使って、前のエントリーで書いた内容を
書き直してみようと思います。

環境

今回の環境は、こちら。

LocalStack。

$ python3 -V
Python 3.10.6


$ localstack --version
1.3.1

起動。

$ localstack start

AWS CLI。LocalStackの提供するものを重ねています。

$ awslocal --version
aws-cli/2.9.21 Python/3.9.11 Linux/5.15.0-58-generic exe/x86_64.ubuntu.22 prompt/off

Java。

$ java --version
openjdk 17.0.5 2022-10-18
OpenJDK Runtime Environment (build 17.0.5+8-Ubuntu-2ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.5+8-Ubuntu-2ubuntu122.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.7 (b89d5959fcde851dcb1c8946a785a163f14e1e29)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.5, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-58-generic", arch: "amd64", family: "unix"

準備

まずは、キューを作成する必要がありますね。今回はmy-queueという名前で作成しました。

$ awslocal sqs create-queue --queue-name my-queue
{
    "QueueUrl": "http://localhost:4566/000000000000/my-queue"
}

次に、Amazon SQS Java Temporary Queue Clientを使うためにMaven依存関係などの設定を行います。確認はテストコードで行うことに
するので、JUnit等も合わせて追加。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <aws-java-sdk.version>2.19.31</aws-java-sdk.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
            <version>2.19.31</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>test-utils</artifactId>
            <version>2.19.31</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

Amazon SQS Java Temporary Queue Clientは以下で、推移的依存関係の中にAWS SDK for Java v2も含まれてはいるのですが、

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
            <version>2.0.0</version>
        </dependency>

ちょっと古かったので、明示的に新しいバージョンを指定しておきました。

        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
            <version>2.19.31</version>
        </dependency>


        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>test-utils</artifactId>
            <version>2.19.31</version>
            <scope>test</scope>
        </dependency>

簡単なEchoクライアント/サーバーを書く

では、Amazon SQSの一時キューを使ったプログラムを書くわけですが、前回のお題と同様に簡単なEchoクライアント/サーバーを
書いていこうと思います。

クライアントがサーバーにメッセージを送り、サーバーは受け取ったメッセージに簡単に装飾して返すというものにします。

まずは、LocalStack上のAmazon SQSにアクセスするためのSqsClientを作成する部分。

src/test/java/org/littlewings/sqs/LocalSqsBuilder.java

package org.littlewings.sqs;

import java.net.URI;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

public class LocalSqsBuilder {
    public static SqsClient create() {
        return SqsClient
                .builder()
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create("mock", "mock")
                        )
                )
                .region(Region.US_EAST_1)
                .endpointOverride(URI.create("http://localhost:4566"))
                .build();
    }
}

クレデンシャルは固定、エンドポイントはLocalStackのものを指すようにしています。

次に、サーバーを作成。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcServer.java

package org.littlewings.sqs;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQSResponder;
import com.amazonaws.services.sqs.AmazonSQSResponderClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSMessageConsumerBuilder;
import software.amazon.awssdk.services.sqs.SqsClient;

public class TemporaryQueueRpcServer {
    SqsClient sqsClient;
    AmazonSQSResponder responder;
    SQSMessageConsumer consumer;

    TemporaryQueueRpcServer() {
    }

    public static TemporaryQueueRpcServer create(String queueUrl) {
        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        SqsClient sqsClient = LocalSqsBuilder.create();

        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqsClient = sqsClient;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    public void start() {
        consumer.start();
    }

    public void stop() {
        consumer.close();
        responder.shutdown();
        sqsClient.close();
    }
}

こちらが受け取ったメッセージを送り返すためのAmazonSQSResponder、

        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .build();

こちらがメッセージを受信して処理を行うためのSQSMessageConsumerです。

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

Amazon SQSではキューをポーリングしてメッセージの監視を行うので、SQSMessageConsumerにはキューのURLを設定する必要が
あります。

SQSMessageConsumer#startでキューの監視が始まります。

    public void start() {
        consumer.start();
    }

停止はこちら。

    public void stop() {
        consumer.close();
        responder.shutdown();
        sqsClient.close();
    }

クライアント側は、テストコードとして作成します。まずは雛形から。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcTest.java

package org.littlewings.sqs;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.AmazonSQSRequester;
import com.amazonaws.services.sqs.AmazonSQSRequesterClientBuilder;
import com.amazonaws.services.sqs.MessageContent;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import static org.assertj.core.api.Assertions.assertThat;

public class TemporaryQueueRpcTest {
    interface ThrowableRunnable {
        void run() throws Exception;
    }

    void withServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.create(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    // ここに、テストを書く!!
}

先ほど書いたサーバーの、起動と停止を行うメソッドも付けています。

では、クライアント側を書いてみます。

    @Test
    void simpleRpc() throws Exception {
        String queueUrl = "http://localhost:4566/000000000000/my-queue";

        withServer(queueUrl, () -> {
            SqsClient sqsClient = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqsClient)
                            .build();

            try {
                SendMessageRequest request =
                        SendMessageRequest
                                .builder()
                                .queueUrl(queueUrl)
                                .messageBody("Hello World!!")
                                .build();

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);
                MessageContent responseContent = MessageContent.fromMessage(responseMessage);

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

                assertThat(responseContent.getMessageBody()).isEqualTo("★★★Hello World!!★★★");
            } finally {
                requester.shutdown();
                sqsClient.close();
            }
        });
    }

メッセージ送信を行うのは、AmazonSQSRequesterとなります。

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqsClient)
                            .build();

AmazonSQSRequester#sendMessageAndGetResponseで、メッセージ送信。ここでは、同期呼び出しを行っています。

                Message responseMessage = requester.sendMessageAndGetResponse(request, 10, TimeUnit.SECONDS);

結果はここで1度出力していますが、

                System.out.println(responseMessage);
                System.out.println(responseContent.getMessageAttributes());

それぞれ、こちらと

Message(MessageId=2f0e38ea-8440-4247-ab9b-260306b29273, ReceiptHandle=NDA5OWM0ZWUtOTEzMi00YTUzLThlMDEtOTkyZGFiNGNiYzkxIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6X19SZXF1ZXN0ZXJDbGllbnRRdWV1ZXNfX2ViOTNlNDkyLWFhOGYtNGNlNC04NGI1LWZiZDQwMzllMDRkNC0wIDJmMGUzOGVhLTg0NDAtNDI0Ny1hYjliLTI2MDMwNmIyOTI3MyAxNjc1NjA5Njk2LjE5MDY1OQ==, MD5OfBody=b14f6a41b31fd409988bec57f9a1a5cb, Body=★★★Hello World!!★★★, Attributes={ApproximateReceiveCount=1, SentTimestamp=1675609696169, SenderId=000000000000, ApproximateFirstReceiveTimestamp=1675609696190}, MD5OfMessageAttributes=d3d6adc048d9998de883abfd0e923eca, MessageAttributes={__AmazonSQSVirtualQueuesClient.QueueName=MessageAttributeValue(StringValue=__RequesterClientQueues__2a96b18c-ab99-4e99-a48f-d6c4b92043f1, DataType=String)})

こちらですね。

{__AmazonSQSVirtualQueuesClient.QueueName=MessageAttributeValue(StringValue=__RequesterClientQueues__2a96b18c-ab99-4e99-a48f-d6c4b92043f1, DataType=String)}

呼び出しが終わったら、それぞれ停止。

                requester.shutdown();
                sqsClient.close();

送信するメッセージを増やしてみる

次に、送信するメッセージを増やして、ちゃんとRPCとして動作できているか確認してみたいと思います。

まずはサーバー側。メッセージごとに、ランダムにスリープするようにしてみます。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcSCerver.java

// 省略

public class TemporaryQueueRpcServer {

    // 省略

    public static TemporaryQueueRpcServer createRandomSleep(String queueUrl) {
        Random random = new Random();
        random.nextInt(10);

        TemporaryQueueRpcServer server = new TemporaryQueueRpcServer();

        SqsClient sqsClient = LocalSqsBuilder.create();

        AmazonSQSResponder responder =
                AmazonSQSResponderClientBuilder.standard()
                        .withAmazonSQS(sqsClient)
                        .build();

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withPollingThreadCount(5)
                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            try {
                                int sleepTime = random.nextInt(10);
                                System.out.printf("Server[%s]: %d sec sleep%n", Thread.currentThread().getName(), sleepTime);
                                TimeUnit.SECONDS.sleep(sleepTime);
                            } catch (InterruptedException e) {
                                // ignore
                            }

                            responder.sendResponseMessage(requestContent, responseContent);
                        })
                        .build();

        server.sqsClient = sqsClient;
        server.responder = responder;
        server.consumer = consumer;

        return server;
    }

    // 省略
}

前回ElasticMQで試した時はポーリングするスレッド数を増やすと挙動が不安定になりましたが、今回はそうはなりませんでした。

        SQSMessageConsumer consumer =
                SQSMessageConsumerBuilder
                        .standard()
                        .withAmazonSQS(sqsClient)
                        .withQueueUrl(queueUrl)
                        .withPollingThreadCount(5)

メッセージを受信した後は、ランダムにスリープさせています。

                        .withConsumer(requestMessage -> {
                            MessageContent requestContent = MessageContent.fromMessage(requestMessage);
                            String requestBody = requestContent.getMessageBody();

                            MessageContent responseContent = new MessageContent("★★★" + requestBody + "★★★");

                            try {
                                int sleepTime = random.nextInt(10);
                                System.out.printf("Server[%s]: %d sec sleep%n", Thread.currentThread().getName(), sleepTime);
                                TimeUnit.SECONDS.sleep(sleepTime);
                            } catch (InterruptedException e) {
                                // ignore
                            }

続いて、クライアント側。50個のメッセージを送るようにしてみます。

src/test/java/org/littlewings/sqs/TemporaryQueueRpcTest.java

// 省略

public class TemporaryQueueRpcTest {

    // 省略

    void withRandomSleepServer(String queueUrl, ThrowableRunnable runnable) throws Exception {
        TemporaryQueueRpcServer server = TemporaryQueueRpcServer.createRandomSleep(queueUrl);
        server.start();

        try {
            runnable.run();
        } finally {
            server.stop();
        }
    }

    @Test
    public void concurrent() throws Exception {
        String queueUrl = "http://localhost:4566/000000000000/my-queue";

        withRandomSleepServer(queueUrl, () -> {
            SqsClient sqsClient = LocalSqsBuilder.create();

            AmazonSQSRequester requester =
                    AmazonSQSRequesterClientBuilder
                            .standard()
                            .withAmazonSQS(sqsClient)
                            .build();

            try {
                Map<String, CompletableFuture<Message>> futures = new LinkedHashMap<>();

                for (int i = 0; i < 50; i++) {
                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            SendMessageRequest
                                    .builder()
                                    .queueUrl(queueUrl)
                                    .messageBody(uuid)
                                    .build();

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

                    futures.put(uuid, responseMessage);
                }

                assertThat(futures).hasSize(50);

                futures.forEach((uuid, message) -> {
                    long start = System.currentTimeMillis();
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    System.out.println("elapsed: " + (System.currentTimeMillis() - start) / 1000.0 + " sec");
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });
            } finally {
                requester.shutdown();
                sqsClient.close();
            }
        });
    }
}

送信するメッセージの内容は、リクエストごとに別々になるようにUUIDにしてみました。

                    String uuid = UUID.randomUUID().toString();
                    SendMessageRequest request =
                            SendMessageRequest
                                    .builder()
                                    .queueUrl(queueUrl)
                                    .messageBody(uuid)
                                    .build();

メッセージの送信は、AmazonSQSRequester#sendMessageAndGetResponseAsyncを使って非同期に変更。

                    CompletableFuture<Message> responseMessage = requester.sendMessageAndGetResponseAsync(request, 60, TimeUnit.SECONDS);

これで、期待のメッセージが処理できているか確認。

                assertThat(futures).hasSize(50);

                futures.forEach((uuid, message) -> {
                    long start = System.currentTimeMillis();
                    MessageContent responseContent = MessageContent.fromMessage(message.join());
                    System.out.println("elapsed: " + (System.currentTimeMillis() - start) / 1000.0 + " sec");
                    assertThat(responseContent.getMessageBody()).isEqualTo("★★★" + uuid + "★★★");
                });

リクエストとレスポンスの内容が対になっていたので、今回もRPCとして動作できていることは確認できました。

ただ、今回の環境ではメッセージをたくさん送るとサーバー側である程度メッセージを処理しきらないとクライアントにメッセージが
返らないようで、今回の書き方だと最初に送ったメッセージを受け取るまでの時間がかなり長くなったりします。
しかも安定していない感じがするので、このあたりは本当に使うならちゃんとしたAmazon SQSで挙動を確認した方が良さそうですね。

とりあえず、Amazon SQS Java Temporary Queue Clientの2.0.0と一時キューを使ってRPCを行うという目的は達成できたので、
良しとしますか。

まとめ

2.0.0になったAmazon SQS Java Temporary Queue ClientとLocalStackを使って、一時キューとRPCを試してみました。
前回はけっこうハマったのですが、今回はそうでもなかったですね。ElasticMQでなくても、LocalStackでも動きましたし。

ただ、たくさんメッセージを送った時の動きが不安定なのは今回も変わらなかったので、実際の挙動は本物を使って確認、というのが
正解だろうという感覚も変わりませんが、やりやすくなったのは良かったかなと思います。

AWS SDK for Java v1の制約もなくなりましたし。

Infinispan 14.0で実験的にサポートされた、io_uringを有効にしてみる

これは、なにをしたくて書いたもの?

前に、NettyのincubatorプロジェクトであるNetty io_uringを試してみるエントリーを書きました。

TCP Echo Server/Clientを書いて、Netty io_uringを試してみる - CLOVER🍀

Infinispanでも14.0.0.Finalから、Netty io_uringを使えるようになっているということなので、試してみたいと思います。

Infinispan 14.0.0.Final

io_uringについて

io_uringは、Linuxカーネル5.1から導入された、非同期IO用のAPIです。こちらにも、少し書いておきました。

TCP Echo Server/Clientを書いて、Netty io_uringを試してみる - CLOVER🍀

Infinispan 14.0とNetty io_uring

Infinispan 14.0.0.Finalのリリース時のブログエントリーを見てみると、Infinispan Serverで実験的にio_uringをサポートしたことが
書かれています。

Server

  • Experimental IO_Uring support

Infinispan 14.0.0.Final

情報は、例によってこれだけです。どうやったらio_uringを使うようになるかですら、情報がありません。

とりあえず、Infinispan Serverをダウンロードしてみましょう。

$ curl -LO https://downloads.jboss.org/infinispan/14.0.6.Final/infinispan-server-14.0.6.Final.zip
$ unzip infinispan-server-14.0.6.Final.zip
$ cd infinispan-server-14.0.6.Final

libディレクトリには、Netty io_uringのライブラリがありました。

$ ll lib/*io_uring*
-rw-r--r-- 1 xxxxx xxxxx 99211  1月 19 03:23 lib/netty-incubator-transport-classes-io_uring-0.0.14.Final.jar
-rw-r--r-- 1 xxxxx xxxxx  5563  1月 19 03:23 lib/netty-incubator-transport-native-io_uring-0.0.14.Final.jar
-rw-r--r-- 1 xxxxx xxxxx 37477  1月 19 03:23 lib/netty-incubator-transport-native-io_uring-linux-aarch_64-0.0.14.Final.jar
-rw-r--r-- 1 xxxxx xxxxx 36251  1月 19 03:23 lib/netty-incubator-transport-native-io_uring-linux-x86_64-0.0.14.Final.jar

というわけで、Infinispan ServerでもNetty io_uringを使うようですね。

なにも考えずに、Infinispan Serverを起動してみます。

$ bin/server.sh

すると、ログの中にこんな表記が混じっていることに気づきます。

2023-02-05 18:24:12,669 INFO  (main) [org.infinispan.SERVER] Using transport: Epoll

デフォルトではepollを使うようなので、これをio_uringに変更してみるのが今回のお題です。

環境

今回の環境は、こちら。

Ubuntu Linux 22.04 LTSで、カーネルは5.15です。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 22.04.1 LTS
Release:        22.04
Codename:       jammy


$ uname -srmvpio
Linux 5.15.0-58-generic #64-Ubuntu SMP Thu Jan 5 11:43:13 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Java。

$ java --version
openjdk 17.0.5 2022-10-18
OpenJDK Runtime Environment (build 17.0.5+8-Ubuntu-2ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.5+8-Ubuntu-2ubuntu122.04, mixed mode, sharing)

Infinispan ServerでNetty io_uringを使う

Infinispan Serverのソースコードを見ていると、以下の3つの条件を満たしているとNetty io_uringを使ってくれるようですね。

  • epollを使わないようにしている(システムプロパティinfinispan.server.channel.epollをfalseにする)
  • システムプロパティinfinispan.server.channel.iouringがtrueになっている(デフォルト値)
  • 実行環境(OS)がLinuxである
  • io.netty.incubator.channel.uring.IOUringクラスが、クラスパス上に存在する
  • Netty io_uringがio_uringを使える状態だと認識している

https://github.com/infinispan/infinispan/blob/14.0.6.Final/server/core/src/main/java/org/infinispan/server/core/transport/NativeTransport.java#L19

https://github.com/infinispan/infinispan/blob/14.0.6.Final/server/core/src/main/java/org/infinispan/server/core/transport/NativeTransport.java#L22

https://github.com/infinispan/infinispan/blob/14.0.6.Final/server/core/src/main/java/org/infinispan/server/core/transport/NativeTransport.java#L48-L50

優先順位を見てみると、EpollServerSocketChannel → IOUringServerSocketChannel → NioServerSocketChannelとなっているので、
epollを無効にしないとio_uringが選ばれないことになります。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/server/core/src/main/java/org/infinispan/server/core/transport/NativeTransport.java#L65-L75

これらの条件を満たしていると、IOUringEventLoopGroupやIOUringServerSocketChannelを使ってくれることになります。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/server/core/src/main/java/org/infinispan/server/core/transport/IOURingNativeTransport.java

というわけで、システムプロパティinfinispan.server.channel.epollをfalseにして、epollを使わないようにして起動してみましょう。

$ bin/server.sh -Dinfinispan.server.channel.epoll=false

すると、ログから以下のようにNetty io_uringを使うように変わったことがわかります。

2023-02-05 18:38:35,690 INFO  (main) [org.infinispan.SERVER] Using transport: IOUring

確認のために、straceでシステムコールを見てみましょう。

$ strace -f -tt bin/server.sh -Dinfinispan.server.channel.epoll=false 2>&1 | grep io_uring

見てみると、io_uringのシステムコールを使っていることが確認できます。

[pid  7926] 18:44:13.963512 io_uring_setup(4096, {flags=0, sq_thread_cpu=0, sq_thread_idle=0, sq_entries=4096, cq_entries=8192, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABLE|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS|IORING_FEAT_RSRC_TAGS, sq_off={head=0, tail=64, ring_mask=256, ring_entries=264, flags=276, dropped=272, array=131392}, cq_off={head=128, tail=192, ring_mask=260, ring_entries=268, overflow=284, cqes=320, flags=280}}) = 185
[pid  7926] 18:44:13.970822 io_uring_register(185, IORING_REGISTER_PROBE, {last_op=IORING_OP_LINKAT, ops_len=40, ops=[{op=IORING_OP_NOP, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_READV, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_WRITEV, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_FSYNC, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_READ_FIXED, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_WRITE_FIXED, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_POLL_ADD, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_POLL_REMOVE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_SYNC_FILE_RANGE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_SENDMSG, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_RECVMSG, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_TIMEOUT, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_TIMEOUT_REMOVE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_ACCEPT, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_ASYNC_CANCEL, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_LINK_TIMEOUT, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_CONNECT, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_FALLOCATE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_OPENAT, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_CLOSE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_FILES_UPDATE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_STATX, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_READ, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_WRITE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_FADVISE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_MADVISE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_SEND, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_RECV, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_OPENAT2, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_EPOLL_CTL, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_SPLICE, flags=IO_URING_OP_SUPPORTED}, {op=IORING_OP_PROVIDE_BUFFERS, flags=IO_URING_OP_SUPPORTED}, ...]}, 256) = 0
[pid  7926] 18:44:14.041339 io_uring_setup(4096, {flags=0, sq_thread_cpu=0, sq_thread_idle=0, sq_entries=4096, cq_entries=8192, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABLE|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS|IORING_FEAT_RSRC_TAGS, sq_off={head=0, tail=64, ring_mask=256, ring_entries=264, flags=276, dropped=272, array=131392}, cq_off={head=128, tail=192, ring_mask=260, ring_entries=268, overflow=284, cqes=320, flags=280}}) = 185
[pid  7926] 18:44:14.042557 io_uring_setup(4096, {flags=0, sq_thread_cpu=0, sq_thread_idle=0 <unfinished ...>
[pid  7926] 18:44:14.043615 <... io_uring_setup resumed>, sq_entries=4096, cq_entries=8192, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABLE|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS|IORING_FEAT_RSRC_TAGS, sq_off={head=0, tail=64, ring_mask=256, ring_entries=264, flags=276, dropped=272, array=131392}, cq_off={head=128, tail=192, ring_mask=260, ring_entries=268, overflow=284, cqes=320, flags=280}}) = 188
[pid  7926] 18:44:14.044349 io_uring_setup(4096, {flags=0, sq_thread_cpu=0, sq_thread_idle=0, sq_entries=4096, cq_entries=8192, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABLE|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS|IORING_FEAT_RSRC_TAGS, sq_off={head=0, tail=64, ring_mask=256, ring_entries=264, flags=276, dropped=272, array=131392}, cq_off={head=128, tail=192, ring_mask=260, ring_entries=268, overflow=284, cqes=320, flags=280}}) = 189
[pid  7926] 18:44:14.045776 io_uring_setup(4096, {flags=0, sq_thread_cpu=0, sq_thread_idle=0, sq_entries=4096, cq_entries=8192, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABLE|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS|IORING_FEAT_RSRC_TAGS, sq_off={head=0, tail=64, ring_mask=256, ring_entries=264, flags=276, dropped=272, array=131392}, cq_off={head=128, tail=192, ring_mask=260, ring_entries=268, overflow=284, cqes=320, flags=280}}) = 191
[pid  7957] 18:44:18.154084 io_uring_enter(185, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7958] 18:44:18.164140 io_uring_enter(188, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7959] 18:44:18.443682 io_uring_enter(189, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7961] 18:44:18.445582 io_uring_enter(191, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7957] 18:44:19.048567 <... io_uring_enter resumed>) = 1
[pid  7957] 18:44:19.048858 io_uring_enter(185, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7958] 18:44:19.156755 <... io_uring_enter resumed>) = 1
[pid  7958] 18:44:19.157795 io_uring_enter(188, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7959] 18:44:19.160497 <... io_uring_enter resumed>) = 1
[pid  7959] 18:44:19.160678 io_uring_enter(189, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>
[pid  7961] 18:44:19.391462 <... io_uring_enter resumed>) = 1
[pid  7957] 18:44:19.464994 <... io_uring_enter resumed>) = 1
[pid  7957] 18:44:19.465651 io_uring_enter(185, 1, 1, IORING_ENTER_GETEVENTS, NULL, 139637976727560 <unfinished ...>

ちなみに、興味本位ですが、AlmaLinux 8のような、カーネルが5.1よりも低いLinuxで試すとこうなりました。

$ uname -srvmpio
Linux 4.18.0-425.3.1.el8.x86_64 #1 SMP Tue Nov 8 14:08:25 EST 2022 x86_64 x86_64 x86_64 GNU/Linux


$ bin/server.sh -Dinfinispan.server.channel.epoll=false

NioServerSocketChannelが選ばれています。

2023-02-05 18:53:47,839 INFO  (main) [org.infinispan.SERVER] Using transport: NIO

これは、io_uringが使える環境ではないとNetty io_uringが判定するからですね。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/server/core/src/main/java/org/infinispan/server/core/transport/NativeTransport.java#L49

https://github.com/netty/netty-incubator-transport-io_uring/blob/netty-incubator-transport-parent-io_uring-0.0.14.Final/transport-classes-io_uring/src/main/java/io/netty/incubator/channel/uring/IOUring.java#L25-L58

Hot Rod Clientは?

ところで、Infinispanのリポジトリをio_uringで調べていると、どうもInfinispan Server以外にもNetty io_uringを使っていると思われる箇所が
見つかります。

Hot Rod Clientですね。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NativeTransport.java

というわけで、こちらも試してみましょう。

使用するMavenのバージョン。

$ mvn --version
Apache Maven 3.8.7 (b89d5959fcde851dcb1c8946a785a163f14e1e29)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.5, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-58-generic", arch: "amd64", family: "unix"

Infinispan Serverは、172.18.0.2〜172.18.0.4で3ノード起動させ、クラスターを構成しておくことにします。

起動コマンドは、こちら。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=$(hostname -i) \
    -Dinfinispan.server.channel.epoll=false

各Infinispan Serverには、管理用ユーザー、アプリケーション用ユーザーをそれぞれ作成しておきます。

$ bin/cli.sh user create -g admin -p password ispn-admin
$ bin/cli.sh user create -g application -p password ispn-user

Maven依存関係等は、まずはこんな感じで用意。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-client-hotrod</artifactId>
            <version>14.0.6.Final</version>
        </dependency>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>14.0.6.Final</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

簡単なテストコードを書いて、確認してみます。

src/test/java/org/littlewings/infinspan/remote/iouring/HotRodClientIoUringTest.java

package org.littlewings.infinspan.remote.iouring;

import java.util.function.Consumer;
import java.util.stream.IntStream;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.RemoteCacheManagerAdmin;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class HotRodClientIoUringTest {
    static String createUri(String userName, String password) {
        return String.format(
                "hotrod://%s:%s@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222",
                userName,
                password
        );
    }

    @BeforeAll
    static void setUpAll() {
        String uri = createUri("ispn-admin", "password");

        try (RemoteCacheManager manager = new RemoteCacheManager(uri)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("distCache");

            org.infinispan.configuration.cache.Configuration configuration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering()
                            .cacheMode(org.infinispan.configuration.cache.CacheMode.DIST_SYNC)
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .build();

            admin.getOrCreateCache("distCache", configuration);
        }
    }

    <K, V> void withCache(String cacheName, Consumer<RemoteCache<K, V>> func) {
        String uri = createUri("ispn-user", "password");

        try (RemoteCacheManager manager = new RemoteCacheManager(uri)) {
            RemoteCache<K, V> cache = manager.getCache(cacheName);

            func.accept(cache);
        }
    }

    @Test
    void transportIoUring() {
        this.<String, String>withCache("distCache", cache -> {
            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> cache.put("key" + i, "value" + i));

            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> assertThat(cache.get("key" + i)).isEqualTo("value" + i));
        });
    }
}

確認。

$ mvn test

このテスト自体は動くのですが、io_uringは使えないといった感じのログが出ます。

2月 05, 2023 7:37:16 午後 org.infinispan.client.hotrod.impl.transport.netty.NativeTransport useNativeIOUring
INFO: ISPN004108: Native IOUring transport not available, using NIO instead: io.netty.incubator.channel.uring.IOUring

どうやらクラスパス上にio.netty.incubator.channel.uring.IOUringクラスがなく、かつHot Rod Clientのpom.xmlを見ると
Netty io_uringはオプションになっているようです。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/client/hotrod-client/pom.xml#L103-L121

というわけで、依存関係にNetty io_uringを追加します。

        <dependency>
            <groupId>io.netty.incubator</groupId>
            <artifactId>netty-incubator-transport-native-io_uring</artifactId>
            <version>0.0.14.Final</version>
            <classifier>linux-x86_64</classifier>
        </dependency>

バージョンは、Infinispanが使用しているものに合わせておきます。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/build/configuration/pom.xml#L184

再度実行。

$ mvn test

transportに関するログがなくなったので、io_uringが使われているかどうかもわからなくなりました…。
Infinispan Serverと違って、どのトランスポートを選択したのかを示すログは出力されないんですよね。

straceで確認すると、io_uringを使っていなさそうだったので

$ strace -f mvn test 2>&1 | grep io_uring

ソースコードの感じからして、やっぱりepollを無効にする必要がありそうです。

https://github.com/infinispan/infinispan/blob/14.0.6.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NativeTransport.java#L64-L72

システムプロパティで指定して実行。

$ mvn test -Dinfinispan.server.channel.epoll=false

やっぱり見かけ上はわからないので、straceで確認すると

$ strace -f mvn test -Dinfinispan.server.channel.epoll=false 2>&1 | grep io_uring

io_uringに関するシステムコールが使われていることが確認できます。

[pid 25604] <... io_uring_enter resumed>) = 2
[pid 25604] io_uring_enter(93, 2, 1, IORING_ENTER_GETEVENTS, NULL, 140217797312520 <unfinished ...>
[pid 25604] <... io_uring_enter resumed>) = 2
[pid 25604] io_uring_enter(93, 2, 1, IORING_ENTER_GETEVENTS, NULL, 140217797312520 <unfinished ...>
[pid 25604] <... io_uring_enter resumed>) = 2
[pid 25604] io_uring_enter(93, 0, 1, IORING_ENTER_GETEVENTS, NULL, 30064771080 <unfinished ...>
[pid 25604] <... io_uring_enter resumed>) = 0

最終的なMaven依存関係などを書くと、こうなりました。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-client-hotrod</artifactId>
            <version>14.0.6.Final</version>
        </dependency>
        <dependency>
            <groupId>io.netty.incubator</groupId>
            <artifactId>netty-incubator-transport-native-io_uring</artifactId>
            <version>0.0.14.Final</version>
            <classifier>linux-x86_64</classifier>
        </dependency>

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>14.0.6.Final</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.24.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>

それから、システムプロパティの指定は今回の例だとソースコードにそのまま書いても良いかなと。

public class HotRodClientIoUringTest {
    static {
        System.setProperty("infinispan.server.channel.epoll", "false");
    }

とりあえず、io_uringを使うようにはできたので、良しとしましょう。

まとめ

Infinispanで、io_uringを使うようにしてみました。

Infinispan Serverだけかなと思っていたのですが、Hot Rod Client側も対応していたんですね。ただ、新しいHot Rod Clientの方は対応して
いないようでしたが。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-experimental-io_uring