CLOVER🍀

That was when it all began.

WildFly Bootable JARを試す

これは、なにをしたくて書いたもの?

WildFly Bootable JARというものが便利そうだったので、ちょっと試してみることにしました。

WildFly Bootable JARとは?

WildFly Bootable JARとはWildFly Bootable JAR Maven Pluginを使って作成する、実行可能JARファイル(WildFly Bootable JAR)を
作る仕組みです。

GitHub - wildfly-extras/wildfly-jar-maven-plugin: WildFly Bootable JAR

WildFly Bootable JAR is final!

単純化して言うと、WARなどのアプリケーションにWildFlyを追加して、WildFlyを含めて実行可能なJARファイルにする
仕組みのようです。

Spring Bootみたいですね。

WildFly Bootable JAR Maven Plugiinを使って作成したWildFly Bootable JARは、通常のWildFlyと同様に動作するそうです。
WildFly CLIを使って操作できることもできるとか。

制限事項は、以下だそうです。

  • シャットダウン中のサーバーの再起動は不可
  • 実行中の変更(WildFly CLIでの更新)は保持されず、サーバーが終了するとその変更も失われる
  • サーバーを管理者モードで起動することはできない

ドキュメントは、こちらです。

WildFly Bootable JAR Documentation

WildFly側のドキュメントにも、記載があります。

Bootable JAR Guide

いずれも、WildFlyのドキュメントのトップからたどれますね。

WildFly Documentation

登場したのは、WildFly 20からのようです。

A bootable JAR for WildFly 20

ところで、こう書くとQuarkusがあるのでは?とも思ったりするのですが、Quarkusは直接はWildFlyの代わりにはならないので
あくまでJakarta EEを使用したアプリケーションを実行可能JARとしたい場合は、こちらの仕組みを使うのがよいのでしょうね。

できることを、もう少し?

WildFly Bootable JAR Maven Pluginを使ってできることを、もう少し書いてみましょう。

テストでの利用に関しては、Arquillianを使うようです。

https://github.com/wildfly/wildfly-arquillian/tree/3.0.1.Final/container-bootable

各種サンプルは、こちら。

https://github.com/wildfly-extras/wildfly-jar-maven-plugin/tree/5.0.1.Final/examples

というわけで、まずは使ってみましょう。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"

サンプルアプリケーション

まずは、動作確認するためのサンプルアプリケーションを作成します。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>wildfly-bootable-jar-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <failOnMissingWebXml>false</failOnMissingWebXml>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>jakarta.platform</groupId>
                <artifactId>jakarta.jakartaee-bom</artifactId>
                <version>8.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>jakarta.ws.rs</groupId>
            <artifactId>jakarta.ws.rs-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>jakarta.enterprise</groupId>
            <artifactId>jakarta.enterprise.cdi-api</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

簡単な、JAX-RS+CDIを使ったWebアプリケーションを用意しましょう。

src/main/java/org/littlewings/jakarta/wildfly/bootable/JaxrsActivator.java

package org.littlewings.jakarta.wildfly.bootable;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("/")
public class JaxrsActivator extends Application {
}

src/main/java/org/littlewings/jakarta/wildfly/bootable/MessageResource.java

package org.littlewings.jakarta.wildfly.bootable;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

@ApplicationScoped
@Path("message")
public class MessageResource {
    @Inject
    MessageService messageService;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello(@QueryParam("word") @DefaultValue("World") String word) {
        return messageService.message(word);
    }
}

src/main/java/org/littlewings/jakarta/wildfly/bootable/MessageService.java

package org.littlewings.jakarta.wildfly.bootable;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MessageService {
    public String message(String word) {
        return String.format("Hello %s!!", word);
    }
}

パッケージングすると、WARファイルができます。

$ mvn package

$ ll -h target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war
-rw-rw-r-- 1 xxxxx xxxxx 4.9K  7月  3 19:01 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war

まあ、ふつうです。

WildFly Bootable JAR Maven Pluginを追加する

このサンプルアプリケーションに、WildFly Bootable JAR Maven Pluginを追加してパッケージングしてみましょう。

シンプルには、こんな感じですね。

Adding the bootable JAR Maven plugin to your pom file

    <build>
        <plugins>
            <plugin>
                <groupId>org.wildfly.plugins</groupId>
                <artifactId>wildfly-jar-maven-plugin</artifactId>
                <version>5.0.1.Final</version>
                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</feature-pack-location>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

feature-pack-locationで、使用するWildFlyのバージョンを指定します。今回は、24.0.0.Finalです。

Specifying the WildFly server version to use

                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</feature-pack-location>
                </configuration>

バージョンを指定しない場合は、最新のWildFlyが使われます。

こちらは、mvn packageで動作させるためのものです。

                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>

この状態でパッケージングすると

$ mvn package

WildFly Bootable JAR Maven Pluginが動作し

