CLOVER🍀

That was when it all began.

HazelcastのDiscovery SPIを試して解説する

Hazelcast 3.6から、Discovery SPIというものが登場しました。

Discovery SPI

前に、自分がHazelcastの内部構造の紹介エントリ(ネットワーク編)を書いた時に、名前だけちょこっと紹介したものです。
*このエントリで書いた内容は、現在ではクラス構成といった情報が古くなりつつありますが、考え方はまだ使えるようです

Hazelcast Internal(ネットワーク編) - CLOVER

今回は、このDiscovery SPIを試しつつ、ちょっと解説してみたいと思います。

Discovery SPI?

Discovery SPIというのは、HazelcastのNodeを探索するための処理を実装するためのオプションです。

特にクラウド環境においてはマルチキャスト通信が行えないので、環境に応じたNode Discoveryの戦略を立てる必要があります。HazelcastはNode Discoveryの
方法としてはマルチキャストによる動的なNode検出(デフォルト)と、TCPによるユニキャストでの固定のNode検出の2つを持っています。

ここで、Node Discoveryでクラスタ参加対象となるNodeを検出する機能を実装できるようにしたのが、Discovery SPIです。

すでに、このAPIを使用したNode Discoveryの実装がいくつか存在します。

たとえば…

  • AWS Cloud Discovery
  • Apache jclouds Cloud Discovery
  • Azure Cloud Discovery
  • Zookeeper Cloud Discovery
  • Consul Cloud Discovery
  • etcd Cloud Discovery
  • Eureka Cloud Discovery
  • Heroku Cloud Discovery
  • Kubernetes Cloud Discovery

などなど。

詳しい一覧は、こちらへ。

Hazelcast IMDG Plugins

ドキュメントのサンプルでは、/etc/hostsファイルを使用してDiscovery SPIを使ったNode Discoveryの実装を書いています。

Extending The AbstractDiscoveryStrategy

Implementing Lookup

Mapping to DiscoveryNodes

GitHubでのコードは、こちら。
https://github.com/hazelcast/hazelcast-code-samples/tree/v3.9.3/spi/discovery

Discovery SPIの主要なAPI

Discovery SPIでは、いくつかのインターフェースやクラスを継承して、Node Discoveryの仕組みを作成します。

Discovery SPIを使って、実装する必要があるインターフェース(やクラス)はこちらです。

  • DiscoveryStrategy … Discovery SPIの主役で、リクエストに応じてNodeを探してその結果を一覧として返却します。通常、AbstractDiscoveryStrategyという抽象クラスを継承して作成します
  • DiscoveryStrategyFactory … DiscoveryStrategyのインスタンスを作成する、ファクトリクラスです。JavaのService Providerの仕組みを使用しており、この実装クラスの名前をMETA-INF/services/com.hazelcast.spi.discovery.DiscoveryStrategyFactoryファイルに記述する必要があります
  • NodeFilter … こちらは作成は任意になりますが、DiscoveryStrategyが検出したMemberをフィルタリングする役割を担います

あと、インターフェースとしては規程されていないものの、設定に関するクラスも作成したりします。

続いて、実装は作成しないものの、Discovery SPIの実装内で利用するもの。

  • DiscoveryNode … Discovery SPIにおける、Hazelcast Nodeを表します。DiscoveryStrategyからは、検出したNodeとして返却されることになります。また、DiscoveryStrategyFactoryへは、Embeddedに使っている時にはLocal Nodeが渡され、Clientとして動作している時にはnullが渡されます
  • SimpleDiscoveryNode … DiscoveryNodeのデフォルト実装です

最後に、Discovery SPIを使う側のクラス。

  • DiscoveryService … DiscoveryStrategyを駆動して、Node検出を行うインターフェースです。通常、DiscoveryServiceを作成する必要はなく、Hazelcastがデフォルト実装を提供します
  • DiscoveryServiceProvider …DiscoveryServiceを提供するインターフェースで、Configとして設定します
  • DiscoveryServiceSettings … DiscoveryServiceProviderの設定です
  • DiscoveryMode … DiscoveryServiceが、クラスタ内のMemberか、Clientとして動作しているかを表す列挙型

