CLOVER🍀

That was when it all began.

Payara Server 4.1.2.172で、Hazelcast(JCache)の分散処理が使えるようになったという話

Payaraのソースコードを見ていて、こういうクラスがいるのにふと気づきました。

https://github.com/payara/Payara/blob/payara-server-4.1.2.172/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/hazelcast/PayaraHazelcastSerializer.java

Payara 4.1.2.172のリリースで、改善された内容です。

PAYARA-1006 Hazelcast serialization and JCache improvements (#1424) · payara/Payara@67b848f · GitHub

Release Notes

Payara Server 4.1.2.172 Release Notes

PayaraHazelcastSerializerがなにをしてくれるかというと、HazelcastのSerializerをラップすることでシリアライズ/デシリアライズの処理の際に、
ClassLoaderをアプリケーション側のものに一時的に差し替えてくれるものみたいです。

Payara+Hazelcastで分散処理ができなかったという話

Payaraは、サーバーリソースの一部としてHazelcastを管理しますが、セッションレプリケーションやJCacheによる
キャッシュを分散管理するのにはいいのですが、Hazelcastの機能のひとつである分散処理を実行しようとすると
次のような問題を持っていました。

  • HazelcastをロードするのはPayara
  • 分散処理は、アプリケーション側に定義される
  • Remote Nodeでタスクが実行された際に、Hazelcastがアプリケーション内にあるタスクを見つけることができない
  • 結果として、ClassNotFoundExceptionがスローされ実行に失敗する

分散処理を実行した場合にはアプリケーション側のクラスがシリアライズして使われるので、別のNodeにタスクを
転送(例えばJCacheのEntryProcessorや、RunnableやCallable)した際に、こうなってしまう、と。

似たような問題をClustered CDI EventBusも抱えていたのですが、これは過去のPayaraでは初期化処理時に
アプリケーション側のClassLoaderをキャプチャして一時的に差し替えることで回避していました。

Payara 4.1.1.171でClustered CDI EventBusについては改善されましたが、それ以外も少し状況が変わったみたいです。

ただ、この話題はPayara Serverのみです。Payara Microについては、同様のことをすると依然としてClassNotFoundExceptionに
なります。

確認してみる

それでは、今回Payara Serverを使ってJCacheのEntryProcessorやHazelcastのExecutor Serviceが分散実行
できるか確認してみましょう。

この2つを簡単に実装して確認してみます。

準備

pom.xmlは、こんな感じに。

    <packaging>war</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <failOnMissingWebXml>false</failOnMissingWebXml>
    </properties>

    <dependencies>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.cache</groupId>
            <artifactId>cache-api</artifactId>
            <version>1.0.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>3.8</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>app</finalName>
    </build>

サンプルアプリケーションの実装

JAX-RSの有効化

まずは、JAX-RSの有効化から。
src/main/java/org/littlewings/payara/hazelcast/distexec/JaxrsActivator.java

package org.littlewings.payara.hazelcast.distexec;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("")
public class JaxrsActivator extends Application {
}
EntryProcessor(JCache)

続いて、JCacheのEntryProcessorを作ってみます。EntryProcessorのキックは、JAX-RSのリソースクラス内で行うように実装しました。
src/main/java/org/littlewings/payara/hazelcast/distexec/CacheResource.java

package org.littlewings.payara.hazelcast.distexec;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import javax.annotation.PostConstruct;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.Configuration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("cache")
@ApplicationScoped
public class CacheResource {
    @Inject
    CacheManager cacheManager;

    @PostConstruct
    public void init() {
        Configuration<String, Integer> configuration =
                new MutableConfiguration<String, Integer>()
                        .setTypes(String.class, Integer.class);
        Cache<String, Integer> cache = cacheManager.createCache("simpleCache", configuration);

        IntStream
                .rangeClosed(1, 100)
                .forEach(i -> cache.put("key" + i, i));

        System.out.println("cache initialized.");
    }

    @GET
    @Path("entry-processor/{id}")
    @Produces(MediaType.TEXT_PLAIN)
    public String entryProcessor(@PathParam("id") int id) {
        String key = "key" + id;

        Cache<String, Integer> cache =
                cacheManager.getCache("simpleCache", String.class, Integer.class);
        int result =
                cache.invoke(key, new MyEntryProcessor());

        Map<String, Object> map = new HashMap<>();
        map.put("requested-id", id);
        map.put("key", key);
        map.put("result", result);

        return map.toString();
    }

    public static class MyEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
        @Override
        public Integer process(MutableEntry<String, Integer> entry, Object... arguments) throws EntryProcessorException {
            System.out.println("execute!!");
            return entry.getValue() * 2;
        }
    }
}

JCacheのCacheManagerを@Injectし、初期化処理としてデータの登録。

    @Inject
    CacheManager cacheManager;

    @PostConstruct
    public void init() {
        Configuration<String, Integer> configuration =
                new MutableConfiguration<String, Integer>()
                        .setTypes(String.class, Integer.class);
        Cache<String, Integer> cache = cacheManager.createCache("simpleCache", configuration);

        IntStream
                .rangeClosed(1, 100)
                .forEach(i -> cache.put("key" + i, i));

        System.out.println("cache initialized.");
    }

