Apache SparkのDataFrame API、Spark SQLで、通常のJDBCアクセス可能なデータベースに対しても操作ができそうな感じだったので、ちょっと試してみました。

- 作者: 下田倫大,師岡一成,今井雄太,石川有,田中裕一,小宮篤史,加嵜長門
- 出版社/メーカー: 技術評論社
- 発売日: 2016/04/29
- メディア: 大型本
- この商品を含むブログ (5件) を見る
Thrift JDBC/ODBC serverを使わない?パターンっぽいですが…。
お題
まず、対象のデータベースはMySQLとします。また、データのテーマは書籍で、bookテーブルとします。
このテーブルに対して、Apache Sparkでデータを読み書きしてみましょう。
テーブル定義は、こちら。
CREATE TABLE book( isbn VARCHAR(14), title VARCHAR(255), price INT, PRIMARY KEY(isbn) );
準備
ビルド定義は、このようにしました。
build.sbt
name := "spark-sql-mysql" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.8" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) fork in Test := false libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided", "mysql" % "mysql-connector-java" % "6.0.2" % "runtime", "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
Spark SQLは、providedにしておきました。また、テストコードで確認しようかなと思い、ScalaTestを加えています。
Case Classとテストコードの雛形
今回は、データを扱うのにCase Classを使うことにしました。
簡単にですが、Bookクラスを定義。
src/test/scala/org/littlewings/spark/mysql/Book.scala
package org.littlewings.spark.mysql case class Book(isbn: String, title: String, price: Int)
テストコードの雛形は、こんな感じにしました。お題の書籍データは、テストコード内に持っておきます。
src/test/scala/org/littlewings/spark/mysql/SparkSqlMySqlSpec.scala
package org.littlewings.spark.mysql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{FunSpec, Matchers} class SparkSqlMySqlSpec extends FunSpec with Matchers { val javaeeBook: Book = Book("978-4774127804", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava", 4200) val springBootBook: Book = Book("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 2500) val sparkBook: Book = Book("978-4774181240", "詳解 Apache Spark", 3888) val books: Array[Book] = Array(javaeeBook, springBootBook, sparkBook) describe("Spark SQL MySQL Spec") { // ここに、テストを書く! } }
ここに、テストコードを埋めていきます。
データを保存する
それでは、まずはデータを保存してみましょう。
できあがったコードは、このようになりました。
it("write to MySQL, from DataFrame") { val conf = new SparkConf() .setAppName("Spark SQL from MySQL") .setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) sqlContext.sql( """|CREATE TEMPORARY TABLE book |USING jdbc |OPTIONS ( | url 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false', | user 'kazuhira', | password 'password', | dbtable 'book' |)""".stripMargin) val bookDf = sqlContext.createDataFrame(books) bookDf.write.insertInto("book") sc.stop() }
最初にCREATE TEMPORARY TABLEする必要があるようです。OPTIONSでは、JDBCの接続URLやユーザー名、パスワードなどのJDBC接続関連のプロパティを設定します。また、dbtableというパラメーターも設定します。
sqlContext.sql( """|CREATE TEMPORARY TABLE book |USING jdbc |OPTIONS ( | url 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false', | user 'kazuhira', | password 'password', | dbtable 'book' |)""".stripMargin)
この状態のSQLContextに対して、あらかじめArrayとして作成しておいた書籍データをDataFrameに変換して保存します。
val bookDf = sqlContext.createDataFrame(books) bookDf.write.insertInto("book")
これでなんと、MySQLにデータが保存できました。
mysql> SELECT * FROM book; +----------------+---------------------------------------------------------------------------------+-------+ | isbn | title | price | +----------------+---------------------------------------------------------------------------------+-------+ | 978-4774127804 | Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava | 4200 | | 978-4774181240 | 詳解 Apache Spark | 3888 | | 978-4777518654 | はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発 | 2500 | +----------------+---------------------------------------------------------------------------------+-------+ 3 rows in set (0.00 sec)
DataFrame APIでデータを読み出す
では、今度は保存したデータを読み出してみましょう。ここでは、DataFrame APIを使います。
できあがったコードは、こんな感じ。
it("read from MySQL, as DataFrame") { val conf = new SparkConf() .setAppName("Spark SQL from MySQL") .setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val options = Map( "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false", "user" -> "kazuhira", "password" -> "password", "dbtable" -> "book" ) val jdbcDf = sqlContext.read.format("jdbc").options(options).load() val resultBooks = jdbcDf .orderBy('price.asc) .map(row => Book(row.getString(0), row.getString(1), row.getInt(2))) .collect() resultBooks should have size(3) resultBooks(0) should be(springBootBook) resultBooks(1) should be(sparkBook) resultBooks(2) should be(javaeeBook) sc.stop() }
先ほどと変わったところは、SQLContext#read#formatで「jdbc」を指定し、そのあとoptionsに接続に関する情報はMapとして渡して接続します。こちらの場合は、CREATE TEMPORARY TABLEしなくても、このコードで動きました…。
val options = Map( "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false", "user" -> "kazuhira", "password" -> "password", "dbtable" -> "book" ) val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
あとは、DataFrame APIに沿って操作すればOKっぽいです。
val resultBooks = jdbcDf .orderBy('price.asc) .map(row => Book(row.getString(0), row.getString(1), row.getInt(2))) .collect()
WHERE句を使ったりもできます。
val jdbcDf = sqlContext.read.format("jdbc").options(options).load() val resultBooks = jdbcDf .where('price >= 4000) .orderBy('price.asc) .map(row => Book(row.getString(0), row.getString(1), row.getInt(2))) .collect() resultBooks should have size(1) resultBooks(0) should be(javaeeBook)
Spark SQLでデータを読み出す
最後は、Spark SQLで書いてみます。
コードは、こんな感じになりました。
it("read from MySQL, as SQL") { val conf = new SparkConf() .setAppName("Spark SQL from MySQL") .setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val options = Map( "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false", "user" -> "kazuhira", "password" -> "password", "dbtable" -> "book" ) val jdbcDf = sqlContext.read.format("jdbc").options(options).load() jdbcDf.registerTempTable("book") val selectDf = sqlContext.sql("SELECT isbn, title, price FROM book ORDER BY price ASC") val resultBooks = selectDf .map(row => Book(row.getString(0), row.getString(1), row.getInt(2))) .collect() resultBooks should have size(3) resultBooks(0) should be(springBootBook) resultBooks(1) should be(sparkBook) resultBooks(2) should be(javaeeBook) sc.stop() }
接続あたりまではDataFrame APIの時とかなり近いですが、こちらの場合はregisterTempTableを行っておく必要があります。
jdbcDf.registerTempTable("book")
あとはSQLContext#sqlでクエリが投げられるので、ここからDataFrameを取得、操作します。
val selectDf = sqlContext.sql("SELECT isbn, title, price FROM book ORDER BY price ASC") val resultBooks = selectDf .map(row => Book(row.getString(0), row.getString(1), row.getInt(2))) .collect()
WHERE句を使っても大丈夫。
val jdbcDf = sqlContext.read.format("jdbc").options(options).load() jdbcDf.registerTempTable("book") val selectDf = sqlContext.sql("SELECT isbn, title, price FROM book WHERE price >= 4000 ORDER BY price ASC") val resultBooks = selectDf .map(row => Book(row.getString(0), row.getString(1), row.getInt(2))) .collect() resultBooks should have size(1) resultBooks(0) should be(javaeeBook)