CLOVER🍀

That was when it all began.

Spring Data for Apache Cassandra(Reactive)で遊ぶ

そろそろ、Spring WebFluxを使いつつ、データストアも組み合わせて覚えていきたいなと思いまして。

Reactiveな接続がサポートされているのは、Redis、Couchbase、Apache Cassandra、MongoDBですが、この中なら個人的には
Apache Cassandraで試しておきたいと。

利用するものとしては、以下の組み合わせで遊んでいきたいと思います。

テストも書く方向で。

お題

バックエンドに置いたApache Cassandraに対して、ReactiveにアクセスするREST APIを作成します。データのお題は書籍で、

  • 登録
  • 全件取得
  • Primary Key検索
  • タイトルで検索
  • 削除

といった操作ができるように、アプリケーションを作成します。

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_171, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-23-generic", arch: "amd64", family: "unix"

Apache Cassandraは、2 Node構成(172.18.0.2、172.18.0.3)とし、バージョンは3.11.2とします。

準備

Mavenの設定は、こちら。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.2.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-transport-native-epoll</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.2.RELEASE</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19.1</version>
                <dependencies>
                    <dependency>
                        <groupId>org.junit.platform</groupId>
                        <artifactId>junit-platform-surefire-provider</artifactId>
                        <version>1.1.1</version>
                    </dependency>
                    <dependency>
                        <groupId>org.junit.jupiter</groupId>
                        <artifactId>junit-jupiter-engine</artifactId>
                        <version>5.1.1</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>

主要なライブラリとして、Spring WebFluxとSpring Data for Cassandra(Reactive)を追加。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-transport-native-epoll</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
        </dependency>

Nettyのepoll用のネイティブライブラリは、今回は(ロードに失敗してたので)外しました。

テストは、Spring Boot+JUnit 5(+AssertJ)でいきます。JUnit 5を使うので、spring-boot-starter-testからJUnit 4をexcludeしておきましょう。

参照したドキュメントは、こちら。

Web on Reactive Stack

Testing

Spring Data for Apache Cassandra - Reference Documentation

テーブル定義

Apache Cassandraのキースペース名は「mykeyspace」とし、こちらは実行時に作成することにします。

テーブルについてもテストコード中に作成するのですが、定義だけ載せておきます。

create table book(
  isbn varchar,
  title varchar,
  price int,
  primary key(isbn)
);

create index book_title_index on book (title);

タイトルで検索する関係上、titleカラムにインデックスを定義しておきます。

Entity定義

Apache Cassandra上に作成した、テーブルに対応するEntity定義は、こちら。
src/main/java/org/littlewings/spring/cassandra/entity/Book.java

package org.littlewings.spring.cassandra.entity;

import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;

@Table
public class Book {
    @PrimaryKey
    private String isbn;

    private String title;

    private int price;

    public static Book create(String isbn, String title, int price) {
        Book book = new Book();
        book.isbn = isbn;
        book.title = title;
        book.price = price;
        return book;
    }

    // getter/setterは省略
}

Primary Keyは、@PrimaryKeyアノテーションを指定します。

Working with Primary Keys

コンポジットなキーの場合は、@PrimaryKeyColumnや@PrimaryKeyClassを使うのだそうな。

Repository

Apache Cassandra上に作成した、テーブルを操作するためのRepositoryはこちら。
src/main/java/org/littlewings/spring/cassandra/repository/BookRepository.java

package org.littlewings.spring.cassandra.repository;

import org.littlewings.spring.cassandra.entity.Book;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

public interface BookRepository extends ReactiveCrudRepository<Book, String> {
    Flux<Book> findAllBy();

    Flux<Book> findByTitle(String title);
}

CrudRepositoryを使うのですが

Cassandra Repositories

今回はReactiveなライブラリを使うので、ReactiveCrudRepositoryインターフェースを継承します。

Reactive Composition Libraries

ReactiveCrudRepositoryを使用すると、各メソッドの宣言にMonoやFluxが現れます。

Controller

Repositoryの操作は、今回は簡単にControllerから直接行うことにします。
src/main/java/org/littlewings/spring/cassandra/controller/BookController.java

package org.littlewings.spring.cassandra.controller;

import org.littlewings.spring.cassandra.entity.Book;
import org.littlewings.spring.cassandra.repository.BookRepository;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("book")
public class BookController {
    BookRepository bookRepository;

