CLOVER🍀

That was when it all began.

Infinispan 12.1でのMarshalling/Encodingと分散処理と

これは、なにをしたくて書いたもの?

Infinispan 10.0で、デフォルトのMarshallingの仕組みがProtoStream(Protocol Buffers)になりました。

このバージョン付近からHot Rodを前面に出す雰囲気になっていたので、あまりEmbedded Modeは扱ってこなかったのですが。

そういえば、Protocol BuffersをMarshallingの仕組みに使いつつ、分散処理を行った場合はMarshallingはどういった扱いに
なるのかな?と思って試してみることにしました。

結果を見ると、頑張ればすべてをProtoStreamでのMarshallingにまとめることはできそうな気はします。
ただ、扱うクラスや構成にけっこう引っ張られそうな気も…。

今回は、あくまでEmbedded Modeでの話です(Remote Cacheの場合は、クライアント側のMarshallerも関与するので)。

Marshalling and Encoding Data

気づいていなかったのですが、いつの間にかMarshallingまわりのドキュメントが独立していました。

Marshalling and Encoding Data

こちらのドキュメントに、Cacheへ格納する際のエンコーディングついて書かれています。

Cache Encoding

エンコーディングとはメディアタイプによって識別されるものであり、InfinispanがどのようにCacheにエントリ(キー、値)を
格納するかに影響します。

Marshalling and Encoding Data / Cache encoding

Remote Cacheの場合は、このようになります。

  • Infinispan Serverは、エントリをCacheに設定されたエンコーディングで保存する
  • Hot RodやREST Clientにはリクエストにメディアタイプが含まれており、Cacheの設定と異なるメディアタイプが指定された場合、Infinispan Serverはオンデマンドで変換を行う
    • Cacheにエンコーディング設定がない場合は、Infinispan Serverはbyte[]でエントリを保存する
      • Clinetからのデータ変換要求によっては、予期しない結果になることもある
  • 複数の種類のClinet(Hot Rod、REST、Infinispan Console、CLIなど)を使用する場合は、ProtoStreamエンコーディングを推奨

Embedded Cacheの場合は、このようになります。

  • クラスタ化されたCacheの場合、byte[]にマーシャリングする必要がある
    • ローカルモードの場合は、POJOとして保存する
  • 特にMarshallerを構成しない場合は、ProtoStream Marshallerでマーシャリングされる

Infinispanでは以下のエンコーディングを使うことができ、互換性のあるエンコーディングの間で変換を行うことができます。

エンコーディング メディアタイプ 変換可能なメディアタイプ 備考
ProtoStream / Protobuf encoding application/x-protostream application/json, ? Hot Rod、REST、Infinspan Consoleで相互運用可能
Text-based cache encoding text/plain application/xml

application/json

application/x-protostream

application/x-jboss-marshalling

application/x-java-serialized
JBoss Marshalling application/x-jboss-marshalling 現在は非推奨。シリアライズ可能なリストを設定する必要あり
Java Serialization application/x-java-serialized-object ProtoStreamよりパフォーマンスに劣る。シリアライズ可能なリストを設定する必要あり

?…の部分は、ドキュメント上でハッキリとした明示はありませんが、テキストベースのメディアタイプとは変換可能
と見てよいんでしょうね。

REST Clinetなどからの利用もありますし。

Client側でのMarshalling

Client側でのMarshallingについては、各メディアタイプやテーマごとにドキュメントが書かれています。

Text-based cache encoding / Clients and text-based encoding

Marshalled Java objects / Clients and marshalled objects

Plain Old Java Objects (POJO) / Clients and POJOs

Hot Rod Clientの場合は、ProtoStreamやJava Serialization、テキストベースのMarshallerを使用します。

REST Clientの場合は、JSONXML、テキストを使用します。

データ変換

さらに、Cacheからエントリを取得する際に、異なるメディアタイプに変換することも可能なようです。

Data conversion

今回は、ここは見ません。

分散処理では?

ところで、Infinispanといえば分散処理もあるでしょう。このあたりでしょうか。

このあたりの機能について書かれているところを見ると、Marshallingについては基本的にSerializableを想定して
書かれています。

Cluster Executorはドキュメント上は記載がありませんが、こちらもSerializableが想定されています。

ClusterExecutor (Infinispan JavaDoc 12.1.4.Final API)

よって、ProtoStream(Protocol Buffers)とはまた別ですね。

Cacheに格納するエントリはProtoStreamでMarshallingする場合、このあたりのとの関係はどうなるのでしょうか?

