CLOVER🍀

That was when it all began.

Spring Cloud StreamでSchema Evolution

Spring Cloud Streamのドキュメントで、Schema Evolutionというものが載っていて、ちょっと興味があったので
試してみることにしました。

Schema evolution support

Spring Cloud Stream Brooklyn.M1 is available

Schema Evolutionとは?

Spring Cloud Streamでは、SchemaベースのMessageConverterをサポートしていて、オブジェクトの
シリアライズ/デシリアライズ時に適用されます。

現在サポートされているのは、Apache Avroのみだそうです。

Welcome to Apache Avro!

Converterは2つあり、オブジェクトのシリアライズ/デシリアライズをする際のクラスの情報か、もしくはアプリケーションの起動時に
把握しているSchemaを使うもの。それからSchema Registryを使って実行時にSchemaを特定し、ドメインオブジェクトが
進化するに従い新しいSchemaを動的にSchema Registryに登録するものです。

Schema Evolution自体は、対象となるデータ構造のバージョンアップ(ダウン)に伴うSchema変更の際に、互換性を
もたせましょうという話みたいですね。

  • Backward Compatibility (Newer version can read old version)
  • Forward Compatibility (Older version can read new version)

Schema Evolution for Resilient Data microservices

Backward Compatibilityではフィールドのリネームに、Forward Compatibilityではフィールドのリネームと削除に耐えられる
ような感じですね。

参考)

GitHub - making-demo-scst/schema-evolution-demo: Schema Evolution with Spring Cloud Stream

GitHub - viniciusccarvalho/schema-evolution-samples: Samples for different schema evolution options

構成要素

この仕組みを構成する要素として、以下の2つがあります。

  • Schema Registry Server
  • Schema Registry Client

Schema Registry Serverは、Schemaを保存するサーバーで、RDBMSを使ってSchemaを保存します。デフォルトはインメモリ
データベース(H2 Database)を使用します。Spring Bootで作られているので、他のデータベースを使うようにカスタマイズ
することも可能です。

Schema Registry Server

Schema Registry Serverを使うには、「spring-cloud-stream-schema-server」というモジュールを使います。

内部的には、Spring Data JPAを使ってデータの管理を行い、またREST APIも提供します。

Schema Registry Server API

Schema Registry Clientは、Schema Registry Serverとのインターフェースを抽象化したものです。Schema Registry Clientを使う場合は、
「spring-cloud-stream-schema」モジュールを依存関係に追加します。

Spring Cloud Stream Reference Guide

Schema Registry Clientとなるのは、SourceやSink側になります。

お題

とまあ、前置きはこのくらいにして、Spring Cloud Streamの提供するSchema Evolutionを試してみましょう。

お題を書籍として、以下のようにSourceとSinkが把握しているSchemaのバージョンを変えていき、合わせてSourceに投入するデータが
表現するSchemaのバージョンも変えていってみましょう。

こんな感じで。

データ Ver. Source Known Schema Ver. Sink Known Schema Ver.
v1 v1 v1
v1 v1 v2
v1 v2 v2
v2 v2 v2
v2 v2 v1

データの方はSchemaに合わせたバージョンを、SourceとSinkはそれ自身が持つSchemaのバージョンを指します。

SourceとSinkは、バージョンごとにそれぞれMavenプロジェクトでのサブモジュールとします。

準備

まずは準備から。今回は、こういうMavenのマルチプロジェクト構成とします。

pom.xml
source-v1/pom.xml
source-v2/pom.xml
sink-v1/pom.xml
sink-v2/pom.xml
schema-registry-server/pom.xml

親pom.xmlがあって、SourceとSinkをサブモジュールとしてバージョンごとに用意します。それから、Schema Registry Server用にもひとつ。

親pom.xmlの内容は、こんな感じです。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>schema-evolution</artifactId>
    <packaging>pom</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <modules>
        <module>source-v1</module>
        <module>schema-registry-server</module>
        <module>sink-v1</module>
        <module>sink-v2</module>
        <module>source-v2</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Chelsea.SR2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>1.5.4.RELEASE</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Spring Cloud StreamのBOMのimport、Spring BootのMaven Pluginの設定を入れているくらいです。Apache Avroなどの設定は、今回は個別に
入れていくスタイルとします。