    public BookController(BookRepository bookRepository) {
        this.bookRepository = bookRepository;
    }

    @PutMapping
    @ResponseBody
    public Mono<Book> save(@RequestBody Book book) {
        return bookRepository.save(book);
    }

    @GetMapping
    @ResponseBody
    public Flux<Book> findAll() {
        return bookRepository.findAllBy();
    }

    @GetMapping("{isbn}")
    @ResponseBody
    public Mono<Book> find(@PathVariable String isbn) {
        return bookRepository.findById(isbn);
    }

    @GetMapping("title")
    @ResponseBody
    public Flux<Book> findByTitle(@RequestParam("title") String title) {
        return bookRepository.findByTitle(title);
    }

    @DeleteMapping("{isbn}")
    public Mono<Void> delete(@PathVariable String isbn) {
        return bookRepository.deleteById(isbn);
    }
}

こちらは、さらっと。

Apache Cassandraへの接続設定(Spring Data for Apache Cassandraの利用設定)

ここで、Apache Cassandraへの接続情報など、今回のmain側のソースコードで重要な、Configクラスを載せます。
src/main/java/org/littlewings/spring/cassandra/ReactiveCassandraConfig.java

package org.littlewings.spring.cassandra;

import java.util.Arrays;
import java.util.LinkedHashSet;

import com.datastax.driver.core.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.cassandra.config.CassandraCqlClusterFactoryBean;
import org.springframework.data.cassandra.config.CassandraCqlSessionFactoryBean;
import org.springframework.data.cassandra.core.ReactiveCassandraTemplate;
import org.springframework.data.cassandra.core.cql.keyspace.CreateKeyspaceSpecification;
import org.springframework.data.cassandra.core.cql.keyspace.DefaultOption;
import org.springframework.data.cassandra.core.cql.keyspace.DropKeyspaceSpecification;
import org.springframework.data.cassandra.core.cql.keyspace.KeyspaceOption;
import org.springframework.data.cassandra.core.cql.keyspace.Option;
import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession;
import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories;
import org.springframework.data.cassandra.util.MapBuilder;

@Configuration
@EnableReactiveCassandraRepositories
public class ReactiveCassandraConfig {  // AbstractReactiveCassandraConfigurationを継承して簡単に済ませてもよい
    @Bean
    public CassandraCqlClusterFactoryBean cluster() {
        CassandraCqlClusterFactoryBean cluster = new CassandraCqlClusterFactoryBean();
        cluster.setContactPoints("172.18.0.2,172.18.0.3");
         cluster.setPort(9042);

        cluster
                .setKeyspaceSpecifications(
                        new LinkedHashSet<>(
                                Arrays
                                        .asList(
                                                DropKeyspaceSpecification.dropKeyspace("mykeyspace").ifExists(),
                                                CreateKeyspaceSpecification
                                                        .createKeyspace("mykeyspace")
                                                        // 次と同じ
                                                        // .withSimpleReplication()
                                                        .with(KeyspaceOption.REPLICATION,
                                                                MapBuilder.map(Option.class, Object.class)
                                                                        .entry(new DefaultOption("class", String.class, true, false, true),
                                                                                KeyspaceOption.ReplicationStrategy.SIMPLE_STRATEGY.getValue())
                                                                        .entry(new DefaultOption("replication_factor", Long.class, true, false, false), 1)
                                                                        .build()
                                                        )
                                        )
                        )
                );


        return cluster;
    }

    @Bean
    public CassandraCqlSessionFactoryBean session() {
        CassandraCqlSessionFactoryBean session = new CassandraCqlSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName("mykeyspace");
        return session;
    }

    @Bean
    public ReactiveCassandraTemplate reactiveCassandraTemplate(Session session) {
        return new ReactiveCassandraTemplate(new DefaultBridgedReactiveSession(session));
    }
}

設定方法は、DataStax Driverのクラスを直接Beanとして返す方法や、Spring Data for Apache Cassandraの提供するFactoryBeanを使う方法、
同じくSpring Data for Apache Cassandraが提供する抽象クラスとして作成する方法などがあるようです。

Connecting to Cassandra with Spring

今回は、ふつうのクラスとして作成しました。