ということで試してみます。

お題

以下のお題で、試してみたいと思います。

  • Cacheに格納する値には自分でクラスを用意し、ProtoStreamでMarshallingする
    • ネタは書籍で
  • Distributed StreamおよびFunctional Mapを使って、Nodeをまたぐ処理を使う
    • ここで受け渡す処理の結果を表すクラスも、自分でクラスを用意する

この時に、Marshallingをどうしたらいいか?というのを試していきます。

環境

今回の環境は、こちら。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"

準備

Maven依存関係などは、こちら。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>12.1.4.Final</version>
        </dependency>

        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <version>4.4.1.Final</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.20.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.31</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

Infinispanは12.1.4.Finalを使い、動作確認はテストコードで行います。

ログライブラリはSLF4J+Logbackとし、なにも設定しない場合のj.u.l.Loggerよりは見やすいログにしておきます。

src/test/resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

Cacheに格納する値

Cacheに格納するのは、書籍をお題にこちらのクラスとします。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/ProtoBook.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class ProtoBook {
    @ProtoField(number = 1)
    String isbn;

    @ProtoField(number = 2)
    String title;

    @ProtoField(number = 3, defaultValue = "0")
    int price;

    @ProtoFactory
    public static ProtoBook create(String isbn, String title, int price) {
        ProtoBook book = new ProtoBook();

        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    // getter/setterは省略
}

キーは、Stringを使うことにしましょう。つまり、Cache<String, ProtoBook>です。

自分で定義するクラスをProtoStreamでMarshallingする場合は、こちらのドキュメントに従ってアノテーションを付与したり
します。

Marshalling custom objects with ProtoStream

SerializationContextInitializerの作成も必要です。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/EntityInitializer.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = ProtoBook.class,
        schemaFileName = "entities.proto",
        schemaFilePath = "proto",
        schemaPackageName = "entity"
)
public interface EntityInitializer extends SerializationContextInitializer {
}

今回は、このProtoBookをCacheに格納し、priceフィールドを扱う分散処理を書くことにしましょう。

テストコードの雛形

では、まずはテストコードの雛形を作成します。

src/test/java/org/littlewings/infinispan/distexec/protostream/ProtoStreamDistExecTest.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;
import org.littlewings.infinispan.distexec.protostream.entity.Summary;

import static org.assertj.core.api.Assertions.assertThat;

public class ProtoStreamDistExecTest {
    Logger logger = Logger.getLogger(ProtoStreamDistExecTest.class);

    List<ProtoBook> books =
            List.of(
                    ProtoBook.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    ProtoBook.create("978-1849518222", "Infinispan Data Grid Platform", 3608),
                    ProtoBook.create("978-0359439379", "The Apache Ignite Book", 7686),
                    ProtoBook.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342),
                    ProtoBook.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638),
                    ProtoBook.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209),
                    ProtoBook.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297),
                    ProtoBook.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817),
                    ProtoBook.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516),
                    ProtoBook.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989)
            );

    <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> func) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(manager -> manager.getCache(cacheName));

        try {
            Cache<K, V> cache = managers.get(0).getCache(cacheName);

            func.accept(cache);
        } finally {
            managers.forEach(manager -> manager.stop());
        }
    }

    // ここに、テストを書く
}

簡単にクラスタを構成できるメソッド付き。

infinispan.xmlの内容は、また後で書きます。

こんなメソッドで動作確認。ここで、bookCacheというのはownersが1のDistributed Cacheとします。
また、Nodeは3つにしてクラスタを構成しています。以降に出てくるテストコードも全部3 Nodeにします。

    @Test
    public void simple() {
        this.<String, ProtoBook>withCache("bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            assertThat(cache).hasSize(books.size());

            ProtoBook infinispanBook = cache.get("978-1782169970");
            assertThat(infinispanBook.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide");
            assertThat(infinispanBook.getPrice()).isEqualTo(5344);

            DistributionManager dm = cache.getAdvancedCache().getDistributionManager();
            cache.keySet().forEach(isbn -> {
                LocalizedCacheTopology cacheTopology = dm.getCacheTopology();
                logger.infof("isbn = %s, members = %s", isbn, cacheTopology.getWriteOwners(isbn));
            });
        });
    }

ログ出力しているので、このコードを動作させると、各キーがどのNodeに配置されているか確認できるでしょう。

Distributed Streamを使った処理を書く

では、Distributed Streamを使った処理を書いてみましょう。

