CLOVER🍀

That was when it all began.

Lambda式をシリアライズする

少し前にLambda式のシリアライズってどうなの?という話があって、Lambda式が表現するクラスまたはインターフェースがSerializableでなければシリアライズはできないという話ではあったのですが、ちょっと気になって調べてみました。

すでにシリアライズしてみたよ、という方がいらっしゃったので、こちらをまずは参考に。

Java 8 ラムダ式の実装メソッド名を取得 - SerializedLambda
http://fits.hatenablog.com/entry/2014/04/13/134406

シリアライズされたLambda式は、SerializedLambdaというクラスで表現されるそうです。

java.lang.invoke.SerializedLambda
http://docs.oracle.com/javase/jp/8/docs/api/java/lang/invoke/SerializedLambda.html

こんなクラスがあったんですね…。

さらに参考)
ラムダと invokedynamic の蜜月
http://www.slideshare.net/miyakawataku/lambda-meets-invokedynamic

先ほどの参照エントリは、このためにSerializableを拡張したインターフェースを定義していますが、このあたりを含めて自分で追っていきたいと思います。

なお、テストコードの形で示していきますが、テストコードではJUnitとAssertJを使用します。

FunctionインターフェースのLambda式を書く

まずは、単純なLambda式を書いてみます。

    @Test
    public void testLambda() {
        int x = 2;
        Function<Integer, Integer> doubling = y -> x * y;

        assertThat(doubling.apply(5))
                .isEqualTo(10);
    }

これ自体は普通ですね?

Functionインターフェースに対して表現したLambda式をシリアライズする

続いて、先ほどのコードをシリアライズしてみます。

    @Test
    public void testNoSerializationLambda() throws IOException {
        int x = 2;
        Function<Integer, Integer> doubling = y -> x * y;

        assertThatThrownBy(() -> {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
                oos.writeObject(doubling);
            }
        })
                .isInstanceOf(NotSerializableException.class)
                .hasMessageContaining("$$Lambda");
    }

すると、FunctionインターフェースはSerializableではないので、シリアライズに失敗し例外が飛んできます。

これをなんとかしようというのが、今回のテーマです。

Functionインターフェースに、Serializableインターフェースを追加する

これを解決するために単純に思いつくのは、FunctionインターフェースにSerializableインターフェースを追加した新たなインタフェースを追加すること。

    public interface SerializedFunction<A, B> extends Function<A, B>, Serializable { }

最初に出した参考エントリに記載されていたのが、この方法です。

この方法であれば、シリアライズ、デシリアライズともに成功します。

    @Test
    public void testSerializableMixInterface() throws IOException, ReflectiveOperationException {
        int x = 2;
        SerializedFunction<Integer, Integer> doubling = y -> x * y;

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(doubling);
        }

        byte[] binary = baos.toByteArray();

        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(binary))) {
            @SuppressWarnings("unchecked")
            Function<Integer, Integer> deserialized = (Function<Integer, Integer>) ois.readObject();
            assertThat(deserialized.apply(5))
                    .isEqualTo(10);
        }
    }

とはいえ、これだと型が増えてしまう…。

交差型キャストを利用する

ここで利用するのが、交差型(Intersection Type)キャストというものらしいです。

JavaSE8リリース記念!マイナーな言語仕様を紹介してみる(交差型キャスト,レシーバパラメータ(仮引数にthis))
http://d.hatena.ne.jp/bitter_fox/20140319/1395221764

Javaラムダ式 - シリアライズ
http://www.ne.jp/asahi/hishidama/home/tech/java/lambda.html#h_serialize

交差型キャスト…名前は聞いたことありましたけど、あんまり覚えてませんでした…。こういうところで使うんですね…。

交差型キャストを使うと、型を合成することができて、Lambda式で使う場合は合成後の型が関数型インターフェースであればよいらしい、と。

というわけで、使ってみます。

    @Test
    public void testSerializationLambda() throws IOException, ReflectiveOperationException {
        int x = 2;
        Function<Integer, Integer> doubling =
                (Function<Integer, Integer> & Serializable) y -> x * y;

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(doubling);
        }

        byte[] binary = baos.toByteArray();

        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(binary))) {
            @SuppressWarnings("unchecked")
            Function<Integer, Integer> deserialized = (Function<Integer, Integer>) ois.readObject();
            assertThat(deserialized.apply(5))
                    .isEqualTo(10);
        }
    }

おお、できました!

SerializedLambda