@Configuration
@EnableReactiveCassandraRepositories
public class ReactiveCassandraConfig {  // AbstractReactiveCassandraConfigurationを継承して簡単に済ませてもよい

コメントに記載のある通り、AbstractReactiveCassandraConfigurationクラスを継承して簡単に作成することもできます。

Repositoryを使うので、@EnableReactiveCassandraRepositoriesアノテーションを指定しているところもポイントです。

Reactive Composition Libraries

Connecting to Cassandra with Spring

接続先のApache CassandraへのIPアドレスは、こちらで設定。Keyspaceは、実行時に作成するようにしました。

    @Bean
    public CassandraCqlClusterFactoryBean cluster() {
        CassandraCqlClusterFactoryBean cluster = new CassandraCqlClusterFactoryBean();
        cluster.setContactPoints("172.18.0.2,172.18.0.3");
         cluster.setPort(9042);

        cluster
                .setKeyspaceSpecifications(
                        new LinkedHashSet<>(
                                Arrays
                                        .asList(
                                                DropKeyspaceSpecification.dropKeyspace("mykeyspace").ifExists(),
                                                CreateKeyspaceSpecification
                                                        .createKeyspace("mykeyspace")
                                                        // 次と同じ
                                                        // .withSimpleReplication()
                                                        .with(KeyspaceOption.REPLICATION,
                                                                MapBuilder.map(Option.class, Object.class)
                                                                        .entry(new DefaultOption("class", String.class, true, false, true),
                                                                                KeyspaceOption.ReplicationStrategy.SIMPLE_STRATEGY.getValue())
                                                                        .entry(new DefaultOption("replication_factor", Long.class, true, false, false), 1)
                                                                        .build()
                                                        )
                                        )
                        )
                );


        return cluster;
    }

KeyspaceはSimpleStrategy、replication_factorは1と、オーソドックスな感じです。コメントの記載の通り、この設定であればwithSimpleReplicationメソッドを
使うことで簡単に記述することもできます。

Sessionでは、作成した「mykeyspace」を使うように設定。

    @Bean
    public CassandraCqlSessionFactoryBean session() {
        CassandraCqlSessionFactoryBean session = new CassandraCqlSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName("mykeyspace");
        return session;
    }

あと、テストコードでセットアップ的な処理でReactiveCassandraTemplateを使うので、こちらもBean定義。

    @Bean
    public ReactiveCassandraTemplate reactiveCassandraTemplate(Session session) {
        return new ReactiveCassandraTemplate(new DefaultBridgedReactiveSession(session));
    }

Introduction to ReactiveCassandraTemplate

ここまでで、だいたい準備完了です。

起動クラス

最後に、このアプリケーション全体の起動用のクラスも作成しておきます。
src/main/java/org/littlewings/spring/cassandra/App.java

package org.littlewings.spring.cassandra;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

テストコードの雛形

では、続いてテストコードを書いていきます。

テストコードの雛形は、こちら。
src/test/java/org/littlewings/spring/cassandra/SpringDataCassandraTest.java

package org.littlewings.spring.cassandra;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.littlewings.spring.cassandra.entity.Book;
import org.littlewings.spring.cassandra.repository.BookRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.cassandra.core.ReactiveCassandraTemplate;
import org.springframework.data.cassandra.core.cql.ReactiveCqlOperations;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SpringDataCassandraTest {
    @Autowired
    ReactiveCassandraTemplate reactiveCassandraTemplate;

    @Autowired
    BookRepository bookRepository;

    @Autowired
    private WebTestClient webTestClient;

    // ここに、テストを書く!!
}

JUnit 5を使い、Nettyを起動しつつランダムポートを使用するように設定。

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)

テーブルDROP&CREATE

今回は、テストを実行する度に、対象のテーブルをDROP&CREATEすることにしましょう。

    @BeforeEach
    public void setUp() {
        ReactiveCqlOperations operations = reactiveCassandraTemplate.getReactiveCqlOperations();

        operations
                .execute("drop table if exists book")
                .then(operations.execute("create table book(" +
                        "  isbn varchar," +
                        "  title varchar," +
                        "  price int," +
                        "  primary key(isbn))"))
                .then(operations.execute("create index book_title_index on book (title)"))
                .block();
    }

これは、ReactiveCassandraTemplateから取得できるReactiveCqlOperationsを使って、操作を行います。

テストを書く

