CLOVER🍀

That was when it all began.

Apache Spark(DataFrame API/Spark SQL)で、MySQLのデータを読み書きする

Apache SparkのDataFrame API、Spark SQLで、通常のJDBCアクセス可能なデータベースに対しても操作ができそうな感じだったので、ちょっと試してみました。

詳解 Apache Spark

詳解 Apache Spark

Thrift JDBC/ODBC serverを使わない?パターンっぽいですが…。

JDBC To Other Databases

お題

まず、対象のデータベースはMySQLとします。また、データのテーマは書籍で、bookテーブルとします。

このテーブルに対して、Apache Sparkでデータを読み書きしてみましょう。

テーブル定義は、こちら。

CREATE TABLE book(
  isbn VARCHAR(14),
  title VARCHAR(255),
  price INT,
  PRIMARY KEY(isbn)
);

準備

ビルド定義は、このようにしました。
build.sbt

name := "spark-sql-mysql"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := false

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
  "mysql" % "mysql-connector-java" % "6.0.2" % "runtime",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

Spark SQLは、providedにしておきました。また、テストコードで確認しようかなと思い、ScalaTestを加えています。

Case Classとテストコードの雛形

今回は、データを扱うのにCase Classを使うことにしました。

簡単にですが、Bookクラスを定義。
src/test/scala/org/littlewings/spark/mysql/Book.scala

package org.littlewings.spark.mysql

case class Book(isbn: String, title: String, price: Int)

テストコードの雛形は、こんな感じにしました。お題の書籍データは、テストコード内に持っておきます。
src/test/scala/org/littlewings/spark/mysql/SparkSqlMySqlSpec.scala

package org.littlewings.spark.mysql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{FunSpec, Matchers}