とまあ、登場するのはこれくらいです。コードを書く範囲では見えないものもありますが。

では、紹介はこれくらいにして使っていってみましょう。

お題

Hazelcastのドキュメントでは/etc/hostsを使ったDiscovery SPIを作成していましたが、もうちょっとひねりたいなーと思い、こういうお題にしました。

  • Redisを使ってNode Discoveryを行う
  • Hazelcastは、Embedded Modeで動かす
  • RedisのSetに対して、Nodeの起動時にNodeを登録、Nodeの終了時に削除
  • 作成するDiscoveryStrategyの設定として、Redisへの接続URLが設定可能
  • Redisへの接続URLは任意設定で、省略時はデフォルト値が使われる

環境

実行環境。

$ java -version
openjdk version "1.8.0_151"
OpenJDK Runtime Environment (build 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_151, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

Redisについては、起動。準備済みとします。今回使用したRedisは、4.0.8です。

準備

では、Maven依存関係を。まずはHazelcast。

        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>3.9.3</version>
        </dependency>

Redisへのアクセスには、Lettuceを使用しました。

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

あとは、テストで使うライブラリを。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.9.1</version>
            <scope>test</scope>
        </dependency>

Discovery SPIを使って、Redis Discoveryを作ってみる

では、作っていきます。

設定に関するクラス

最初は、作成するDiscoveryStrategyの設定に関するクラスを作成します。クラスというか、インターフェースですね。
Hazelcastのサンプルでは、クラスとして実装されていましたが。
src/main/java/org/littlewings/hazelcast/discoveryspi/RedisDiscoveryConfiguration.java

package org.littlewings.hazelcast.discoveryspi;

import com.hazelcast.config.properties.PropertyDefinition;
import com.hazelcast.config.properties.PropertyTypeConverter;
import com.hazelcast.config.properties.SimplePropertyDefinition;

public interface RedisDiscoveryConfiguration {
    PropertyDefinition REDIS_URL =
            new SimplePropertyDefinition("redis-url", true, PropertyTypeConverter.STRING);
}

今回は、Redisへの接続URLのみを設定項目として設けました。

PropertyDefinition(の実装であるSimplePropertyDefinition)を使って、設定項目を作成します。

    PropertyDefinition REDIS_URL =
            new SimplePropertyDefinition("redis-url", true, PropertyTypeConverter.STRING);

最低限項目名と、期待する型が必要です。引数を3つ取るコンストラクタの場合は、第2引数をtrueにするとオプション扱いにすることができます。

DiscoveryStrategyFactory

続いて、作成した設定項目と、まだ登場していませんが実装したDiscoveryStrategyを使ったDiscoveryStrategyFactoryです。DiscoveryStrategyFactoryインターフェースを
実装します。
src/main/java/org/littlewings/hazelcast/discoveryspi/RedisDiscoveryStrategyFactory.java

package org.littlewings.hazelcast.discoveryspi;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import com.hazelcast.config.properties.PropertyDefinition;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.discovery.DiscoveryNode;
import com.hazelcast.spi.discovery.DiscoveryStrategy;
import com.hazelcast.spi.discovery.DiscoveryStrategyFactory;

public class RedisDiscoveryStrategyFactory implements DiscoveryStrategyFactory {
    @Override
    public Class<? extends DiscoveryStrategy> getDiscoveryStrategyType() {
        return RedisDiscoveryStrategy.class;
    }

    @Override
    public DiscoveryStrategy newDiscoveryStrategy(DiscoveryNode discoveryNode, ILogger logger, Map<String, Comparable> properties) {
        return new RedisDiscoveryStrategy(discoveryNode, logger, properties);
    }

    @Override
    public Collection<PropertyDefinition> getConfigurationProperties() {
        return Collections.singletonList(RedisDiscoveryConfiguration.REDIS_URL);
    }
}

DiscoveryStrategyのClassクラスを返すようにgetDiscoveryStrategyTypeメソッドを作成し

    @Override
    public Class<? extends DiscoveryStrategy> getDiscoveryStrategyType() {
        return RedisDiscoveryStrategy.class;
    }

newDiscoveryStrategyメソッドでは、DiscoveryStrategyのインスタンスを作成して返します。この時、DiscoveryNodeとLogger、そして設定項目がMapとして
渡ってきます。

    @Override
    public DiscoveryStrategy newDiscoveryStrategy(DiscoveryNode discoveryNode, ILogger logger, Map<String, Comparable> properties) {
        return new RedisDiscoveryStrategy(discoveryNode, logger, properties);
    }

このDiscoveryNodeが、Embeddedの時は自Node、Clientの時はnullということになるらしいですね。

そして、設定項目をCollectionとして返します。このPropertyDefinitionのCollectionは、設定項目のバリデーションに使用されます。

    @Override
    public Collection<PropertyDefinition> getConfigurationProperties() {
        return Collections.singletonList(RedisDiscoveryConfiguration.REDIS_URL);
    }

PropertyDefinitionを必須項目として作成していると、Hazelcastの設定で該当の項目を指定していないとエラーになります。今回は、任意項目に
していましたね。

このクラスを、META-INF/services/com.hazelcast.spi.discovery.DiscoveryStrategyFactoryというファイルを作成して、その中にクラス名を記載します。
src/main/resources/META-INF/services/com.hazelcast.spi.discovery.DiscoveryStrategyFactory

org.littlewings.hazelcast.discoveryspi.RedisDiscoveryStrategyFactory

これで、作成したDiscoveryStrategyFactoryがServiceLoaderによりロードされるようになります。ただ、これだけでは有効化されませんが。

DiscoveryStrategy

最後に、今回の主役であるDiscoveryStrategyです。
src/main/java/org/littlewings/hazelcast/discoveryspi/RedisDiscoveryStrategy.java

package org.littlewings.hazelcast.discoveryspi;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;

import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.discovery.AbstractDiscoveryStrategy;
import com.hazelcast.spi.discovery.DiscoveryNode;
import com.hazelcast.spi.discovery.SimpleDiscoveryNode;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import reactor.core.publisher.Flux;

public class RedisDiscoveryStrategy extends AbstractDiscoveryStrategy {
    DiscoveryNode discoveryNode;
    RedisClient redisClient;
    StatefulRedisConnection<String, String> connection;

    String redisUrl;

    public RedisDiscoveryStrategy(DiscoveryNode discoveryNode, ILogger logger, Map<String, Comparable> properties) {
        super(logger, properties);

        this.discoveryNode = discoveryNode;

        redisUrl = getOrDefault(
                "redis.discovery",
                RedisDiscoveryConfiguration.REDIS_URL,
                "redis://redispass@172.17.0.2:6379/0"
        );
    }

    @Override
    public void start() {
        redisClient = RedisClient.create(redisUrl);
        connection = redisClient.connect();

        if (discoveryNode != null) {
            RedisReactiveCommands<String, String> commands = connection.reactive();
            commands.sadd("hazelcast-nodes", getLocalNodeAddress()).block();

            getLogger().info("register local-node[" + getLocalNodeAddress() + "]");
        }
    }

    String getLocalNodeAddress() {
        return discoveryNode.getPrivateAddress().getHost() + ":" + discoveryNode.getPrivateAddress().getPort();
    }

    @Override
    public Iterable<DiscoveryNode> discoverNodes() {
        RedisReactiveCommands<String, String> commands = connection.reactive();

        Flux<String> nodes = commands.smembers("hazelcast-nodes");

        getLogger().info("discovery nodes");

        return nodes
                .map(n -> {
                            getLogger().info("node: " + n);

                            String host = n.split(":")[0];
                            int port = Integer.parseInt(n.split(":")[1]);

                            try {
                                Map<String, Object> attributes = new HashMap<>();
                                attributes.put("url", redisUrl);
                                attributes.put("host", host);
                                attributes.put("port", port);

                                Address address = new Address(InetAddress.getByName(host), port);
                                return (DiscoveryNode) new SimpleDiscoveryNode(address, attributes);
                            } catch (UnknownHostException e) {
                                throw new RuntimeException(e);
                            }
                        }
                )
                .collectList()
                .block();
    }

    String getLocalNodeAddress() {
        return discoveryNode.getPrivateAddress().getHost() + ":" + discoveryNode.getPrivateAddress().getPort();
    }

    @Override
    public void destroy() {
        getLogger().info("shutdown redis discovery strategy...");

        try {
            if (discoveryNode != null) {
                RedisReactiveCommands<String, String> commands = connection.reactive();
                commands.srem("hazelcast-nodes", getLocalNodeAddress()).block();
            }
        } finally {
            connection.close();
            redisClient.shutdown();
        }
    }
}


コンストラクタでは、設定項目の値を取得しています。AbstractDiscoveryStrategy#getOrDefaultで、設定項目が指定されていなかった場合には
デフォルト値を返すようにすることができます。

    public RedisDiscoveryStrategy(DiscoveryNode discoveryNode, ILogger logger, Map<String, Comparable> properties) {
        super(logger, properties);

        this.discoveryNode = discoveryNode;

        redisUrl = getOrDefault(
                "redis.discovery",
                RedisDiscoveryConfiguration.REDIS_URL,
                "redis://redispass@172.17.0.2:6379/0"
        );
    }

getOrDefault以外にも、設定値を取得できるメソッドはいくつかあるので、確認してみるとよいでしょう。

最初に書いているのはprefixで、システムプロパティで指定する時には、このprefixを使って指定します。

つまり、今回の例だとシステムプロパティを使って指定すると

-Dredis.discovery.redis-url=redis://redispass@172.17.0.2:6379/0

となり、設定ファイルに書くと

                    <properties>
                        <property name="redis-url">redis://redispass@172.17.0.2:6379/0</property>
                    </properties>

となります。設定に書く時には、prefixは指定しません。設定ファイルの全貌は、また後で書きます。

DiscoveryStrategyは起動時にstart、終了時にdestroyが呼ばれるので、今回は起動/終了時にそれぞれRedisのSetに自Nodeを登録、削除するようにしてみました。

    @Override
    public void start() {
        redisClient = RedisClient.create(redisUrl);
        connection = redisClient.connect();

        if (discoveryNode != null) {
            RedisReactiveCommands<String, String> commands = connection.reactive();
            commands.sadd("hazelcast-nodes", getLocalNodeAddress()).block();

            getLogger().info("register local-node[" + getLocalNodeAddress() + "]");
        }
    }

    @Override
    public void destroy() {
        getLogger().info("shutdown redis discovery strategy...");

        try {
            if (discoveryNode != null) {
                RedisReactiveCommands<String, String> commands = connection.reactive();
                commands.srem("hazelcast-nodes", getLocalNodeAddress()).block();
            }
        } finally {
            connection.close();
            redisClient.shutdown();
        }
    }

「hazelcast-nodes」というキーで、IPアドレスとポートをペアで登録するようにしています。

IPアドレスとポートはDiscoveryNodeから取得していますが、なんかDiscoveryNode#getPrivateAddressとか使っています。

    String getLocalNodeAddress() {
        return discoveryNode.getPrivateAddress().getHost() + ":" + discoveryNode.getPrivateAddress().getPort();
    }

もうひとつ、DiscoveryNode#getPublicAddressというものもあります。こちらについては、また後で。

そして、DiscoveryStrategy#discoverNodes。このメソッドでは、クラスタ参加対象となる可能性のあるNodeを、IterableなDiscoveryNodeとして返すように実装します。

    @Override
    public Iterable<DiscoveryNode> discoverNodes() {
        RedisReactiveCommands<String, String> commands = connection.reactive();

        Flux<String> nodes = commands.smembers("hazelcast-nodes");

        getLogger().info("discovery nodes");

        return nodes
                .map(n -> {
                            getLogger().info("node: " + n);

                            String host = n.split(":")[0];
                            int port = Integer.parseInt(n.split(":")[1]);

                            try {
                                Map<String, Object> attributes = new HashMap<>();
                                attributes.put("url", redisUrl);
                                attributes.put("host", host);
                                attributes.put("port", port);

                                Address address = new Address(InetAddress.getByName(host), port);
                                return (DiscoveryNode) new SimpleDiscoveryNode(address, attributes);
                            } catch (UnknownHostException e) {
                                throw new RuntimeException(e);
                            }
                        }
                )
                .collectList()
                .block();
    }

RedisのSetに、Nodeが「IPアドレス:ポート」の形式で入っているので、それをsplitしてIPアドレスとポートに分解、

        Flux<String> nodes = commands.smembers("hazelcast-nodes");

        return nodes
                .map(n -> {
                            getLogger().info("node: " + n);

                            String host = n.split(":")[0];
                            int port = Integer.parseInt(n.split(":")[1]);

                            try {
                                Map<String, Object> attributes = new HashMap<>();
                                attributes.put("url", redisUrl);
                                attributes.put("host", host);
                                attributes.put("port", port);

                                Address address = new Address(InetAddress.getByName(host), port);
                                return (DiscoveryNode) new SimpleDiscoveryNode(address, attributes);
                            } catch (UnknownHostException e) {
                                throw new RuntimeException(e);
                            }
                        }
                )
                .collectList()
                .block();

Addressを生成して、SimpleDiscoveryNodeを作成します。

                                Address address = new Address(InetAddress.getByName(host), port);
                                return (DiscoveryNode) new SimpleDiscoveryNode(address, attributes);

SimpleDiscoveryNodeの作成にはAddressとMap(プロパティ)を使用しますが、プロパティの指定自体は任意で省略することもできます。好きにプロパティを
追加して、NodeFilterあたりで使うのでないかなと思います(通常のHazelcastのMemberからの情報としては残らないみたいなので)。

最後に、Redisに入っていたNodeの情報を、DiscoveryNodeのListとして返却します。

ここまでで、DiscoveryStrategyと関連するクラスの作成が終わりました。

確認してみる

それでは、動作確認してみましょう。

設定ファイルを作成する

最初に、設定ファイルを作成してみます。

作った設定ファイルは、こちら。
src/test/resources/hazelcast.xml

<?xml version="1.0" encoding="utf-8"?>
<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.hazelcast.com/schema/config
                               http://www.hazelcast.com/schema/config/hazelcast-config-3.9.xsd"
           xmlns="http://www.hazelcast.com/schema/config">
    <!-- activate Discovery SPI -->
    <properties>
        <property name="hazelcast.discovery.enabled">true</property>
    </properties>

    <network>
        <join>
            <multicast enabled="false"/>
            <tcp-ip enabled="false"/>

            <discovery-strategies>
                <discovery-strategy enabled="true"
                                    class="org.littlewings.hazelcast.discoveryspi.RedisDiscoveryStrategy">
                    <properties>
                        <property name="redis-url">redis://redispass@172.17.0.2:6379/0</property>
                    </properties>
                </discovery-strategy>
            </discovery-strategies>
        </join>
    </network>
</hazelcast>

ポイントは、Discovery SPIを有効にしていること。

    <properties>
        <property name="hazelcast.discovery.enabled">true</property>
    </properties>

通常のNode Discoveryの方法は、無効にしていること。

            <multicast enabled="false"/>
            <tcp-ip enabled="false"/>

「discovery-strategies」配下に「discovery-strategy」要素を並べ、有効なstrategyとクラスを指定します。discovery-strategyの「enabled」要素をtrueに
すると、有効なstrategyとなります。デフォルトはfalseで、falseにすると無視されることになります。

            <discovery-strategies>
                <discovery-strategy enabled="true"
                                    class="org.littlewings.hazelcast.discoveryspi.RedisDiscoveryStrategy">
                    <properties>
                        <property name="redis-url">redis://redispass@172.17.0.2:6379/0</property>
                    </properties>
                </discovery-strategy>
            </discovery-strategies>

また、discovery-strategyのclass属性に指定するのは、DiscoveryStrategyであることに注意してください。DiscoveryStrategyFactoryではありません。

プロパティを設定する場合は、properties/property要素で指定します。

テストを書く

ここまでの実装と設定ファイルを、テストコードを書いて確認してみましょう。

テストコードの雛形は、こちら。
src/test/java/org/littlewings/hazelcast/discoveryspi/RedisDiscoveryTest.java

package org.littlewings.hazelcast.discoveryspi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class RedisDiscoveryTest {
    // ここに、テストを書く!!
}

Nodeを3つ起動して、クラスタが構成できるか確認してみます。

    @Test
    public void redisDiscoveryCluster() {
        List<HazelcastInstance> hazelcasts =
                IntStream
                        .rangeClosed(1, 3)
                        .mapToObj(i -> Hazelcast.newHazelcastInstance())
                        .collect(Collectors.toList());

        HazelcastInstance hazelcast = hazelcasts.get(0);

        // クラスタが構成できている
        assertThat(hazelcast.getCluster().getMembers()).hasSize(3);

        hazelcasts.forEach(HazelcastInstance::shutdown);
        Hazelcast.shutdownAll();
    }

テストコード上はOKになりました。Redisの方はどうなっているでしょう?

redis-cliで確認すると、テスト中にNodeが増減することが確認できます。

172.17.0.2:6379> SMEMBERS hazelcast-nodes
(empty list or set)

172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:5701"

172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:5702"
2) "172.22.0.1:5701"

