そろそろ、Spring WebFluxを使いつつ、データストアも組み合わせて覚えていきたいなと思いまして。
Reactiveな接続がサポートされているのは、Redis、Couchbase、Apache Cassandra、MongoDBですが、この中なら個人的には
Apache Cassandraで試しておきたいと。
利用するものとしては、以下の組み合わせで遊んでいきたいと思います。
- Spring Boot
- Spring WebFlux
- Spring Data for Apache Cassandra / Reactive
テストも書く方向で。
お題
バックエンドに置いたApache Cassandraに対して、ReactiveにアクセスするREST APIを作成します。データのお題は書籍で、
- 登録
- 全件取得
- Primary Key検索
- タイトルで検索
- 削除
といった操作ができるように、アプリケーションを作成します。
環境
今回の環境は、こちら。
$ java -version openjdk version "1.8.0_171" OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11) OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode) $ mvn -version Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_171, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-23-generic", arch: "amd64", family: "unix"
Apache Cassandraは、2 Node構成(172.18.0.2、172.18.0.3)とし、バージョンは3.11.2とします。
準備
Mavenの設定は、こちら。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.0.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <exclusions> <exclusion> <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.0.2.RELEASE</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.19.1</version> <dependencies> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-surefire-provider</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.1.1</version> </dependency> </dependencies> </plugin> </plugins> </build>
主要なライブラリとして、Spring WebFluxとSpring Data for Cassandra(Reactive)を追加。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <exclusions> <exclusion> <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId> </dependency>
Nettyのepoll用のネイティブライブラリは、今回は(ロードに失敗してたので)外しました。
テストは、Spring Boot+JUnit 5(+AssertJ)でいきます。JUnit 5を使うので、spring-boot-starter-testからJUnit 4をexcludeしておきましょう。
参照したドキュメントは、こちら。
テーブル定義
Apache Cassandraのキースペース名は「mykeyspace」とし、こちらは実行時に作成することにします。
テーブルについてもテストコード中に作成するのですが、定義だけ載せておきます。
create table book( isbn varchar, title varchar, price int, primary key(isbn) ); create index book_title_index on book (title);
タイトルで検索する関係上、titleカラムにインデックスを定義しておきます。
Entity定義
Apache Cassandra上に作成した、テーブルに対応するEntity定義は、こちら。
src/main/java/org/littlewings/spring/cassandra/entity/Book.java
package org.littlewings.spring.cassandra.entity; import org.springframework.data.cassandra.core.mapping.PrimaryKey; import org.springframework.data.cassandra.core.mapping.Table; @Table public class Book { @PrimaryKey private String isbn; private String title; private int price; public static Book create(String isbn, String title, int price) { Book book = new Book(); book.isbn = isbn; book.title = title; book.price = price; return book; } // getter/setterは省略 }
Primary Keyは、@PrimaryKeyアノテーションを指定します。
コンポジットなキーの場合は、@PrimaryKeyColumnや@PrimaryKeyClassを使うのだそうな。
Repository
Apache Cassandra上に作成した、テーブルを操作するためのRepositoryはこちら。
src/main/java/org/littlewings/spring/cassandra/repository/BookRepository.java
package org.littlewings.spring.cassandra.repository; import org.littlewings.spring.cassandra.entity.Book; import org.springframework.data.repository.reactive.ReactiveCrudRepository; import reactor.core.publisher.Flux; public interface BookRepository extends ReactiveCrudRepository<Book, String> { Flux<Book> findAllBy(); Flux<Book> findByTitle(String title); }
CrudRepositoryを使うのですが
今回はReactiveなライブラリを使うので、ReactiveCrudRepositoryインターフェースを継承します。
Reactive Composition Libraries
ReactiveCrudRepositoryを使用すると、各メソッドの宣言にMonoやFluxが現れます。
Controller
Repositoryの操作は、今回は簡単にControllerから直接行うことにします。
src/main/java/org/littlewings/spring/cassandra/controller/BookController.java
package org.littlewings.spring.cassandra.controller; import org.littlewings.spring.cassandra.entity.Book; import org.littlewings.spring.cassandra.repository.BookRepository; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController @RequestMapping("book") public class BookController { BookRepository bookRepository; public BookController(BookRepository bookRepository) { this.bookRepository = bookRepository; } @PutMapping @ResponseBody public Mono<Book> save(@RequestBody Book book) { return bookRepository.save(book); } @GetMapping @ResponseBody public Flux<Book> findAll() { return bookRepository.findAllBy(); } @GetMapping("{isbn}") @ResponseBody public Mono<Book> find(@PathVariable String isbn) { return bookRepository.findById(isbn); } @GetMapping("title") @ResponseBody public Flux<Book> findByTitle(@RequestParam("title") String title) { return bookRepository.findByTitle(title); } @DeleteMapping("{isbn}") public Mono<Void> delete(@PathVariable String isbn) { return bookRepository.deleteById(isbn); } }
こちらは、さらっと。
Apache Cassandraへの接続設定(Spring Data for Apache Cassandraの利用設定)
ここで、Apache Cassandraへの接続情報など、今回のmain側のソースコードで重要な、Configクラスを載せます。
src/main/java/org/littlewings/spring/cassandra/ReactiveCassandraConfig.java
package org.littlewings.spring.cassandra; import java.util.Arrays; import java.util.LinkedHashSet; import com.datastax.driver.core.Session; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.cassandra.config.CassandraCqlClusterFactoryBean; import org.springframework.data.cassandra.config.CassandraCqlSessionFactoryBean; import org.springframework.data.cassandra.core.ReactiveCassandraTemplate; import org.springframework.data.cassandra.core.cql.keyspace.CreateKeyspaceSpecification; import org.springframework.data.cassandra.core.cql.keyspace.DefaultOption; import org.springframework.data.cassandra.core.cql.keyspace.DropKeyspaceSpecification; import org.springframework.data.cassandra.core.cql.keyspace.KeyspaceOption; import org.springframework.data.cassandra.core.cql.keyspace.Option; import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession; import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories; import org.springframework.data.cassandra.util.MapBuilder; @Configuration @EnableReactiveCassandraRepositories public class ReactiveCassandraConfig { // AbstractReactiveCassandraConfigurationを継承して簡単に済ませてもよい @Bean public CassandraCqlClusterFactoryBean cluster() { CassandraCqlClusterFactoryBean cluster = new CassandraCqlClusterFactoryBean(); cluster.setContactPoints("172.18.0.2,172.18.0.3"); cluster.setPort(9042); cluster .setKeyspaceSpecifications( new LinkedHashSet<>( Arrays .asList( DropKeyspaceSpecification.dropKeyspace("mykeyspace").ifExists(), CreateKeyspaceSpecification .createKeyspace("mykeyspace") // 次と同じ // .withSimpleReplication() .with(KeyspaceOption.REPLICATION, MapBuilder.map(Option.class, Object.class) .entry(new DefaultOption("class", String.class, true, false, true), KeyspaceOption.ReplicationStrategy.SIMPLE_STRATEGY.getValue()) .entry(new DefaultOption("replication_factor", Long.class, true, false, false), 1) .build() ) ) ) ); return cluster; } @Bean public CassandraCqlSessionFactoryBean session() { CassandraCqlSessionFactoryBean session = new CassandraCqlSessionFactoryBean(); session.setCluster(cluster().getObject()); session.setKeyspaceName("mykeyspace"); return session; } @Bean public ReactiveCassandraTemplate reactiveCassandraTemplate(Session session) { return new ReactiveCassandraTemplate(new DefaultBridgedReactiveSession(session)); } }
設定方法は、DataStax Driverのクラスを直接Beanとして返す方法や、Spring Data for Apache Cassandraの提供するFactoryBeanを使う方法、
同じくSpring Data for Apache Cassandraが提供する抽象クラスとして作成する方法などがあるようです。
Connecting to Cassandra with Spring
今回は、ふつうのクラスとして作成しました。
@Configuration @EnableReactiveCassandraRepositories public class ReactiveCassandraConfig { // AbstractReactiveCassandraConfigurationを継承して簡単に済ませてもよい
コメントに記載のある通り、AbstractReactiveCassandraConfigurationクラスを継承して簡単に作成することもできます。
Repositoryを使うので、@EnableReactiveCassandraRepositoriesアノテーションを指定しているところもポイントです。
Reactive Composition Libraries
Connecting to Cassandra with Spring
接続先のApache CassandraへのIPアドレスは、こちらで設定。Keyspaceは、実行時に作成するようにしました。
@Bean public CassandraCqlClusterFactoryBean cluster() { CassandraCqlClusterFactoryBean cluster = new CassandraCqlClusterFactoryBean(); cluster.setContactPoints("172.18.0.2,172.18.0.3"); cluster.setPort(9042); cluster .setKeyspaceSpecifications( new LinkedHashSet<>( Arrays .asList( DropKeyspaceSpecification.dropKeyspace("mykeyspace").ifExists(), CreateKeyspaceSpecification .createKeyspace("mykeyspace") // 次と同じ // .withSimpleReplication() .with(KeyspaceOption.REPLICATION, MapBuilder.map(Option.class, Object.class) .entry(new DefaultOption("class", String.class, true, false, true), KeyspaceOption.ReplicationStrategy.SIMPLE_STRATEGY.getValue()) .entry(new DefaultOption("replication_factor", Long.class, true, false, false), 1) .build() ) ) ) ); return cluster; }
KeyspaceはSimpleStrategy、replication_factorは1と、オーソドックスな感じです。コメントの記載の通り、この設定であればwithSimpleReplicationメソッドを
使うことで簡単に記述することもできます。
Sessionでは、作成した「mykeyspace」を使うように設定。
@Bean public CassandraCqlSessionFactoryBean session() { CassandraCqlSessionFactoryBean session = new CassandraCqlSessionFactoryBean(); session.setCluster(cluster().getObject()); session.setKeyspaceName("mykeyspace"); return session; }
あと、テストコードでセットアップ的な処理でReactiveCassandraTemplateを使うので、こちらもBean定義。
@Bean public ReactiveCassandraTemplate reactiveCassandraTemplate(Session session) { return new ReactiveCassandraTemplate(new DefaultBridgedReactiveSession(session)); }
Introduction to ReactiveCassandraTemplate
ここまでで、だいたい準備完了です。
起動クラス
最後に、このアプリケーション全体の起動用のクラスも作成しておきます。
src/main/java/org/littlewings/spring/cassandra/App.java
package org.littlewings.spring.cassandra; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
テストコードの雛形
では、続いてテストコードを書いていきます。
テストコードの雛形は、こちら。
src/test/java/org/littlewings/spring/cassandra/SpringDataCassandraTest.java
package org.littlewings.spring.cassandra; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.littlewings.spring.cassandra.entity.Book; import org.littlewings.spring.cassandra.repository.BookRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.cassandra.core.ReactiveCassandraTemplate; import org.springframework.data.cassandra.core.cql.ReactiveCqlOperations; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.web.reactive.server.WebTestClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(SpringExtension.class) @SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class SpringDataCassandraTest { @Autowired ReactiveCassandraTemplate reactiveCassandraTemplate; @Autowired BookRepository bookRepository; @Autowired private WebTestClient webTestClient; // ここに、テストを書く!! }
JUnit 5を使い、Nettyを起動しつつランダムポートを使用するように設定。
@ExtendWith(SpringExtension.class) @SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
テーブルDROP&CREATE
今回は、テストを実行する度に、対象のテーブルをDROP&CREATEすることにしましょう。
@BeforeEach public void setUp() { ReactiveCqlOperations operations = reactiveCassandraTemplate.getReactiveCqlOperations(); operations .execute("drop table if exists book") .then(operations.execute("create table book(" + " isbn varchar," + " title varchar," + " price int," + " primary key(isbn))")) .then(operations.execute("create index book_title_index on book (title)")) .block(); }
これは、ReactiveCassandraTemplateから取得できるReactiveCqlOperationsを使って、操作を行います。
テストを書く
あとは、@AutowiredでインジェクションしたRepositoryで操作を行ったり、WebTestClientでControllerにアクセスして動作確認しましょう。
まずは、Repositoryの操作。
@Test public void cassandraRepositoryGettingStarted() { StepVerifier.create(Flux.merge( bookRepository.save(Book.create("978-4798045733", "RDB技術者のためのNoSQLガイド", 3672)), bookRepository.save(Book.create("978-4774192253", "ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)", 3197)), bookRepository.save(Book.create("978-4798053776", "アプリケーションエンジニアのためのApache Spark入門", 3672)) )) .expectNextCount(3L) .verifyComplete(); StepVerifier .create(bookRepository.findById("978-4798045733").map(Book::getTitle)) .expectNext("RDB技術者のためのNoSQLガイド") .verifyComplete(); StepVerifier .create(bookRepository.findByTitle("RDB技術者のためのNoSQLガイド").map(Book::getIsbn)) .expectNext("978-4798045733") .verifyComplete(); StepVerifier .create(bookRepository.count()) .expectNext(3L) .verifyComplete(); StepVerifier.create(Flux.merge( bookRepository.deleteById("978-4798045733"), bookRepository.deleteById("978-4774192253"), bookRepository.deleteById("978-4798053776") )) .expectNextCount(0L) .verifyComplete(); StepVerifier .create(bookRepository.count()) .expectNext(0L) .verifyComplete(); }
ここは、ReactorのAPIを使うことになるので、テストもReactor Testが提供するStepVerifierを利用。
続いて、WebTestClientを使って、Controllerへアクセス。
@Test public void webClient() { Arrays.asList( Book.create("978-4798045733", "RDB技術者のためのNoSQLガイド", 3672), Book.create("978-4774192253", "ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)", 3197), Book.create("978-4798053776", "アプリケーションエンジニアのためのApache Spark入門", 3672) ).forEach(book -> webTestClient .put() .uri("/book") .body(Mono.just(book), Book.class) .exchange() .expectStatus() .isOk() ); webTestClient .get() .uri("/book") .exchange() .expectBodyList(Book.class) .consumeWith(booksBody -> { List<Book> books = booksBody.getResponseBody(); assertThat(books).hasSize(3); assertThat(books.stream().map(Book::getIsbn).collect(Collectors.toList())) .containsExactlyInAnyOrder("978-4798045733", "978-4774192253", "978-4798053776"); }); assertThat(webTestClient .get() .uri("/book/{isbn}", "978-4774192253") .exchange() .expectBody(Book.class) .returnResult() .getResponseBody() .getTitle() ).isEqualTo("ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)"); webTestClient .get() .uri("/book/title?title={title}", "RDB技術者のためのNoSQLガイド") .exchange() .expectBodyList(Book.class) .consumeWith(booksBody -> { List<Book> books = booksBody.getResponseBody(); assertThat(books).hasSize(1); assertThat(books.get(0).getIsbn()).isEqualTo("978-4798045733"); }); Arrays .asList("978-4798045733", "978-4774192253", "978-4798053776") .forEach(isbn -> webTestClient .delete() .uri("/book/{isbn}", isbn) .exchange() .expectStatus() .isOk() ); webTestClient .get() .uri("/book") .exchange() .expectBodyList(Book.class) .hasSize(0); }
どちらも、それほど特筆することは…ないかな?
実行。
$ mvn test
結果。
Results : Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
まとめ
Spring Data for Apache Cassandra(Reactive)を使って、Spring BootアプリケーションからApache Cassandraにアクセスしてみました。
実は、Reactive以前に、そもそも初めてSpring Data for Apache Cassandraを使うので調べ方にちょっと手間取ったり、WebTestClientも初めて使うので
割と混乱したりといろいろ苦労しましたが、なんとかなりました。
あ、Spring Boot 2でJUnit 5を使うのも初めてでしたね。
これから、ちょっとずつ慣れていきましょう。