SourceとSinkのpom.xmlの主要な部分は、こんな感じです。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

他はアーティファクトのIDが違うくらいなので、割愛…。

メッセージブローカーはApache Kafkaとするので、「spring-cloud-starter-stream-kafka」を加えます。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

Schema Evolutionのサポートを使うには、「spring-cloud-stream-schema」を追加して

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
        </dependency>

Apache Avroへの依存関係と

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>

プラグインの設定を加えます。

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

これで、Apache AvroのIDLからJavaソースコードを自動生成してくれるようになります。これは、SourceおよびSinkで
行います。IDLは、各プロジェクトの「src/main/resources/avro」ディレクトリに置くように設定しています。

Apache Kafkaは、起動済みとします。

Schema Registry Serverの作成

先に、Schema Registry Serverから作りましょう。ドキュメントに沿って作成すればOKです。

Schema Registry Server

pom.xmlは、こんな感じです。
schema-registry-server/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>schema-evolution</artifactId>
        <groupId>org.littlewings</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>schema-registry-server</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema-server</artifactId>
        </dependency>
    </dependencies>
</project>

「spring-cloud-stream-schema-server」を追加しました。

あとは、@EnableSchemaRegistryServerアノテーションを付与したSpring Bootの起動クラスを作成すれば完了です。
schema-registry-server/src/main/java/org/littlewings/spring/cloud/SchemaRegistryServer.java

package org.littlewings.spring.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer;

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServer {
    public static void main(String... args) {
        SpringApplication.run(SchemaRegistryServer.class, args);
    }
}

Source v1の作成

データを登録する、Sourceのversion 1を作成します。

ソースコードは、こんな感じ。
source-v1/src/main/java/org/littlewings/spring/cloud/SourceApp.java

package org.littlewings.spring.cloud;

import java.time.LocalDateTime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
@RestController
public class SourceApp {
    public static void main(String... args) {
        SpringApplication.run(SourceApp.class, args);
    }

    Source source;

    public SourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    public String register(@RequestBody Book book) {
        source.output().send(MessageBuilder.withPayload(book).build());

        System.out.printf(
                "[%s] send book, isbn = %s, title = %s, price = %d, version = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion()
        );

        return "OK!!" + System.lineSeparator();
    }
}

@RestControllerとし、受け取ったデータをメッセージブローカーに送るわけですが、その時に内容を標準出力に表示するように
しています。特に意味はないのですが、あるバージョンのSchemaに対応したJavaクラスを使うという意味で…。

で、このBookクラスは、自分では実装しません。Apache AvroのIDLから、Maven Pluginで自動生成してもらいます。

作成したIDLはこちら。
source-v1/src/main/resources/avro/book.avsc

{
  "type": "record",
  "name": "Book",
  "namespace": "org.littlewings.spring.cloud",
  "fields": [
    { "name": "isbn", "type": "string" },
    { "name": "title", "type": "string", "default": "" },
    { "name": "price", "type": "int", "default": 0 },
    { "name": "version", "type": "string", "default": "" }
  ]
}

IDLは、こちらを見ながら作ってみました。

Schema Declaration / Complex Types / Records

namespaceは、パッケージになるみたいですねぇ…。

ビルドすると、今回の設定だと「source-v1/target/generated-sources/avro」にソースコードが出力されます。
source-v1/target/generated-sources/avro/org/littlewings/spring/cloud/Book.java

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package org.littlewings.spring.cloud;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Book extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

〜省略〜

ちなみに、出力先のディレクトリを変えたかったらApache AvroのMaven Pluginで「outputDirectory」パラメーターで指定すれば
いいみたいですよ。

例えば、こんな感じ。

      <configuration>
        <sourceDirectory>src/main/resources/avro</sourceDirectory>
        <outputDirectory>src/main/java/</outputDirectory>
      </configuration>