あとは、@AutowiredでインジェクションしたRepositoryで操作を行ったり、WebTestClientでControllerにアクセスして動作確認しましょう。

まずは、Repositoryの操作。

    @Test
    public void cassandraRepositoryGettingStarted() {
        StepVerifier.create(Flux.merge(
                bookRepository.save(Book.create("978-4798045733", "RDB技術者のためのNoSQLガイド", 3672)),
                bookRepository.save(Book.create("978-4774192253", "ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)", 3197)),
                bookRepository.save(Book.create("978-4798053776", "アプリケーションエンジニアのためのApache Spark入門", 3672))
        ))
                .expectNextCount(3L)
                .verifyComplete();

        StepVerifier
                .create(bookRepository.findById("978-4798045733").map(Book::getTitle))
                .expectNext("RDB技術者のためのNoSQLガイド")
                .verifyComplete();

        StepVerifier
                .create(bookRepository.findByTitle("RDB技術者のためのNoSQLガイド").map(Book::getIsbn))
                .expectNext("978-4798045733")
                .verifyComplete();

        StepVerifier
                .create(bookRepository.count())
                .expectNext(3L)
                .verifyComplete();

        StepVerifier.create(Flux.merge(
                bookRepository.deleteById("978-4798045733"),
                bookRepository.deleteById("978-4774192253"),
                bookRepository.deleteById("978-4798053776")
        ))
                .expectNextCount(0L)
                .verifyComplete();

        StepVerifier
                .create(bookRepository.count())
                .expectNext(0L)
                .verifyComplete();
    }

ここは、ReactorのAPIを使うことになるので、テストもReactor Testが提供するStepVerifierを利用。

続いて、WebTestClientを使って、Controllerへアクセス。

WebTestClient

    @Test
    public void webClient() {
        Arrays.asList(
                Book.create("978-4798045733", "RDB技術者のためのNoSQLガイド", 3672),
                Book.create("978-4774192253", "ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)", 3197),
                Book.create("978-4798053776", "アプリケーションエンジニアのためのApache Spark入門", 3672)
        ).forEach(book ->
                webTestClient
                        .put()
                        .uri("/book")
                        .body(Mono.just(book), Book.class)
                        .exchange()
                        .expectStatus()
                        .isOk()
        );

        webTestClient
                .get()
                .uri("/book")
                .exchange()
                .expectBodyList(Book.class)
                .consumeWith(booksBody -> {
                    List<Book> books = booksBody.getResponseBody();
                    assertThat(books).hasSize(3);
                    assertThat(books.stream().map(Book::getIsbn).collect(Collectors.toList()))
                            .containsExactlyInAnyOrder("978-4798045733", "978-4774192253", "978-4798053776");
                });

        assertThat(webTestClient
                .get()
                .uri("/book/{isbn}", "978-4774192253")
                .exchange()
                .expectBody(Book.class)
                .returnResult()
                .getResponseBody()
                .getTitle()
        ).isEqualTo("ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)");

        webTestClient
                .get()
                .uri("/book/title?title={title}", "RDB技術者のためのNoSQLガイド")
                .exchange()
                .expectBodyList(Book.class)
                .consumeWith(booksBody -> {
                    List<Book> books = booksBody.getResponseBody();
                    assertThat(books).hasSize(1);
                    assertThat(books.get(0).getIsbn()).isEqualTo("978-4798045733");
                });

        Arrays
                .asList("978-4798045733", "978-4774192253", "978-4798053776")
                .forEach(isbn ->
                        webTestClient
                                .delete()
                                .uri("/book/{isbn}", isbn)
                                .exchange()
                                .expectStatus()
                                .isOk()
                );

        webTestClient
                .get()
                .uri("/book")
                .exchange()
                .expectBodyList(Book.class)
                .hasSize(0);
    }

どちらも、それほど特筆することは…ないかな?

実行。

$ mvn test

結果。

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

まとめ

Spring Data for Apache Cassandra(Reactive)を使って、Spring BootアプリケーションからApache Cassandraにアクセスしてみました。

実は、Reactive以前に、そもそも初めてSpring Data for Apache Cassandraを使うので調べ方にちょっと手間取ったり、WebTestClientも初めて使うので
割と混乱したりといろいろ苦労しましたが、なんとかなりました。

あ、Spring Boot 2でJUnit 5を使うのも初めてでしたね。

これから、ちょっとずつ慣れていきましょう。