冒頭でSerializedLambdaというクラスがあるよという話だったのですが、これについて書いてみます。

Lambda式でSerializableを含めるようにすると、Lambda式で定義されたクラスがwriteReplaceメソッドを持つようになります。

java.io.Serializable
http://docs.oracle.com/javase/jp/8/docs/api/java/io/Serializable.html

writeReplaceメソッドはprivateスコープなので、外からアクセスできないのですが、リフレクションで呼び出すことで強引にSerializedLambdaを取得できます。

    @Test
    public void testAboutSerializableLambda() throws ReflectiveOperationException {
        int x = 2;
        Function<Integer, Integer> doubling =
                (Function<Integer, Integer> & Serializable) y -> x * y;

        Class<?> clazz = doubling.getClass();

        assertThat(clazz)
                .isNotInstanceOfAny(SerializedLambda.class);

        Method writeReplaceMethod = clazz.getDeclaredMethod("writeReplace");
        writeReplaceMethod.setAccessible(true);

        SerializedLambda serializedLambda = (SerializedLambda) writeReplaceMethod.invoke(doubling);

        assertThat(serializedLambda.getClass())
                .isAssignableFrom(SerializedLambda.class);

        assertThat(serializedLambda.getCapturedArgCount())
                .isEqualTo(1);
        assertThat(serializedLambda.getCapturingClass())
                .isEqualTo(LambdaSerializationTest.class.getName().replace('.', '/'));

        assertThat(serializedLambda.getFunctionalInterfaceClass())
                .isEqualTo(Function.class.getName().replace('.', '/'));
        assertThat(serializedLambda.getFunctionalInterfaceMethodName())
                .isEqualTo("apply");
        assertThat(serializedLambda.getFunctionalInterfaceMethodSignature())
                .isEqualTo("(Ljava/lang/Object;)Ljava/lang/Object;");

        assertThat(serializedLambda.getImplClass())
                .isEqualTo(LambdaSerializationTest.class.getName().replace('.', '/'));
        assertThat(serializedLambda.getImplMethodName())
                .contains("lambda$testAboutSerializableLambda$");
        assertThat(serializedLambda.getImplMethodSignature())
                .isEqualTo("(ILjava/lang/Integer;)Ljava/lang/Integer;");

        assertThat(serializedLambda.getInstantiatedMethodType())
                .isEqualTo("(Ljava/lang/Integer;)Ljava/lang/Integer;");
    }

SerializedLambdaから取得できる値を見ることで、どのクラスで定義されていたのかということや、メソッドのシグネチャなどを知ることができます。なお、クラス名が返ってくるものについては、パッケージの区切りが「.」ではなくて「/」になるようです。

なお、Lambda式そのものはSerializedLambdaではありません。

        Function<Integer, Integer> doubling =
                (Function<Integer, Integer> & Serializable) y -> x * y;

        Class<?> clazz = doubling.getClass();

        assertThat(clazz)
                .isNotInstanceOfAny(SerializedLambda.class);

Lambda式に、シリアライズできないものが含まれていると?

ちょっと気になるのが、Lambda式で取り込んだ中にシリアライズ不可のものが含まれていた場合にどうなるか、です。だいたい予想できますが、これはシリアライズに失敗します。

ここは、地味にSerializableでなくてたまに踏む人がいると思われる、ArrayList#subListに登場していただきます。

    @Test
    public void testSerializableLambdaWithNonSerializableClass() throws IOException, ReflectiveOperationException {
        List<String> tokens =
                new ArrayList<>(Arrays.asList("Hello", "World")).subList(0, 2);

        Supplier<String> factory =
                (Supplier<String> & Serializable) () -> tokens.stream().collect(Collectors.joining(", "));

        assertThatThrownBy(() -> {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
                oos.writeObject(factory);
            }
        })
                .isInstanceOf(NotSerializableException.class)
                .hasMessageContaining("java.util.ArrayList$SubList");
    }

予想に違わず、失敗です。

もちろん、ArrayListならばOKです。

    @Test
    public void testSerializableLambdaWithSerializableClass() throws IOException, ReflectiveOperationException {
        List<String> tokens = new ArrayList<>(Arrays.asList("Hello", "World"));

        Supplier<String> factory =
                (Supplier<String> & Serializable) () -> tokens.stream().collect(Collectors.joining(", "));

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(factory);
        }

        byte[] binary = baos.toByteArray();

        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(binary))) {
            @SuppressWarnings("unchecked")
            Supplier<String>  deserialized = (Supplier<String> ) ois.readObject();
            assertThat(deserialized.get())
                    .isEqualTo("Hello, World");
        }
    }