class SparkSqlMySqlSpec extends FunSpec with Matchers {
  val javaeeBook: Book = Book("978-4774127804", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava", 4200)
  val springBootBook: Book = Book("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 2500)
  val sparkBook: Book = Book("978-4774181240", "詳解 Apache Spark", 3888)

  val books: Array[Book] = Array(javaeeBook, springBootBook, sparkBook)

  describe("Spark SQL MySQL Spec") {
    // ここに、テストを書く!
  }
}

ここに、テストコードを埋めていきます。

データを保存する

それでは、まずはデータを保存してみましょう。

できあがったコードは、このようになりました。

    it("write to MySQL, from DataFrame") {
      val conf =
        new SparkConf()
          .setAppName("Spark SQL from MySQL")
          .setMaster("local[*]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      sqlContext.sql(
        """|CREATE TEMPORARY TABLE book
          |USING jdbc
          |OPTIONS (
          | url 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false',
          | user 'kazuhira',
          | password 'password',
          | dbtable 'book'
          |)""".stripMargin)

      val bookDf = sqlContext.createDataFrame(books)
      bookDf.write.insertInto("book")

      sc.stop()
    }

最初にCREATE TEMPORARY TABLEする必要があるようです。OPTIONSでは、JDBCの接続URLやユーザー名、パスワードなどのJDBC接続関連のプロパティを設定します。また、dbtableというパラメーターも設定します。

      sqlContext.sql(
        """|CREATE TEMPORARY TABLE book
          |USING jdbc
          |OPTIONS (
          | url 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false',
          | user 'kazuhira',
          | password 'password',
          | dbtable 'book'
          |)""".stripMargin)

この状態のSQLContextに対して、あらかじめArrayとして作成しておいた書籍データをDataFrameに変換して保存します。

      val bookDf = sqlContext.createDataFrame(books)
      bookDf.write.insertInto("book")

これでなんと、MySQLにデータが保存できました。

mysql> SELECT * FROM book;
+----------------+---------------------------------------------------------------------------------+-------+
| isbn           | title                                                                           | price |
+----------------+---------------------------------------------------------------------------------+-------+
| 978-4774127804 | Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava                     |  4200 |
| 978-4774181240 | 詳解 Apache Spark                                                               |  3888 |
| 978-4777518654 | はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発                  |  2500 |
+----------------+---------------------------------------------------------------------------------+-------+
3 rows in set (0.00 sec)

DataFrame APIでデータを読み出す

では、今度は保存したデータを読み出してみましょう。ここでは、DataFrame APIを使います。

できあがったコードは、こんな感じ。

    it("read from MySQL, as DataFrame") {
      val conf =
        new SparkConf()
          .setAppName("Spark SQL from MySQL")
          .setMaster("local[*]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      import sqlContext.implicits._

      val options = Map(
        "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false",
        "user" -> "kazuhira",
        "password" -> "password",
        "dbtable" -> "book"
      )

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      val resultBooks =
        jdbcDf
          .orderBy('price.asc)
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(3)
      resultBooks(0) should be(springBootBook)
      resultBooks(1) should be(sparkBook)
      resultBooks(2) should be(javaeeBook)

      sc.stop()
    }

先ほどと変わったところは、SQLContext#read#formatで「jdbc」を指定し、そのあとoptionsに接続に関する情報はMapとして渡して接続します。こちらの場合は、CREATE TEMPORARY TABLEしなくても、このコードで動きました…。

      val options = Map(
        "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false",
        "user" -> "kazuhira",
        "password" -> "password",
        "dbtable" -> "book"
      )

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()

あとは、DataFrame APIに沿って操作すればOKっぽいです。

      val resultBooks =
        jdbcDf
          .orderBy('price.asc)
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

WHERE句を使ったりもできます。

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      val resultBooks =
        jdbcDf
          .where('price >= 4000)
          .orderBy('price.asc)
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(1)
      resultBooks(0) should be(javaeeBook)

Spark SQLでデータを読み出す

最後は、Spark SQLで書いてみます。

コードは、こんな感じになりました。

    it("read from MySQL, as SQL") {
      val conf =
        new SparkConf()
          .setAppName("Spark SQL from MySQL")
          .setMaster("local[*]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      val options = Map(
        "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false",
        "user" -> "kazuhira",
        "password" -> "password",
        "dbtable" -> "book"
      )

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      jdbcDf.registerTempTable("book")

      val selectDf =
        sqlContext.sql("SELECT isbn, title, price FROM book ORDER BY price ASC")

      val resultBooks =
        selectDf
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(3)
      resultBooks(0) should be(springBootBook)
      resultBooks(1) should be(sparkBook)
      resultBooks(2) should be(javaeeBook)

      sc.stop()
    }

接続あたりまではDataFrame APIの時とかなり近いですが、こちらの場合はregisterTempTableを行っておく必要があります。

      jdbcDf.registerTempTable("book")

あとはSQLContext#sqlでクエリが投げられるので、ここからDataFrameを取得、操作します。

      val selectDf =
        sqlContext.sql("SELECT isbn, title, price FROM book ORDER BY price ASC")

      val resultBooks =
        selectDf
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

WHERE句を使っても大丈夫。

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      jdbcDf.registerTempTable("book")

      val selectDf =
        sqlContext.sql("SELECT isbn, title, price FROM book WHERE price >= 4000 ORDER BY price ASC")

      val resultBooks =
        selectDf
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(1)
      resultBooks(0) should be(javaeeBook)

まとめ

JDBC接続可能なデータベース(今回はMySQL)に対して、Apache Sparkで接続してDataFrame API/Spark SQLでいろいろ操作しました。

あんまりふつうのRDBMSにつなげるような例を見かけなかったので、動かすまでにはそれなりに苦労しましたが、まあなんとかなってよかったです。

今回のコードを書くにあたっては、最初に載せたオフィシャルのドキュメント以外に、以下のページを参考にしています。

Apache Spark の JdbcRDD を使ってみた結果

Spark SQL MySQL Example with JDBC