これは、なにをしたくて書いたもの?
前に、QuarkusのGraphQL Extensionで初めてGraphQLを使ってみました。
この時はQueryとMutationのみだったので、今回はSubscription(通知)を使ってみたいと思います。
Quarkus GraphQL Extension
Quarkus GraphQL Extensionのドキュメントは、こちら。
今回も、サーバー側のみです。
Subscriptionについての記載はこちら。
Implementing GraphQL Services / Subscriptions
Subscriptionは、まだ実験的な位置づけのようです。
Subscription is currently still considered experimental.
説明も書かれていますが、以下の実装になるようです。
WebSocketを使うんですね。quarkus.smallrye-graphql.websocket-subprotocols
プロパティでサブプロトコルを指定することができ、
graphql-ws
かgraphql-transport-ws
を使うようです(デフォルトは両方有効)。
Quarkus GraphQL Extensionは通常の型であってもリアクティブな型(Uni
やMulti
など)であってもGraphQLを扱えましたが、Subscritpionに
関してはSmallRye Mutinyのリアクティブな型を使うことが求められるようです。
QueryやMutationはリアクティブな型である必要はありませんが、こちらも同じようにリアクティブな型に合わせておくことにします。
ドキュメント内で登場するBroadcastProcessor
というのは、SmallRye Mutinyのドキュメントに説明があります。
こちらは、Hot Stream(サブスクライバーが不在の時でもストリームを作成する)を作るためのもので、名前のとおり複数の
サブスクライバーを紐付けることができます。
では、実際に使っていってみましょう。
環境
今回の環境は、こちらです。
$ java --version openjdk 17.0.2 2022-01-18 OpenJDK Runtime Environment (build 17.0.2+8-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.2+8-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.2, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-105-generic", arch: "amd64", family: "unix"
準備
では、プロジェクトを作成します。
$ mvn io.quarkus.platform:quarkus-maven-plugin:2.7.5.Final:create \ -DprojectGroupId=org.littlewings \ -DprojectArtifactId=graphql-subscription \ -DprojectVersion=0.0.1-SNAPSHOT \ -Dextensions="resteasy-reactive,graphql" \ -DnoCode
Extension Codestartによるアプリケーションコード生成は含みません。
選択されたExtensionとCodestart。
[INFO] selected extensions: - io.quarkus:quarkus-resteasy-reactive - io.quarkus:quarkus-smallrye-graphql [INFO] applying codestarts... [INFO] 📚 java 🔨 maven 📦 quarkus 📝 config-properties 🔧 dockerfiles 🔧 maven-wrapper
プロジェクト内に移動。
$ cd graphql-subscription
Maven依存関係はこちらなのですが、前回のエントリーよりRESTEasyは不要なことがわかっているので外しておきます。
<dependencies> <!-- <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy-reactive</artifactId> </dependency> --> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-smallrye-graphql</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-arc</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> <scope>test</scope> </dependency> </dependencies>
プログラムを作成する
今回のお題で使うエンティティは、前回同様に以下とします。
- カテゴリー
- 書籍
書籍は、なんらかのカテゴリーに属するものとします。
カテゴリー、書籍でそれぞれエンティティとリポジトリーを作成。
カテゴリー。
src/main/java/org/littlewings/quarkus/graphql/Category.java
package org.littlewings.quarkus.graphql; public class Category { Integer id; String name; // getter/setterは省略 }
src/main/java/org/littlewings/quarkus/graphql/CategoryRepository.java
package org.littlewings.quarkus.graphql; import java.util.Comparator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.enterprise.context.ApplicationScoped; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @ApplicationScoped public class CategoryRepository { ConcurrentMap<Integer, Category> categories = new ConcurrentHashMap<>(); public Uni<Category> save(Category category) { return Uni .createFrom() .item(categories.put(category.getId(), category)) .onItem() .transform(v -> category); } public Uni<Category> findById(Integer id) { return Uni .createFrom() .item(categories.get(id)); } public Multi<Category> findAll() { return Multi .createFrom() .iterable(categories.values().stream().sorted(Comparator.comparing(Category::getId)).toList()); } }
データは、インメモリで保持することにします。検索用のメソッドは、id
指定のものしか動作確認では使用しません(全件取得の方は、
デバッグ用途でした)。
書籍。
src/main/java/org/littlewings/quarkus/graphql/Book.java
package org.littlewings.quarkus.graphql; public class Book { String isbn; String title; Integer price; Integer categoryId; // getter/setterは省略 }
src/main/java/org/littlewings/quarkus/graphql/BookRepository.java
package org.littlewings.quarkus.graphql; import java.util.Comparator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.enterprise.context.ApplicationScoped; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @ApplicationScoped public class BookRepository { ConcurrentMap<String, Book> books = new ConcurrentHashMap<>(); public Uni<Book> save(Book book) { return Uni .createFrom() .item(books.put(book.getIsbn(), book)) .onItem() .transform(v -> book); } public Uni<Book> findByIsbn(String isbn) { return Uni .createFrom() .item(books.get(isbn)); } public Multi<Book> findAll() { return Multi .createFrom() .iterable(books.values().stream().sorted(Comparator.comparing(Book::getPrice).reversed()).toList()); } }
では、GraphQL APIリソースクラスを作成します。
QueryやMutationを使う時と同じく、SmallRye GraphQL(というかEclipse MicroProfile GraphQL)を使用していきます。
まずはカテゴリーから。
src/main/java/org/littlewings/quarkus/graphql/CategoryResource.java
package org.littlewings.quarkus.graphql; import java.util.List; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import io.smallrye.graphql.api.Subscription; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; import org.eclipse.microprofile.graphql.GraphQLApi; import org.eclipse.microprofile.graphql.Mutation; import org.eclipse.microprofile.graphql.Query; import org.jboss.logging.Logger; @GraphQLApi @ApplicationScoped public class CategoryResource { Logger logger = Logger.getLogger(CategoryResource.class); @Inject CategoryRepository categoryRepository; BroadcastProcessor<Category> categoryBroadcastProcessor = BroadcastProcessor.create(); @Mutation public Uni<Category> createCategory(Category category) { logger.infof("mutation create category, id = %d", category.getId()); return categoryRepository.save(category).onItem().invoke(categoryBroadcastProcessor::onNext); } @Query public Uni<Category> category(Integer id) { logger.infof("query category, id = %d", id); return categoryRepository.findById(id); } @Query public Uni<List<Category>> categories() { logger.infof("query categories"); return categoryRepository .findAll() .collect() .asList(); } @Subscription public Multi<Category> categoryCreated() { logger.infof("subscribe category created"); return categoryBroadcastProcessor; } }
書籍。
src/main/java/org/littlewings/quarkus/graphql/BookResource.java
package org.littlewings.quarkus.graphql; import java.util.List; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import io.smallrye.graphql.api.Subscription; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; import org.eclipse.microprofile.graphql.GraphQLApi; import org.eclipse.microprofile.graphql.Mutation; import org.eclipse.microprofile.graphql.Query; import org.eclipse.microprofile.graphql.Source; import org.jboss.logging.Logger; @GraphQLApi @ApplicationScoped public class BookResource { Logger logger = Logger.getLogger(BookResource.class); @Inject BookRepository bookRepository; @Inject CategoryRepository categoryRepository; BroadcastProcessor<Book> bookBroadcastProcessor = BroadcastProcessor.create(); @Mutation public Uni<Book> createBook(Book book) { logger.infof("mutation create book, isbn = %s", book.getIsbn()); return bookRepository.save(book).onItem().invoke(bookBroadcastProcessor::onNext); } @Query public Uni<Book> book(String isbn) { logger.infof("query book, isbn = %s", isbn); return bookRepository.findByIsbn(isbn); } @Query public Uni<List<Book>> books() { logger.infof("query books"); return bookRepository.findAll().collect().asList(); } public Category category(@Source Book book) { logger.infof("source book, isbn = %s, title = %s", book.getIsbn(), book.getTitle()); return categoryRepository.findById(book.getCategoryId()).await().indefinitely(); } @Subscription("simpleBookCreated") public Multi<Book> bookCreated() { logger.infof("subscribe book created"); return bookBroadcastProcessor; } @Subscription public Multi<Book> bookCreatedFilterCategory(Integer categoryId) { logger.infof("subscribe book created, category id = %d", categoryId); return bookBroadcastProcessor.select().where(b -> b.getCategoryId().equals(categoryId)); } }
まずはプログラムを載せましたが、説明を順に書いていきます。同じ説明で済むところは、カテゴリー側のコードを挙げていきます。
Subscriptionでのポイントになるのは、BroadcastProcessor
です。
BroadcastProcessor<Category> categoryBroadcastProcessor = BroadcastProcessor.create();
Mutationでデータが登録された時に、BroadcastProcessor#onNext
を呼び出して対象のデータを渡します。
@Mutation public Uni<Category> createCategory(Category category) { logger.infof("mutation create category, id = %d", category.getId()); return categoryRepository.save(category).onItem().invoke(categoryBroadcastProcessor::onNext); }
このBroadcastProcessor
を@Subscription
アノテーションが付与されたメソッドの戻り値とすることで、Subscriptionが実現できます。
@Subscription public Multi<Category> categoryCreated() { logger.infof("subscribe category created"); return categoryBroadcastProcessor; }
BroadcastProcessor
についてですが、SmallRye Mutinyの説明をもう1度見てみます。
Hot Streamでは、以下の挙動になるようです。
- サブスクライバーがストリームを監視していなくても、イベントを発行する
- サブスクライバーがいない場合は、アイテムは破棄される
- サブスクライバーは、サブスクライブを開始した後に発行されたアイテムを受け取り、それ以前のアイテムは受け取らない
In a hot stream, the stream exists before subscribers subscribe. The stream emits items even if no subscribers observe the stream. If there are no subscribers, the items are just dropped. Subscribers only get items emitted after their subscription, meaning that any previous items would not be received.
こう説明するとシンプルなのですが、Quarkus GraphQL Extensionのドキュメントに沿って書くと見事にハマってしまい。
この構成とする場合、GraphQL APIリソースクラスには@ApplicationScoped
アノテーションを付与しておく必要があります。
@GraphQLApi @ApplicationScoped public class CategoryResource {
こうしないと、GraphQL APIリソースクラスがリクエストの度にインスタンスが破棄され、新しいインスタンスが作られる挙動になるの
ですが、そうなってしまうとSubscriptionが継続できないからです。
書籍のGraphQL APIリソースクラスには、@Subscription
アノテーションを付与したメソッドにもう少し変更を入れています。
@Query
アノテーションなどと同じように、@Subscription
アノテーションもvalue
属性に値を指定することで名前を指定することが
できます。
@Subscription("simpleBookCreated") public Multi<Book> bookCreated() { logger.infof("subscribe book created"); return bookBroadcastProcessor; }
デフォルトでは、メソッド名がそのまま使われるのも@Query
アノテーションなどと同じです。
また、Subscriptionにもパラメーターを取ることができるので、書籍ではこちらの例も追加してみました。
@Subscription public Multi<Book> bookCreatedFilterCategory(Integer categoryId) { logger.infof("subscribe book created, category id = %d", categoryId); return bookBroadcastProcessor.select().where(b -> b.getCategoryId().equals(categoryId)); }
このコードでは、通知する書籍データを特定のカテゴリーのIDのものにフィルタリングしています。
@Source
アノテーションを付与したメソッドも加えて、関連するデータも取得してみましょう。
public Category category(@Source Book book) { logger.infof("source book, isbn = %s, title = %s", book.getIsbn(), book.getTitle()); return categoryRepository.findById(book.getCategoryId()).await().indefinitely(); }
スキーマ定義を見てみる
ここでパッケージングして
$ mvn package
起動。
$ java -jar target/quarkus-app/quarkus-run.jar
スキーマ定義を見てみます。
localhost:8080/graphql/schema.graphql type Book { category: Category categoryId: Int isbn: String price: Int title: String } type Category { id: Int name: String } "Mutation root" type Mutation { createBook(book: BookInput): Book createCategory(category: CategoryInput): Category } "Query root" type Query { book(isbn: String): Book books: [Book] categories: [Category] category(id: Int): Category } "Subscription root" type Subscription { bookCreatedFilterCategory(categoryId: Int): Book categoryCreated: Category simpleBookCreated: Book } input BookInput { categoryId: Int isbn: String price: Int title: String } input CategoryInput { id: Int name: String }
Subscriptionは、この部分ですね。
"Subscription root" type Subscription { bookCreatedFilterCategory(categoryId: Int): Book categoryCreated: Category simpleBookCreated: Book }
simpleBookCreated
は@Subscription
アノテーションのvalue
属性で指定した値が名前になっており、それ以外はメソッド名がそのまま
反映されていることが確認できます。
動作確認する
動作確認には、Altair GraphQL ClientのGoogle Chrome向けextensionを使うことにします。
Altair GraphQL Client - Chrome ウェブストア
デフォルトの設定のまま起動しているので、GraphQLのエンドポイントはhttp://localhost:8080/graphql
になります。
とりあえず、スキーマを認識してもらいましょう。
次に、SubscriptionのURLを登録します。
`
URLは、ws://localhost:8080/graphql
になります。他のGraphQLのエンドポイントと同じですね。これが最初わからずに、情報を探し回ったり
しましたけど…。
では、Subscriptionに登録してみます。
subscription subscribeCategory { categoryCreated { id name } }
使用するSubscriptionは、CategoryResource
の以下のメソッドですね。
@Subscription public Multi<Category> categoryCreated() { logger.infof("subscribe category created"); return categoryBroadcastProcessor; }
ここで、別のウィンドウを開いてデータを登録してみます。
mutation createJavaCategory { createCategory(category: { id: 1 name: "java" }) { id name } }
すると、Subscriptionを登録した側にデータが表示されます。
動作しましたね。
実際のデータは、こんな感じです。
{ "data": { "categoryCreated": { "id": 1, "name": "java" } } }
ちなみに、この時に使われているWebSocketのサブプロトコルはgraphql-ws
だったようです。
sec-websocket-protocol: graphql-ws
ちなみに、以下のようにプロトコルを切り替えると
graphql-transport-ws
になります。
sec-websocket-protocol: graphql-transport-ws
ここで、もうひとつSubscriptionを登録。
subscription subscribeCategory2 { categoryCreated { id name } }
データをもう1件登録してみます。
mutation createMysqlCategory { createCategory(category: { id: 2 name: "mysql" }) { id name } }
すると、先に登録しておいた方には前のデータと含めて2件、
新しく登録した方には、Subscription登録後にのみ追加したデータだけが表示されます。
これで、ひとつのSubscriptionに対して複数サブスクライブできることを確認できました。
次に、書籍向けのSubscriptionを使ってみます。カテゴリーのデータは登録したままです。
まずは、書籍が登録されればどのカテゴリーに属する書籍であっても通知されるSubscription。
subscription subscribeAllBooks { simpleBookCreated { isbn title price categoryId category { id name } } }
カテゴリーのデータも取得しましょう。
BookResource
でのソースコードとしては、こちらですね。
public Category category(@Source Book book) { logger.infof("source book, isbn = %s, title = %s", book.getIsbn(), book.getTitle()); return categoryRepository.findById(book.getCategoryId()).await().indefinitely(); } @Subscription("simpleBookCreated") public Multi<Book> bookCreated() { logger.infof("subscribe book created"); return bookBroadcastProcessor; }
次に、指定のカテゴリーに属する書籍であれば通知されるSubscription。
subscription subscribeBooksFilterJavaCategory {
bookCreatedFilterCategory(categoryId: 1) {
isbn
title
price
categoryId
category {
id
name
}
}
}
カテゴリーのIDは1にしたので、java
カテゴリーですね。
BookResource
でのソースコードとしてはこちらで、先ほどと同様に関連するカテゴリーのデータも取得します。
public Category category(@Source Book book) { logger.infof("source book, isbn = %s, title = %s", book.getIsbn(), book.getTitle()); return categoryRepository.findById(book.getCategoryId()).await().indefinitely(); } // 省略 @Subscription public Multi<Book> bookCreatedFilterCategory(Integer categoryId) { logger.infof("subscribe book created, category id = %d", categoryId); return bookBroadcastProcessor.select().where(b -> b.getCategoryId().equals(categoryId)); }
では、書籍を登録します。
java
カテゴリーで3件登録。
mutation createJavaBook1 { createBook(book: { isbn: "978-4621303252" title: "Effective Java 第3版" price: 4400 categoryId: 1 }) { isbn title price categoryId } }
mutation createJavaBook2 { createBook(book: { isbn: "978-4774189093" title: "Java本格入門 ~モダンスタイルによる基礎からオブジェクト指向・実用ライブラリまで" price: 3278 categoryId: 1 }) { isbn title price categoryId } }
mutation createJavaBook3 { createBook(book: { isbn: "978-4295008477" title: "新世代Javaプログラミングガイド[Java SE 10/11/12/13と言語拡張プロジェクト]" price: 2860 categoryId: 1 }) { isbn title price categoryId } }
mysql
カテゴリーで2件。
mutation createMysqlBook1 { createBook(book: { isbn: "978-4798161488" title: "MySQL徹底入門 第4版" price: 4180 categoryId: 2 }) { isbn title price categoryId } }
mutation createMysqlBook2 { createBook(book: { isbn: "978-4798147406" title: "詳解MySQL 5.7 止まらぬ進化に乗り遅れないためのテクニカルガイド" price: 3960 categoryId: 2 }) { isbn title price categoryId } }
結果はこちら。
どのカテゴリーの書籍でも通知されるSubscription。
java
カテゴリーのみに絞ったSubscription。
データも載せておきましょう。
どのカテゴリーの書籍でも通知されるSubscription。
{ "data": { "simpleBookCreated": { "isbn": "978-4621303252", "title": "Effective Java 第3版", "price": 4400, "categoryId": 1, "category": { "id": 1, "name": "java" } } } } { "data": { "simpleBookCreated": { "isbn": "978-4774189093", "title": "Java本格入門 ~モダンスタイルによる基礎からオブジェクト指向・実用ライブラリまで", "price": 3278, "categoryId": 1, "category": { "id": 1, "name": "java" } } } } { "data": { "simpleBookCreated": { "isbn": "978-4295008477", "title": "新世代Javaプログラミングガイド[Java SE 10/11/12/13と言語拡張プロジェクト]", "price": 2860, "categoryId": 1, "category": { "id": 1, "name": "java" } } } } { "data": { "simpleBookCreated": { "isbn": "978-4798161488", "title": "MySQL徹底入門 第4版", "price": 4180, "categoryId": 2, "category": { "id": 2, "name": "mysql" } } } } { "data": { "simpleBookCreated": { "isbn": "978-4798147406", "title": "詳解MySQL 5.7 止まらぬ進化に乗り遅れないためのテクニカルガイド", "price": 3960, "categoryId": 2, "category": { "id": 2, "name": "mysql" } } } }
java
カテゴリーのみに絞ったSubscription。
{ "data": { "bookCreatedFilterCategory": { "isbn": "978-4621303252", "title": "Effective Java 第3版", "price": 4400, "categoryId": 1, "category": { "id": 1, "name": "java" } } } } { "data": { "bookCreatedFilterCategory": { "isbn": "978-4774189093", "title": "Java本格入門 ~モダンスタイルによる基礎からオブジェクト指向・実用ライブラリまで", "price": 3278, "categoryId": 1, "category": { "id": 1, "name": "java" } } } } { "data": { "bookCreatedFilterCategory": { "isbn": "978-4295008477", "title": "新世代Javaプログラミングガイド[Java SE 10/11/12/13と言語拡張プロジェクト]", "price": 2860, "categoryId": 1, "category": { "id": 1, "name": "java" } } } }
OKですね。カテゴリーのデータも取得できているので、@Source
を使った部分も動作していることが確認できました。
実際、ログとしてはこんな感じになっています。
2022-03-29 01:40:48,901 INFO [org.lit.qua.gra.BookResource] (executor-thread-0) mutation create book, isbn = 978-4621303252 2022-03-29 01:40:48,905 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4621303252, title = Effective Java 第3版 2022-03-29 01:40:49,116 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4621303252, title = Effective Java 第3版 2022-03-29 01:41:02,539 INFO [org.lit.qua.gra.BookResource] (executor-thread-0) mutation create book, isbn = 978-4774189093 2022-03-29 01:41:02,540 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4774189093, title = Java本格入門 ~モダンスタイルによる基礎からオブジ ェクト指向・実用ライブラリまで 2022-03-29 01:41:02,543 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4774189093, title = Java本格入門 ~モダンスタイルによる基礎からオブジ ェクト指向・実用ライブラリまで 2022-03-29 01:41:14,243 INFO [org.lit.qua.gra.BookResource] (executor-thread-0) mutation create book, isbn = 978-4295008477 2022-03-29 01:41:14,244 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4295008477, title = 新世代Javaプログラミングガイド[Java SE 10/11/12/13と言語拡張プロジェクト] 2022-03-29 01:41:14,247 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4295008477, title = 新世代Javaプログラミングガイド[Java SE 10/11/12/13と言語拡張プロジェクト] 2022-03-29 01:41:38,821 INFO [org.lit.qua.gra.BookResource] (executor-thread-0) mutation create book, isbn = 978-4798161488 2022-03-29 01:41:38,822 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4798161488, title = MySQL徹底入門 第4版 2022-03-29 01:41:45,125 INFO [org.lit.qua.gra.BookResource] (executor-thread-0) mutation create book, isbn = 978-4798147406 2022-03-29 01:41:45,127 INFO [org.lit.qua.gra.BookResource] (executor-thread-2) source book, isbn = 978-4798147406, title = 詳解MySQL 5.7 止まらぬ進化に乗り遅れないためのテ クニカルガイド
WebSocketの実装はどこに?
ところで、mvn dependency:tree
で見てみてもWebSocketの実装と思われるものは見当たらなかったのですが。
どこに含まれているのでしょう?
WebSocketのハンドラーは、SmallRye GraphQLに含まれています。
このハンドラーは、QuarkusのSmallRye GraphQL Extensionの方でサブプロトコルに応じて使い分けるようになっています。
そして、仕組みはVert.x Webに乗ったものです。
ところでVert.xにはGraphQL向けの機能があるのですが、Quarkusで使っているわけではなさそうですね。
まとめ
Quarkus GraphQL Extensioonを使って、今回はSubscriptionを試してみました。
CDI管理Beanとしての扱いでやや手間取りましたが、それ以外はあまり大きなハマりどころはありませんでした。
これで、GraphQLを使ったサーバーサイドの実装の基礎的なことはつかめた感じでしょうか。