CLOVER🍀

That was when it all began.

SmallRye Mutiny Vert.x Bindingsを使って、Redisにアクセスしてみる(+AssertSubscriberを試す)

これは、なにをしたくて書いたもの?

SmallRye Mutiny Vert.x Bindingsを単体でも使ってみようかなと思い、Redis Clientを使って遊ぼうとしていたのですが。

AssertSubscriberを一緒に使っていたら思わずハマったので、まとめてメモ。

SmallRye Mutiny Vert.x Bindingsとは

以前、このブログで少し触れたことがあります。

Vert.x API Generationをちょっと調べつつ、Vert.xのRedis ClientをSmallRye Mutinyに対応させてみる - CLOVER🍀

この時はSmallRye Reactive Utilsという名前(リポジトリ名)だったのですが、気づくとSmallRye Mutiny Vert.x Bindings
という名前になっていました。

GitHub - smallrye/smallrye-mutiny-vertx-bindings: Smallrye Mutiny bindings for Eclipse Vert.x

現在のバージョンは2.13で、リポジトリ内には各種クライアントに対応するプロジェクトが含まれています。

https://github.com/smallrye/smallrye-mutiny-vertx-bindings/tree/2.13.0/vertx-mutiny-clients

といっても、このプロジェクトはVert.x API Generationを使ってソースコードを自動生成するので、リポジトリ内に
実体はほとんどありませんが。

ドキュメントもあるようです。

Smallrye Mutiny Vert.x Bindings project

Smallrye Mutiny Vert.x bindings

Overview (SmallRye Mutiny - Client APIs 2.13.0 API)

SmallRyeのWebサイトやSmallRye Mutinyのサイトから直接辿れない気はするのですが…。

SmallRye

Mutiny!

よく見ると、SmallRye MutinyのGetting Startedページに少し登場します。

Getting Started with Mutiny / Using Mutiny with Eclipse Vert.x

それで、今回はRedis Clientを使ってみようかなと思います。

Redis Client | Eclipse Vert.x

対応するSmallRye Mutiny Vert.x Bindingsのライブラリは、こちらです。

https://github.com/smallrye/smallrye-mutiny-vertx-bindings/tree/2.13.0/vertx-mutiny-clients/vertx-mutiny-redis-client

そういえば、このディレクトリってvertx-mutiny-clientsという名前なのに、Vert.x CoreのSmallRye Mutiny Vert.x Bindingsまで
ありますね。いいんですけど。

https://github.com/smallrye/smallrye-mutiny-vertx-bindings/tree/2.13.0/vertx-mutiny-clients/vertx-mutiny-core

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.2 (ea98e05a04480131370aa0c110b8c54cf726c06f)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-84-generic", arch: "amd64", family: "unix"

Redisは6.2.5を使い、172.17.0.2のIPアドレスで動作しているものとします。また、パスワードはredispassで設定している
ものとします。

準備

Maven依存関係はこちら。

    <dependencies>
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-mutiny-vertx-redis-client</artifactId>
            <version>2.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.20.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

今回の主は、こちらのsmallrye-mutiny-vertx-redis-clientですね。

        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-mutiny-vertx-redis-client</artifactId>
            <version>2.13.0</version>
        </dependency>

よく見ると、SmallRye Mutinyって1.0.0になっていたんですね。

        <mutiny.version>1.0.0</mutiny.version>

https://github.com/smallrye/smallrye-mutiny-vertx-bindings/blob/2.13.0/pom.xml#L50

Release 1.0.0 · smallrye/smallrye-mutiny · GitHub

テストコードの雛形

確認は、テストコードで行います。まずは雛形を。

src/test/java/org/littlewings/smallrye/mutiny/RedisClientTest.java

package org.littlewings.smallrye.mutiny;

import java.util.List;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.Response;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class RedisClientTest {

    // ここに、テストを書く!!
}

import文がちょっとポイントですが、Vert.xのクラスを含め、SmallRye Mutinyのものを使います。

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.Response;

簡単にset/get/delしてみる

SmallRye Mutiny Vert.x Bindingsを使って、Redisにアクセスしてみます。簡単にset/get/delしてみましょう。

    @Test
    public void gettingStarted() {
        Vertx vertx = Vertx.vertx();

        Redis redis = Redis.createClient(vertx, "redis://:redispass@172.17.0.2:6379/0");
        RedisAPI redisApi = RedisAPI.api(redis);

        Uni<Response> setResponse = redisApi.set(List.of("key1", "value1"));
        Uni<Response> getResponse =
                setResponse
                        .onItem()
                        .transformToUni(r -> redisApi.get("key1"));

        UniAssertSubscriber<Response> getAssertSubscriber =
                getResponse
                        .subscribe()
                        .withSubscriber(UniAssertSubscriber.create());

        Response actualGetResponse =
                getAssertSubscriber
                        .awaitItem()
                        .assertCompleted()
                        .getItem();

        assertThat(actualGetResponse).asString().isEqualTo("value1");

        Uni<Response> delResponse = redisApi.del(List.of("key1"));
        Uni<Response> emptyResponse =
                delResponse
                        .onItem()
                        .transformToUni(r -> redisApi.get("key1"));

        UniAssertSubscriber<Response> emptyAssertSubscriber =
                emptyResponse
                        .subscribe()
                        .withSubscriber(UniAssertSubscriber.create());

        Response actualEmptyResponse =
                emptyAssertSubscriber
                        .awaitItem()
                        .assertCompleted()
                        .getItem();

        assertThat(actualEmptyResponse).isNull();

        redisApi.close();

        vertx.closeAndAwait();
    }