172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:5702"
2) "172.22.0.1:5703"
3) "172.22.0.1:5701"

172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:5702"
2) "172.22.0.1:5703"

172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:5703"

172.17.0.2:6379> SMEMBERS hazelcast-nodes
(empty list or set)

OKそうです。

次に、最初にNodeを3つ起動し、ひとつNodeを落として、それから3つNodeを起動してみます。

    @Test
    public void nodeUpDown() throws InterruptedException {
        List<HazelcastInstance> hazelcasts =
                new ArrayList<>(
                        IntStream
                                .rangeClosed(1, 3)
                                .mapToObj(i -> Hazelcast.newHazelcastInstance())
                                .collect(Collectors.toList())
                );

        HazelcastInstance hazelcast = hazelcasts.get(0);

        assertThat(hazelcast.getCluster().getMembers()).hasSize(3);

        // ひとつNode Down
        HazelcastInstance lastInstance = hazelcasts.get(2);
        lastInstance.shutdown();
        hazelcasts.remove(lastInstance);

        TimeUnit.SECONDS.sleep(3L);

        // Nodeが減っている
        assertThat(hazelcast.getCluster().getMembers()).hasSize(2);

        // Nodeを3つ追加
        hazelcasts.add(Hazelcast.newHazelcastInstance());
        hazelcasts.add(Hazelcast.newHazelcastInstance());
        hazelcasts.add(Hazelcast.newHazelcastInstance());

        // Nodeが追加されている
        assertThat(hazelcast.getCluster().getMembers()).hasSize(5);

        hazelcasts.forEach(HazelcastInstance::shutdown);
        Hazelcast.shutdownAll();
    }

