CLOVER🍀

That was when it all began.

InfinispanのDistributed Streams(分散Stream API)を試す

先日、Infinispan 8.0.0.Finalがリリースされました(今は8.0.1.Finalですが…)。

Infinispan: Infinispan 8.0.0.Final

Infinispan: Infinispan 8.0.1.Final (and 7.2.5.Final)

8になってけっこう多くの機能が追加されているようで、個人的にはいろいろ試してみたいのですが、最近あんまり時間が取れない今日この頃です。

ちょっとずつですが、気長に試していこうと思います(先に8.1.0.Finalが出るのでないかという気もしますが…)。

で、今回はDistributed Streamsを試してみたいと思います。

Support for the Java 8's Streams API which, in the context of Infinispan, becomes fully distributed: parallel streams become truly parallel !

http://blog.infinispan.org/2015/08/infinispan-800final.html

要するに、Java 8で追加されたStream APIをInfinispanのCache上で実装しましたよ、と。分散実行も可能なようです。
似たような機能である既存のMap Reduce Frameworkとの関係ですが、既存のものは廃止してDistributed Streamsに置き換えていく方針のようです(以下にリンクを貼っているブログに、それっぽい記述があります)。

ドキュメントはブログとチュートリアルがあるだけのようですので、こちらを参考に試していってみたいと思います。

Infinispan: Distributed Streams

Streams Tutorial - Infinispan

準備

まずはMaven依存関係。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>8.0.1.Final</version>
        </dependency>

とりあえず、「infinispan-core」があればOKです。

ところで、このブログでInfinispanを扱う時はだいたいScala+sbtで書いていたのですが、今回はLambda式および交差型キャストを使う関係上、Javaで書くことにしました。まあ、Infinispan 8.0.0.Finalで追加された機能はLambdaが使えるものもぼちぼちありそうなので、今後もJavaで書くことになるかも。

設定

Infinispanの設定は、まずは最小構成で作ってみました。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.0 http://www.infinispan.org/schemas/infinispan-config-8.0.xsd"
        xmlns="urn:infinispan:config:8.0">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="cacheManager" shutdown-hook="REGISTER">
        <transport cluster="cluster" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <local-cache name="localCache"/>
        <distributed-cache name="distCache"/>
        <replicated-cache name="replCache"/>
        <invalidation-cache name="invalCache"/>
    </cache-container>
</infinispan>

JGroupsの設定も、デフォルト+αで用意。
src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.6.xsd">
    <UDP
            mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
            mcast_port="${jgroups.udp.mcast_port:46655}"
            ucast_recv_buf_size="150k"
            ucast_send_buf_size="130k"
            mcast_recv_buf_size="150k"
            mcast_send_buf_size="130k"
            ip_ttl="${jgroups.ip_ttl:2}"
            thread_naming_pattern="pl"
            enable_diagnostics="false"

            thread_pool.min_threads="${jgroups.thread_pool.min_threads:2}"
            thread_pool.max_threads="${jgroups.thread_pool.max_threads:30}"
            thread_pool.keep_alive_time="60000"
            thread_pool.queue_enabled="false"

            internal_thread_pool.min_threads="${jgroups.internal_thread_pool.min_threads:5}"
            internal_thread_pool.max_threads="${jgroups.internal_thread_pool.max_threads:20}"
            internal_thread_pool.keep_alive_time="60000"
            internal_thread_pool.queue_enabled="true"
            internal_thread_pool.queue_max_size="500"

            oob_thread_pool.min_threads="${jgroups.oob_thread_pool.min_threads:20}"
            oob_thread_pool.max_threads="${jgroups.oob_thread_pool.max_threads:200}"
            oob_thread_pool.keep_alive_time="60000"
            oob_thread_pool.queue_enabled="false"
            />

    <PING/>
    <MERGE3 min_interval="10000"
            max_interval="30000"
            />
    <FD_SOCK/>
    <FD_ALL timeout="60000"
            interval="15000"
            timeout_check_interval="5000"
            />
    <VERIFY_SUSPECT timeout="5000"
            />

    <pbcast.NAKACK2 xmit_interval="1000"
                    xmit_table_num_rows="50"
                    xmit_table_msgs_per_row="1024"
                    xmit_table_max_compaction_time="30000"
                    max_msg_batch_size="100"
                    resend_last_seqno="true"
            />

    <UNICAST3 xmit_interval="500"
              xmit_table_num_rows="50"
              xmit_table_msgs_per_row="1024"
              xmit_table_max_compaction_time="30000"
              max_msg_batch_size="100"
              conn_expiry_timeout="0"
            />
    <pbcast.STABLE stability_delay="500"
                   desired_avg_gossip="5000"
                   max_bytes="1M"
            />
    <pbcast.GMS print_local_addr="true"
                join_timeout="15000"
            />

    <!-- <tom.TOA/> -->
    <!-- the TOA is only needed for total order transactions-->

    <UFC max_credits="2m"
         min_threshold="0.40"
            />
    <MFC max_credits="2m"
         min_threshold="0.40"
            />
    <FRAG2/>