[INFO] --- wildfly-jar-maven-plugin:5.0.1.Final:package (default) @ wildfly-bootable-jar-example ---
[INFO] Provisioning server configuration based on the standalone-microprofile.xml default configuration
[INFO] Building server based on [[wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final inherit-packages=false inheritConfigs=false includedConfigs [model=standalone name=standalone-microprofile.xml]]] galleon feature-packs
[INFO] Found boot artifact org.wildfly.core:wildfly-jar-boot:jar:16.0.0.Final:provided in org.wildfly:wildfly-ee-galleon-pack:24.0.0.Final
7月 03, 2021 7:06:42 午後 org.wildfly.core.embedded.LoggerContext$JBossLoggingModuleLogger greeting
INFO: JBoss Modules version 1.11.0.Final
7月 03, 2021 7:06:42 午後 org.jboss.msc.service.ServiceContainerImpl <clinit>
INFO: JBoss MSC version 1.4.12.Final
7月 03, 2021 7:06:42 午後 org.jboss.threads.Version <clinit>
INFO: JBoss Threads version 2.3.2.Final
7月 03, 2021 7:06:42 午後 org.jboss.as.server.ApplicationServerService start
INFO: WFLYSRV0049: WildFly Full 24.0.0.Final (WildFly Core 16.0.0.Final) starting
7月 03, 2021 7:06:43 午後 org.jboss.as.patching.installation.InstallationManagerService start
INFO: WFLYPAT0050: WildFly Full cumulative patch ID is: base, one-off patches include: none
7月 03, 2021 7:06:43 午後 org.jboss.as.server.suspend.SuspendController resume
INFO: WFLYSRV0212: Resuming server
7月 03, 2021 7:06:43 午後 org.jboss.as.server.BootstrapListener done
INFO: WFLYSRV0025: WildFly Full 24.0.0.Final (WildFly Core 16.0.0.Final) started in 1091ms - Started 29 of 32 services (3 services are lazy, passive or on-demand)
7月 03, 2021 7:06:43 午後 org.wildfly.security.Version <clinit>
INFO: ELY00001: WildFly Elytron version 1.16.0.Final
7月 03, 2021 7:06:45 午後 org.jboss.as.controller.AttributeDefinition validateAndSet
INFO: WFLYCTL0028: Attribute 'security-realm' in the resource at address '/subsystem=undertow/server=default-server/https-listener=https' is deprecated, and may be removed in a future version. See the attribute description in the output of the read-resource-description operation to learn more about the deprecation.
7月 03, 2021 7:06:45 午後 org.jboss.as.server.ApplicationServerService stop
INFO: WFLYSRV0050: WildFly Full 24.0.0.Final (WildFly Core 16.0.0.Final) stopped in 10ms
[INFO] Executing CLI, Server configuration
[INFO] CLI scripts execution done.

WARファイル以外に、-bootable.jarと付いたJARファイルが作成されます。125Mありますけど。

$ ll -h target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT*.*
-rw-rw-r-- 1 xxxxx xxxxx 125M  7月  3 19:07 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT-bootable.jar
-rw-rw-r-- 1 xxxxx xxxxx 4.9K  7月  3 19:06 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war

このJARファイルを実行すると

$ java -jar target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT-bootable.jar

文字通りWildFlyが起動します。