うまくいきました。

ちなみに、存在しないテキトーなNodeをあらかじめRedisに登録して実行するとどうなるかというと

172.17.0.2:6379> SADD hazelcast-nodes "172.22.0.1:6000"
(integer) 1
172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:6000"

手動で追加したNodeの情報は入ったまま、動作します。

172.17.0.2:6379> SMEMBERS hazelcast-nodes
1) "172.22.0.1:5702"
2) "172.22.0.1:5703"
3) "172.22.0.1:6000"
4) "172.22.0.1:5701"

手動で追加したNode(172.22.0.1:6000)もDiscoveryStrategyに拾われるわけですが、接続できないNodeについては接続できないことが検出され、対象外となります。

情報: [172.22.0.1]:5701 [dev] [3.9.3] node: 172.22.0.1:6000
3 18, 2018 1:05:25 午前 com.hazelcast.nio.tcp.TcpIpConnector
情報: [172.22.0.1]:5701 [dev] [3.9.3] Connecting to /172.22.0.1:6000, timeout: 0, bind-any: true
3 18, 2018 1:05:25 午前 com.hazelcast.nio.tcp.TcpIpConnector
情報: [172.22.0.1]:5701 [dev] [3.9.3] Could not connect to: /172.22.0.1:6000. Reason: SocketException[接続を拒否されました to address /172.22.0.1:6000]
3 18, 2018 1:05:25 午前 com.hazelcast.internal.cluster.impl.DiscoveryJoiner
情報: [172.22.0.1]:5701 [dev] [3.9.3] [172.22.0.1]:6000 is added to the blacklist.