src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.Serializable;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.Price;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;

public class StreamSummaryTask implements Serializable {
    private static final long serialVersionUID = 1L;

    Logger logger = Logger.getLogger(StreamSummaryTask.class);

    transient Cache<String, ProtoBook> cache;

    public StreamSummaryTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    logger.infof("[execute] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    logger.infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

    public SerializableSummary execute(int greaterThanPrice) {
        return cache
                .values()
                .stream()
                .filter(book -> {
                    logger.infof("[execute, filter price] filter %s", book.getIsbn());
                    return book.getPrice() > greaterThanPrice;
                })
                .map(book -> {
                    logger.infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    logger.infof("[execute, filter price] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

いきなりでなんですが、このクラスがすでにSerializableです。

public class StreamSummaryTask implements Serializable {

処理の内容自体は、Cacheに格納されているProtoBookクラスのpriceを合算しているだけです。

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    logger.infof("[execute] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    logger.infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

CacheStream#filterを使っているバージョンもありますが。

処理の内容は単純ですが、どこでSerializableが求められるのかをわかるようにするために、各種中間・終端操作の
戻り値を自分で用意したクラスにしておきました。

Priceクラスは、こんな定義です。こちらはSerializableではありません。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/Price.java

package org.littlewings.infinispan.distexec.protostream.entity;

public class Price {
    String isbn;
    int value;

    public static Price create(String isbn, int value) {
        Price price = new Price();

        price.setIsbn(isbn);
        price.setValue(value);

        return price;
    }

    // getter/setterは省略
}

SerializableSummaryクラスは、Serializableを実装します。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/SerializableSummary.java

package org.littlewings.infinispan.distexec.protostream.entity;

import java.io.Serializable;

public class SerializableSummary implements Serializable {
    private static final long serialVersionUID = 1L;

    int value;

    public static SerializableSummary create(int value) {
        SerializableSummary summary = new SerializableSummary();

        summary.setValue(value);

        return summary;
    }

    // getter/setter
}

つまり、CacheStream#collectではSerializableが必要だ、ということになります。
※とはいえ、今回のソースコードシリアライズが必要になるのはLoggerをフィールド(ローカル変数でも同じ)に持ち、それを分散処理ないで参照しているからであり、このためにStreamSummaryTaskSerializableを実装しています

後にも書きますが、CacheStream#mapについては今回はSerializableは実は求められていません。

テストコードは、こちら。

    @Test
    public void distributedStream() {
        this.<String, ProtoBook>withCache("bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            StreamSummaryTask summaryTask = new StreamSummaryTask(cache);

            SerializableSummary totalPriceSummary = summaryTask.execute();
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);

            SerializableSummary filteredPriceSummary = summaryTask.execute(5000);
            assertThat(filteredPriceSummary.getValue()).isEqualTo(25669);
        });
    }

このテストコードを動作させるための、Infinispanの設定ファイルはこちらです。

src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd"
        xmlns="urn:infinispan:config:12.1">
    <cache-container shutdown-hook="REGISTER">
        <transport cluster="ispn-cluster" stack="udp"/>

        <distributed-cache name="bookCache" owners="1">
            <encoding>
                <!-- <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> を入れたら明示が必要 -->
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
        </distributed-cache>

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <!-- <context-initializer class="org.littlewings.infinispan.distexec.protostream.entity.EntityInitializerImpl"/>  自動生成されるので不要 -->
            <allow-list>
                <class>org.littlewings.infinispan.distexec.protostream.StreamSummaryTask</class>
                <class>org.littlewings.infinispan.distexec.protostream.FunctionalMapSummaryTask</class>
                <regex>org\.littlewings\.infinispan\.distexec\.protostream\.entity\.Serializable.+</regex>
                <regex>org\.jboss\.logging\..+</regex>
            </allow-list>
        </serialization>
    </cache-container>
</infinispan>

今回のソースコードを、<serialization>を設定しないまま動作させようとすると、Distributed Streamを使っているクラスを
Marshallingしようとして失敗します。

22:40:47.727 [main] WARN  org.infinispan.PERSISTENCE - ISPN000559: Cannot marshall 'class org.infinispan.marshall.protostream.impl.MarshallableUserObject'
java.lang.IllegalArgumentException: No marshaller registered for object of Java type org.littlewings.infinispan.distexec.protostream.StreamSummaryTask : org.littlewings.infinispan.distexec.protostream.StreamSummaryTask@161dd92a

このクラスを、ProtoStreamでシリアライズするのはさすがにできませんね…。

このため、MarshallerとしてJavaSerializationMarshallerを指定し、Java標準のシリアライズの仕組みも使えるように
します。

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <allow-list>
                <class>org.littlewings.infinispan.distexec.protostream.StreamSummaryTask</class>
                <class>org.littlewings.infinispan.distexec.protostream.FunctionalMapSummaryTask</class>
                <regex>org\.littlewings\.infinispan\.distexec\.protostream\.entity\.Serializable.+</regex>
                <regex>org\.jboss\.logging\..+</regex>
            </allow-list>
        </serialization>

この時、シリアライズできるクラスは明示的に指定する必要があり、regex正規表現)またはclass(クラス名)で
指定します。
※次に使うクラスの名前(FunctionalMapSummaryTask)も含まれていますが

Using Java serialization

ここに登録していないクラスは、Java標準のシリアライズを行う際にInfinispanから拒否されます。

ちなみに、SerializationContextInitializerを自分で作成していますが、これはServiceLoaderの仕組みで自動登録されるため
今回は指定不要です。

Registering serialization context initializers

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <!-- <context-initializer class="org.littlewings.infinispan.distexec.protostream.entity.EntityInitializerImpl"/>  自動生成されるので不要 -->

複数のSerializationContextInitializerを使う場合、ひとつでも明示的に登録した場合は自動登録が無効になるため、
すべて明示的に指定する必要があるようです。

そして、JavaSerializationMarshallerをMarshallerを指定した段階で、特になにもしないとCacheにもJava標準の
シリアライズの仕組みでエントリを登録しようとします。

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">

つまり、ProtoBookSerializableであることを求めてきます。

そうではなく、ProtoBook、つまりCacheに格納するエントリはProtoStreamでMarshallingを行う場合は、エンコーディング
明示します。

Encoding caches as ProtoStream

        <distributed-cache name="bookCache" owners="1">
            <encoding>
                <!-- <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> を入れたら明示が必要 -->
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
        </distributed-cache>

これで、CacheにはProtoStreamでMarshallingしつつ、分散処理はJava標準のシリアライズの仕組みで動作するように
設定できました。

先ほどのテストもパスします。

Functional Map

次は、Functional Map。今回はReadOnlyMapを使うことにしました。

Read-Only Map API

src/main/java/org/littlewings/infinispan/distexec/protostream/FunctionalMapSummaryTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.Serializable;
import java.util.Set;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.Traversable;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadOnlyMapImpl;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.SerializablePrice;
import org.littlewings.infinispan.distexec.protostream.entity.Summary;

public class FunctionalMapSummaryTask implements Serializable {
    private static final long serialVersionUID = 1L;

    Logger logger = Logger.getLogger(FunctionalMapSummaryTask.class);

    transient Cache<String, ProtoBook> cache;

    public FunctionalMapSummaryTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public Summary execute(Set<String> isbns) {
        FunctionalMapImpl<String, ProtoBook> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache());
        FunctionalMap.ReadOnlyMap<String, ProtoBook> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

        Traversable<SerializablePrice> prices = readOnlyMap.evalMany(isbns, view -> {
            String isbn = view.key();
            ProtoBook book = view.get();

            logger.infof("[execute] get %s", isbn);
            return SerializablePrice.create(view.key(), book.getPrice());
        });

        return prices
                .collect(
                        Collectors.reducing(
                                Summary.create(0),
                                p -> {
                                    logger.infof("[execute] collect %s", p.getIsbn());
                                    return Summary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return Summary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

Functional Mapの場合、eval〜メソッドで扱う処理がSerializableである必要があるようです。なので、メソッドの戻り値に
使う値は今回はSerializableになりました。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/SerializablePrice.java

package org.littlewings.infinispan.distexec.protostream.entity;

import java.io.Serializable;

public class SerializablePrice implements Serializable {
    private static final long serialVersionUID = 1L;

    String isbn;
    int value;

    public static SerializablePrice create(String isbn, int value) {
        SerializablePrice price = new SerializablePrice();

        price.setIsbn(isbn);
        price.setValue(value);

        return price;
    }

    // getter/setterは省略
}

Traversable#collectを使っても、こちらはローカルで動作するようで、Serializableは求められません。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/Summary.java

package org.littlewings.infinispan.distexec.protostream.entity;

public class Summary {
    int value;

    public static Summary create(int value) {
        Summary summary = new Summary();

        summary.setValue(value);

        return summary;
    }

    // getter/setterは省略
}

テストコードは、こちら。

    @Test
    public void functionalMap() {
        this.<String, ProtoBook>withCache("bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            FunctionalMapSummaryTask summaryTask = new FunctionalMapSummaryTask(cache);

            Set<String> isbns = books.stream().map(ProtoBook::getIsbn).collect(Collectors.toSet());

            Summary totalPriceSummary = summaryTask.execute(isbns);
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);
        });
    }

Infinispanの設定は、Distributed Streamの時と同じです(含めています)。

もう少し

ところで、ここまでSerializableなものがたくさん出てきました。

実は、もうちょっと気をつけると同じ処理でSerializableであるものを減らすことができます。

Serializable

Distributed Streamを扱ったクラスを、もうちょっと変えてみましょう。

src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryReturnOnlySerializableTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.Price;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;

public class StreamSummaryReturnOnlySerializableTask {
    Cache<String, ProtoBook> cache;

    public StreamSummaryReturnOnlySerializableTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

    public SerializableSummary execute(int greaterThanPrice) {
        return cache
                .values()
                .stream()
                .filter(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] filter %s", book.getIsbn());
                    return book.getPrice() > greaterThanPrice;
                })
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

クラスの定義から、Serializableを外しました。フィールドに定義しているCacheも、transientではなくなっています。

public class StreamSummaryReturnOnlySerializableTask {
    Cache<String, ProtoBook> cache;

また、ログ出力はCacheStreamのメソッド内でLoggerを取得して行うようにしています。

    public SerializableSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                SerializableSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %s", p.getIsbn());
                                    return SerializableSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return SerializableSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

こうすると、CacheStream#collectメソッドの戻り値だけがSerializableであれば良くなります。

上記クラスを動かすためのInfinispanの設定は、こちら。

src/test/resources/infinispan-serializable.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd"
        xmlns="urn:infinispan:config:12.1">
    <cache-container shutdown-hook="REGISTER">
        <transport cluster="ispn-cluster" stack="udp"/>

        <distributed-cache name="bookCache" owners="1">
            <encoding>
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
        </distributed-cache>

        <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
            <allow-list>
                <class>org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary</class>
            </allow-list>
        </serialization>
    </cache-container>
</infinispan>

テストコードは、最後にまとめて載せます。

ProtoStreamにする

ここまでくると、CacheStream#collectの戻り値をProtoStreamにすることもできるのでは?という気もします。

できます。

src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryReturnOnlyProtoTask.java

package org.littlewings.infinispan.distexec.protostream;

import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.distexec.protostream.entity.Price;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary;

public class StreamSummaryReturnOnlyProtoTask {
    Cache<String, ProtoBook> cache;

    public StreamSummaryReturnOnlyProtoTask(Cache<String, ProtoBook> cache) {
        this.cache = cache;
    }

    public ProtoSummary execute() {
        return cache
                .values()
                .stream()
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                ProtoSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute] collect %s", p.getIsbn());
                                    return ProtoSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue());
                                    return ProtoSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }

    public ProtoSummary execute(int greaterThanPrice) {
        return cache
                .values()
                .stream()
                .filter(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] filter %s", book.getIsbn());
                    return book.getPrice() > greaterThanPrice;
                })
                .map(book -> {
                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] map %s", book.getIsbn());
                    return Price.create(book.getIsbn(), book.getPrice());
                })
                .collect(() ->
                        Collectors.reducing(
                                ProtoSummary.create(0),
                                p -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] collect %s", p.getIsbn());
                                    return ProtoSummary.create(p.getValue());
                                },
                                (s1, s2) -> {
                                    Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue());
                                    return ProtoSummary.create(s1.getValue() + s2.getValue());
                                })
                );
    }
}