19:08:35,107 INFO  [org.jboss.as] (Controller Boot Thread) WFLYSRV0025: WildFly Full 24.0.0.Final (WildFly Core 16.0.0.Final) started in 4582ms - Started 318 of 463 services (221 services are lazy, passive or on-demand

WARファイルは、ROOT.warとしてデプロイされます。

19:08:35,068 INFO  [org.jboss.as.server] (Controller Boot Thread) WFLYSRV0010: Deployed "wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war" (runtime-name : "ROOT.war")

このため、コンテキストパスは/でアクセスできます。

$ curl localhost:8080/message
Hello World!!


$ curl localhost:8080/message?word=WildFly
Hello WildFly!!

動作確認もできましたね。

Bootable JAR実行時に使える引数は、こちら。

Bootable JAR arguments

起動は、wildfly-jar:runやwildfly-jar:startでも行うことができます。

$ mvn package wildfly-jar:run

wildfly-jar:runとwildfly-jar:startの違いは、バックグラウンドで動作するかどうかとなります。

wildfly-jar:startで起動した場合は、wildfly-jar:shutdownで停止する必要があります。

$ mvn wildfly-jar:shutdown

コンテキストパスを変更する場合は、contextRootというWildFly Bootable JAR Maven Pluginの設定で行うようです。

URL context path of deployed application

executionを設定しない場合は?

以下のexecutionの指定を入れない場合、

                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>

Bootable JARを作成するためのコマンドはこうなります。

$ mvn package wildfly-jar:package

先にpackageゴールを指定しておくことが必須となり、wildfly-jar:package単体では動作しません。

あくまで、デプロイ対象を先に作っておくことがポイントになります。Hollow JARの場合は微妙ですが。

wildfly-jar:runをこの前提で書こうとすると、けっこう長くなります…。

$ mvn package wildfly-jar:package wildfly-jar:run

設定を行う

Bootable JARとして構成されるWildFlyの設定を変更する方法はいくつかありますが、基本はパッケージング時でしょうね。

WildFly CLIのスクリプトを使って設定することになります。

Configuring the server during packaging

今回は、扱いません…。

コンポーネントをカスタマイズする

次に、利用するコンポーネントをカスタマイズしてみましょう。

Galleon configuration

Composing custom server with Galleon layers

ここで出てくるGalleonというのは、複数のコンポーネントで構成されるソフトウェアを作成、プロビジョニングするための
ツールです。

Galleon Documentation

要するに、WildFly Bootable JAR Maven Pluginでは、このGalleonを使って利用するコンポーネントを選択することになります。

デフォルトでは、standalone-microprofile.xml相当の構成でBootable JARが作成されるようです。

Specifying the set of Galleon layers to use

こちらでしょうかね?

https://github.com/wildfly/wildfly/blob/24.0.0.Final/galleon-pack/galleon-content/src/main/resources/configs/standalone/standalone-microprofile.xml/config.xml

コンポーネントの設定を行うには、2つの書き方があるようです。

とはいえ、互いに排他的な関係ではなさそうですが。

ただ、feature-packを使った場合はfeature-pack-locationを使ったWildFlyのバージョン指定はできなくなり、
これもfeature-packとして指定することになります。

<feature-packs>
    <feature-pack>
        <location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</location>
    </feature-pack>
    <feature-pack>
        <groupId>org.wildfly</groupId>
        <artifactId>wildfly-datasources-galleon-pack</artifactId>
        <version>1.0.6.Final</version>
    </feature-pack>
</feature-packs>

データソースを扱う場合などは、feature-packを使うことになります。

今回は、Galleon layerのみを使います。

Galleon layerにも2種類あり、基本なlayer(Basic layer)と、その組み合わせで表現されるFoundational layerがあります。

この2つのリストと、先ほどのstandalone-microprofile.xmlに書かれているlayersの内容を見ると、どのようなlayerが
選択されているのかがわかるでしょう。

    <layers>
        <include name="cloud-server"/>
        <include name="h2-default-datasource"/>
        <include name="microprofile-fault-tolerance"/>
        <include name="microprofile-health"/>
        <include name="microprofile-jwt"/>
        <include name="microprofile-metrics"/>
        <include name="microprofile-openapi"/>
        <include name="microprofile-opentracing"/>
        <include name="microprofile-rest-client"/>
        <include name="undertow-legacy-https"/>
        <exclude name="management-security-realm"/>
    </layers>

https://github.com/wildfly/wildfly/blob/24.0.0.Final/galleon-pack/galleon-content/src/main/resources/configs/standalone/standalone-microprofile.xml/config.xml

また、layerは除外することもできます。

Excluding Galleon layers

今回は、jaxrs-serverとmanagementを入れておきましょう。

                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</feature-pack-location>
                    <layers>
                        <layer>jaxrs-server</layer>
                        <layer>management</layer>
                    </layers>
                </configuration>

パッケージングすると
※今回はexecutionの設定は外しています

$ mvn package wildfly-jar:package

最初よりも、10MBほど小さくなりました。

$ ll -h target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT*.*
-rw-rw-r-- 1 xxxxx xxxxx 114M  7月  3 20:02 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT-bootable.jar
-rw-rw-r-- 1 xxxxx xxxxx 4.9K  7月  3 20:01 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war

jaxrs-serverはFoundational layerであり、もう少し削れます。

Basic layerである、jaxrsとcdiにしてみましょう。

                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</feature-pack-location>
                    <layers>
                        <layer>jaxrs</layer>
                        <layer>cdi</layer>
                        <layer>management</layer>
                    </layers>
                </configuration>

一気に半分くらいになりました。

$ ll -h target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT*.*
-rw-rw-r-- 1 xxxxx xxxxx  64M  7月  3 20:06 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT-bootable.jar
-rw-rw-r-- 1 xxxxx xxxxx 4.9K  7月  3 20:06 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war

こうやって、必要なコンポーネントを選んで、カスタマイズしていくことができます。

Basic layerには依存もあり、このあたりは自動的に解決してくれます。

ちなみに、management layerはリモートアクセスでの管理機能を提供するもので、WildFly Bootable JAR Maven Pluginでは
wildfly-jar:shutdownで使われるので、入れておいた方が良い気がします。

開発で使う

設定は、以下に1度戻します。

                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</feature-pack-location>
                    <layers>
                        <layer>jaxrs-server</layer>
                        <layer>management</layer>
                    </layers>
                </configuration>

ここからは、開発用途での使い方を書いていきましょう。

JARファイルをスリムにする

ここまで見てきた通り、Bootable JARはそれなりのサイズになります。

実環境では使えなくなるとは思いますが、WildFlyのモジュールをローカルのMavenリポジトリから使用することで、
JARファイルのサイズを小さくすることができます。

Provisioning a slim bootable JAR

plugin-optionsとして、jboss-maven-distを追加します。

                    <plugin-options>
                        <jboss-maven-dist/>
                    </plugin-options>

configurationとしては、こうなります。

                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#24.0.0.Final</feature-pack-location>
                    <layers>
                        <layer>jaxrs-server</layer>
                        <layer>management</layer>
                    </layers>
                    <plugin-options>
                        <jboss-maven-dist/>
                    </plugin-options>
                </configuration>

これでパッケージングすると

$ mvn package wildfly-jar:package

Bootable JARファイルのサイズが、一気に数Mまで小さくなります。

$ ll -h target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT*.*
-rw-rw-r-- 1 xxxxx xxxxx 3.9M  7月  3 20:16 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT-bootable.jar
-rw-rw-r-- 1 xxxxx xxxxx 4.9K  7月  3 20:16 target/wildfly-bootable-jar-example-0.0.1-SNAPSHOT.war

パッケージングなどにかかる時間も、短くなります。

ソースコードの監視を使った開発モード

次は、WildFly Bootable JAR Maven Pluginでの、開発中に便利なモードを使っていきましょう。

まずは、ソースコードの監視を使った開発モードです。

Development mode with source watching

使い方は簡単で、以下のコマンドを実行します。

$ mvn wildfly-jar:dev-watch

すると、空のサーバー(Hollow JAR)が起動します。

[INFO] Hollow jar, No application deployment added to server.

あとは、ソースコードの変更を検知すると自動で再コンパイル&デプロイを行い、変更をサーバーに反映してくれます。

targetディレクトリが空の状態で起動しても、勝手にパッケージング&デプロイします。

$ curl localhost:8080/message
Hello World!!

起動後にソースコードを変更すると

@ApplicationScoped
public class MessageService {
    public String message(String word) {
        return String.format("Hello %s!??", word);
    }
}

変更を検出して再デプロイが行われます。

[INFO] Changes detected - recompiling the module!
[INFO] Compiling 3 source files to /path/to/target/classes
[INFO] Exploding webapp
[INFO] Assembling webapp [wildfly-bootable-jar-example] in [/path/to/target/deployments/ROOT.war]
[INFO] Processing war project
[INFO] Webapp assembled in [5 msecs]
20:26:16,524 INFO  [org.wildfly.extension.undertow] (ServerService Thread Pool -- 9) WFLYUT0022: Unregistered web context: '/' from server 'default-server'
20:26:16,580 INFO  [org.jboss.as.server.deployment] (MSC service thread 1-3) WFLYSRV0028: Stopped deployment ROOT.war (runtime-name: ROOT.war) in 62ms
20:26:16,607 INFO  [org.jboss.as.server] (management-handler-thread - 1) WFLYSRV0009: Undeployed "ROOT.war" (runtime-name: "ROOT.war")
20:26:16,615 INFO  [org.jboss.as.server.deployment] (MSC service thread 1-2) WFLYSRV0027: Starting deployment of "ROOT.war" (runtime-name: "ROOT.war")
20:26:16,699 INFO  [org.jboss.weld.deployer] (MSC service thread 1-8) WFLYWELD0003: Processing weld deployment ROOT.war
20:26:17,025 INFO  [org.jboss.resteasy.resteasy_jaxrs.i18n] (ServerService Thread Pool -- 47) RESTEASY002225: Deploying javax.ws.rs.core.Application: class org.littlewings.jakarta.wildfly.bootable.JaxrsActivator
20:26:17,033 INFO  [org.wildfly.extension.undertow] (ServerService Thread Pool -- 47) WFLYUT0021: Registered web context: '/' for server 'default-server'
20:26:17,049 INFO  [org.jboss.as.server] (management-handler-thread - 1) WFLYSRV0010: Deployed "ROOT.war" (runtime-name : "ROOT.war")

確認。

$ curl localhost:8080/message
Hello World!??

なかなか便利ではないでしょうか?

いくつかポイント、注意事項があります。

  • サーバーはフォアグラウンドで起動するので、停止はCtrl-cで行う
  • WAR、JAR、EARをサポート
  • src/main/java(デフォルト)およびsrc/main/webapp、src/main/resourcesの変更を検知して、再デプロイする
  • pom.xmlのWildFly Bootable JAR Maven Pluginの設定変更は、サーバーの再起動が必要
  • pom.xmlのWildFly Bootable JAR Maven Plugin以外の設定変更は、再デプロイが行われる
  • コンパイルエラーでは停止しない
  • マルチモジュールはサポートしていない
  • リソースファイルのフィルタリング(include/exclude)はサポートされておらず、すべて監視対象となる
開発モードサーバー+再パッケージング

もうひとつは、先に開発モードのサーバーを起動しておき、パッケージングすると自動で再デプロイする方法です。

Development mode with repackaging

最初に、wildfly-jar:devで開発サーバーを起動します。

$ mvn wildfly-jar:dev

こちらもHollow JARです。

[INFO] Hollow jar, No application deployment added to server.

このサーバーはバックグラウンドで起動します。

この状態で-Ddevを指定してパッケージングを行うと、target/deploymentsにファイルが配置、デプロイが行われます。

$ mvn package wildfly-jar:package -Ddev

あとは、ソースコードを変更してから同じように再パッケージングすると、自動でデプロイされます。

$ mvn package wildfly-jar:package -Ddev

wildfly-jar:dev-watchとの違いは、明示的な再パッケージングがデプロイのトリガーになることですね。

なお、開発サーバーはバックグラウンドで起動したままなので、wildfly-jar:shutdownで停止します。

$ mvn wildfly-jar:shutdown

この時に、management layerが必要になります。

まとめ

WildFly Bootable JAR Maven Pluginを試してみました。けっこう便利ではないでしょうか?

実際に動かす環境ではWildFlyサーバーにデプロイするようなケースでも、手元では開発モードでBootable JARで動かして確認して、
実際の環境ではデプロイ時はWARファイルで、という使い方も良いのかなと思います。

覚えておきましょう。

Infinispan 12.1でのMarshalling/Encodingと分散処理と

これは、なにをしたくて書いたもの?

Infinispan 10.0で、デフォルトのMarshallingの仕組みがProtoStream(Protocol Buffers)になりました。

このバージョン付近からHot Rodを前面に出す雰囲気になっていたので、あまりEmbedded Modeは扱ってこなかったのですが。

そういえば、Protocol BuffersをMarshallingの仕組みに使いつつ、分散処理を行った場合はMarshallingはどういった扱いに
なるのかな?と思って試してみることにしました。

結果を見ると、頑張ればすべてをProtoStreamでのMarshallingにまとめることはできそうな気はします。
ただ、扱うクラスや構成にけっこう引っ張られそうな気も…。

今回は、あくまでEmbedded Modeでの話です(Remote Cacheの場合は、クライアント側のMarshallerも関与するので)。

Marshalling and Encoding Data

気づいていなかったのですが、いつの間にかMarshallingまわりのドキュメントが独立していました。

Marshalling and Encoding Data

こちらのドキュメントに、Cacheへ格納する際のエンコーディングついて書かれています。

Cache Encoding

エンコーディングとはメディアタイプによって識別されるものであり、InfinispanがどのようにCacheにエントリ(キー、値)を
格納するかに影響します。

Marshalling and Encoding Data / Cache encoding

Remote Cacheの場合は、このようになります。

  • Infinispan Serverは、エントリをCacheに設定されたエンコーディングで保存する
  • Hot RodやREST Clientにはリクエストにメディアタイプが含まれており、Cacheの設定と異なるメディアタイプが指定された場合、Infinispan Serverはオンデマンドで変換を行う
    • Cacheにエンコーディング設定がない場合は、Infinispan Serverはbyte[]でエントリを保存する
      • Clinetからのデータ変換要求によっては、予期しない結果になることもある
  • 複数の種類のClinet(Hot Rod、REST、Infinispan Console、CLIなど)を使用する場合は、ProtoStreamエンコーディングを推奨

Embedded Cacheの場合は、このようになります。

  • クラスタ化されたCacheの場合、byte[]にマーシャリングする必要がある
    • ローカルモードの場合は、POJOとして保存する
  • 特にMarshallerを構成しない場合は、ProtoStream Marshallerでマーシャリングされる

Infinispanでは以下のエンコーディングを使うことができ、互換性のあるエンコーディングの間で変換を行うことができます。

エンコーディング メディアタイプ 変換可能なメディアタイプ 備考
ProtoStream / Protobuf encoding application/x-protostream application/json, ? Hot Rod、REST、Infinspan Consoleで相互運用可能
Text-based cache encoding text/plain application/xml

application/json

application/x-protostream

application/x-jboss-marshalling

application/x-java-serialized
JBoss Marshalling application/x-jboss-marshalling ? 現在は非推奨。シリアライズ可能なリストを設定する必要あり
Java Serialization application/x-java-serialized-object ? ProtoStreamよりパフォーマンスに劣る。シリアライズ可能なリストを設定する必要あり

?…の部分は、ドキュメント上でハッキリとした明示はありませんが、テキストベースのメディアタイプとは変換可能
と見てよいんでしょうね。

REST Clinetなどからの利用もありますし。

Client側でのMarshalling

Client側でのMarshallingについては、各メディアタイプやテーマごとにドキュメントが書かれています。

Text-based cache encoding / Clients and text-based encoding

Marshalled Java objects / Clients and marshalled objects

Plain Old Java Objects (POJO) / Clients and POJOs

Hot Rod Clientの場合は、ProtoStreamやJava Serialization、テキストベースのMarshallerを使用します。

REST Clientの場合は、JSONやXML、テキストを使用します。

データ変換

さらに、Cacheからエントリを取得する際に、異なるメディアタイプに変換することも可能なようです。

Data conversion

今回は、ここは見ません。

分散処理では?

ところで、Infinispanといえば分散処理もあるでしょう。このあたりでしょうか。

このあたりの機能について書かれているところを見ると、Marshallingについては基本的にSerializableを想定して
書かれています。

Cluster Executorはドキュメント上は記載がありませんが、こちらもSerializableが想定されています。

ClusterExecutor (Infinispan JavaDoc 12.1.4.Final API)

よって、ProtoStream(Protocol Buffers)とはまた別ですね。

Cacheに格納するエントリはProtoStreamでMarshallingする場合、このあたりのとの関係はどうなるのでしょうか?

ということで試してみます。

お題

以下のお題で、試してみたいと思います。

  • Cacheに格納する値には自分でクラスを用意し、ProtoStreamでMarshallingする
    • ネタは書籍で
  • Distributed StreamおよびFunctional Mapを使って、Nodeをまたぐ処理を使う
    • ここで受け渡す処理の結果を表すクラスも、自分でクラスを用意する

この時に、Marshallingをどうしたらいいか?というのを試していきます。

環境

今回の環境は、こちら。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"

準備

Maven依存関係などは、こちら。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>12.1.4.Final</version>
        </dependency>

        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <version>4.4.1.Final</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.20.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.31</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

Infinispanは12.1.4.Finalを使い、動作確認はテストコードで行います。

ログライブラリはSLF4J+Logbackとし、なにも設定しない場合のj.u.l.Loggerよりは見やすいログにしておきます。

src/test/resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

Cacheに格納する値

Cacheに格納するのは、書籍をお題にこちらのクラスとします。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/ProtoBook.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class ProtoBook {
    @ProtoField(number = 1)
    String isbn;

    @ProtoField(number = 2)
    String title;

    @ProtoField(number = 3, defaultValue = "0")
    int price;

    @ProtoFactory
    public static ProtoBook create(String isbn, String title, int price) {
        ProtoBook book = new ProtoBook();

        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    // getter/setterは省略
}

キーは、Stringを使うことにしましょう。つまり、Cache<String, ProtoBook>です。

自分で定義するクラスをProtoStreamでMarshallingする場合は、こちらのドキュメントに従ってアノテーションを付与したり
します。

Marshalling custom objects with ProtoStream

SerializationContextInitializerの作成も必要です。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/EntityInitializer.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = ProtoBook.class,
        schemaFileName = "entities.proto",
        schemaFilePath = "proto",
        schemaPackageName = "entity"
)
public interface EntityInitializer extends SerializationContextInitializer {
}

今回は、このProtoBookをCacheに格納し、priceフィールドを扱う分散処理を書くことにしましょう。

テストコードの雛形

では、まずはテストコードの雛形を作成します。

src/test/java/org/littlewings/infinispan/distexec/protostream/ProtoStreamDistExecTest.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;
import org.littlewings.infinispan.distexec.protostream.entity.Summary;

import static org.assertj.core.api.Assertions.assertThat;

public class ProtoStreamDistExecTest {
    Logger logger = Logger.getLogger(ProtoStreamDistExecTest.class);

    List<ProtoBook> books =
            List.of(
                    ProtoBook.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    ProtoBook.create("978-1849518222", "Infinispan Data Grid Platform", 3608),
                    ProtoBook.create("978-0359439379", "The Apache Ignite Book", 7686),
                    ProtoBook.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342),
                    ProtoBook.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638),
                    ProtoBook.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209),
                    ProtoBook.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297),
                    ProtoBook.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817),
                    ProtoBook.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516),
                    ProtoBook.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989)
            );

    <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> func) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(manager -> manager.getCache(cacheName));

        try {
            Cache<K, V> cache = managers.get(0).getCache(cacheName);

            func.accept(cache);
        } finally {
            managers.forEach(manager -> manager.stop());
        }
    }

    // ここに、テストを書く
}