なので、接続できないNodeが入ったりしていても、うまく起動しているNodeだけでクラスタが構成できるというわけですね。

とりあえず、基本的な使い方を確認することができたのではないでしょうか。

飛ばした話

では、ここまでで飛ばした話について触れていってみましょう。

Public Address

DiscoveryStrategyを作った時に、起動時にNodeの情報をRedisに登録するためにDiscoveryNode#getPrivateAddressをしていました。そして、getPublicAddressもあるという
話も書きました。

    String getLocalNodeAddress() {
        return discoveryNode.getPrivateAddress().getHost() + ":" + discoveryNode.getPrivateAddress().getPort();
    }

両者の違いなのですが、まずPrivate Addressというのは必ず設定される(=nullにならない)ものです。これに対して、Public Addressというのは
オプション扱いのものになります。

Public Addressというのは、主にNATを想定した設定のようです。NAT上で見せる、IPアドレスとポートのペアをHazelcastの設定として行います。
Public Address

Discovery SPIにおけるPublic Addressは、システムプロパティで有効にします。
System Properties

hazelcast.discovery.public.ip.enabled

なにに使われるかというと、DiscoveryStrategyが検出したNodeのうち、自Nodeを除去する際に比較する方法として使います。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/DiscoveryJoiner.java#L78-L81