CacheStream#collectで使うクラスを、ProtoSummaryというクラスにしました。ProtoStreamでMarshallingすることを
想定したクラスです。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/ProtoSummary.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class ProtoSummary {
    @ProtoField(number = 1, defaultValue = "0")
    int value;

    @ProtoFactory
    public static ProtoSummary create(int value) {
        ProtoSummary summary = new ProtoSummary();

        summary.setValue(value);

        return summary;
    }

    // getter/setterは省略
}

SerializationContextInitializerで、このクラスも対象に追加。

src/main/java/org/littlewings/infinispan/distexec/protostream/entity/EntityInitializer.java

package org.littlewings.infinispan.distexec.protostream.entity;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = {ProtoBook.class, ProtoSummary.class},
        schemaFileName = "entities.proto",
        schemaFilePath = "proto",
        schemaPackageName = "entity"
)
public interface EntityInitializer extends SerializationContextInitializer {
}

こうすると、Infinispanの設定からserializationを削除することができます。encodingも外せます。

src/test/resources/infinispan-proto.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd"
        xmlns="urn:infinispan:config:12.1">
    <cache-container shutdown-hook="REGISTER">
        <transport cluster="ispn-cluster" stack="udp"/>

        <distributed-cache name="bookCache" owners="1"/>
    </cache-container>