クラスタ環境下だと、同じことをNode数分だけやる可能性がありますが、そこは今回は置いておきます。

EntryProcessorは、別Nodeで実行される場合はシリアライズされて転送されることになるので、Serializableインターフェースを実装しておきます。

    public static class MyEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
        @Override
        public Integer process(MutableEntry<String, Integer> entry, Object... arguments) throws EntryProcessorException {
            System.out.println("execute!!");
            return entry.getValue() * 2;
        }
    }

実行されたNodeがわかるように、標準出力にメッセージ出力するくらいは仕込んでおきました。

あとは、EntryProcessorを実行するだけですね。どのエントリーに対して行うかは、JAX-RSの@PathParamで決めることにしました。

    @GET
    @Path("entry-processor/{id}")
    @Produces(MediaType.TEXT_PLAIN)
    public String entryProcessor(@PathParam("id") int id) {
        String key = "key" + id;

        Cache<String, Integer> cache =
                cacheManager.getCache("simpleCache", String.class, Integer.class);
        int result =
                cache.invoke(key, new MyEntryProcessor());

        Map<String, Object> map = new HashMap<>();
        map.put("requested-id", id);
        map.put("key", key);
        map.put("result", result);

        return map.toString();
    }
Executor Service(Hazelcast)

続いては、Hazelcastの分散処理のひとつである、ExecutorServiceを使います。

Executor Service

今回は、単純に「Hello World!!」を返すCallableなタスクを全Nodeで実行します。
src/main/java/org/littlewings/payara/hazelcast/distexec/HazelcastDistExecResource.java

package org.littlewings.payara.hazelcast.distexec;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Resource;
import javax.faces.bean.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;

@Path("hazelcast-distexec")
@ApplicationScoped
public class HazelcastDistExecResource {
    @Resource(lookup = "payara/Hazelcast")
    HazelcastInstance hazelcast;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String distExec() throws ExecutionException, InterruptedException {
        IExecutorService es = hazelcast.getExecutorService("default");
        Map<Member, Future<String>> resultFutures = es.submitToAllMembers(new MyCallable());

        Map<Member, String> results = new HashMap<>();
        for (Map.Entry<Member, Future<String>> entry : resultFutures.entrySet()) {
            results.put(entry.getKey(), entry.getValue().get());
        }

        return results.toString();
    }

    public static class MyCallable implements Callable<String>, Serializable {
        @Override
        public String call() throws Exception {
            return "Hello World!!";
        }
    }
}

Callableについても、Serializableである必要があります。

    public static class MyCallable implements Callable<String>, Serializable {
        @Override
        public String call() throws Exception {
            return "Hello World!!";
        }
    }

HazelcastInstanceは、JNDIルックアップして取得します。そして、ExecutorServiceを取得して全NodeでCallableを実行。処理結果をJAX-RS
リソースメソッドの結果として戻します。この時、各Hazelcast Nodeの名前が含まれます。

    @Resource(lookup = "payara/Hazelcast")
    HazelcastInstance hazelcast;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String distExec() throws ExecutionException, InterruptedException {
        IExecutorService es = hazelcast.getExecutorService("default");
        Map<Member, Future<String>> resultFutures = es.submitToAllMembers(new MyCallable());

        Map<Member, String> results = new HashMap<>();
        for (Map.Entry<Member, Future<String>> entry : resultFutures.entrySet()) {
            results.put(entry.getKey(), entry.getValue().get());
        }

        return results.toString();
    }

パッケージング

パッケージングは、ふつうにMavenで。

$ mvn package

「app.war」というファイルができます。

$ ls -1 target/app.war
target/app.war

Payara Serverの準備

デプロイ先の、Payara Serverを用意します。今回は、簡単にDockerイメージを使うことにしました。

https://hub.docker.com/r/payara/server-full/:title:Image for the full profile of Payara Server

payara1〜payara3という名前で、3つのPayara Serverを立ち上げます。「-v」オプションで、ホスト側のディレクトリをDockerイメージ内に
マウントしておきましょう。

## Node 1
$ docker run -it --rm --name payara1 -v /[今回作成したプロジェクト]/target:/target payara/server-full:172

## Node 2
$ docker run -it --rm --name payara2 -v /[今回作成したプロジェクト]/target:/target payara/server-full:172

## Node 3
$ docker run -it --rm --name payara3 -v /[今回作成したプロジェクト]/target:/target payara/server-full:172

Hazelcastの有効化とデプロイ

Payara Serverでは、Hazelcastではデフォルトでは無効化されているので、有効化するとともに作成したアプリケーションをデプロイします。

$ docker exec -it [payara1〜payara3] bash

## 以下の内容を、3 Node分繰り返す
$ perl -wpi -e 's!<hazelcast-runtime-configuration></hazelcast-runtime-configuration>!<hazelcast-runtime-configuration enabled="true" multicastGroup="224.2.2.4" multicastPort="2904" generate-names="true"></hazelcast-runtime-configuration>!' glassfish/domains/domain1/config/domain.xml
$ bin/asadmin restart-domain domain1
Successfully restarted the domain
Command restart-domain executed successfully.
$ bin/asadmin deploy /target/app.war
Enter admin user name>  admin
Enter admin password for user "admin"> 
Application deployed with name app.
Command deploy executed successfully.

