CLOVER🍀

That was when it all began.

Apache Geodeで単一JavaVMのまま(若干ウソ)簡単なクラスタを立てる

Apache GeodeでPartitioned Cacheを使った確認などをしている時は、クラスタを構成して複数メンバー間でのデータの配置状況などの確認をしたいものですが、Apache GeodeではひとつのJavaVMでは簡単にクラスタを作るのが難しそうな感じです。

どうも、単一のJavaVMで複数のApache Geodeインスタンスを起動することが厳しそうなんですよね。

Apache Geodeのテストコードや、
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M2/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/standalone/DUnitLauncher.java

Spring SessionでもServer側はJavaVMを別途プロセス起動しようとしています。
https://github.com/spring-projects/spring-session/blob/1.2.1.RELEASE/samples/httpsession-gemfire-clientserver/build.gradle#L33
https://github.com/spring-projects/spring-session/blob/1.2.1.RELEASE/samples/httpsession-gemfire-clientserver/src/main/java/sample/ServerConfig.java

なので、このアプローチ自体は取るべきなのでしょう。

ただ、動作確認の時にいちいちプロセスを起動して、それから目的のプログラムを実行〜というのは面倒なので、簡易的にクラスタを組む方法を考えてみました。

JUnitとかでテスト中に複数のApache Geodeのメンバーを(別プロセスとしながらも)起動し、最後にシャットダウンするようなプログラムですね。

では、いろいろ試行した結果を書いておきます。

準備

とりあえず、Maven依存関係。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>geode-lucene</artifactId>
            <version>1.0.0-incubating.M2</version>
        </dependency>

        <dependency>
            <groupId>org.zeroturnaround</groupId>
            <artifactId>zt-exec</artifactId>
            <version>1.9</version>
        </dependency>

Apache Geodeと、外部プロセス起動用にZT Process Executorを使います。

で、メインとなるServer以外はどうするか?

プログラム中でメインで操作するCache以外のメンバーは、いわばクラスタを構成することだけが目的のメンバーとなります。これらは、別プロセスで起動することにします。

で、いろいろ試した結果、どうもLocatorが必要なようなので、各メンバーの起動時には「start-locator」を指定して一緒にLocatorも起動するようにしました。

テスト中などでよく触りそうなオプションを設定できるように作成したクラスが、こちら。
src/main/java/org/littlewings/geode/easycluster/SimpleCacheServer.java

package org.littlewings.geode.easycluster;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.StartedProcess;
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;

public class SimpleCacheServer {
    StartedProcess process;

    String gemfireProperties;
    String name;
    String cacheXmlFile;
    String startLocatorAddress = "localhost";
    int startLocatorPort;
    String locators;

    protected SimpleCacheServer() {
    }