</infinispan>
テストコード

ここまでの2種類のDistributed Streamsを扱うテストコードは、こちら。

src/test/java/org/littlewings/infinispan/distexec/protostream/ProtoStreamDistExecSimplyTest.java

package org.littlewings.infinispan.distexec.protostream;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.junit.jupiter.api.Test;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook;
import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary;
import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary;

import static org.assertj.core.api.Assertions.assertThat;

public class ProtoStreamDistExecSimplyTest {
    List<ProtoBook> books =
            List.of(
                    ProtoBook.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    ProtoBook.create("978-1849518222", "Infinispan Data Grid Platform", 3608),
                    ProtoBook.create("978-0359439379", "The Apache Ignite Book", 7686),
                    ProtoBook.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342),
                    ProtoBook.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638),
                    ProtoBook.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209),
                    ProtoBook.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297),
                    ProtoBook.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817),
                    ProtoBook.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516),
                    ProtoBook.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989)
            );

    <K, V> void withCache(String configurationXmlPath, String cacheName, int numInstances, Consumer<Cache<K, V>> func) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager(configurationXmlPath);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(manager -> manager.getCache(cacheName));

        try {
            Cache<K, V> cache = managers.get(0).getCache(cacheName);

            func.accept(cache);
        } finally {
            managers.forEach(manager -> manager.stop());
        }
    }

    @Test
    public void returnOnlySerializable() {
        this.<String, ProtoBook>withCache("infinispan-serializable.xml", "bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            StreamSummaryReturnOnlySerializableTask summaryTask = new StreamSummaryReturnOnlySerializableTask(cache);

            SerializableSummary totalPriceSummary = summaryTask.execute();
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);

            SerializableSummary filteredPriceSummary = summaryTask.execute(5000);
            assertThat(filteredPriceSummary.getValue()).isEqualTo(25669);
        });
    }

    @Test
    public void returnOnlyProto() {
        this.<String, ProtoBook>withCache("infinispan-proto.xml", "bookCache", 3, cache -> {
            books.forEach(book -> cache.put(book.getIsbn(), book));

            StreamSummaryReturnOnlyProtoTask summaryTask = new StreamSummaryReturnOnlyProtoTask(cache);

            ProtoSummary totalPriceSummary = summaryTask.execute();
            assertThat(totalPriceSummary.getValue()).isEqualTo(50446);

            ProtoSummary filteredPriceSummary = summaryTask.execute(5000);
            assertThat(filteredPriceSummary.getValue()).isEqualTo(25669);
        });
    }
}

