これは、なにをしたくて書いたもの?
QuarkusのInfinispan Client Extensionのガイドを読んでいて、ProtoStreamが使えるようになっていたことに気づきまして。
どうやら、0.23あたりからのようです。ちょっと試してみましょう。
ProtoStream?
Infinispan 10.0以降のデフォルトのMarshallerで、Protocol Buffers 2のスキーマ定義をアノテーションで書くことができます。
Infinispan ProtoStream Serialization Library
アノテーションからProtocol Buffers 2のスキーマ定義とMarshallerの実装を、自動生成します。
Infinispan 10.0でMarshallingがリファクタリングされたという話(Embedded Mode) - CLOVER🍀
以前のQuarkusのInfinispan Client ExtensionではこのProtoStreamが使えず、自分でProtocol Buffers 2のスキーマ定義を書く必要が
あったのですが、今はそうではなくなりました、と。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.6 2020-01-14 OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1) OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing) $ $GRAALVM_HOME/bin/java --version openjdk 11.0.6 2020-01-14 OpenJDK Runtime Environment GraalVM CE 19.3.1 (build 11.0.6+9-jvmci-19.3-b07) OpenJDK 64-Bit Server VM GraalVM CE 19.3.1 (build 11.0.6+9-jvmci-19.3-b07, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.6, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-76-generic", arch: "amd64", family: "unix"
Quarkus 1.2.0.FinalでGraalVM 19.3.1に対応したので、ネイティブイメージを含めて、Java 11が使えるようになりましたね。
Quarkus 1.2.0.Final released - GraalVM 19.3.1 support, Metrics, Cache extension, and more
Infinispan Serverも使いますが、こちらはQuarkusが依存しているInfinispanに合わせて、10.0.0.Finalを使用しました。
Infinispan Serverが動作しているサーバーは、172.17.0.2とします。
お題
以下のことをやってみましょう。
- 書籍をお題にして、Infinispan Serverにデータをput/get
- Continuous Query+JAX-RS(Server Sent Event)を使って、putされた書籍データを、指定の条件にマッチしたら受信する
- Ickle Query(index less)で検索
まずは、プロジェクトを作成します。Extensionは、RESTEasy JacksonとInfinispan Client。
$ mvn io.quarkus:quarkus-maven-plugin:1.2.0.Final:create \ -DprojectGroupId=org.littlewings \ -DprojectArtifactId=infinispan-client-protostream \ -Dextensions="resteasy-jackson,infinispan-client"
作成したプロジェクト内に移動。
$ cd infinispan-client-protostream
Javaのバージョンは11にします。
$ perl -wpi -e 's!>1.8<!>11<!' pom.xml
pom.xmlに書かれた依存関係は、こちら。
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-jackson</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-infinispan-client</artifactId> </dependency>
お題にあるContinuous Query+Server Sent Eventは、RxJava2を使って受け取ることにしましょう。
RESTEasy RxJava2を依存関係に追加します。
<dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-rxjava2</artifactId> </dependency>
QuarkusのBOMに記載があるので、バージョン指定は不要です。
RxJava Support · Issue #4278 · quarkusio/quarkus · GitHub
では、ソースコードを作成していきます。
書籍クラス。
src/main/java/org/littlewings/quarkus/infinispan/Book.java
package org.littlewings.quarkus.infinispan; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class Book { @ProtoField(number = 1, required = true) String isbn; @ProtoField(number = 2, required = true) String title; @ProtoField(number = 3, defaultValue = "0") int price; @ProtoFactory public Book(String isbn, String title, int price) { this.isbn = isbn; this.title = title; this.price = price; } // getter/setterは省略 }
このクラスに対応する、SerializationContextInitializerインターフェースの実装を作成。
src/main/java/org/littlewings/quarkus/infinispan/BookContextInitializer.java
package org.littlewings.quarkus.infinispan; import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; @AutoProtoSchemaBuilder(includeClasses = {Book.class}, schemaPackageName = "sample") public interface BookContextInitializer extends SerializationContextInitializer { }
パッケージは、「sample」にしました…。
Quarkusでは、@AutoProtoSchemaBuilderアノテーションでのschemaFileNameおよびschemaFilePathは不要なようです。
In Quarkus the schemaFileName and schemaFilePath attributes should NOT be set on the AutoProtoSchemaBuilder annotation, setting either will cause native runtime to error.
Annotation based Serialization
JAX-RSリソースクラス。
src/main/java/org/littlewings/quarkus/infinispan/BookResource.java
package org.littlewings.quarkus.infinispan; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import javax.annotation.PostConstruct; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; import io.quarkus.infinispan.client.Remote; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.Search; import org.infinispan.commons.marshall.ProtoStreamMarshaller; import org.infinispan.query.api.continuous.ContinuousQuery; import org.infinispan.query.api.continuous.ContinuousQueryListener; import org.infinispan.query.dsl.Query; import org.infinispan.query.remote.client.impl.MarshallerRegistration; import org.jboss.resteasy.annotations.Stream; @Path("book") public class BookResource { @Inject @Remote("bookCache") RemoteCache<String, Book> bookCache; @PostConstruct public void init() { // Continuous Query work-around MarshallerRegistration.init(((ProtoStreamMarshaller) bookCache.getRemoteCacheManager().getMarshaller()).getSerializationContext()); } // あとで }
中身は、順に書いていきましょう。
まずは、put/get。一応、全件取得も。
@GET @Produces(MediaType.APPLICATION_JSON) public List<Book> findAll() { return new ArrayList<>(bookCache.values()); } @PUT @Path("{isbn}") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response register(@PathParam("isbn") String isbn, Book book, @Context UriInfo uriInfo) { bookCache.put(isbn, book); return Response.created(uriInfo.getRequestUriBuilder().build()).build(); } @GET @Path("{isbn}") @Produces(MediaType.APPLICATION_JSON) public Book find(@PathParam("isbn") String isbn) { return bookCache.get(isbn); }
Continuous Query+Server Sent Event。
@GET @Path("stream") @Produces(MediaType.APPLICATION_JSON) @Stream public Flowable<Book> stream() { ContinuousQuery<String, Book> cq = Search.getContinuousQuery(bookCache); Function<FlowableEmitter<Book>, ContinuousQueryListener<String, Book>> listenerFactory = emitter -> new ContinuousQueryListener<>() { @Override public void resultJoining(String key, Book value) { emitter.onNext(value); } @Override public void resultUpdated(String key, Book value) { emitter.onNext(value); } }; AtomicReference<ContinuousQueryListener<String, Book>> listener = new AtomicReference<>(); return Flowable .<Book>create(emitter -> { listener.set(listenerFactory.apply(emitter)); cq.addContinuousQueryListener("from sample.Book where price >= 3500", listener.get()); }, BackpressureStrategy.BUFFER) .doOnTerminate(() -> cq.removeContinuousQueryListener(listener.get())); }
RxJava2をここで使います。
RESTEasy × RxJava 2を試す - CLOVER🍀
3,500円以上の書籍が登録、更新された返すようにしました。
return Flowable .<Book>create(emitter -> { listener.set(listenerFactory.apply(emitter)); cq.addContinuousQueryListener("from sample.Book where price >= 3500", listener.get()); }, BackpressureStrategy.BUFFER) .doOnTerminate(() -> cq.removeContinuousQueryListener(listener.get()));
検索。titleをLIKE検索、priceを指定以上の値で検索することにしました。
@GET @Path("search") @Produces(MediaType.APPLICATION_JSON) public List<Book> search(@QueryParam("title") String title, @QueryParam("price") int price) { Query query = Search .getQueryFactory(bookCache) .create(String.format("from %s where title like :title and price >= :price order by price desc", "sample.Book")); Map<String, Object> parameters = Map.of("title", "%" + title + "%", "price", price); query.setParameters(parameters); return query.list(); }
Full Text Searchは、今回はパス。
これで、アプリケーションの作成は完了です。
Infinispan ServerにCacheを作成する
先ほど作成したJAX-RSリソースクラスでは、「bookCache」という名前のCacheがInfinispan Serverに必要です。
@Path("book") public class BookResource { @Inject @Remote("bookCache") RemoteCache<String, Book> bookCache;
こちらを、Infinispan Serverに付属しているCLIで作成します。
Using the Infinispan Command Line Interface
Infinispan Serverに接続。
$ bin/cli.sh [disconnected]> connect 172.17.0.2:11222 [infinispan-server-50607@cluster//containers/default]>
Cacheの作成。今回は、デフォルトのDistributed Cacheのテンプレートから作成します。
[infinispan-server-50607@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC bookCache
確認。
[infinispan-server-50607@cluster//containers/default]> describe caches/bookCache # もしくは describe /containers/default/caches/bookCache { "distributed-cache" : { "mode" : "SYNC", "remote-timeout" : 17500, "state-transfer" : { "timeout" : 60000 }, "transaction" : { "mode" : "NONE" }, "locking" : { "concurrency-level" : 1000, "acquire-timeout" : 15000, "striping" : false }, "statistics" : true } }
これで、準備完了です。
確認する
では、パッケージングして確認しましょう。
$ mvn package # ネイティブイメージの場合 $ mvn package -P native
$ java -jar target/infinispan-client-protostream-1.0-SNAPSHOT-runner.jar # ネイティブイメージの場合 $ ./target/infinispan-client-protostream-1.0-SNAPSHOT-runner
まず、Continous Query+Server Sent Eventでデータを受け取るための接続をします。
$ curl -i localhost:8080/book/stream HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: application/x-stream-general;element-type="application/json"
そのまま応答待ちになります。
別のターミナルから、データを登録。
$ curl -i -XPUT -H 'Content-Type: application/json' localhost:8080/book/978-4798124605 -d '{"isbn": "978-4798124605", "title": "Beginning Java EE 6", "price": 3891}' $ curl -i -XPUT -H 'Content-Type: application/json' localhost:8080/book/978-4774183169 -d '{"isbn": "978-4774183169", "title": "パーフェクト Java EE", "price": 3456}' $ curl -i -XPUT -H 'Content-Type: application/json' localhost:8080/book/978-4798140926 -d '{"isbn": "978-4798140926", "title": "Java EE 7徹底入門", "price": 4104}'
この時、先ほど応答待ちになっていたターミナルに、データが表示されます。価格が3,500円を下回るものについては、表示されていませんね。
data: {"isbn":"978-4798124605","title":"Beginning Java EE 6","price":3891} data: {"isbn":"978-4798140926","title":"Java EE 7徹底入門","price":4104}
1件取得。
$ curl -s localhost:8080/book/978-4798124605 | jq { "isbn": "978-4798124605", "title": "Beginning Java EE 6", "price": 3891 }
全件取得。
$ curl -s localhost:8080/book | jq [ { "isbn": "978-4798124605", "title": "Beginning Java EE 6", "price": 3891 }, { "isbn": "978-4798140926", "title": "Java EE 7徹底入門", "price": 4104 }, { "isbn": "978-4774183169", "title": "パーフェクト Java EE", "price": 3456 } ]
検索。
$ curl -s 'localhost:8080/book/search?title=Beginning&price=3000' | jq [ { "isbn": "978-4798124605", "title": "Beginning Java EE 6", "price": 3891 } ] $ curl -s 'localhost:8080/book/search?title=Java&price=3000' | jq [ { "isbn": "978-4798140926", "title": "Java EE 7徹底入門", "price": 4104 }, { "isbn": "978-4798124605", "title": "Beginning Java EE 6", "price": 3891 }, { "isbn": "978-4774183169", "title": "パーフェクト Java EE", "price": 3456 } ] $ curl -s 'localhost:8080/book/search?title=Java&price=4000' | jq [ { "isbn": "978-4798140926", "title": "Java EE 7徹底入門", "price": 4104 } ]
ひととおり、確認できました、と。
Continuous Queryでちょっとハマったこと
実は、Continous Queryでちょっとハマりまして。
JAX-RSリソースクラスに書いていた、こちら。
@PostConstruct public void init() { // Continuous Query work-around MarshallerRegistration.init(((ProtoStreamMarshaller) bookCache.getRemoteCacheManager().getMarshaller()).getSerializationContext()); }
現状、これを入れずに上記の手順をそのまま実行すると、Continuous Queryで条件に合致したデータを受信した際に、こんな
エラーを見ることになります。
Caused by: java.lang.IllegalArgumentException: No marshaller registered for Protobuf type org.infinispan.query.remote.client.ContinuousQueryResult at org.infinispan.protostream.impl.SerializationContextImpl.getMarshallerDelegate(SerializationContextImpl.java:267) at org.infinispan.protostream.WrappedMessage.readMessage(WrappedMessage.java:386) at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:163) at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:158) at org.infinispan.client.hotrod.event.impl.ContinuousQueryImpl$ClientEntryListener.handleEvent(ContinuousQueryImpl.java:118) ... 32 more
ContinuousQueryResultに対する、Marshallerがない、と…。
該当するMarshallerは、こちら。
Queryに関するMarshallerは、こちらのクラスでSerializationContextに登録されます。
で、それがいつ登録されるかというと、RemoteQueryFactoryのインスタンスが作られる時。
Search#getQueryFactoryが実行される時であり、Search#getContinuousQueryの時ではありません。
つまり、Ickle QueryやQuery DSLを実行した後であれば、Continuous Queryはうまくいきますし、今回のサンプルのように先に
Continuous Queryを実行してしまうとMarshallerがないので失敗します。
というわけで、今回は自分で登録しました、と。
あとでInfinispan側のコードも、もうちょっと見てましょうかね…。