    public static void main(String... args) throws InterruptedException {
        Cache cache = new CacheFactory().create();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> cache.close()));

        while (true) {
            TimeUnit.SECONDS.sleep(1L);
        }
    }

    protected void startProcess(ProcessExecutor executor) {
        try {
            process = executor.start();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void stop() {
        process.getProcess().destroy();
    }

    public static SimpleCacheServer newServer() {
        return new SimpleCacheServer();
    }

    public SimpleCacheServer name(String name) {
        this.name = name;
        return this;
    }

    public SimpleCacheServer gemfireProperties(String gemfireProperties) {
        this.gemfireProperties = gemfireProperties;
        return this;
    }

    public SimpleCacheServer cacheXmlFile(String cacheXmlFile) {
        this.cacheXmlFile = cacheXmlFile;
        return this;
    }

    public SimpleCacheServer startLocator(int startLocatorPort) {
        this.startLocatorPort = startLocatorPort;
        return this;
    }

    public SimpleCacheServer startLocator(String startLocatorAddress, int startLocatorPort) {
        this.startLocatorAddress = startLocatorAddress;
        this.startLocatorPort = startLocatorPort;
        return this;
    }

    public SimpleCacheServer locators(String locators) {
        this.locators = locators;
        return this;
    }

    protected String systemPropertyArgument(String key, Object value) {
        return String.format("-D%s=%s", key, value);
    }

    public SimpleCacheServer start() {
        List<String> commands = new ArrayList<>();
        commands.add("mvn");
        commands.add("exec:java");
        commands.add("-Dexec.mainClass=" + SimpleCacheServer.class.getName());

        commands.addAll(gemfireSystemPropertiesArguments());

        startProcess(new ProcessExecutor()
                .redirectOutput(Slf4jStream.of(SimpleCacheServer.class).asInfo())
                .command(commands));
        return this;
    }

    protected List<String> gemfireSystemPropertiesArguments() {
        List<String> arguments = new ArrayList<>();

        if (gemfireProperties != null) {
            arguments.add(systemPropertyArgument("gemfirePropertyFile", gemfireProperties));
        }

        if (name != null) {
            arguments.add(systemPropertyArgument("gemfire.name", name));
        }

        if (cacheXmlFile != null) {
            arguments.add(systemPropertyArgument("gemfire.cache-xml-file", cacheXmlFile));
        }

        if (startLocatorPort > 0) {
            arguments.add(systemPropertyArgument("gemfire.start-locator", startLocatorAddress + "[" + startLocatorPort + "]"));
        }

        if (locators != null) {
            arguments.add(systemPropertyArgument("gemfire.locators", locators));
        }

        return arguments;
    }
}

内部でプロセス起動(mvn exec:java使用)をして、Serverメンバーを起動しています。設定されたオプションなどは、システムプロパティとしてプロセス起動するプログラム側に渡して、設定値を上書きする形で使います。

使い方は、こんな感じです。
src/main/java/org/littlewings/geode/easycluster/App.java

package org.littlewings.geode.easycluster;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;

public class App {
    public static void main(String... args) throws InterruptedException {
        String cacheXmlFile = "cache.xml";
        int port = 10334;
        String locator = "localhost[" + port + "]";

        try (Cache cache = new CacheFactory().set("cache-xml-file", cacheXmlFile).set("start-locator", locator).create()) {
            List<SimpleCacheServer> servers =
                    IntStream
                            .rangeClosed(1, 2)
                            .mapToObj(i ->
                                    SimpleCacheServer
                                            .newServer()
                                            .name("server" + i)
                                            .cacheXmlFile(cacheXmlFile)
                                            .startLocator(port + i)
                                            .locators(locator)
                                            .start()
                            )
                            .collect(Collectors.toList());

            TimeUnit.SECONDS.sleep(20L);

            System.out.println("cluster other members = " + cache.getMembers() + ", cluster others size = " + cache.getMembers().size());

            servers.forEach(SimpleCacheServer::stop);
        }
    }
}

追加で必要なメンバーを、オプションを設定しながら構築してstart。

            List<SimpleCacheServer> servers =
                    IntStream
                            .rangeClosed(1, 2)
                            .mapToObj(i ->
                                    SimpleCacheServer
                                            .newServer()
                                            .name("server" + i)
                                            .cacheXmlFile(cacheXmlFile)
                                            .startLocator(port + i)
                                            .locators(locator)
                                            .start()
                            )
                            .collect(Collectors.toList());

使い終わったらシャットダウン。

            servers.forEach(SimpleCacheServer::stop);

これを、メインで操作するCacheを構成して最後にcloseします。

        try (Cache cache = new CacheFactory().set("cache-xml-file", cacheXmlFile).set("start-locator", locator).create()) {

なお、この時に用意したCacheの設定ファイルはこちら。
src/main/resources/cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://geode.apache.org/schema/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://geode.apache.org/schema/cache
        http://geode.apache.org/schema/cache/cache-1.0.xsd"
        version="1.0">

    <region name="region" refid="PARTITION_REDUNDANT"/>
</cache>

ちゃんと、クラスタも構成できました。
※自分以外のメンバーが出力されています

cluster other members = [192.168.254.129(server2:81618)<ec><v1>:1026, 192.168.254.129(server1:81613)<ec><v1>:1025], cluster others size = 2

これから動作確認でクラスタを組む時は、こんな感じのプログラムを書いていきましょう。