今回は、Cacheを使う時にInfinispanの設定ファイルも指定できるようにしています。

        this.<String, ProtoBook>withCache("infinispan-serializable.xml", "bookCache", 3, cache -> {


        this.<String, ProtoBook>withCache("infinispan-proto.xml", "bookCache", 3, cache -> {

こんなところでしょうか。

まとめ

CacheにProtoStreamでMarshallingしたエントリを格納しつつ、分散処理を使う場合のMarshallingはどうなるのか?
ということをいくつかバリエーションを入れて確認してみました。

結果を見れば、分散処理に渡すLambda式(またはインスタンス)にMarshallingが必要になるものを絞り込み、
かつすべてProtoStreamでMarshalling可能なように作成すれば、ProtoStreamでMarshallingの仕組みを統一することも
できるということは言えそうです。

ただ、ProtoStreamでMarshallingするのが難しそうなものや、サードパーティ製のライブラリのクラスなどを扱う場合に
どうしようという感じでしょうか。

サードパーティ製のものであっても、可能なそうであれば@ProtoAdapterアノテーションを使ってProtoStreamでの
Marshallingはできそうですけどね。

Creating ProtoStream adapter classes

いけるところまでは、ProtoStreamでMarshallingするように寄せていった方がいいのでしょうか?
このブログでは、いったんその方向で頑張ってみますか。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distexec-protostream