domain.xmlの内容をPerlで書き換えていますが、今回はとりあえずPayara Microの内容をそのまま使いました。

$ perl -wpi -e 's!<hazelcast-runtime-configuration></hazelcast-runtime-configuration>!<hazelcast-runtime-configuration enabled="true" multicastGroup="224.2.2.4" multicastPort="2904" generate-names="true"></hazelcast-runtime-configuration>!' glassfish/domains/domain1/config/domain.xml

https://github.com/payara/Payara/blob/payara-server-4.1.2.172/appserver/extras/payara-micro/payara-micro-boot/src/main/resources/MICRO-INF/domain/domain.xml#L155

うまくいけば、Hazelcastクラスタが構成されます。

Members [3] {
	Member [172.17.0.2]:5900 - 3fd10db4-03df-4f7a-a15a-1b0ad86651f8 this
	Member [172.17.0.3]:5900 - 236e0f7f-1afd-40cb-a394-c3aa2d45d6e7
	Member [172.17.0.4]:5900 - 4d77f7ac-b636-4f4b-8dc2-b9db5bcac008
}
|#]

確認

それでは、確認してみましょう。

まずはJCacheのEntryProcessorから。リクエストは、全部Node 1に投げています。

$ curl http://172.17.0.2:8080/app/cache/entry-processor/1
{requested-id=1, result=2, key=key1}

$ curl http://172.17.0.2:8080/app/cache/entry-processor/20
{requested-id=20, result=40, key=key20}

$ curl http://172.17.0.2:8080/app/cache/entry-processor/35
{requested-id=35, result=70, key=key35}

$ curl http://172.17.0.2:8080/app/cache/entry-processor/55
{requested-id=55, result=110, key=key55}

$ curl http://172.17.0.2:8080/app/cache/entry-processor/72
{requested-id=72, result=144, key=key72}

$ curl http://172.17.0.2:8080/app/cache/entry-processor/80
{requested-id=80, result=160, key=key80}

OKそうです。

Payara Server側では、Node 1以外でも以下のようなログが出力されれば、分散実行に成功しています。
※データのオーナーのNodeで実行されます。

[#|2017-06-03T12:34:23.567+0000|INFO|Payara 4.1||_ThreadID=53;_ThreadName=hz._hzInstance_1_development.partition-operation.thread-2;_TimeMillis=1496493263567;_LevelValue=800;|
  execute!!|#]

続いて、Executor Service。

$ curl http://172.17.0.2:8080/app/hazelcast-distexec
{Member [172.17.0.2]:5900 - 3fd10db4-03df-4f7a-a15a-1b0ad86651f8 this=Hello World!!, Member [172.17.0.3]:5900 - 236e0f7f-1afd-40cb-a394-c3aa2d45d6e7=Hello World!!, Member [172.17.0.4]:5900 - 4d77f7ac-b636-4f4b-8dc2-b9db5bcac008=Hello World!!}

3つのNodeから結果が返っていることが確認できます。

JCacheのEntryProcessorとExecutor Serviceで、処理がNodeをまたがって実行できていることが確認できましたね。

Payara Microは?

これ、Payara Microでもできてもいい気がするのですが、残念ながらうまくいきません。

過去のPayaraでは動かせなかったこの機能、PayaraHazelcastSerializerでシリアライズ/デシリアライズ時にアプリケーション側のClassLoaderを
差し替えることで実現しているのですが、
https://github.com/payara/Payara/blob/payara-server-4.1.2.172/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/hazelcast/PayaraHazelcastSerializer.java#L61-L72

その条件として、ApplicationRegistryからアプリケーションの情報が取得できることが必要です。
https://github.com/payara/Payara/blob/7fa0583907c423dfc3b8a4001b50b98f279e3171/appserver/payara-appserver-modules/payara-micro-service/src/main/java/fish/payara/appserver/context/JavaEEContextUtilImpl.java#L157-L180

Payara Microでデプロイした場合、ApplicationRegistryにアプリケーションの情報が登録されないみたいで、実質PayaraHazelcastSerializerが
機能せずに、ClassNotFoundExceptionとなります。
※そのままPayara側のClassLoaderで動作してしまう

ApplicationLifecycleがApplicationRegistryに登録するみたいなので、Payar Microの場合はこれが動いていない…?
https://github.com/payara/Payara/blob/payara-server-4.1.2.172/nucleus/core/kernel/src/main/java/com/sun/enterprise/v3/server/ApplicationLifecycle.java

まとめ

Payara Serverで、JCacheのEntryProcessorとHazelcastのExecutor Serviceを使って(どっちもHazelcastですが)分散処理ができるように
なったことを、簡単に確認してみました。

今回のものは分散処理というほどのものでもないですが、これと同じ理屈でAggregationsやMapReduceも使えるでしょう。

Payara Microで現状使えないのはちょっと残念ですが、このあたりができるようになったのは個人的には大きいかなと思います。