set、get。

        Uni<Response> setResponse = redisApi.set(List.of("key1", "value1"));
        Uni<Response> getResponse =
                setResponse
                        .onItem()
                        .transformToUni(r -> redisApi.get("key1"));

getの結果はUniで返ってくるので、確認にはUniAssertSubscriberを使いましょう。

        UniAssertSubscriber<Response> getAssertSubscriber =
                getResponse
                        .subscribe()
                        .withSubscriber(UniAssertSubscriber.create());

        Response actualGetResponse =
                getAssertSubscriber
                        .awaitItem()
                        .assertCompleted()
                        .getItem();

        assertThat(actualGetResponse).asString().isEqualTo("value1");

ちょっとハマったのがawaitItemで、これをつけないと動かなかったのですが…なにがマズいんでしょうね?

オマケ:Multiで試す

UniAssertSubscriberでちょっとハマったので、Multiも扱ってみることにしました。

Redis Clientで直接Multiを返すものはなかったので、mgetの結果をMultiに変換して扱うことにします。
まあ、これはこれでハマる理由になったのですが。

作成したテストコードは、こちら。

    @Test
    public void assertMulti() {
        Vertx vertx = Vertx.vertx();

        Redis redis = Redis.createClient(vertx, "redis://:redispass@172.17.0.2:6379/0");
        RedisAPI redisApi = RedisAPI.api(redis);

        Uni<Response> msetResponse = redisApi.mset(List.of("key1", "value1", "key2", "value2", "key3", "value3"));
        Uni<Response> mgetResponse =
                msetResponse.onItem().transformToUni(r -> redisApi.mget(List.of("key1", "key2", "key3")));

        Multi<String> multiMgetResponse =
                mgetResponse
                        .onItem()
                        .transformToMulti(item -> item.toMulti()).onItem().transform(item -> item.toString());

        AssertSubscriber<String> mgetAssertSubscriber =
                multiMgetResponse
                        .subscribe()
                        .withSubscriber(AssertSubscriber.create(10));

        List<String> actualMgetResponse =
                mgetAssertSubscriber
                        .awaitItems(3)
                        .assertCompleted()
                        .getItems();

        assertThat(actualMgetResponse).hasSize(3).containsExactly("value1", "value2", "value3");

        Uni<Response> delResponse = redisApi.del(List.of("key1", "key2", "key3"));
        Uni<Response> emptyResponse =
                delResponse.onItem().transformToUni(r -> redisApi.mget(List.of("key1", "key2", "key3")));

        UniAssertSubscriber<Response> emptyAssertSubscriber =
                emptyResponse
                        .subscribe()
                        .withSubscriber(UniAssertSubscriber.create());

        Response actualEmptyResponse =
                emptyAssertSubscriber
                        .awaitItem()
                        .assertCompleted()
                        .getItem();

        assertThat(actualEmptyResponse)
                .hasSize(3)
                .containsOnlyNulls();

        redisApi.close();

        vertx.closeAndAwait();
    }

mset/mgetして、mgetの結果は個々の要素をバラしてUniからMultiに変換。

        Uni<Response> msetResponse = redisApi.mset(List.of("key1", "value1", "key2", "value2", "key3", "value3"));
        Uni<Response> mgetResponse =
                msetResponse.onItem().transformToUni(r -> redisApi.mget(List.of("key1", "key2", "key3")));

        Multi<String> multiMgetResponse =
                mgetResponse
                        .onItem()
                        .transformToMulti(item -> item.toMulti()).onItem().transform(item -> item.toString());

Multiの場合は、AssertSubscriberを使います。

        AssertSubscriber<String> mgetAssertSubscriber =
                multiMgetResponse
                        .subscribe()
                        .withSubscriber(AssertSubscriber.create(10));

        List<String> actualMgetResponse =
                mgetAssertSubscriber
                        .awaitItems(3)
                        .assertCompleted()
                        .getItems();

        assertThat(actualMgetResponse).hasSize(3).containsExactly("value1", "value2", "value3");

ポイントはAssertSubscriber#createAssertSubscriber#awaitItemsです。

AssertSubscriber#createに与える値は、少なくとも期待する要素数以上を指定する必要があります。今回はMulti
3つの要素が含まれるので、3以上であればOKです。

                        .withSubscriber(AssertSubscriber.create(10));

AssertSubscriber#awaitItemsについては、期待する数をきっちり指定する必要があります。今回は、3以外だとうまくいかない
ことになります。

                mgetAssertSubscriber
                        .awaitItems(3)

