CLOVER🍀

That was when it all began.

ProtoStreamが使えるようになった、Quarkus+Infinispan Client(Hot Rod) Extensionを試す

これは、なにをしたくて書いたもの?

QuarkusのInfinispan Client Extensionのガイドを読んでいて、ProtoStreamが使えるようになっていたことに気づきまして。

Quarkus - Infinispan Client

どうやら、0.23あたりからのようです。ちょっと試してみましょう。

ProtoStream?

Infinispan 10.0以降のデフォルトのMarshallerで、Protocol Buffers 2のスキーマ定義をアノテーションで書くことができます。

ProtoStream (Default)

Infinispan ProtoStream Serialization Library

アノテーションからProtocol Buffers 2のスキーマ定義とMarshallerの実装を、自動生成します。

Infinispan 10.0でMarshallingがリファクタリングされたという話(Embedded Mode) - CLOVER🍀

以前のQuarkusのInfinispan Client ExtensionではこのProtoStreamが使えず、自分でProtocol Buffers 2のスキーマ定義を書く必要が
あったのですが、今はそうではなくなりました、と。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.6 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)


$ $GRAALVM_HOME/bin/java --version
openjdk 11.0.6 2020-01-14
OpenJDK Runtime Environment GraalVM CE 19.3.1 (build 11.0.6+9-jvmci-19.3-b07)
OpenJDK 64-Bit Server VM GraalVM CE 19.3.1 (build 11.0.6+9-jvmci-19.3-b07, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.6, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-76-generic", arch: "amd64", family: "unix"

Quarkus 1.2.0.FinalでGraalVM 19.3.1に対応したので、ネイティブイメージを含めて、Java 11が使えるようになりましたね。

Quarkus 1.2.0.Final released - GraalVM 19.3.1 support, Metrics, Cache extension, and more

Infinispan Serverも使いますが、こちらはQuarkusが依存しているInfinispanに合わせて、10.0.0.Finalを使用しました。
Infinispan Serverが動作しているサーバーは、172.17.0.2とします。

お題

以下のことをやってみましょう。

  • 書籍をお題にして、Infinispan Serverにデータをput/get
  • Continuous Query+JAX-RS(Server Sent Event)を使って、putされた書籍データを、指定の条件にマッチしたら受信する
  • Ickle Query(index less)で検索

まずは、プロジェクトを作成します。Extensionは、RESTEasy JacksonとInfinispan Client。

$ mvn io.quarkus:quarkus-maven-plugin:1.2.0.Final:create \
    -DprojectGroupId=org.littlewings \
    -DprojectArtifactId=infinispan-client-protostream \
    -Dextensions="resteasy-jackson,infinispan-client"

作成したプロジェクト内に移動。

$ cd infinispan-client-protostream

Javaのバージョンは11にします。

$ perl -wpi -e 's!>1.8<!>11<!' pom.xml

pom.xmlに書かれた依存関係は、こちら。

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-junit5</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.rest-assured</groupId>
      <artifactId>rest-assured</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-jackson</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-infinispan-client</artifactId>
    </dependency>

お題にあるContinuous Query+Server Sent Eventは、RxJava2を使って受け取ることにしましょう。

RESTEasy RxJava2を依存関係に追加します。

    <dependency>
      <groupId>org.jboss.resteasy</groupId>
      <artifactId>resteasy-rxjava2</artifactId>
    </dependency>

QuarkusのBOMに記載があるので、バージョン指定は不要です。

RxJava Support · Issue #4278 · quarkusio/quarkus · GitHub

では、ソースコードを作成していきます。

書籍クラス。
src/main/java/org/littlewings/quarkus/infinispan/Book.java

package org.littlewings.quarkus.infinispan;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class Book {
    @ProtoField(number = 1, required = true)
    String isbn;

    @ProtoField(number = 2, required = true)
    String title;

    @ProtoField(number = 3, defaultValue = "0")
    int price;

    @ProtoFactory
    public Book(String isbn, String title, int price) {
        this.isbn = isbn;
        this.title = title;
        this.price = price;
    }

    // getter/setterは省略
}

このクラスに対応する、SerializationContextInitializerインターフェースの実装を作成。
src/main/java/org/littlewings/quarkus/infinispan/BookContextInitializer.java

package org.littlewings.quarkus.infinispan;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(includeClasses = {Book.class}, schemaPackageName = "sample")
public interface BookContextInitializer extends SerializationContextInitializer {
}

パッケージは、「sample」にしました…。

Quarkusでは、@AutoProtoSchemaBuilderアノテーションでのschemaFileNameおよびschemaFilePathは不要なようです。

In Quarkus the schemaFileName and schemaFilePath attributes should NOT be set on the AutoProtoSchemaBuilder annotation, setting either will cause native runtime to error.

Annotation based Serialization

JAX-RSリソースクラス。
src/main/java/org/littlewings/quarkus/infinispan/BookResource.java

package org.littlewings.quarkus.infinispan;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import io.quarkus.infinispan.client.Remote;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.remote.client.impl.MarshallerRegistration;
import org.jboss.resteasy.annotations.Stream;

@Path("book")
public class BookResource {
    @Inject
    @Remote("bookCache")
    RemoteCache<String, Book> bookCache;

    @PostConstruct
    public void init() {
        // Continuous Query work-around
        MarshallerRegistration.init(((ProtoStreamMarshaller) bookCache.getRemoteCacheManager().getMarshaller()).getSerializationContext());
    }

    // あとで
}

中身は、順に書いていきましょう。

まずは、put/get。一応、全件取得も。

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public List<Book> findAll() {
        return new ArrayList<>(bookCache.values());
    }

    @PUT
    @Path("{isbn}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response register(@PathParam("isbn") String isbn, Book book, @Context UriInfo uriInfo) {
        bookCache.put(isbn, book);

        return Response.created(uriInfo.getRequestUriBuilder().build()).build();
    }

    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public Book find(@PathParam("isbn") String isbn) {
        return bookCache.get(isbn);
    }

Continuous Query+Server Sent Event。

    @GET
    @Path("stream")
    @Produces(MediaType.APPLICATION_JSON)
    @Stream
    public Flowable<Book> stream() {
        ContinuousQuery<String, Book> cq = Search.getContinuousQuery(bookCache);

        Function<FlowableEmitter<Book>, ContinuousQueryListener<String, Book>> listenerFactory =
                emitter ->
                        new ContinuousQueryListener<>() {
                            @Override
                            public void resultJoining(String key, Book value) {
                                emitter.onNext(value);
                            }

                            @Override
                            public void resultUpdated(String key, Book value) {
                                emitter.onNext(value);
                            }
                        };

        AtomicReference<ContinuousQueryListener<String, Book>> listener = new AtomicReference<>();

        return Flowable
                .<Book>create(emitter -> {
                            listener.set(listenerFactory.apply(emitter));
                            cq.addContinuousQueryListener("from sample.Book where price >= 3500", listener.get());
                        },
                        BackpressureStrategy.BUFFER)
                .doOnTerminate(() -> cq.removeContinuousQueryListener(listener.get()));
    }

RxJava2をここで使います。

RESTEasy × RxJava 2を試す - CLOVER🍀

3,500円以上の書籍が登録、更新された返すようにしました。

        return Flowable
                .<Book>create(emitter -> {
                            listener.set(listenerFactory.apply(emitter));
                            cq.addContinuousQueryListener("from sample.Book where price >= 3500", listener.get());
                        },
                        BackpressureStrategy.BUFFER)
                .doOnTerminate(() -> cq.removeContinuousQueryListener(listener.get()));

検索。titleをLIKE検索、priceを指定以上の値で検索することにしました。

    @GET
    @Path("search")
    @Produces(MediaType.APPLICATION_JSON)
    public List<Book> search(@QueryParam("title") String title, @QueryParam("price") int price) {
        Query query =
                Search
                        .getQueryFactory(bookCache)
                        .create(String.format("from %s where title like :title and price >= :price order by price desc", "sample.Book"));

        Map<String, Object> parameters = Map.of("title", "%" + title + "%", "price", price);
        query.setParameters(parameters);

        return query.list();
    }

Full Text Searchは、今回はパス。

これで、アプリケーションの作成は完了です。

Infinispan ServerにCacheを作成する

先ほど作成したJAX-RSリソースクラスでは、「bookCache」という名前のCacheがInfinispan Serverに必要です。

@Path("book")
public class BookResource {
    @Inject
    @Remote("bookCache")
    RemoteCache<String, Book> bookCache;

こちらを、Infinispan Serverに付属しているCLIで作成します。

Using the Infinispan Command Line Interface

Infinispan Serverに接続。

$ bin/cli.sh 
[disconnected]> connect 172.17.0.2:11222
[infinispan-server-50607@cluster//containers/default]>

Cacheの作成。今回は、デフォルトのDistributed Cacheのテンプレートから作成します。

[infinispan-server-50607@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC bookCache

確認。

[infinispan-server-50607@cluster//containers/default]> describe caches/bookCache  # もしくは describe /containers/default/caches/bookCache
{
  "distributed-cache" : {
    "mode" : "SYNC",
    "remote-timeout" : 17500,
    "state-transfer" : {
      "timeout" : 60000
    },
    "transaction" : {
      "mode" : "NONE"
    },
    "locking" : {
      "concurrency-level" : 1000,
      "acquire-timeout" : 15000,
      "striping" : false
    },
    "statistics" : true
  }
}

これで、準備完了です。

確認する

では、パッケージングして確認しましょう。

$ mvn package

# ネイティブイメージの場合
$ mvn package -P native
$ java -jar target/infinispan-client-protostream-1.0-SNAPSHOT-runner.jar

# ネイティブイメージの場合
$ ./target/infinispan-client-protostream-1.0-SNAPSHOT-runner

まず、Continous Query+Server Sent Eventでデータを受け取るための接続をします。

$ curl -i localhost:8080/book/stream
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/x-stream-general;element-type="application/json"

そのまま応答待ちになります。

別のターミナルから、データを登録。

$ curl -i -XPUT -H 'Content-Type: application/json' localhost:8080/book/978-4798124605 -d '{"isbn": "978-4798124605", "title": "Beginning Java EE 6", "price": 3891}'


$ curl -i -XPUT -H 'Content-Type: application/json' localhost:8080/book/978-4774183169 -d '{"isbn": "978-4774183169", "title": "パーフェクト Java EE", "price": 3456}'


$ curl -i -XPUT -H 'Content-Type: application/json' localhost:8080/book/978-4798140926 -d '{"isbn": "978-4798140926", "title": "Java EE 7徹底入門", "price": 4104}'

この時、先ほど応答待ちになっていたターミナルに、データが表示されます。価格が3,500円を下回るものについては、表示されていませんね。

data: {"isbn":"978-4798124605","title":"Beginning Java EE 6","price":3891}

data: {"isbn":"978-4798140926","title":"Java EE 7徹底入門","price":4104}

1件取得。

$ curl -s localhost:8080/book/978-4798124605 | jq
{
  "isbn": "978-4798124605",
  "title": "Beginning Java EE 6",
  "price": 3891
}

全件取得。

$ curl -s localhost:8080/book | jq
[
  {
    "isbn": "978-4798124605",
    "title": "Beginning Java EE 6",
    "price": 3891
  },
  {
    "isbn": "978-4798140926",
    "title": "Java EE 7徹底入門",
    "price": 4104
  },
  {
    "isbn": "978-4774183169",
    "title": "パーフェクト Java EE",
    "price": 3456
  }
]

検索。

$ curl -s 'localhost:8080/book/search?title=Beginning&price=3000' | jq
[
  {
    "isbn": "978-4798124605",
    "title": "Beginning Java EE 6",
    "price": 3891
  }
]


$ curl -s 'localhost:8080/book/search?title=Java&price=3000' | jq
[
  {
    "isbn": "978-4798140926",
    "title": "Java EE 7徹底入門",
    "price": 4104
  },
  {
    "isbn": "978-4798124605",
    "title": "Beginning Java EE 6",
    "price": 3891
  },
  {
    "isbn": "978-4774183169",
    "title": "パーフェクト Java EE",
    "price": 3456
  }
]


$ curl -s 'localhost:8080/book/search?title=Java&price=4000' | jq
[
  {
    "isbn": "978-4798140926",
    "title": "Java EE 7徹底入門",
    "price": 4104
  }
]

ひととおり、確認できました、と。

Continuous Queryでちょっとハマったこと

実は、Continous Queryでちょっとハマりまして。

JAX-RSリソースクラスに書いていた、こちら。

    @PostConstruct
    public void init() {
        // Continuous Query work-around
        MarshallerRegistration.init(((ProtoStreamMarshaller) bookCache.getRemoteCacheManager().getMarshaller()).getSerializationContext());
    }

現状、これを入れずに上記の手順をそのまま実行すると、Continuous Queryで条件に合致したデータを受信した際に、こんな
エラーを見ることになります。

Caused by: java.lang.IllegalArgumentException: No marshaller registered for Protobuf type org.infinispan.query.remote.client.ContinuousQueryResult
    at org.infinispan.protostream.impl.SerializationContextImpl.getMarshallerDelegate(SerializationContextImpl.java:267)
    at org.infinispan.protostream.WrappedMessage.readMessage(WrappedMessage.java:386)
    at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:163)
    at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:158)
    at org.infinispan.client.hotrod.event.impl.ContinuousQueryImpl$ClientEntryListener.handleEvent(ContinuousQueryImpl.java:118)
    ... 32 more

ContinuousQueryResultに対する、Marshallerがない、と…。

該当するMarshallerは、こちら。

https://github.com/infinispan/infinispan/blob/10.0.0.Final/remote-query/remote-query-client/src/main/java/org/infinispan/query/remote/client/impl/ContinuousQueryResult.java#L105

Queryに関するMarshallerは、こちらのクラスでSerializationContextに登録されます。

https://github.com/infinispan/infinispan/blob/10.0.0.Final/remote-query/remote-query-client/src/main/java/org/infinispan/query/remote/client/impl/MarshallerRegistration.java#L34-L42

で、それがいつ登録されるかというと、RemoteQueryFactoryのインスタンスが作られる時。

https://github.com/infinispan/infinispan/blob/10.0.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/query/RemoteQueryFactory.java#L32

Search#getQueryFactoryが実行される時であり、Search#getContinuousQueryの時ではありません。

https://github.com/infinispan/infinispan/blob/10.0.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Search.java#L23

つまり、Ickle QueryやQuery DSLを実行した後であれば、Continuous Queryはうまくいきますし、今回のサンプルのように先に
Continuous Queryを実行してしまうとMarshallerがないので失敗します。

というわけで、今回は自分で登録しました、と。

あとでInfinispan側のコードも、もうちょっと見てましょうかね…。