簡単にクラスタを構成できるメソッド付き。

infinispan.xmlの内容は、また後で書きます。

こんなメソッドで動作確認。ここで、bookCacheというのはownersが1のDistributed Cacheとします。
また、Nodeは3つにしてクラスタを構成しています。以降に出てくるテストコードも全部3 Nodeにします。

    @Test
    public void simple() {
        this.<String, ProtoBook>withCache("bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            assertThat(cache).hasSize(books.size());

            ProtoBook infinispanBook = cache.get("978-1782169970");
            assertThat(infinispanBook.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide");
            assertThat(infinispanBook.getPrice()).isEqualTo(5344);

            DistributionManager dm = cache.getAdvancedCache().getDistributionManager();
            cache.keySet().forEach(isbn -> {
                LocalizedCacheTopology cacheTopology = dm.getCacheTopology();
                logger.infof("isbn = %s, members = %s", isbn, cacheTopology.getWriteOwners(isbn));
            });
        });
    }

ログ出力しているので、このコードを動作させると、各キーがどのNodeに配置されているか確認できるでしょう。

Distributed Streamを使った処理を書く

では、Distributed Streamを使った処理を書いてみましょう。

src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.Serializable;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.Price;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;

public class StreamSummaryTask implements Serializable {
    private static final long serialVersionUID = 1L;

