これは、なにをしたくて書いたもの?
Infinispan 10.0で、デフォルトのMarshallingの仕組みがProtoStream(Protocol Buffers)になりました。
このバージョン付近からHot Rodを前面に出す雰囲気になっていたので、あまりEmbedded Modeは扱ってこなかったのですが。
そういえば、Protocol BuffersをMarshallingの仕組みに使いつつ、分散処理を行った場合はMarshallingはどういった扱いに
なるのかな?と思って試してみることにしました。
結果を見ると、頑張ればすべてをProtoStreamでのMarshallingにまとめることはできそうな気はします。
ただ、扱うクラスや構成にけっこう引っ張られそうな気も…。
今回は、あくまでEmbedded Modeでの話です(Remote Cacheの場合は、クライアント側のMarshallerも関与するので)。
Marshalling and Encoding Data
気づいていなかったのですが、いつの間にかMarshallingまわりのドキュメントが独立していました。
こちらのドキュメントに、Cacheへ格納する際のエンコーディングついて書かれています。
Cache Encoding
エンコーディングとはメディアタイプによって識別されるものであり、InfinispanがどのようにCacheにエントリ(キー、値)を
格納するかに影響します。
Marshalling and Encoding Data / Cache encoding
Remote Cacheの場合は、このようになります。
- Infinispan Serverは、エントリをCacheに設定されたエンコーディングで保存する
- Hot RodやREST Clientにはリクエストにメディアタイプが含まれており、Cacheの設定と異なるメディアタイプが指定された場合、Infinispan Serverはオンデマンドで変換を行う
- Cacheにエンコーディング設定がない場合は、Infinispan Serverは
byte[]
でエントリを保存する- Clinetからのデータ変換要求によっては、予期しない結果になることもある
- Cacheにエンコーディング設定がない場合は、Infinispan Serverは
- 複数の種類のClinet(Hot Rod、REST、Infinispan Console、CLIなど)を使用する場合は、ProtoStreamエンコーディングを推奨
Embedded Cacheの場合は、このようになります。
- クラスタ化されたCacheの場合、
byte[]
にマーシャリングする必要がある- ローカルモードの場合は、POJOとして保存する
- 特にMarshallerを構成しない場合は、ProtoStream Marshallerでマーシャリングされる
Infinispanでは以下のエンコーディングを使うことができ、互換性のあるエンコーディングの間で変換を行うことができます。
エンコーディング | メディアタイプ | 変換可能なメディアタイプ | 備考 |
---|---|---|---|
ProtoStream / Protobuf encoding | application/x-protostream |
application/json , ? |
Hot Rod、REST、Infinspan Consoleで相互運用可能 |
Text-based cache encoding | text/plain |
application/xml application/json application/x-protostream application/x-jboss-marshalling application/x-java-serialized |
|
JBoss Marshalling | application/x-jboss-marshalling |
? | 現在は非推奨。シリアライズ可能なリストを設定する必要あり |
Java Serialization | application/x-java-serialized-object |
? | ProtoStreamよりパフォーマンスに劣る。シリアライズ可能なリストを設定する必要あり |
?…の部分は、ドキュメント上でハッキリとした明示はありませんが、テキストベースのメディアタイプとは変換可能
と見てよいんでしょうね。
REST Clinetなどからの利用もありますし。
Client側でのMarshalling
Client側でのMarshallingについては、各メディアタイプやテーマごとにドキュメントが書かれています。
Text-based cache encoding / Clients and text-based encoding
Marshalled Java objects / Clients and marshalled objects
Plain Old Java Objects (POJO) / Clients and POJOs
Hot Rod Clientの場合は、ProtoStreamやJava Serialization、テキストベースのMarshallerを使用します。
REST Clientの場合は、JSONやXML、テキストを使用します。
データ変換
さらに、Cacheからエントリを取得する際に、異なるメディアタイプに変換することも可能なようです。
今回は、ここは見ません。
分散処理では?
ところで、Infinispanといえば分散処理もあるでしょう。このあたりでしょうか。
- Distributed Stream
- Functional Map
- Cluster Executor
このあたりの機能について書かれているところを見ると、Marshallingについては基本的にSerializable
を想定して
書かれています。
- Streams / Serialization
AdvancedExternalizer
含む
- Marshalling of Functions
Cluster Executorはドキュメント上は記載がありませんが、こちらもSerializable
が想定されています。
ClusterExecutor (Infinispan JavaDoc 12.1.11.Final API)
よって、ProtoStream(Protocol Buffers)とはまた別ですね。
Cacheに格納するエントリはProtoStreamでMarshallingする場合、このあたりのとの関係はどうなるのでしょうか?
ということで試してみます。
お題
以下のお題で、試してみたいと思います。
- Cacheに格納する値には自分でクラスを用意し、ProtoStreamでMarshallingする
- ネタは書籍で
- Distributed StreamおよびFunctional Mapを使って、Nodeをまたぐ処理を使う
- ここで受け渡す処理の結果を表すクラスも、自分でクラスを用意する
この時に、Marshallingをどうしたらいいか?というのを試していきます。
環境
今回の環境は、こちら。
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"
準備
Maven依存関係などは、こちら。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>12.1.4.Final</version> </dependency> <dependency> <groupId>org.infinispan.protostream</groupId> <artifactId>protostream-processor</artifactId> <version>4.4.1.Final</version> <optional>true</optional> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.20.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.31</version> <scope>test</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
Infinispanは12.1.4.Finalを使い、動作確認はテストコードで行います。
ログライブラリはSLF4J+Logbackとし、なにも設定しない場合のj.u.l.Loggerよりは見やすいログにしておきます。
src/test/resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="STDOUT"/> </root> </configuration>
Cacheに格納する値
Cacheに格納するのは、書籍をお題にこちらのクラスとします。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/ProtoBook.java
package org.littlewings.infinispan.distexec.protostream.entity; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class ProtoBook { @ProtoField(number = 1) String isbn; @ProtoField(number = 2) String title; @ProtoField(number = 3, defaultValue = "0") int price; @ProtoFactory public static ProtoBook create(String isbn, String title, int price) { ProtoBook book = new ProtoBook(); book.setIsbn(isbn); book.setTitle(title); book.setPrice(price); return book; } // getter/setterは省略 }
キーは、String
を使うことにしましょう。つまり、Cache<String, ProtoBook>
です。
自分で定義するクラスをProtoStreamでMarshallingする場合は、こちらのドキュメントに従ってアノテーションを付与したり
します。
Marshalling custom objects with ProtoStream
SerializationContextInitializer
の作成も必要です。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/EntityInitializer.java
package org.littlewings.infinispan.distexec.protostream.entity; import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; @AutoProtoSchemaBuilder( includeClasses = ProtoBook.class, schemaFileName = "entities.proto", schemaFilePath = "proto", schemaPackageName = "entity" ) public interface EntityInitializer extends SerializationContextInitializer { }
今回は、このProtoBook
をCacheに格納し、price
フィールドを扱う分散処理を書くことにしましょう。
テストコードの雛形
では、まずはテストコードの雛形を作成します。
src/test/java/org/littlewings/infinispan/distexec/protostream/ProtoStreamDistExecTest.java
package org.littlewings.infinispan.distexec.protostream; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.infinispan.Cache; import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.jboss.logging.Logger; import org.junit.jupiter.api.Test; import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook; import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary; import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary; import org.littlewings.infinispan.distexec.protostream.entity.Summary; import static org.assertj.core.api.Assertions.assertThat; public class ProtoStreamDistExecTest { Logger logger = Logger.getLogger(ProtoStreamDistExecTest.class); List<ProtoBook> books = List.of( ProtoBook.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344), ProtoBook.create("978-1849518222", "Infinispan Data Grid Platform", 3608), ProtoBook.create("978-0359439379", "The Apache Ignite Book", 7686), ProtoBook.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342), ProtoBook.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638), ProtoBook.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209), ProtoBook.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297), ProtoBook.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817), ProtoBook.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516), ProtoBook.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989) ); <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> func) { List<EmbeddedCacheManager> managers = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> { try { return new DefaultCacheManager("infinispan.xml"); } catch (IOException e) { throw new UncheckedIOException(e); } }) .collect(Collectors.toList()); managers.forEach(manager -> manager.getCache(cacheName)); try { Cache<K, V> cache = managers.get(0).getCache(cacheName); func.accept(cache); } finally { managers.forEach(manager -> manager.stop()); } } // ここに、テストを書く }
簡単にクラスタを構成できるメソッド付き。
infinispan.xml
の内容は、また後で書きます。
こんなメソッドで動作確認。ここで、bookCache
というのはownersが1のDistributed Cacheとします。
また、Nodeは3つにしてクラスタを構成しています。以降に出てくるテストコードも全部3 Nodeにします。
@Test public void simple() { this.<String, ProtoBook>withCache("bookCache", 3, cache -> { books.forEach(book -> cache.put(book.getIsbn(), book)); assertThat(cache).hasSize(books.size()); ProtoBook infinispanBook = cache.get("978-1782169970"); assertThat(infinispanBook.getTitle()).isEqualTo("Infinispan Data Grid Platform Definitive Guide"); assertThat(infinispanBook.getPrice()).isEqualTo(5344); DistributionManager dm = cache.getAdvancedCache().getDistributionManager(); cache.keySet().forEach(isbn -> { LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); logger.infof("isbn = %s, members = %s", isbn, cacheTopology.getWriteOwners(isbn)); }); }); }
ログ出力しているので、このコードを動作させると、各キーがどのNodeに配置されているか確認できるでしょう。
Distributed Streamを使った処理を書く
では、Distributed Streamを使った処理を書いてみましょう。
src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryTask.java
package org.littlewings.infinispan.distexec.protostream; import java.io.Serializable; import java.util.stream.Collectors; import org.infinispan.Cache; import org.jboss.logging.Logger; import org.littlewings.infinispan.distexec.protostream.entity.Price; import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook; import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary; public class StreamSummaryTask implements Serializable { private static final long serialVersionUID = 1L; Logger logger = Logger.getLogger(StreamSummaryTask.class); transient Cache<String, ProtoBook> cache; public StreamSummaryTask(Cache<String, ProtoBook> cache) { this.cache = cache; } public SerializableSummary execute() { return cache .values() .stream() .map(book -> { logger.infof("[execute] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( SerializableSummary.create(0), p -> { logger.infof("[execute] collect %s", p.getIsbn()); return SerializableSummary.create(p.getValue()); }, (s1, s2) -> { logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue()); return SerializableSummary.create(s1.getValue() + s2.getValue()); }) ); } public SerializableSummary execute(int greaterThanPrice) { return cache .values() .stream() .filter(book -> { logger.infof("[execute, filter price] filter %s", book.getIsbn()); return book.getPrice() > greaterThanPrice; }) .map(book -> { logger.infof("[execute, filter price] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( SerializableSummary.create(0), p -> { logger.infof("[execute, filter price] collect %s", p.getIsbn()); return SerializableSummary.create(p.getValue()); }, (s1, s2) -> { logger.infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue()); return SerializableSummary.create(s1.getValue() + s2.getValue()); }) ); } }
いきなりでなんですが、このクラスがすでにSerializable
です。
public class StreamSummaryTask implements Serializable {
処理の内容自体は、Cacheに格納されているProtoBook
クラスのprice
を合算しているだけです。
public SerializableSummary execute() { return cache .values() .stream() .map(book -> { logger.infof("[execute] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( SerializableSummary.create(0), p -> { logger.infof("[execute] collect %s", p.getIsbn()); return SerializableSummary.create(p.getValue()); }, (s1, s2) -> { logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue()); return SerializableSummary.create(s1.getValue() + s2.getValue()); }) ); }
CacheStream#filter
を使っているバージョンもありますが。
処理の内容は単純ですが、どこでSerializable
が求められるのかをわかるようにするために、各種中間・終端操作の
戻り値を自分で用意したクラスにしておきました。
Price
クラスは、こんな定義です。こちらはSerializable
ではありません。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/Price.java
package org.littlewings.infinispan.distexec.protostream.entity; public class Price { String isbn; int value; public static Price create(String isbn, int value) { Price price = new Price(); price.setIsbn(isbn); price.setValue(value); return price; } // getter/setterは省略 }
SerializableSummary
クラスは、Serializable
を実装します。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/SerializableSummary.java
package org.littlewings.infinispan.distexec.protostream.entity; import java.io.Serializable; public class SerializableSummary implements Serializable { private static final long serialVersionUID = 1L; int value; public static SerializableSummary create(int value) { SerializableSummary summary = new SerializableSummary(); summary.setValue(value); return summary; } // getter/setter }
つまり、CacheStream#collect
ではSerializable
が必要だ、ということになります。
※とはいえ、今回のソースコードでシリアライズが必要になるのはLogger
をフィールド(ローカル変数でも同じ)に持ち、それを分散処理内で参照しているからであり、このためにStreamSummaryTask
はSerializable
を実装しています
後にも書きますが、CacheStream#map
については今回はSerializable
は実は求められていません。
テストコードは、こちら。
@Test public void distributedStream() { this.<String, ProtoBook>withCache("bookCache", 3, cache -> { books.forEach(book -> cache.put(book.getIsbn(), book)); StreamSummaryTask summaryTask = new StreamSummaryTask(cache); SerializableSummary totalPriceSummary = summaryTask.execute(); assertThat(totalPriceSummary.getValue()).isEqualTo(50446); SerializableSummary filteredPriceSummary = summaryTask.execute(5000); assertThat(filteredPriceSummary.getValue()).isEqualTo(25669); }); }
このテストコードを動作させるための、Infinispanの設定ファイルはこちらです。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd" xmlns="urn:infinispan:config:12.1"> <cache-container shutdown-hook="REGISTER"> <transport cluster="ispn-cluster" stack="udp"/> <distributed-cache name="bookCache" owners="1"> <encoding> <!-- <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> を入れたら明示が必要 --> <key media-type="application/x-protostream"/> <value media-type="application/x-protostream"/> </encoding> </distributed-cache> <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> <!-- <context-initializer class="org.littlewings.infinispan.distexec.protostream.entity.EntityInitializerImpl"/> 自動生成されるので不要 --> <allow-list> <class>org.littlewings.infinispan.distexec.protostream.StreamSummaryTask</class> <class>org.littlewings.infinispan.distexec.protostream.FunctionalMapSummaryTask</class> <regex>org\.littlewings\.infinispan\.distexec\.protostream\.entity\.Serializable.+</regex> <regex>org\.jboss\.logging\..+</regex> </allow-list> </serialization> </cache-container> </infinispan>
今回のソースコードを、<serialization>
を設定しないまま動作させようとすると、Distributed Streamを使っているクラスを
Marshallingしようとして失敗します。
22:40:47.727 [main] WARN org.infinispan.PERSISTENCE - ISPN000559: Cannot marshall 'class org.infinispan.marshall.protostream.impl.MarshallableUserObject' java.lang.IllegalArgumentException: No marshaller registered for object of Java type org.littlewings.infinispan.distexec.protostream.StreamSummaryTask : org.littlewings.infinispan.distexec.protostream.StreamSummaryTask@161dd92a
このクラスを、ProtoStreamでシリアライズするのはさすがにできませんね…。
このため、MarshallerとしてJavaSerializationMarshaller
を指定し、Java標準のシリアライズの仕組みも使えるように
します。
<serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> <allow-list> <class>org.littlewings.infinispan.distexec.protostream.StreamSummaryTask</class> <class>org.littlewings.infinispan.distexec.protostream.FunctionalMapSummaryTask</class> <regex>org\.littlewings\.infinispan\.distexec\.protostream\.entity\.Serializable.+</regex> <regex>org\.jboss\.logging\..+</regex> </allow-list> </serialization>
この時、シリアライズできるクラスは明示的に指定する必要があり、regex
(正規表現)またはclass
(クラス名)で
指定します。
※次に使うクラスの名前(FunctionalMapSummaryTask
)も含まれていますが
ここに登録していないクラスは、Java標準のシリアライズを行う際にInfinispanから拒否されます。
ちなみに、SerializationContextInitializer
を自分で作成していますが、これはServiceLoader
の仕組みで自動登録されるため
今回は指定不要です。
Registering serialization context initializers
<serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> <!-- <context-initializer class="org.littlewings.infinispan.distexec.protostream.entity.EntityInitializerImpl"/> 自動生成されるので不要 -->
複数のSerializationContextInitializer
を使う場合、ひとつでも明示的に登録した場合は自動登録が無効になるため、
すべて明示的に指定する必要があるようです。
そして、JavaSerializationMarshaller
をMarshallerを指定した段階で、特になにもしないとCacheにもJava標準の
シリアライズの仕組みでエントリを登録しようとします。
<serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
つまり、ProtoBook
がSerializable
であることを求めてきます。
そうではなく、ProtoBook
、つまりCacheに格納するエントリはProtoStreamでMarshallingを行う場合は、エンコーディングを
明示します。
Encoding caches as ProtoStream
<distributed-cache name="bookCache" owners="1"> <encoding> <!-- <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> を入れたら明示が必要 --> <key media-type="application/x-protostream"/> <value media-type="application/x-protostream"/> </encoding> </distributed-cache>
これで、CacheにはProtoStreamでMarshallingしつつ、分散処理はJava標準のシリアライズの仕組みで動作するように
設定できました。
先ほどのテストもパスします。
Functional Map
次は、Functional Map。今回はReadOnlyMap
を使うことにしました。
src/main/java/org/littlewings/infinispan/distexec/protostream/FunctionalMapSummaryTask.java
package org.littlewings.infinispan.distexec.protostream; import java.io.Serializable; import java.util.Set; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.functional.FunctionalMap; import org.infinispan.functional.Traversable; import org.infinispan.functional.impl.FunctionalMapImpl; import org.infinispan.functional.impl.ReadOnlyMapImpl; import org.jboss.logging.Logger; import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook; import org.littlewings.infinispan.distexec.protostream.entity.SerializablePrice; import org.littlewings.infinispan.distexec.protostream.entity.Summary; public class FunctionalMapSummaryTask implements Serializable { private static final long serialVersionUID = 1L; Logger logger = Logger.getLogger(FunctionalMapSummaryTask.class); transient Cache<String, ProtoBook> cache; public FunctionalMapSummaryTask(Cache<String, ProtoBook> cache) { this.cache = cache; } public Summary execute(Set<String> isbns) { FunctionalMapImpl<String, ProtoBook> functionalMap = FunctionalMapImpl.create(cache.getAdvancedCache()); FunctionalMap.ReadOnlyMap<String, ProtoBook> readOnlyMap = ReadOnlyMapImpl.create(functionalMap); Traversable<SerializablePrice> prices = readOnlyMap.evalMany(isbns, view -> { String isbn = view.key(); ProtoBook book = view.get(); logger.infof("[execute] get %s", isbn); return SerializablePrice.create(view.key(), book.getPrice()); }); return prices .collect( Collectors.reducing( Summary.create(0), p -> { logger.infof("[execute] collect %s", p.getIsbn()); return Summary.create(p.getValue()); }, (s1, s2) -> { logger.infof("[execute] collect %d, %d", s1.getValue(), s2.getValue()); return Summary.create(s1.getValue() + s2.getValue()); }) ); } }
Functional Mapの場合、eval〜
メソッドで扱う処理がSerializable
である必要があるようです。なので、メソッドの戻り値に
使う値は今回はSerializable
になりました。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/SerializablePrice.java
package org.littlewings.infinispan.distexec.protostream.entity; import java.io.Serializable; public class SerializablePrice implements Serializable { private static final long serialVersionUID = 1L; String isbn; int value; public static SerializablePrice create(String isbn, int value) { SerializablePrice price = new SerializablePrice(); price.setIsbn(isbn); price.setValue(value); return price; } // getter/setterは省略 }
Traversable#collect
を使っても、こちらはローカルで動作するようで、Serializable
は求められません。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/Summary.java
package org.littlewings.infinispan.distexec.protostream.entity; public class Summary { int value; public static Summary create(int value) { Summary summary = new Summary(); summary.setValue(value); return summary; } // getter/setterは省略 }
テストコードは、こちら。
@Test public void functionalMap() { this.<String, ProtoBook>withCache("bookCache", 3, cache -> { books.forEach(book -> cache.put(book.getIsbn(), book)); FunctionalMapSummaryTask summaryTask = new FunctionalMapSummaryTask(cache); Set<String> isbns = books.stream().map(ProtoBook::getIsbn).collect(Collectors.toSet()); Summary totalPriceSummary = summaryTask.execute(isbns); assertThat(totalPriceSummary.getValue()).isEqualTo(50446); }); }
Infinispanの設定は、Distributed Streamの時と同じです(含めています)。
もう少し
ところで、ここまでSerializable
なものがたくさん出てきました。
実は、もうちょっと気をつけると同じ処理でSerializable
であるものを減らすことができます。
Serializable
Distributed Streamを扱ったクラスを、もうちょっと変えてみましょう。
src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryReturnOnlySerializableTask.java
package org.littlewings.infinispan.distexec.protostream; import java.util.stream.Collectors; import org.infinispan.Cache; import org.jboss.logging.Logger; import org.littlewings.infinispan.distexec.protostream.entity.Price; import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook; import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary; public class StreamSummaryReturnOnlySerializableTask { Cache<String, ProtoBook> cache; public StreamSummaryReturnOnlySerializableTask(Cache<String, ProtoBook> cache) { this.cache = cache; } public SerializableSummary execute() { return cache .values() .stream() .map(book -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( SerializableSummary.create(0), p -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %s", p.getIsbn()); return SerializableSummary.create(p.getValue()); }, (s1, s2) -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue()); return SerializableSummary.create(s1.getValue() + s2.getValue()); }) ); } public SerializableSummary execute(int greaterThanPrice) { return cache .values() .stream() .filter(book -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] filter %s", book.getIsbn()); return book.getPrice() > greaterThanPrice; }) .map(book -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( SerializableSummary.create(0), p -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] collect %s", p.getIsbn()); return SerializableSummary.create(p.getValue()); }, (s1, s2) -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue()); return SerializableSummary.create(s1.getValue() + s2.getValue()); }) ); } }
クラスの定義から、Serializable
を外しました。フィールドに定義しているCache
も、transient
ではなくなっています。
public class StreamSummaryReturnOnlySerializableTask { Cache<String, ProtoBook> cache;
また、ログ出力はCacheStream
のメソッド内でLogger
を取得して行うようにしています。
public SerializableSummary execute() { return cache .values() .stream() .map(book -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute, filter price] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( SerializableSummary.create(0), p -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %s", p.getIsbn()); return SerializableSummary.create(p.getValue()); }, (s1, s2) -> { Logger.getLogger(StreamSummaryReturnOnlySerializableTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue()); return SerializableSummary.create(s1.getValue() + s2.getValue()); }) ); }
こうすると、CacheStream#collect
メソッドの戻り値だけがSerializable
であれば良くなります。
上記クラスを動かすためのInfinispanの設定は、こちら。
src/test/resources/infinispan-serializable.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd" xmlns="urn:infinispan:config:12.1"> <cache-container shutdown-hook="REGISTER"> <transport cluster="ispn-cluster" stack="udp"/> <distributed-cache name="bookCache" owners="1"> <encoding> <key media-type="application/x-protostream"/> <value media-type="application/x-protostream"/> </encoding> </distributed-cache> <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> <allow-list> <class>org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary</class> </allow-list> </serialization> </cache-container> </infinispan>
テストコードは、最後にまとめて載せます。
ProtoStreamにする
ここまでくると、CacheStream#collect
の戻り値をProtoStreamにすることもできるのでは?という気もします。
できます。
src/main/java/org/littlewings/infinispan/distexec/protostream/StreamSummaryReturnOnlyProtoTask.java
package org.littlewings.infinispan.distexec.protostream; import java.util.stream.Collectors; import org.infinispan.Cache; import org.jboss.logging.Logger; import org.littlewings.infinispan.distexec.protostream.entity.Price; import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook; import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary; public class StreamSummaryReturnOnlyProtoTask { Cache<String, ProtoBook> cache; public StreamSummaryReturnOnlyProtoTask(Cache<String, ProtoBook> cache) { this.cache = cache; } public ProtoSummary execute() { return cache .values() .stream() .map(book -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( ProtoSummary.create(0), p -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute] collect %s", p.getIsbn()); return ProtoSummary.create(p.getValue()); }, (s1, s2) -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute] collect %d, %d", s1.getValue(), s2.getValue()); return ProtoSummary.create(s1.getValue() + s2.getValue()); }) ); } public ProtoSummary execute(int greaterThanPrice) { return cache .values() .stream() .filter(book -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] filter %s", book.getIsbn()); return book.getPrice() > greaterThanPrice; }) .map(book -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] map %s", book.getIsbn()); return Price.create(book.getIsbn(), book.getPrice()); }) .collect(() -> Collectors.reducing( ProtoSummary.create(0), p -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] collect %s", p.getIsbn()); return ProtoSummary.create(p.getValue()); }, (s1, s2) -> { Logger.getLogger(StreamSummaryReturnOnlyProtoTask.class).infof("[execute, filter price] collect %d, %d", s1.getValue(), s2.getValue()); return ProtoSummary.create(s1.getValue() + s2.getValue()); }) ); } }
CacheStream#collect
で使うクラスを、ProtoSummary
というクラスにしました。ProtoStreamでMarshallingすることを
想定したクラスです。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/ProtoSummary.java
package org.littlewings.infinispan.distexec.protostream.entity; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class ProtoSummary { @ProtoField(number = 1, defaultValue = "0") int value; @ProtoFactory public static ProtoSummary create(int value) { ProtoSummary summary = new ProtoSummary(); summary.setValue(value); return summary; } // getter/setterは省略 }
SerializationContextInitializer
で、このクラスも対象に追加。
src/main/java/org/littlewings/infinispan/distexec/protostream/entity/EntityInitializer.java
package org.littlewings.infinispan.distexec.protostream.entity; import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; @AutoProtoSchemaBuilder( includeClasses = {ProtoBook.class, ProtoSummary.class}, schemaFileName = "entities.proto", schemaFilePath = "proto", schemaPackageName = "entity" ) public interface EntityInitializer extends SerializationContextInitializer { }
こうすると、Infinispanの設定からserialization
を削除することができます。encoding
も外せます。
src/test/resources/infinispan-proto.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd" xmlns="urn:infinispan:config:12.1"> <cache-container shutdown-hook="REGISTER"> <transport cluster="ispn-cluster" stack="udp"/> <distributed-cache name="bookCache" owners="1"/> </cache-container> </infinispan>
テストコード
ここまでの2種類のDistributed Streamsを扱うテストコードは、こちら。
src/test/java/org/littlewings/infinispan/distexec/protostream/ProtoStreamDistExecSimplyTest.java
package org.littlewings.infinispan.distexec.protostream; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.infinispan.Cache; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.junit.jupiter.api.Test; import org.littlewings.infinispan.distexec.protostream.entity.ProtoBook; import org.littlewings.infinispan.distexec.protostream.entity.ProtoSummary; import org.littlewings.infinispan.distexec.protostream.entity.SerializableSummary; import static org.assertj.core.api.Assertions.assertThat; public class ProtoStreamDistExecSimplyTest { List<ProtoBook> books = List.of( ProtoBook.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344), ProtoBook.create("978-1849518222", "Infinispan Data Grid Platform", 3608), ProtoBook.create("978-0359439379", "The Apache Ignite Book", 7686), ProtoBook.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342), ProtoBook.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638), ProtoBook.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209), ProtoBook.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297), ProtoBook.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817), ProtoBook.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516), ProtoBook.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989) ); <K, V> void withCache(String configurationXmlPath, String cacheName, int numInstances, Consumer<Cache<K, V>> func) { List<EmbeddedCacheManager> managers = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> { try { return new DefaultCacheManager(configurationXmlPath); } catch (IOException e) { throw new UncheckedIOException(e); } }) .collect(Collectors.toList()); managers.forEach(manager -> manager.getCache(cacheName)); try { Cache<K, V> cache = managers.get(0).getCache(cacheName); func.accept(cache); } finally { managers.forEach(manager -> manager.stop()); } } @Test public void returnOnlySerializable() { this.<String, ProtoBook>withCache("infinispan-serializable.xml", "bookCache", 3, cache -> { books.forEach(book -> cache.put(book.getIsbn(), book)); StreamSummaryReturnOnlySerializableTask summaryTask = new StreamSummaryReturnOnlySerializableTask(cache); SerializableSummary totalPriceSummary = summaryTask.execute(); assertThat(totalPriceSummary.getValue()).isEqualTo(50446); SerializableSummary filteredPriceSummary = summaryTask.execute(5000); assertThat(filteredPriceSummary.getValue()).isEqualTo(25669); }); } @Test public void returnOnlyProto() { this.<String, ProtoBook>withCache("infinispan-proto.xml", "bookCache", 3, cache -> { books.forEach(book -> cache.put(book.getIsbn(), book)); StreamSummaryReturnOnlyProtoTask summaryTask = new StreamSummaryReturnOnlyProtoTask(cache); ProtoSummary totalPriceSummary = summaryTask.execute(); assertThat(totalPriceSummary.getValue()).isEqualTo(50446); ProtoSummary filteredPriceSummary = summaryTask.execute(5000); assertThat(filteredPriceSummary.getValue()).isEqualTo(25669); }); } }
今回は、Cacheを使う時にInfinispanの設定ファイルも指定できるようにしています。
this.<String, ProtoBook>withCache("infinispan-serializable.xml", "bookCache", 3, cache -> { this.<String, ProtoBook>withCache("infinispan-proto.xml", "bookCache", 3, cache -> {
こんなところでしょうか。
まとめ
CacheにProtoStreamでMarshallingしたエントリを格納しつつ、分散処理を使う場合のMarshallingはどうなるのか?
ということをいくつかバリエーションを入れて確認してみました。
結果を見れば、分散処理に渡すLambda式(またはインスタンス)にMarshallingが必要になるものを絞り込み、
かつすべてProtoStreamでMarshalling可能なように作成すれば、ProtoStreamでMarshallingの仕組みを統一することも
できるということは言えそうです。
ただ、ProtoStreamでMarshallingするのが難しそうなものや、サードパーティ製のライブラリのクラスなどを扱う場合に
どうしようという感じでしょうか。
サードパーティ製のものであっても、可能なそうであれば@ProtoAdapter
アノテーションを使ってProtoStreamでの
Marshallingはできそうですけどね。
Creating ProtoStream adapter classes
いけるところまでは、ProtoStreamでMarshallingするように寄せていった方がいいのでしょうか?
このブログでは、いったんその方向で頑張ってみますか。
今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distexec-protostream