このプロパティを有効にすると、Public Addressの値で自Nodeを判別することになります。DiscoveryStrategy#discoverNodesの結果として返ってくる
IterableなDiscoveryNodeのうち、自NodeのAddressとDiscoveryNode#getXxxAddressを比較して自Nodeを除去するのですが、自Nodeを判別するために
getPrivateAddressを使うのかgetPublicAddressを使うのかがここで分かれます。

といっても、DiscoveryStrategy側でどちらのアドレスを見るように設定されているかはわからないので、微妙な感じもするのですけどね…。

NodeFilter

そういえば、今回はNodeFilterは使いませんでした。NodeFilterはこういうインターフェースで

public interface NodeFilter {
    boolean test(DiscoveryNode candidate);
}

DiscoveryNodeを引数にして呼び出される、testメソッドを実装します。

このNodeFilterはDiscoveryStrategy#discoverNodesの結果に対して適用され、このtestメソッドがtrueを返すとそのNodeは残り、falseを返すと対象のNodeは
Node Discoveryの対象から除去されます。

DiscoveryStrategyによって検出されたNodeに対して、さらに絞り込みができるというわけです。

XMLの設定ファイルに書く場合は、discovery-strategy要素よりも前に配置します。

            <discovery-strategies>
                <node-filter class="com.example.my.NodeFilterImpl"/>

                <discovery-strategy enabled="true"
                                    class="org.littlewings.hazelcast.discoveryspi.RedisDiscoveryStrategy">
                    <properties>
                        <property name="redis-url">redis://redispass@172.17.0.2:6379/0</property>
                    </properties>
                </discovery-strategy>
            </discovery-strategies>