    Logger logger = Logger.getLogger(StreamSummaryTask.class);

    transient Cache<String, ProtoBook> cache;

    public StreamSummaryTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    logger.infof("[execute] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    logger.infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

    public SerializableSummary execute(int greaterThanPrice) {
        return cache
                .values()
                .stream()
                .filter(book -> {
                    logger.infof("[execute, filter price] filter %s", book.getIsbn());
                    return book.getPrice() > greaterThanPrice;
                })
                .map(book -> {
                    logger.infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    logger.infof("[execute, filter price] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

いきなりでなんですが、このクラスがすでにSerializableです。

public class StreamSummaryTask implements Serializable {

処理の内容自体は、Cacheに格納されているProtoBookクラスのpriceを合算しているだけです。

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    logger.infof("[execute] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    logger.infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

CacheStream#filterを使っているバージョンもありますが。

処理の内容は単純ですが、どこでSerializableが求められるのかをわかるようにするために、各種中間・終端操作の
戻り値を自分で用意したクラスにしておきました。

Priceクラスは、こんな定義です。こちらはSerializableではありません。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/Price.java

package org.littlewings.infinispan.distexec.protostream.entity;

public class Price {
    String isbn;
    int value;

    public static Price create(String isbn, int value) {
        Price price = new Price();

        price.setIsbn(isbn);
        price.setValue(value);

        return price;
    }

    // getter/setterは省略
}

SerializableSummaryクラスは、Serializableを実装します。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/SerializableSummary.java

package org.littlewings.infinispan.distexec.protostream.entity;

import java.io.Serializable;

public class SerializableSummary implements Serializable {
    private static final long serialVersionUID = 1L;

    int value;

    public static SerializableSummary create(int value) {
        SerializableSummary summary = new SerializableSummary();

        summary.setValue(value);

        return summary;
    }

    // getter/setter
}

つまり、CacheStream#collectではSerializableが必要だ、ということになります。
※とはいえ、今回のソースコードでシリアライズが必要になるのはLoggerをフィールド(ローカル変数でも同じ)に持ち、それを分散処理ないで参照しているからであり、このためにStreamSummaryTaskはSerializableを実装しています

後にも書きますが、CacheStream#mapについては今回はSerializableは実は求められていません。

テストコードは、こちら。

    @Test
    public void distributedStream() {
        this.<String, ProtoBook>withCache("bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            StreamSummaryTask summaryTask = new StreamSummaryTask(cache);

            SerializableSummary totalPriceSummary = summaryTask.execute();
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);

            SerializableSummary filteredPriceSummary = summaryTask.execute(5000);
            assertThat(filteredPriceSummary.getValue()).isEqualTo(25669);
        });
    }

このテストコードを動作させるための、Infinispanの設定ファイルはこちらです。

src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd"
        xmlns="urn:infinispan:config:12.1">
    <cache-container shutdown-hook="REGISTER">
        <transport cluster="ispn-cluster" stack="udp"/>

        <distributed-cache name="bookCache" owners="1">
            <encoding>
                <!-- <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> を入れたら明示が必要 -->
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
        </distributed-cache>

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <!-- <context-initializer class="org.littlewings.infinispan.distexec.protostream.entity.EntityInitializerImpl"/>  自動生成されるので不要 -->
            <allow-list>
                <class>org.littlewings.infinispan.distexec.protostream.StreamSummaryTask</class>
                <class>org.littlewings.infinispan.distexec.protostream.FunctionalMapSummaryTask</class>
                <regex>org\.littlewings\.infinispan\.distexec\.protostream\.entity\.Serializable.+</regex>
                <regex>org\.jboss\.logging\..+</regex>
            </allow-list>
        </serialization>
    </cache-container>
</infinispan>

今回のソースコードを、<serialization>を設定しないまま動作させようとすると、Distributed Streamを使っているクラスを
Marshallingしようとして失敗します。

22:40:47.727 [main] WARN  org.infinispan.PERSISTENCE - ISPN000559: Cannot marshall 'class org.infinispan.marshall.protostream.impl.MarshallableUserObject'
java.lang.IllegalArgumentException: No marshaller registered for object of Java type org.littlewings.infinispan.distexec.protostream.StreamSummaryTask : org.littlewings.infinispan.distexec.protostream.StreamSummaryTask@161dd92a

このクラスを、ProtoStreamでシリアライズするのはさすがにできませんね…。

このため、MarshallerとしてJavaSerializationMarshallerを指定し、Java標準のシリアライズの仕組みも使えるように
します。

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <allow-list>
                <class>org.littlewings.infinispan.distexec.protostream.StreamSummaryTask</class>
                <class>org.littlewings.infinispan.distexec.protostream.FunctionalMapSummaryTask</class>
                <regex>org\.littlewings\.infinispan\.distexec\.protostream\.entity\.Serializable.+</regex>
                <regex>org\.jboss\.logging\..+</regex>
            </allow-list>
        </serialization>

この時、シリアライズできるクラスは明示的に指定する必要があり、regex(正規表現)またはclass(クラス名)で
指定します。
※次に使うクラスの名前(FunctionalMapSummaryTask)も含まれていますが

Using Java serialization

ここに登録していないクラスは、Java標準のシリアライズを行う際にInfinispanから拒否されます。

ちなみに、SerializationContextInitializerを自分で作成していますが、これはServiceLoaderの仕組みで自動登録されるため
今回は指定不要です。

Registering serialization context initializers

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <!-- <context-initializer class="org.littlewings.infinispan.distexec.protostream.entity.EntityInitializerImpl"/>  自動生成されるので不要 -->

複数のSerializationContextInitializerを使う場合、ひとつでも明示的に登録した場合は自動登録が無効になるため、
すべて明示的に指定する必要があるようです。

そして、JavaSerializationMarshallerをMarshallerを指定した段階で、特になにもしないとCacheにもJava標準の
シリアライズの仕組みでエントリを登録しようとします。

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">

つまり、ProtoBookがSerializableであることを求めてきます。

そうではなく、ProtoBook、つまりCacheに格納するエントリはProtoStreamでMarshallingを行う場合は、エンコーディングを
明示します。

Encoding caches as ProtoStream

        <distributed-cache name="bookCache" owners="1">
            <encoding>
                <!-- <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> を入れたら明示が必要 -->
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
        </distributed-cache>

これで、CacheにはProtoStreamでMarshallingしつつ、分散処理はJava標準のシリアライズの仕組みで動作するように
設定できました。

先ほどのテストもパスします。

Functional Map

次は、Functional Map。今回はReadOnlyMapを使うことにしました。

Read-Only Map API

src/main/java/org/littlewings/infinispan/distexec/protostream/FunctionalMapSummaryTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.Serializable;
import java.util.Set;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.Traversable;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadOnlyMapImpl;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.SerializablePrice;
import org.littlewings.infinispan.distexec.protostream.entity.Summary;

public class FunctionalMapSummaryTask implements Serializable {
    private static final long serialVersionUID = 1L;

    Logger logger = Logger.getLogger(FunctionalMapSummaryTask.class);

    transient Cache<String, ProtoBook> cache;

    public FunctionalMapSummaryTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public Summary execute(Set<String> isbns) {
        FunctionalMapImpl<String, ProtoBook> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
        FunctionalMap.ReadOnlyMap<String, ProtoBook> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

        Traversable<SerializablePrice> prices = readOnlyMap.evalMany(isbns, view -> {
            String isbn = view.key();
            ProtoBook book = view.get();

            logger.infof("[execute] get %s", isbn);
            return SerializablePrice.create(view.key(), book.getPrice());
        });

        return prices
                .collect(
                        Collectors.reducing(
                                Summary.create(0),
                                p -> {
                                    logger.infof("[execute] collect %s", p.getIsbn());
                                    return Summary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return Summary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

Functional Mapの場合、eval〜メソッドで扱う処理がSerializableである必要があるようです。なので、メソッドの戻り値に
使う値は今回はSerializableになりました。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/SerializablePrice.java

package org.littlewings.infinispan.distexec.protostream.entity;

import java.io.Serializable;

public class SerializablePrice implements Serializable {
    private static final long serialVersionUID = 1L;

    String isbn;
    int value;

    public static SerializablePrice create(String isbn, int value) {
        SerializablePrice price = new SerializablePrice();

        price.setIsbn(isbn);
        price.setValue(value);

        return price;
    }

    // getter/setterは省略
}

Traversable#collectを使っても、こちらはローカルで動作するようで、Serializableは求められません。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/Summary.java

package org.littlewings.infinispan.distexec.protostream.entity;

public class Summary {
    int value;

    public static Summary create(int value) {
        Summary summary = new Summary();

        summary.setValue(value);

        return summary;
    }

    // getter/setterは省略
}

テストコードは、こちら。

    @Test
    public void functionalMap() {
        this.<String, ProtoBook>withCache("bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            FunctionalMapSummaryTask summaryTask = new FunctionalMapSummaryTask(cache);

            Set<String> isbns = books.stream().map(ProtoBook::getIsbn).collect(Collectors.toSet());

            Summary totalPriceSummary = summaryTask.execute(isbns);
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);
        });
    }

Infinispanの設定は、Distributed Streamの時と同じです(含めています)。

もう少し

ところで、ここまでSerializableなものがたくさん出てきました。

実は、もうちょっと気をつけると同じ処理でSerializableであるものを減らすことができます。

Serializable

Distributed Streamを扱ったクラスを、もうちょっと変えてみましょう。

src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryReturnOnlySerializableTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.Price;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;

public class StreamSummaryReturnOnlySerializableTask {
    Cache<String, ProtoBook> cache;

    public StreamSummaryReturnOnlySerializableTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

    public SerializableSummary execute(int greaterThanPrice) {
        return cache
                .values()
                .stream()
                .filter(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] filter %s", book.getIsbn());
                    return book.getPrice() > greaterThanPrice;
                })
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

クラスの定義から、Serializableを外しました。フィールドに定義しているCacheも、transientではなくなっています。

public class StreamSummaryReturnOnlySerializableTask {
    Cache<String, ProtoBook> cache;

また、ログ出力はCacheStreamのメソッド内でLoggerを取得して行うようにしています。

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

こうすると、CacheStream#collectメソッドの戻り値だけがSerializableであれば良くなります。

上記クラスを動かすためのInfinispanの設定は、こちら。

src/test/resources/infinispan-serializable.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd"
        xmlns="urn:infinispan:config:12.1">
    <cache-container shutdown-hook="REGISTER">
        <transport cluster="ispn-cluster" stack="udp"/>

        <distributed-cache name="bookCache" owners="1">
            <encoding>
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
        </distributed-cache>

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <allow-list>
                <class>org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary</class>
            </allow-list>
        </serialization>
    </cache-container>
</infinispan>

テストコードは、最後にまとめて載せます。

ProtoStreamにする

ここまでくると、CacheStream#collectの戻り値をProtoStreamにすることもできるのでは?という気もします。

できます。

src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryReturnOnlyProtoTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.Price;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary;

public class StreamSummaryReturnOnlyProtoTask {
    Cache<String, ProtoBook> cache;

    public StreamSummaryReturnOnlyProtoTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public ProtoSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                ProtoSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute] collect %s", p.getIsbn());
                                    return ProtoSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return ProtoSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

    public ProtoSummary execute(int greaterThanPrice) {
        return cache
                .values()
                .stream()
                .filter(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] filter %s", book.getIsbn());
                    return book.getPrice() > greaterThanPrice;
                })
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                ProtoSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] collect %s", p.getIsbn());
                                    return ProtoSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue());
                                    return ProtoSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

CacheStream#collectで使うクラスを、ProtoSummaryというクラスにしました。ProtoStreamでMarshallingすることを
想定したクラスです。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/ProtoSummary.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class ProtoSummary {
    @ProtoField(number = 1, defaultValue = "0")
    int value;

    @ProtoFactory
    public static ProtoSummary create(int value) {
        ProtoSummary summary = new ProtoSummary();

        summary.setValue(value);

        return summary;
    }

    // getter/setterは省略
}

SerializationContextInitializerで、このクラスも対象に追加。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/EntityInitializer.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = {ProtoBook.class, ProtoSummary.class},
        schemaFileName = "entities.proto",
        schemaFilePath = "proto",
        schemaPackageName = "entity"
)
public interface EntityInitializer extends SerializationContextInitializer {
}

こうすると、Infinispanの設定からserializationを削除することができます。encodingも外せます。

src/test/resources/infinispan-proto.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd"
        xmlns="urn:infinispan:config:12.1">
    <cache-container shutdown-hook="REGISTER">
        <transport cluster="ispn-cluster" stack="udp"/>

        <distributed-cache name="bookCache" owners="1"/>
    </cache-container>
</infinispan>
テストコード

ここまでの2種類のDistributed Streamsを扱うテストコードは、こちら。

src/test/java/org/littlewings/infinispan/distexec/protostream/ProtoStreamDistExecSimplyTest.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.junit.jupiter.api.Test;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;

import static org.assertj.core.api.Assertions.assertThat;

public class ProtoStreamDistExecSimplyTest {
    List<ProtoBook> books =
            List.of(
                    ProtoBook.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    ProtoBook.create("978-1849518222", "Infinispan Data Grid Platform", 3608),
                    ProtoBook.create("978-0359439379", "The Apache Ignite Book", 7686),
                    ProtoBook.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342),
                    ProtoBook.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638),
                    ProtoBook.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209),
                    ProtoBook.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297),
                    ProtoBook.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817),
                    ProtoBook.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516),
                    ProtoBook.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989)
            );

    <K, V> void withCache(String configurationXmlPath, String cacheName, int numInstances, Consumer<Cache<K, V>> func) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager(configurationXmlPath);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(manager -> manager.getCache(cacheName));

        try {
            Cache<K, V> cache = managers.get(0).getCache(cacheName);

            func.accept(cache);
        } finally {
            managers.forEach(manager -> manager.stop());
        }
    }

    @Test
    public void returnOnlySerializable() {
        this.<String, ProtoBook>withCache("infinispan-serializable.xml", "bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            StreamSummaryReturnOnlySerializableTask summaryTask = new StreamSummaryReturnOnlySerializableTask(cache);

            SerializableSummary totalPriceSummary = summaryTask.execute();
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);

            SerializableSummary filteredPriceSummary = summaryTask.execute(5000);
            assertThat(filteredPriceSummary.getValue()).isEqualTo(25669);
        });
    }

    @Test
    public void returnOnlyProto() {
        this.<String, ProtoBook>withCache("infinispan-proto.xml", "bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            StreamSummaryReturnOnlyProtoTask summaryTask = new StreamSummaryReturnOnlyProtoTask(cache);

            ProtoSummary totalPriceSummary = summaryTask.execute();
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);

            ProtoSummary filteredPriceSummary = summaryTask.execute(5000);
            assertThat(filteredPriceSummary.getValue()).isEqualTo(25669);
        });
    }
}