ところで、AssertSubscriber#awaitItemsが必要な理由もやっぱりよくわかりませんでした…。

delしてmget。このコードの場合、データは削除した後なのでmgetの結果はすべてnullになるのですが、この状態の
UniMultiに変換しようとするとうまくいかない(nullを「失敗」と見なす模様)のでそのままUniにすることに
しました…。

        Uni<Response> delResponse = redisApi.del(List.of("key1", "key2", "key3"));
        Uni<Response> emptyResponse =
                delResponse.onItem().transformToUni(r -> redisApi.mget(List.of("key1", "key2", "key3")));

        UniAssertSubscriber<Response> emptyAssertSubscriber =
                emptyResponse
                        .subscribe()
                        .withSubscriber(UniAssertSubscriber.create());

        Response actualEmptyResponse =
                emptyAssertSubscriber
                        .awaitItem()
                        .assertCompleted()
                        .getItem();

        assertThat(actualEmptyResponse)
                .hasSize(3)
                .containsOnlyNulls();

これはこれで、ずいぶんハマりましたが…。

まとめ

SmallRye Mutiny Vert.x Bindingsの最近の情報を調べつつ、Redis Clientで遊び、AssertSubscriberでハマってみました。

いつの間にか、SmallRye Mutinyも1.0.0になっていたんですね。

SmallRye Mutiny Vert.x Bindingsの方も含めて、実装、ドキュメントともにいろいろ増えてきて良いなと思います。
もうちょっと扱って慣れていかないと…。

Apache Spark(スタンドアロンモード)のマスターノード、ワーカーノードをフォアグラウンドで起動する

これは、なにをしたくて書いたもの?

スタンドアロンモードのApache Sparkのマスターノード、ワーカーノードをふつうに起動するとバックグラウンドに
行ってしまうのですが。

これをフォアグラウンドで実行する方法はないのかな?と思いまして。

結果からいくと、SPARK_NO_DAEMONIZEという環境変数で制御できるようです。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)


$ python3 -V
Python 3.8.10

Apache Sparkは、3.1.2を使用します。

$ bin/pyspark --version
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.11
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.

SPARK_NO_DAEMONIZE環境変数

SPARK_NO_DAEMONIZEという環境変数を使うことで、今回のお題をクリアすることができます。

ドキュメントには特に記載がないようですが、sbin/start-master.shsbin/start-worker.shが内部で使用している
sbin/spark-daemon.shに記述があります。

https://github.com/apache/spark/blob/v3.1.2/sbin/spark-daemon.sh#L31

# Runs a Spark command as a daemon.
#
# Environment Variables
#
#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_HOME}/conf.
#   SPARK_LOG_DIR   Where log files are stored. ${SPARK_HOME}/logs by default.
#   SPARK_LOG_MAX_FILES Max log files of Spark daemons can rotate to. Default is 5.
#   SPARK_MASTER    host:path where spark code should be rsync'd from
#   SPARK_PID_DIR   The pid files are stored. /tmp by default.
#   SPARK_IDENT_STRING   A string representing this instance of spark. $USER by default
#   SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
#   SPARK_NO_DAEMONIZE   If set, will run the proposed command in the foreground. It will not output a PID file.
##

こちらですね。値を設定しておくと、フォアグラウンドで起動してくれるようです。

SPARK_NO_DAEMONIZE If set, will run the proposed command in the foreground. It will not output a PID file.

試してみましょう。

まずはSPARK_NO_DAEMONIZE環境変数を設定して、マスターノードを起動。

$ SPARK_NO_DAEMONIZE=1 sbin/start-master.sh

すると、フォアグラウンドでマスターノードが起動しました。

starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark--org.apache.spark.deploy.master.Master-1-master.out
Spark Command: /opt/java/openjdk/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host master --port 7077 --webui-port 8080
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/09/11 15:38:46 INFO Master: Started daemon with process name: 37@master
21/09/11 15:38:46 INFO SignalUtils: Registering signal handler for TERM
21/09/11 15:38:46 INFO SignalUtils: Registering signal handler for HUP
21/09/11 15:38:46 INFO SignalUtils: Registering signal handler for INT

〜省略〜

ワーカーノードも起動してみます。こちらも、SPARK_NO_DAEMONIZE環境変数を設定。

$ SPARK_NO_DAEMONIZE=1 sbin/start-worker.sh spark://[マスターノードのIPアドレス]:7077

こちらも、フォアグラウンドで起動しました。

starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark--org.apache.spark.deploy.worker.Worker-1-worker1.out
Spark Command: /opt/java/openjdk/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://172.17.0.2:7077
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/09/11 15:40:23 INFO Worker: Started daemon with process name: 19@worker1
21/09/11 15:40:23 INFO SignalUtils: Registering signal handler for TERM
21/09/11 15:40:23 INFO SignalUtils: Registering signal handler for HUP
21/09/11 15:40:23 INFO SignalUtils: Registering signal handler for INT

〜省略〜

知っておくと、Dockerコンテナ化する時などに便利かなと思います。