discovery-strategy要素の後に書いたりすると、XML設定ファイルのXMLスキーマバリデーションでエラーになります。

もっと中身を

最後に、もうちょっと中身を追ってみます。

Discovery SPIを有効にすると、Node Discoveryを行うJoinerとしてDiscoveryJoinerが使用されるようになります。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/instance/Node.java#L713

DiscoveryJoinerは、TcpIpJoinerを継承したクラスとして作成されています。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/DiscoveryJoiner.java#L37-L38

これらのインターフェースであるJoinerについてはすでにdeprecatedになっているのですが、その実装としてDiscovery SPI用のJoinerが存在するのは
面白いです。

DiscoveryStrategyを扱うインターフェースであるDiscoveryServiceの実装は、デフォルトでは以下のProviderと実装が使用されます。実際のDiscoveryStrategyが
どのように使われるかは、このDefaultDiscoveryServiceの実装を見るとよいでしょう。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/spi/discovery/impl/DefaultDiscoveryServiceProvider.java
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/spi/discovery/impl/DefaultDiscoveryService.java

DiscoveryStrategyFactoryのロードから、DiscoveryStrategyのstart/stopといったライフサイクル管理や、DiscoveryStrategyを使ったNode Discovery、
NodeFilterの利用などはほぼすべてここに書かれています。