Lambda式をシリアライズするのはどうなの?

JSR-335に、シリアライズについてちょっと書いてあります。

JSR 335: Lambda Expressions for the Java Programming Language
https://jcp.org/en/jsr/detail?id=335

ここに書いている内容を見ていると、Lambda式で表現されるクラスの名前がコンパイラ依存だったり、ソースコードの変更が簡単にデシリアライズに影響する、またシリアライズのメカニズムをコントロールすることもできないため、けっこう消極的に見えます。

実際にシリアライズするとなると、やっぱりちゃんとクラスとして定義すべきな印象を受けました。

インメモリ・データグリッドにLambda式を放り込む

で、これだけで終わるのもなんなので、シリアライズしてLambda式を転送するとなるとやっぱり分散系ですよね。

ということで、インメモリ・データグリッドにLambda式を放り込んでみました。

ここでは、HazelcastのDistributed Executorを使用します。

Hazelcast
http://hazelcast.org/

Hazelcastのインスタンスを複数起動し、このNodo間でシリアライズされたCallableを転送し、結果を受け取ろうというもの。

Hazelcastを使用するための、Maven依存関係。

        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>3.4.2</version>
            <scope>test</scope>
        </dependency>

テストコードの雛形は、このように用意。
src/test/java/org/littlewings/lambda/HazelcastDistributedExecLambdaTest.java

package org.littlewings.lambda;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;
import com.hazelcast.nio.serialization.HazelcastSerializationException;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HazelcastDistributedExecLambdaTest {
    // ここに、テストを書く!

    protected void withHazelcast(int clusterSize, Consumer<HazelcastInstance> f) {
        List<HazelcastInstance> hazelcastInstances =
                IntStream
                        .rangeClosed(1, clusterSize)
                        .mapToObj(i -> Hazelcast.newHazelcastInstance())
                        .collect(Collectors.toList());

        try {
            f.accept(hazelcastInstances.get(0));
        } finally {
            hazelcastInstances.forEach(h -> h.getLifecycleService().shutdown());
            Hazelcast.shutdownAll();
        }
    }
}

単一のJava VMでクラスタが構成可能な、簡易メソッドを含んでいます。

では、まずは非SerializableなCallableを使って確認。ここでは、Node数は2つとしました。

    @Test
    public void testNoSerializedCall() {
        int clusterSize = 2;

        assertThatThrownBy(() -> {
            withHazelcast(clusterSize, hazelcast -> {
                IExecutorService executorService = hazelcast.getExecutorService("default");
                executorService.submitToAllMembers(() -> "Hello Cluster Callable!");
            });
        })
                .isInstanceOfAny(HazelcastSerializationException.class);
    }

例外はHazelcast独自のものですが、実行には失敗します。

今度は、交差型キャストを使用して実行。クラスタ内のNode数は、4つとしました。

    @Test
    public void testSerializedCall() {
        int clusterSize = 4;

        withHazelcast(clusterSize, hazelcast -> {
            IExecutorService executorService = hazelcast.getExecutorService("default");
            Map<Member, Future<String>> futures =
                    executorService.submitToAllMembers(
                            (Callable<String> & Serializable) () -> "Hello Cluster Callable!"
                    );

            List<String> messages =
                    futures
                            .entrySet()
                            .stream()
                            .map(entry -> {
                                try {
                                    return entry.getValue().get();
                                } catch (ExecutionException | InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .collect(Collectors.toList());

            assertThat(messages)
                    .hasSize(clusterSize)
                    .containsSequence("Hello Cluster Callable!");
        });
    }

各Nodeから、同じメッセージが返ってきますが、Node数と同じ4つが返ってきたことが確認できます。

ということで、とりあえずクラスタでCallableを投げるというサンプルを書いてみましたが…やっぱりちょっと微妙です。Node内でグリッドのリソースにアクセスしたりすることを通常書くと思うのですが、(Hazelcastのインターフェースでは)Lambda式で表現することはできないですし、Map Reduceも今回はインターフェースが合いませんでした。

あくまでインターフェースの話なので、他の分散処理系のフレームワークを持ってきたらまた事情が変わると思いますが、あまり長々とLambda式に書くものでもないと思いますので、ご利用はほどほどに、というか積極的に使う理由はないと思います。