今回は、Cacheを使う時にInfinispanの設定ファイルも指定できるようにしています。

        this.<String, ProtoBook>withCache("infinispan-serializable.xml", "bookCache", 3, cache -> {


        this.<String, ProtoBook>withCache("infinispan-proto.xml", "bookCache", 3, cache -> {

こんなところでしょうか。

まとめ

CacheにProtoStreamでMarshallingしたエントリを格納しつつ、分散処理を使う場合のMarshallingはどうなるのか?
ということをいくつかバリエーションを入れて確認してみました。

結果を見れば、分散処理に渡すLambda式(またはインスタンス)にMarshallingが必要になるものを絞り込み、
かつすべてProtoStreamでMarshalling可能なように作成すれば、ProtoStreamでMarshallingの仕組みを統一することも
できるということは言えそうです。

ただ、ProtoStreamでMarshallingするのが難しそうなものや、サードパーティ製のライブラリのクラスなどを扱う場合に
どうしようという感じでしょうか。

サードパーティ製のものであっても、可能なそうであれば@ProtoAdapterアノテーションを使ってProtoStreamでの
Marshallingはできそうですけどね。

Creating ProtoStream adapter classes

いけるところまでは、ProtoStreamでMarshallingするように寄せていった方がいいのでしょうか?
このブログでは、いったんその方向で頑張ってみますか。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distexec-protostream