DiscoveryJoinerは、DiscoveryService越しにDiscoveryStrategyを操作することになります。DiscoveryStrategyそのものの存在は、DiscoveryJoinerは
意識しません。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/DiscoveryJoiner.java#L70

このDiscoveryJoinerによって、DiscoveryService#discoverNodes(さらに内部はDiscoveryStrategy)が呼び出され、その結果がTcpJoinerの
アドレス探索ロジックの代わりとして使われるわけです。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/DiscoveryJoiner.java#L55-L56
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/DiscoveryJoiner.java#L68-L69

TcpJoinerの場合は、Hazelcastの設定(TCPネットワークの設定)から対象となるMemberをリストアップして探索するわけですが、この部分がごっそりと
DiscoveryServiceに置き換わることになります、と。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java#L402-L455

あとはTCPによるNode Discoveryと同じ流れに乗ることになります。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java#L131
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java#L255-L274

ClusterJoinManagerを使ってMaster検出および、クラスタ参加ですね。

つまり、TCPによるNode Discovery時の利用可能なNodeのリストアップを、設定ではなくDiscoveryServiceとDiscoveryStategyによって行えるようにしたのが
現在のDiscover SPIだということです。

また、設定項目を取得する際に書いていたprefixは、システムプロパティや環境変数から取得する時に使われるもので、設定ファイルに書く時に使うものではありません。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/spi/discovery/AbstractDiscoveryStrategy.java#L191-L195

設定から取得する時には、プロパティ名そのものから取得します。
https://github.com/hazelcast/hazelcast/blob/v3.9.3/hazelcast/src/main/java/com/hazelcast/spi/discovery/AbstractDiscoveryStrategy.java#L179

この点は、設定を書く時に説明しました。

まとめ

ちょっと前から存在していたDiscovery SPIですが、今頃やっと試してみました。

TCPを使ったNode Discoveryの仕組みとうまい具合に統合されており、良い仕掛けだな〜と思いました。

これで、Node Discoveryを自分で作成できるようになりますね。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-discovery-spi