</config>

ヘルパー的な

以後書いていくソースコードでは、頻繁にCacheの取得やクラスタの構成を行うので、このあたりを実行しやすくするクラスを用意。
src/main/java/org/littlewings/infinispan/distributedstreams/EmbeddedCacheSupport.java

package org.littlewings.infinispan.distributedstreams;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

public abstract class EmbeddedCacheSupport {
    protected <K, V> void withCache(String cacheName, Consumer<Cache<K, V>> fun) {
        withCache(cacheName, 1, fun);
    }

    protected <K, V> void withCache(String cacheName, int numInstances, Consumer<Cache<K, V>> fun) {
        List<EmbeddedCacheManager> managers =
                IntStream
                        .rangeClosed(1, numInstances)
                        .mapToObj(i -> {
                            try {
                                return new DefaultCacheManager("infinispan.xml");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        })
                        .collect(Collectors.toList());

        managers.forEach(m -> m.<K, V>getCache(cacheName));

        try {
            fun.accept(managers.get(0).<K, V>getCache(cacheName));
        } finally {
            managers.forEach(m -> m.getCache(cacheName).stop());
            managers.forEach(m -> m.stop());
        }
    }
}

インスタンス数を与えた場合は、その数分だけのNodeを持ったクラスタの構成ができます。

以後作成するクラスは、こちらを継承して実装していきます。

Cacheごとに使ってみる

では、実際にDistributed Streamsを使ったコードを書いてみます。先ほど作成した、EmbeddedCacheSupportクラスを継承しておきます。
src/main/java/org/littlewings/infinispan/distributedstreams/EmbeddedDistributedStreamsSimple.java

package org.littlewings.infinispan.distributedstreams;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.CacheStream;
import org.infinispan.stream.CacheCollectors;

public class EmbeddedDistributedStreamsSimple extends EmbeddedCacheSupport {
    public static void main(String... args) {
        EmbeddedDistributedStreamsSimple eds = new EmbeddedDistributedStreamsSimple();
        eds.executeDistributedStreams();
    }

    public void executeDistributedStreams() {
      // ここに、いろいろ書く
    }
}

executeDistributedStreamsメソッドの中身は、以降で埋めていきます。

Local Cache

Local Cacheの場合。