続いて、設定です。
source-v1/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=book
spring.cloud.stream.bindings.output.contentType=application/*+avro

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990

ここでのポイントは、「spring.cloud.stream.bindings.output.contentType」を「application/*+avro」としていることですね。

spring.cloud.stream.bindings.output.contentType=application/*+avro

これで、Apache Avroを使うMessageConverterを自動設定してくれます。

この時のContent-Typeは、このようになるのだとか。prefixは設定可能で、subjectはPayloadのタイプから決定されます。

application/[prefix].[subject].v[version]+avro

あとは、Schema Registry Serverへの接続先も設定しています。実はこれ、デフォルト値なのでこの値だと不要なのですが、
設定として把握しておこうかと。

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990

ここまでで、Source v1の準備はおしまいです。

Sink v1

続いて、Sink version1。Sourceと似たり寄ったりで、@EnableSchemaRegistryClientアノテーションを付与してあげればOKです。
sink-v1/src/main/java/org/littlewings/spring/cloud/SinkApp.java

package org.littlewings.spring.cloud;

import java.time.LocalDateTime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;

@SpringBootApplication
@EnableBinding(Sink.class)
@EnableSchemaRegistryClient
public class SinkApp {
    public static void main(String... args) {
        SpringApplication.run(SinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Book book) {
        System.out.printf(
                "[%s] received book, isbn = %s, title = %s, price = %d, version = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion()
        );
    }
}

Apache AvroのMaven Plugin設定は、Sourceの時と同じでOKです。

Apache AvroのIDLは、Sourceとまったく同じものを使用するので割愛。

sink-v1/src/main/resources/avro/book.avsc

〜省略〜

設定。
sink-v1/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=book
spring.cloud.stream.bindings.input.group=book-group

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990

server.port=18080

Schema Registry Serverへの接続情報を書いている以外は、あまり新しい情報はありません。Sourceとかぶらないように、Listen Portを
ずらしておいたくらいですね。

Source v2/Sink v2

最後は、SourceとSinkのversion 2をまとめて。

IDLは、それぞれのプロジェクトで同じものを使用します。

IDLの定義自体は、追加フィールドで確認します。
source-v2/src/main/resources/avro/book.avsc
sink-v2/src/main/resources/avro/book.avsc

{
  "type": "record",
  "name": "Book",
  "namespace": "org.littlewings.spring.cloud",
  "fields": [
    { "name": "isbn", "type": "string" },
    { "name": "title", "type": "string", "default": "" },
    { "name": "price", "type": "int", "default": 0 },
    { "name": "version", "type": "string", "default": "" },
    { "name": "tags", "type": { "type": "array", "items": "string" }, "default": [] }
  ]
}

「tags」というフィールドを追加しました。

SourceとSinkのコードには、増えたフィールドを使うコードに変更しておきます。

Source。

@SpringBootApplication
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
@RestController
public class SourceApp {
    public static void main(String... args) {
        SpringApplication.run(SourceApp.class, args);
    }

    Source source;

    public SourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    public String register(@RequestBody Book book) {
        source.output().send(MessageBuilder.withPayload(book).build());

        System.out.printf(
                "[%s] received book, isbn = %s, title = %s, price = %d, version = %s, tags = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion(),
                book.getTags()
        );

        return "OK!!" + System.lineSeparator();
    }
}

Sink。

@SpringBootApplication
@EnableBinding(Sink.class)
@EnableSchemaRegistryClient
public class SinkApp {
    public static void main(String... args) {
        SpringApplication.run(SinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Book book) {
        System.out.printf(
                "[%s] received book, isbn = %s, title = %s, price = %d, version = %s, tags = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion(),
                book.getTags()
        );
    }
}

あとの設定は、version 1の時と同じです。

動作確認

それでは、パッケージングして動作確認してみましょう。

$ mvn package

最初に、Schema Registry Serverを起動しておきます。

$ java -jar schema-registry-server/target/schema-registry-server-0.0.1-SNAPSHOT.jar

Schema Registry Serverは、デフォルトで8990ポートでListenします。

Data v1 / Source v1 → Sink v1

では、SourceとSinkのversion 1を起動します。

## Source
$ java -jar source-v1/target/source-v1-0.0.1-SNAPSHOT.jar

## Sink
$ java -jar sink-v1/target/sink-v1-0.0.1-SNAPSHOT.jar

データをPOSTしてみます。curlで実行しようと思ったのですが、ちょっと面倒になってこういうスクリプトを用意。
post.sh

#!/bin/bash

FILE=$1
curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d @$FILE

はい。

で、次のようなデータを2つ放り込みます。
book-v1-1.json

{
  "isbn": "978-4798142470",
  "title": "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発",
  "price": 4320,
  "version": "v1"
}

book-v1-2.json

{
  "isbn": "978-4774183169",
  "title": "パーフェクト Java EE",
  "price": 3456,
  "version": "v1"
}

データのフォーマットは、version 1です。

登録。

$ ./post.sh book-v1-1.json
OK!!
$ ./post.sh book-v1-2.json
OK!!

標準出力に、それぞれログが出力されます。

## Source
[2017-07-06T22:52:22.243] send book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1
[2017-07-06T22:52:24.662] send book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1

## Sink
[2017-07-06T22:52:23.098] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1
[2017-07-06T22:52:24.669] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1

とりあえず、Apache Avroを使って動きました。

Data v1 / Source v1 → Sink v2

ここで、Sink v1を落としてSink v2を起動させます。

$ java -jar sink-v2/target/sink-v2-0.0.1-SNAPSHOT.jar

もう1度、version 1形式のデータを登録してみます。

$ ./post.sh book-v1-1.json
OK!!
$ ./post.sh book-v1-2.json
OK!!

Sink側で、問題なく受け取れます。tagsの中身は、もちろん空ですが。

[2017-07-06T22:54:43.637] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1, tags = []
[2017-07-06T22:54:44.542] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1, tags = []
Data v1 / Source v2 → Sink v2

今度は、Source v1を落としてSource v2を起動します。

$ java -jar source-v2/target/source-v2-0.0.1-SNAPSHOT.jar

で、データを登録しようとすると、うまくいきま…せん。

$ ./post.sh book-v1-1.json 
{"timestamp":1499349527822,"status":500,"error":"Internal Server Error","exception":"org.springframework.messaging.MessageDeliveryException","message":"failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException: null of array of org.littlewings.spring.cloud.Book","path":"/register"}

$ ./post.sh book-v1-2.json 
{"timestamp":1499349545349,"status":500,"error":"Internal Server Error","exception":"org.springframework.messaging.MessageDeliveryException","message":"failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException: null of array of org.littlewings.spring.cloud.Book","path":"/register"}

tagsがないからですと…。これは…仕方ないかな…。

Data v2 / Source v2 → Sink v2

というわけで、データの形式もversion 2にします。

book-v2-1.json

{
  "isbn": "978-4798142470",
  "title": "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発",
  "price": 4320,
  "version": "v2",
  "tags": [
    "Java", "Spring"
  ]
}

book-v2-2.json

{
  "isbn": "978-4774183169",
  "title": "パーフェクト Java EE",
  "price": 3456,
  "version": "v2",
  "tags": [
    "Java", "Java EE"
  ]
}

登録。

$ ./post.sh book-v2-1.json
OK!!
$ ./post.sh book-v2-2.json
OK!!

コンソールの出力結果。

## Source
[2017-07-06T23:02:04.908] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring]
[2017-07-06T23:02:07.321] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]

## Sink
[2017-07-06T23:02:04.935] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring]
[2017-07-06T23:02:07.327] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]

tagsの内容が渡っていますね。

Data v2 / Source v2 → Sink v1

最後に、Sinkをversion 1に戻してみます。

$ java -jar sink-v1/target/sink-v1-0.0.1-SNAPSHOT.jar

登録。

$ ./post.sh book-v2-1.json
OK!!
$ ./post.sh book-v2-2.json
OK!!

こちらは、Sinkでも受け取れます。

[2017-07-06T23:05:37.345] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2
[2017-07-06T23:05:37.899] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2

tagsはなかったことになりますけれど。

Sourceでは、出力されています。

[2017-07-06T23:05:36.770] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring]
[2017-07-06T23:05:37.893] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]

これで、SourceとSinkのSchema定義が異なるものであって、データの受け渡しができることが確認できました。

Schema Registry Serverをもう少し

Schema Registry Serverで保存されるSchema情報は、JPAのEntityとして表現されます。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/model/Schema.java

また、Schema登録時にはSubjectとFormatが一致していれば、特定のバージョンにマッピングして扱われ、そうでなければバージョン1として
扱うようです。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/controllers/ServerController.java

		List<Schema> registeredEntities = this.repository.findBySubjectAndFormatOrderByVersion(
				schema.getSubject(), schema.getFormat());
		if (registeredEntities == null || registeredEntities.size() == 0) {
			schema.setVersion(1);
			result = this.repository.save(schema);
		}
		else {
			result = validator.match(registeredEntities, schema.getDefinition());
			if (result == null) {
				schema.setVersion(
						registeredEntities.get(registeredEntities.size() - 1).getVersion()
								+ 1);
				result = this.repository.save(schema);
			}

		}

登録されているSchemaを、REST APIで見てみましょう。

Schema Registry Server API

登録は…いいかなと思うので、参照を。

まずは、こちら。

GET /{subject}/{format}

結果。

$ curl http://localhost:8990/book/avro | jq .
  {
    "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"}]}",
    "format": "avro",
    "subject": "book",
    "version": 1,
    "id": 1
  },
  {
    "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"default\":[]}]}",
    "format": "avro",
    "subject": "book",
    "version": 2,
    "id": 2
  }
]

確かに、定義したSchemaが入ってますね。

続いて、バージョンを指定して。

GET /{subject}/{format}/{version}

ここで気づく。

$ curl http://localhost:8990/book/avro/v1 | jq .
{
  "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"}]}",
  "format": "avro",
  "subject": "book",
  "version": 1,
  "id": 1
}

なんか、「v」って要るんですけど…。

http://localhost:8990/book/avro/v1

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/controllers/ServerController.java#L114

まあ、いいですけど…。

Schema Registry Server & Clientを使ったSchema登録&解決について

ドキュメントに、Schema登録と解決時の図があるのですが…

Schema Registration and Resolution

小さすぎて見えないので、ちょっと貼っておきます…。

Schema Registration Process (Serialization)

Schema Registration Process (Serialization)

登録…シリアライズの最初のプロセスは、Channel送信時にSpecificRecordやGenericRecordであればSchemaを取得できるのでそちらを使い、POJOであればプロパティ
「spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnable」がtrueであれば動的にSchemaを推論します。

プロパティ「spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled」は、デフォルトでtrueです。

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/schema_resolution.png

Schemaを取得後は、Schema Registry Serverよりメタデータバージョン)を取得します。次にキャッシュを参照し、見つからなければSchemaを
Schema Registry Serverに送信し、バージョン情報を取得します。Converterはその結果をキャッシュし、シリアライズのフェーズごとに
Schema Registry Serverを照会するオーバーヘッドを回避します。

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/registration.png

Schemaのバージョンは、Content-Typeヘッダーに含まれます、と。

Schema Resolution Process (Deserialization)

Schema Resolution Process (Deserialization)

シリアライズ時のSchema解決について。Content-Typeヘッダーなどからバージョンを取得し、ConverterはメッセージのWriter Schemaを
取得するようにSchema Registry Serverにクエリを投げます(キャッシュされていなければ)。メッセージに対する正しいSchemaを
見つけたら、次にReader Schemaを取得しApache AvroのSchema解決サポートを利用してReader定義を読み込みます。

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/schema_reading.png

WriterとReaderで、異なるSchemaが使えるみたいですね。これは重要なことみたいです。

Spring Cloud Streamはどのようにメッセージを読むのかを決定するためにWriter Schemaをフェッチしますし、ちゃんとApache AvroのSchema Evolutionが
有効にするためにはReader Schemaがアプリケーションに適切に設定されている必要がありますと。

Apache AvroのSchema Resolutionについては、こちら。

Schema Resolution

「if both are records:」のところを見ればいいですね。

  • フィールドの順序は異なっていてもよく、名前で照会される
  • 両方のレコードで、同じ名前のフィールドは再帰的に解決される
  • Writerのレコードに存在するフィールドで、Reader側のレコードに同じ名前のフィールドがない場合、Writer側のフィールドの値は無視される
  • ReaderのレコードのSchemaにデフォルト値を含むフィールドがあり、WriterのSchemaに同じ名前のフィールドがない場合、Readerはそのデフォルト値を使用する
  • ReaderのレコードのSchemaにデフォルト値のないフィールドがあり、WriterのSchemaに同じ名前のフィールドがない場合、エラーとなる

まとめ

Spring Cloud Streamで、Schema Evolutionを試してみました。

確認するのにまあまあ時間がかかりましたが、とりあえず動かせたのと理屈はある程度見れたのでいいかなと。

Apache Avro、知っておいた方がいいんでしょうかね。