Spring Cloud Streamのドキュメントで、Schema Evolutionというものが載っていて、ちょっと興味があったので
試してみることにしました。
Spring Cloud Stream Brooklyn.M1 is available
Schema Evolutionとは?
Spring Cloud Streamでは、SchemaベースのMessageConverterをサポートしていて、オブジェクトの
シリアライズ/デシリアライズ時に適用されます。
現在サポートされているのは、Apache Avroのみだそうです。
Converterは2つあり、オブジェクトのシリアライズ/デシリアライズをする際のクラスの情報か、もしくはアプリケーションの起動時に
把握しているSchemaを使うもの。それからSchema Registryを使って実行時にSchemaを特定し、ドメインオブジェクトが
進化するに従い新しいSchemaを動的にSchema Registryに登録するものです。
Schema Evolution自体は、対象となるデータ構造のバージョンアップ(ダウン)に伴うSchema変更の際に、互換性を
もたせましょうという話みたいですね。
- Backward Compatibility (Newer version can read old version)
- Forward Compatibility (Older version can read new version)
Schema Evolution for Resilient Data microservices
Backward Compatibilityではフィールドのリネームに、Forward Compatibilityではフィールドのリネームと削除に耐えられる
ような感じですね。
参考)
GitHub - making-demo-scst/schema-evolution-demo: Schema Evolution with Spring Cloud Stream
GitHub - viniciusccarvalho/schema-evolution-samples: Samples for different schema evolution options
構成要素
この仕組みを構成する要素として、以下の2つがあります。
- Schema Registry Server
- Schema Registry Client
Schema Registry Serverは、Schemaを保存するサーバーで、RDBMSを使ってSchemaを保存します。デフォルトはインメモリ
データベース(H2 Database)を使用します。Spring Bootで作られているので、他のデータベースを使うようにカスタマイズ
することも可能です。
Schema Registry Serverを使うには、「spring-cloud-stream-schema-server」というモジュールを使います。
内部的には、Spring Data JPAを使ってデータの管理を行い、またREST APIも提供します。
Schema Registry Clientは、Schema Registry Serverとのインターフェースを抽象化したものです。Schema Registry Clientを使う場合は、
「spring-cloud-stream-schema」モジュールを依存関係に追加します。
Spring Cloud Stream Reference Guide
Schema Registry Clientとなるのは、SourceやSink側になります。
お題
とまあ、前置きはこのくらいにして、Spring Cloud Streamの提供するSchema Evolutionを試してみましょう。
お題を書籍として、以下のようにSourceとSinkが把握しているSchemaのバージョンを変えていき、合わせてSourceに投入するデータが
表現するSchemaのバージョンも変えていってみましょう。
こんな感じで。
データ Ver. | Source Known Schema Ver. | Sink Known Schema Ver. |
---|---|---|
v1 | v1 | v1 |
v1 | v1 | v2 |
v1 | v2 | v2 |
v2 | v2 | v2 |
v2 | v2 | v1 |
データの方はSchemaに合わせたバージョンを、SourceとSinkはそれ自身が持つSchemaのバージョンを指します。
準備
まずは準備から。今回は、こういうMavenのマルチプロジェクト構成とします。
pom.xml source-v1/pom.xml source-v2/pom.xml sink-v1/pom.xml sink-v2/pom.xml schema-registry-server/pom.xml
親pom.xmlがあって、SourceとSinkをサブモジュールとしてバージョンごとに用意します。それから、Schema Registry Server用にもひとつ。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.littlewings</groupId> <artifactId>schema-evolution</artifactId> <packaging>pom</packaging> <version>0.0.1-SNAPSHOT</version> <modules> <module>source-v1</module> <module>schema-registry-server</module> <module>sink-v1</module> <module>sink-v2</module> <module>source-v2</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Chelsea.SR2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.5.4.RELEASE</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Spring Cloud StreamのBOMのimport、Spring BootのMaven Pluginの設定を入れているくらいです。Apache Avroなどの設定は、今回は個別に
入れていくスタイルとします。
SourceとSinkのpom.xmlの主要な部分は、こんな感じです。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-schema</artifactId> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>src/main/resources/avro</sourceDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
他はアーティファクトのIDが違うくらいなので、割愛…。
メッセージブローカーはApache Kafkaとするので、「spring-cloud-starter-stream-kafka」を加えます。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
Schema Evolutionのサポートを使うには、「spring-cloud-stream-schema」を追加して
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-schema</artifactId> </dependency>
Apache Avroへの依存関係と
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version> </dependency>
プラグインの設定を加えます。
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>src/main/resources/avro</sourceDirectory> </configuration> </execution> </executions> </plugin>
これで、Apache AvroのIDLからJavaのソースコードを自動生成してくれるようになります。これは、SourceおよびSinkで
行います。IDLは、各プロジェクトの「src/main/resources/avro」ディレクトリに置くように設定しています。
Apache Kafkaは、起動済みとします。
Schema Registry Serverの作成
先に、Schema Registry Serverから作りましょう。ドキュメントに沿って作成すればOKです。
pom.xmlは、こんな感じです。
schema-registry-server/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>schema-evolution</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>schema-registry-server</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-schema-server</artifactId> </dependency> </dependencies> </project>
「spring-cloud-stream-schema-server」を追加しました。
あとは、@EnableSchemaRegistryServerアノテーションを付与したSpring Bootの起動クラスを作成すれば完了です。
schema-registry-server/src/main/java/org/littlewings/spring/cloud/SchemaRegistryServer.java
package org.littlewings.spring.cloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer; @SpringBootApplication @EnableSchemaRegistryServer public class SchemaRegistryServer { public static void main(String... args) { SpringApplication.run(SchemaRegistryServer.class, args); } }
Source v1の作成
データを登録する、Sourceのversion 1を作成します。
ソースコードは、こんな感じ。
source-v1/src/main/java/org/littlewings/spring/cloud/SourceApp.java
package org.littlewings.spring.cloud; import java.time.LocalDateTime; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Source.class) @EnableSchemaRegistryClient @RestController public class SourceApp { public static void main(String... args) { SpringApplication.run(SourceApp.class, args); } Source source; public SourceApp(Source source) { this.source = source; } @PostMapping("register") public String register(@RequestBody Book book) { source.output().send(MessageBuilder.withPayload(book).build()); System.out.printf( "[%s] send book, isbn = %s, title = %s, price = %d, version = %s%n", LocalDateTime.now(), book.getIsbn(), book.getTitle(), book.getPrice(), book.getVersion() ); return "OK!!" + System.lineSeparator(); } }
@RestControllerとし、受け取ったデータをメッセージブローカーに送るわけですが、その時に内容を標準出力に表示するように
しています。特に意味はないのですが、あるバージョンのSchemaに対応したJavaクラスを使うという意味で…。
で、このBookクラスは、自分では実装しません。Apache AvroのIDLから、Maven Pluginで自動生成してもらいます。
作成したIDLはこちら。
source-v1/src/main/resources/avro/book.avsc
{ "type": "record", "name": "Book", "namespace": "org.littlewings.spring.cloud", "fields": [ { "name": "isbn", "type": "string" }, { "name": "title", "type": "string", "default": "" }, { "name": "price", "type": "int", "default": 0 }, { "name": "version", "type": "string", "default": "" } ] }
IDLは、こちらを見ながら作ってみました。
Schema Declaration / Complex Types / Records
namespaceは、パッケージになるみたいですねぇ…。
ビルドすると、今回の設定だと「source-v1/target/generated-sources/avro」にソースコードが出力されます。
source-v1/target/generated-sources/avro/org/littlewings/spring/cloud/Book.java
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package org.littlewings.spring.cloud; import org.apache.avro.specific.SpecificData; import org.apache.avro.message.BinaryMessageEncoder; import org.apache.avro.message.BinaryMessageDecoder; import org.apache.avro.message.SchemaStore; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class Book extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { 〜省略〜
ちなみに、出力先のディレクトリを変えたかったらApache AvroのMaven Pluginで「outputDirectory」パラメーターで指定すれば
いいみたいですよ。
例えば、こんな感じ。
<configuration> <sourceDirectory>src/main/resources/avro</sourceDirectory> <outputDirectory>src/main/java/</outputDirectory> </configuration>
続いて、設定です。
source-v1/src/main/resources/application.properties
spring.cloud.stream.bindings.output.destination=book spring.cloud.stream.bindings.output.contentType=application/*+avro spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990
ここでのポイントは、「spring.cloud.stream.bindings.output.contentType」を「application/*+avro」としていることですね。
spring.cloud.stream.bindings.output.contentType=application/*+avro
これで、Apache Avroを使うMessageConverterを自動設定してくれます。
この時のContent-Typeは、このようになるのだとか。prefixは設定可能で、subjectはPayloadのタイプから決定されます。
application/[prefix].[subject].v[version]+avro
あとは、Schema Registry Serverへの接続先も設定しています。実はこれ、デフォルト値なのでこの値だと不要なのですが、
設定として把握しておこうかと。
spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990
ここまでで、Source v1の準備はおしまいです。
Sink v1
続いて、Sink version1。Sourceと似たり寄ったりで、@EnableSchemaRegistryClientアノテーションを付与してあげればOKです。
sink-v1/src/main/java/org/littlewings/spring/cloud/SinkApp.java
package org.littlewings.spring.cloud; import java.time.LocalDateTime; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient; @SpringBootApplication @EnableBinding(Sink.class) @EnableSchemaRegistryClient public class SinkApp { public static void main(String... args) { SpringApplication.run(SinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Book book) { System.out.printf( "[%s] received book, isbn = %s, title = %s, price = %d, version = %s%n", LocalDateTime.now(), book.getIsbn(), book.getTitle(), book.getPrice(), book.getVersion() ); } }
Apache AvroのMaven Plugin設定は、Sourceの時と同じでOKです。
Apache AvroのIDLは、Sourceとまったく同じものを使用するので割愛。
sink-v1/src/main/resources/avro/book.avsc
〜省略〜
設定。
sink-v1/src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=book spring.cloud.stream.bindings.input.group=book-group spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990 server.port=18080
Schema Registry Serverへの接続情報を書いている以外は、あまり新しい情報はありません。Sourceとかぶらないように、Listen Portを
ずらしておいたくらいですね。
Source v2/Sink v2
最後は、SourceとSinkのversion 2をまとめて。
IDLは、それぞれのプロジェクトで同じものを使用します。
IDLの定義自体は、追加フィールドで確認します。
source-v2/src/main/resources/avro/book.avsc
sink-v2/src/main/resources/avro/book.avsc
{ "type": "record", "name": "Book", "namespace": "org.littlewings.spring.cloud", "fields": [ { "name": "isbn", "type": "string" }, { "name": "title", "type": "string", "default": "" }, { "name": "price", "type": "int", "default": 0 }, { "name": "version", "type": "string", "default": "" }, { "name": "tags", "type": { "type": "array", "items": "string" }, "default": [] } ] }
「tags」というフィールドを追加しました。
SourceとSinkのコードには、増えたフィールドを使うコードに変更しておきます。
Source。
@SpringBootApplication @EnableBinding(Source.class) @EnableSchemaRegistryClient @RestController public class SourceApp { public static void main(String... args) { SpringApplication.run(SourceApp.class, args); } Source source; public SourceApp(Source source) { this.source = source; } @PostMapping("register") public String register(@RequestBody Book book) { source.output().send(MessageBuilder.withPayload(book).build()); System.out.printf( "[%s] received book, isbn = %s, title = %s, price = %d, version = %s, tags = %s%n", LocalDateTime.now(), book.getIsbn(), book.getTitle(), book.getPrice(), book.getVersion(), book.getTags() ); return "OK!!" + System.lineSeparator(); } }
Sink。
@SpringBootApplication @EnableBinding(Sink.class) @EnableSchemaRegistryClient public class SinkApp { public static void main(String... args) { SpringApplication.run(SinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Book book) { System.out.printf( "[%s] received book, isbn = %s, title = %s, price = %d, version = %s, tags = %s%n", LocalDateTime.now(), book.getIsbn(), book.getTitle(), book.getPrice(), book.getVersion(), book.getTags() ); } }
あとの設定は、version 1の時と同じです。
動作確認
それでは、パッケージングして動作確認してみましょう。
$ mvn package
最初に、Schema Registry Serverを起動しておきます。
$ java -jar schema-registry-server/target/schema-registry-server-0.0.1-SNAPSHOT.jar
Schema Registry Serverは、デフォルトで8990ポートでListenします。
Data v1 / Source v1 → Sink v1
では、SourceとSinkのversion 1を起動します。
## Source $ java -jar source-v1/target/source-v1-0.0.1-SNAPSHOT.jar ## Sink $ java -jar sink-v1/target/sink-v1-0.0.1-SNAPSHOT.jar
データをPOSTしてみます。curlで実行しようと思ったのですが、ちょっと面倒になってこういうスクリプトを用意。
post.sh
#!/bin/bash FILE=$1 curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d @$FILE
はい。
で、次のようなデータを2つ放り込みます。
book-v1-1.json
{ "isbn": "978-4798142470", "title": "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発", "price": 4320, "version": "v1" }
book-v1-2.json
{ "isbn": "978-4774183169", "title": "パーフェクト Java EE", "price": 3456, "version": "v1" }
データのフォーマットは、version 1です。
登録。
$ ./post.sh book-v1-1.json OK!! $ ./post.sh book-v1-2.json OK!!
標準出力に、それぞれログが出力されます。
## Source [2017-07-06T22:52:22.243] send book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1 [2017-07-06T22:52:24.662] send book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1 ## Sink [2017-07-06T22:52:23.098] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1 [2017-07-06T22:52:24.669] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1
とりあえず、Apache Avroを使って動きました。
Data v1 / Source v1 → Sink v2
ここで、Sink v1を落としてSink v2を起動させます。
$ java -jar sink-v2/target/sink-v2-0.0.1-SNAPSHOT.jar
もう1度、version 1形式のデータを登録してみます。
$ ./post.sh book-v1-1.json OK!! $ ./post.sh book-v1-2.json OK!!
Sink側で、問題なく受け取れます。tagsの中身は、もちろん空ですが。
[2017-07-06T22:54:43.637] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1, tags = [] [2017-07-06T22:54:44.542] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1, tags = []
Data v1 / Source v2 → Sink v2
今度は、Source v1を落としてSource v2を起動します。
$ java -jar source-v2/target/source-v2-0.0.1-SNAPSHOT.jar
で、データを登録しようとすると、うまくいきま…せん。
$ ./post.sh book-v1-1.json {"timestamp":1499349527822,"status":500,"error":"Internal Server Error","exception":"org.springframework.messaging.MessageDeliveryException","message":"failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException: null of array of org.littlewings.spring.cloud.Book","path":"/register"} $ ./post.sh book-v1-2.json {"timestamp":1499349545349,"status":500,"error":"Internal Server Error","exception":"org.springframework.messaging.MessageDeliveryException","message":"failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException: null of array of org.littlewings.spring.cloud.Book","path":"/register"}
tagsがないからですと…。これは…仕方ないかな…。
Data v2 / Source v2 → Sink v2
というわけで、データの形式もversion 2にします。
book-v2-1.json
{ "isbn": "978-4798142470", "title": "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発", "price": 4320, "version": "v2", "tags": [ "Java", "Spring" ] }
book-v2-2.json
{ "isbn": "978-4774183169", "title": "パーフェクト Java EE", "price": 3456, "version": "v2", "tags": [ "Java", "Java EE" ] }
登録。
$ ./post.sh book-v2-1.json OK!! $ ./post.sh book-v2-2.json OK!!
コンソールの出力結果。
## Source [2017-07-06T23:02:04.908] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring] [2017-07-06T23:02:07.321] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE] ## Sink [2017-07-06T23:02:04.935] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring] [2017-07-06T23:02:07.327] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]
tagsの内容が渡っていますね。
Data v2 / Source v2 → Sink v1
最後に、Sinkをversion 1に戻してみます。
$ java -jar sink-v1/target/sink-v1-0.0.1-SNAPSHOT.jar
登録。
$ ./post.sh book-v2-1.json OK!! $ ./post.sh book-v2-2.json OK!!
こちらは、Sinkでも受け取れます。
[2017-07-06T23:05:37.345] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2 [2017-07-06T23:05:37.899] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2
tagsはなかったことになりますけれど。
Sourceでは、出力されています。
[2017-07-06T23:05:36.770] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring] [2017-07-06T23:05:37.893] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]
これで、SourceとSinkのSchema定義が異なるものであって、データの受け渡しができることが確認できました。
Schema Registry Serverをもう少し
Schema Registry Serverで保存されるSchema情報は、JPAのEntityとして表現されます。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/model/Schema.java
また、Schema登録時にはSubjectとFormatが一致していれば、特定のバージョンにマッピングして扱われ、そうでなければバージョン1として
扱うようです。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/controllers/ServerController.java
List<Schema> registeredEntities = this.repository.findBySubjectAndFormatOrderByVersion( schema.getSubject(), schema.getFormat()); if (registeredEntities == null || registeredEntities.size() == 0) { schema.setVersion(1); result = this.repository.save(schema); } else { result = validator.match(registeredEntities, schema.getDefinition()); if (result == null) { schema.setVersion( registeredEntities.get(registeredEntities.size() - 1).getVersion() + 1); result = this.repository.save(schema); } }
登録されているSchemaを、REST APIで見てみましょう。
登録は…いいかなと思うので、参照を。
まずは、こちら。
GET /{subject}/{format}
結果。
$ curl http://localhost:8990/book/avro | jq . { "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"}]}", "format": "avro", "subject": "book", "version": 1, "id": 1 }, { "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"default\":[]}]}", "format": "avro", "subject": "book", "version": 2, "id": 2 } ]
確かに、定義したSchemaが入ってますね。
続いて、バージョンを指定して。
GET /{subject}/{format}/{version}
ここで気づく。
$ curl http://localhost:8990/book/avro/v1 | jq . { "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"}]}", "format": "avro", "subject": "book", "version": 1, "id": 1 }
なんか、「v」って要るんですけど…。
http://localhost:8990/book/avro/v1
まあ、いいですけど…。
Schema Registry Server & Clientを使ったSchema登録&解決について
ドキュメントに、Schema登録と解決時の図があるのですが…
Schema Registration and Resolution
小さすぎて見えないので、ちょっと貼っておきます…。
Schema Registration Process (Serialization)
Schema Registration Process (Serialization)
登録…シリアライズの最初のプロセスは、Channel送信時にSpecificRecordやGenericRecordであればSchemaを取得できるのでそちらを使い、POJOであればプロパティ
「spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnable」がtrueであれば動的にSchemaを推論します。
プロパティ「spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled」は、デフォルトでtrueです。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/schema_resolution.png
Schemaを取得後は、Schema Registry Serverよりメタデータ(バージョン)を取得します。次にキャッシュを参照し、見つからなければSchemaを
Schema Registry Serverに送信し、バージョン情報を取得します。Converterはその結果をキャッシュし、シリアライズのフェーズごとに
Schema Registry Serverを照会するオーバーヘッドを回避します。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/registration.png
Schemaのバージョンは、Content-Typeヘッダーに含まれます、と。
Schema Resolution Process (Deserialization)
Schema Resolution Process (Deserialization)
デシリアライズ時のSchema解決について。Content-Typeヘッダーなどからバージョンを取得し、ConverterはメッセージのWriter Schemaを
取得するようにSchema Registry Serverにクエリを投げます(キャッシュされていなければ)。メッセージに対する正しいSchemaを
見つけたら、次にReader Schemaを取得しApache AvroのSchema解決サポートを利用してReader定義を読み込みます。
WriterとReaderで、異なるSchemaが使えるみたいですね。これは重要なことみたいです。
Spring Cloud Streamはどのようにメッセージを読むのかを決定するためにWriter Schemaをフェッチしますし、ちゃんとApache AvroのSchema Evolutionが
有効にするためにはReader Schemaがアプリケーションに適切に設定されている必要がありますと。
Apache AvroのSchema Resolutionについては、こちら。
「if both are records:」のところを見ればいいですね。
- フィールドの順序は異なっていてもよく、名前で照会される
- 両方のレコードで、同じ名前のフィールドは再帰的に解決される
- Writerのレコードに存在するフィールドで、Reader側のレコードに同じ名前のフィールドがない場合、Writer側のフィールドの値は無視される
- ReaderのレコードのSchemaにデフォルト値を含むフィールドがあり、WriterのSchemaに同じ名前のフィールドがない場合、Readerはそのデフォルト値を使用する
- ReaderのレコードのSchemaにデフォルト値のないフィールドがあり、WriterのSchemaに同じ名前のフィールドがない場合、エラーとなる
まとめ
Spring Cloud Streamで、Schema Evolutionを試してみました。
確認するのにまあまあ時間がかかりましたが、とりあえず動かせたのと理屈はある程度見れたのでいいかなと。
Apache Avro、知っておいた方がいいんでしょうかね。