        this.<String, String>withCache("localCache", cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                System.out.println(stream.getClass());

                int result =
                        stream
                                .map(e -> Integer.parseInt(e.getValue().substring("value".length())))
                                .collect(Collectors.summingInt(v -> v));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 5050;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

パッと見は普通にStream APIを使っているだけですね。現時点では意味がありませんが、CacheStreamという型で受けていますが。Streamのcloseは…しておいた方がいい気がします…。

CacheStreamを使うことで、いくつかStreamに追加の設定などができるようになります。何が利用できるかは、以下のブログの「CacheStream interface」のところを見るとよいでしょう。

Infinispan: Distributed Streams

一部System.out.printlnしていますが、その時の表示内容はこんな感じになります。

Cache[localCache] start.
class org.infinispan.stream.impl.local.LocalEntryCacheStream
Cache[localCache] result = 5050

Local Cache用のStreamってことですね。

Distributed Cache

Distributed Cacheの場合。

        this.<String, String>withCache("distCache", 3, cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                System.out.println(stream.getClass());

                int result =
                        stream
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 5050;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

このセクションでは、複数Nodeにまたがる系のCacheでのNode数は3とします。

        this.<String, String>withCache("distCache", 3, cache -> {

Distributed Cacheの場合、Nodeがひとつの時はいいのですが、複数NodeになるとLambda式で渡している内容がシリアライズできることが必要になります。

このため、交差型キャストを使用したり

                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })

Collectorをラップして変換する必要があったりします。

                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

交差型キャストは、地味に面倒です…。

System.out.printlnの方。

Cache[distCache] start.
class org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1
Cache[distCache] result = 5050

Distributed Cache用のものみたいです。

Replicated Cache/Invalidation Cache

現時点でのReplicated CacheとInvalidation Cacheに対してStream APIを使った場合は、Local Cacheと同じ挙動になりました。

        this.<String, String>withCache("replCache", 3, cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                System.out.println(stream.getClass());

                /*
                int result =
                        stream
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));
                                */
                int result =
                        stream
                                .map(e -> Integer.parseInt(e.getValue().substring("value".length())))
                                .collect(Collectors.summingInt(v -> v));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 5050;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

        this.<String, String>withCache("invalCache", 3, cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                System.out.println(stream.getClass());

                /*
                int result =
                        stream
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));
                                */
                int result =
                        stream
                                .map(e -> Integer.parseInt(e.getValue().substring("value".length())))
                                .collect(Collectors.summingInt(v -> v));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 5050;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

コメントアウトしていますが、Serializableでなくても動作します。が、今後のことを考えるとSerializableにしておいた方が無難かなとは思いますが。

Invalidation Cacheは、動作対象外かと思っていました。少なくとも、既存のMap Reduce FrameworkはInvalidation Cacheでは動作しないはずなので。

System.out.printlnの結果は、Replicated Cacheがこちらで

Cache[replCache] start.
class org.infinispan.stream.impl.local.LocalEntryCacheStream
Cache[replCache] result = 5050

Invalidation Cacheがこちら。

Cache[invalCache] start.
class org.infinispan.stream.impl.local.LocalEntryCacheStream
Cache[invalCache] result = 5050

どちらもLocal Cache的な感じで動いています…。この後、プロセスごと分けた環境で試してみたのですが、Replicated CacheでDistributed Streamsを使ってもどうも単一のNode(処理を依頼したNode)でしか実行されないようでした。

その他

ここまでmapとcollectを使ってきましたが、ふつうにfilterなども使えますし

        this.<String, String>withCache("distCache", 3, cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                System.out.println(stream.getClass());

                int result =
                        stream
                                .filter((Serializable & Predicate<Map.Entry<String, String>>) e -> {
                                    return Integer.parseInt(e.getKey().substring("key".length())) % 2 == 0;
                                })
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 2550;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

CacheStreamのメソッドを使えば、キーのSetを与えることでフィルタリングできたりします。

        this.<String, String>withCache("distCache", 3, cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                System.out.println(stream.getClass());

                Set<String> keys = new HashSet<>();
                keys.add("key1");
                keys.add("key10");

                int result =
                        stream
                                .filterKeys(keys)
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 11;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

CacheStreamやSerializableの意識はする必要があるものの、通常のStream APIとそう変わらず使えそうですね。

分散実行状況の確認

ここまで、単一のJavaプロセス内に複数のNodeを起動して試していましたが、今度はプロセスを分けて試してみたいと思います。

こうなると、単に浮いているだけのNodeが欲しくなるので、以下のようなクラスを用意。
src/main/java/org/littlewings/infinispan/distributedstreams/EmbeddedCacheServer.java

package org.littlewings.infinispan.distributedstreams;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

public class EmbeddedCacheServer {
    public static void main(String... args) throws IOException {
        String cacheName = args[0];

        EmbeddedCacheManager manager = new DefaultCacheManager("infinispan.xml");

        Cache<?, ?> cache  = manager.getCache(cacheName);

        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        System.out.printf("[%s] EmbeddedCacheServer startup.%n", now.format(formatter));
        System.console().readLine("enter, stop...");

        manager.stop();
    }
}

第1引数でCacheの名前をもらい、CacheManagerからCacheを取得した後はEnterを打つまでずっと浮いているサーバーです。

これに対して、起動時の引数でDistributed Streamの設定を変えて実行するようなプログラムを用意。
src/main/java/org/littlewings/infinispan/distributedstreams/EmbeddedDistributedStreamsParallel.java

package org.littlewings.infinispan.distributedstreams;

import java.io.Serializable;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.infinispan.CacheStream;
import org.infinispan.stream.CacheCollectors;

public class EmbeddedDistributedStreamsParallel extends EmbeddedCacheSupport {
    public static void main(String... args) {
        EmbeddedDistributedStreamsParallel eds = new EmbeddedDistributedStreamsParallel();

        switch (args[0]) {
            case "dist-simple":
                eds.distSimple();
                break;
            case "dist-parallel":
                eds.distParallel();
                break;
            case "dist-parallelDistribution":
                eds.distParallelDistribution();
                break;
            case "dist-sequential":
                eds.distSequential();
                break;
            case "dist-sequentialDistribution":
                eds.distSequentialDistribution();
                break;
            case "repl-simple":
                eds.replSimple();
                break;
            case "repl-parallel":
                eds.replParallel();
                break;
            case "repl-parallelDistribution":
                eds.replParallelDistribution();
                break;
            default:
                System.out.printf("unknown option[%s].%n", args[0]);
                System.exit(1);
        }
    }

    // switchの先は、後で
}

コードとしてはReplicated Cacheの分も用意しているのですが、LocalなNodeでしか動作しなかったので、コードと結果は端折ります。ご興味があれば、ソースはGitHubに置いておくので見てみてください…。

プログラムの起動前に、Distributed Cacheの設定でNodeを2つ起動しておきます。以降、Server Node Xと書きます。

## Server Node 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer -Dexec.args=distCache

## Server Node 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer -Dexec.args=distCache

クライアント側は、以下のような実行コマンドとします。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel -Dexec.args=[switchで書いている内容]
デフォルト

まずはデフォルト状態のDistributed Streamsから。

    public void distSimple() {
        this.<String, String>withCache("distCache", cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                int result =
                        stream
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    System.out.println("map phase Thread[" + Thread.currentThread() + "]");
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 55;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });
    }

どこで実行しているかわかるように、System.out.printlnをmapメソッド内に入れています。

また、あんまり数が多くなってもわかりにくいので、範囲は1〜10にしました。

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, "value" + i));

実行。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel -Dexec.args=dist-simple

コンソール上のmapの実行状態は、以下のように。

## main
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]


## Server Node 1
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]


## Server Node 2
map phase Thread[Thread[remote-thread--p2-t2,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t2,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t2,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]

分散実行されているっぽいですね。

parallel

Stream#parallelを呼び出した場合。

    public void distParallel() {
        this.<String, String>withCache("distCache", cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                int result =
                        stream
                                .parallel()
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    System.out.println("map phase Thread[" + Thread.currentThread() + "]");
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 55;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });

先ほどとの違いは、Stream#parallelを呼んでいるかどうかだけです。

                int result =
                        stream
                                .parallel()

実行結果。

## main
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]


## Server Node 1
map phase Thread[Thread[ForkJoinPool.commonPool-worker-3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[ForkJoinPool.commonPool-worker-4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[ForkJoinPool.commonPool-worker-1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[ForkJoinPool.commonPool-worker-2,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]


## Server Node 2
map phase Thread[Thread[ForkJoinPool.commonPool-worker-3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[ForkJoinPool.commonPool-worker-1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[ForkJoinPool.commonPool-worker-2,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]

なんか、ForkJoinPoolが出てきましたね。分散先で、ForkJoinを使って実行されるようになるみたいです。ということは、分散先でさらに並列実行される感じですね。

parallelDistribution

CacheStreamのparallelDistributionを呼び出した場合。

    public void distParallelDistribution() {
        this.<String, String>withCache("distCache", cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                int result =
                        stream
                                .parallelDistribution()
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    System.out.println("map phase Thread[" + Thread.currentThread() + "]");
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 55;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });
    }

結果。

## main
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]


## Server Node 1
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t1,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]


## Server Node 2
map phase Thread[Thread[remote-thread--p2-t3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]

実は、デフォルト状態と同じです。

よくよくCacheStreamのJavadocを読むと

his would enable sending requests to all other remote nodes when a terminal operator is performed. This requires additional overhead as it must process results concurrently from various nodes, but should perform faster in the majority of cases.

Parallel distribution is enabled by default except for iterator() & spliterator()

https://docs.jboss.org/infinispan/8.0/apidocs/org/infinispan/CacheStream.html#parallelDistribution--

デフォルトで有効って書いてる…。そりゃあ同じになりますよね…。

sequential

Stream#sequentialを呼び出した場合。

    public void distSequential() {
        this.<String, String>withCache("distCache", cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                int result =
                        stream
                                .sequential()
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    System.out.println("map phase Thread[" + Thread.currentThread() + "]");
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 55;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });
    }

結果。

## main
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]
map phase Thread[Thread[org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel.main(),5,org.littlewings.infinispan.distributedstreams.EmbeddedDistributedStreamsParallel]]


## Server Node 1
map phase Thread[Thread[remote-thread--p2-t3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t3,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]


## Server Node 2
map phase Thread[Thread[remote-thread--p2-t6,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t6,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]

まあ、parallelをオフにするだけみたいなので、変わりません、と。

sequentialDistribution

CacheStream#sequentialDistributionを呼び出した場合。

    public void distSequentialDistribution() {
        this.<String, String>withCache("distCache", cache -> {
            System.out.println("Cache[" + cache.getName() + "] start.");

            IntStream.rangeClosed(1, 10).forEach(i -> cache.put("key" + i, "value" + i));

            try (CacheStream<Map.Entry<String, String>> stream = cache.entrySet().stream()) {
                int result =
                        stream
                                .sequentialDistribution()
                                .map((Serializable & Function<Map.Entry<String, String>, Integer>) e -> {
                                    System.out.println("map phase Thread[" + Thread.currentThread() + "]");
                                    return Integer.parseInt(e.getValue().substring("value".length()));
                                })
                                .collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(v -> v)));

                System.out.println("Cache[" + cache.getName() + "] result = " + result);

                int expected = 55;
                if (result != expected) {
                    throw new IllegalStateException("result must be [" + expected + "]");
                }
            }
        });
    }

結果。

## main
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]


## Server Node 1
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t4,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]


## Server Node 2
map phase Thread[Thread[remote-thread--p2-t7,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]
map phase Thread[Thread[remote-thread--p2-t7,5,org.littlewings.infinispan.distributedstreams.EmbeddedCacheServer]]

なにか変わったように見えない…。

実装的には、parallelDistributionをオフにするだけみたいです。
https://github.com/infinispan/infinispan/blob/8.0.1.Final/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java#L815

じゃあ、parallelDistributionってどう使われるの?ってところを追ってみると、ここで使われるみたいです。
https://github.com/infinispan/infinispan/blob/8.0.1.Final/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java#L83

こんな感じ。

         if (parallelDistribution) {
            submitAsyncTasks(uuid, targets, keysToExclude, parallelStream, keysToInclude, includeLoader, type,
                    operation);
         } else {
            for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
               // TODO: what if this throws exception?
               Set<Integer> targetSegments = targetInfo.getValue();
               Set<K> keysExcluded = determineExcludedKeys(keysToExclude, targetSegments);
               rpc.invokeRemotely(Collections.singleton(targetInfo.getKey()),
                       factory.buildStreamRequestCommand(uuid, parallelStream, type, targetSegments, keysToInclude,
                               keysExcluded, includeLoader, operation), rpc.getDefaultRpcOptions(true));
            }
         }

コンソール上の結果だけを見るとわかりませんが、parallelDistribtionがオンの場合は「各Remote Nodeに非同期に実行を依頼する」っぽくて、parallelDistributionがオフの場合は「各Remote Nodeに順次同期的に処理を依頼する」ってことみたいですね。

submitAsyncTasksメソッドの中では、CompletableFutureが登場します。

なるほどー。

まとめ

というわけで、Infinispan 8.0.0.Finalで追加されたDistributed Streamsを使って、ちょっとした動作確認と分散処理の確認をやってみました。

Replicated Cacheが分散しなくていいのかなとは思いますが、今後そのうち変わったりするのかな??それとも、ローカルで頑張る感じなのかな?

Lambda式で書く場合には交差型キャストがまあ面倒ですが、お手軽に分散処理ができるのはいいなぁと思います。既存のMap Reduce Frameworkを使うよりは、確かに簡単に使えますし。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distributed-streams