CLOVER🍀

That was when it all began.

Apache Sparkで、HDFS上のファイルを読み書きする

Apache Sparkで、HDFS上のファイルに対して読み書きをしてみます。

といっても、SparkContext#textFileやRDD#saveAsTextFileへ渡すパスを、「hdfs://」から始まるものにすればよさそうです。

なお、HDFSとSparkですが、今回はCDH 5.4.4で構築してみました。なので、Apache Sparkは最新版の1.4系ではなく、1.3系になっています。

プログラム

LuceneのStanardAnalyzerを使い、SparkのREADME.mdのWord Countをするプログラムを書いてみます。

とりあえず、ビルド定義から。

build.sbt

name := "word-count-lucene-analyzer-cdh-hdfs"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-target:jvm-1.7", "-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

resolvers += "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0-cdh5.4.4" % "provided",
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1"
)

Clouderaのリポジトリを使うので、Resolverの追加と

resolvers += "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

Sparkのバージョン指定がちょっと変わります。

  "org.apache.spark" %% "spark-core" % "1.3.0-cdh5.4.4" % "provided",

sbtプラグインの設定。assemblyを使います。
project/plugins.sbt

logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

Word Countするプログラム。
src/main/scala/org/littlewings/spark/WordCount.scala

package org.littlewings.spark

import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit
  = {
    val (hostname, inputPath, outputPath) = args.toList match {
      case h :: in :: out :: Nil => (h, in, out)
      case _ =>
        println("Required hostname, input-path, output-path.")
        sys.exit(1)
    }

    val file = s"hdfs://$hostname:8020$inputPath"
    println(s"Input File => $file")

    val conf = new SparkConf().setAppName("Word Count Application")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(file)

    textFile
      .flatMap { line =>
      val analyzer = new StandardAnalyzer
      val tokenStream = analyzer.tokenStream("", line)
      val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])

      tokenStream.reset()

      try {
        Iterator
          .continually(tokenStream.incrementToken())
          .takeWhile(identity)
          .map(_ => charAttr.toString)
          .toVector
      } finally {
        tokenStream.end()
        tokenStream.close()
      }
    }
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .saveAsTextFile(s"hdfs://$hostname:8020$outputPath")
  }
}

起動引数に、HDFSにアクセスするためのホスト名(またはIPアドレス)、HDFS上の入力パス、HDFS上の出力先を指定するようにしています。

    val (hostname, inputPath, outputPath) = args.toList match {
      case h :: in :: out :: Nil => (h, in, out)
      case _ =>
        println("Required hostname, input-path, output-path.")
        sys.exit(1)
    }

入力。

    val file = s"hdfs://$hostname:8020$inputPath"

出力。

      .saveAsTextFile(s"hdfs://$hostname:8020$outputPath")

これをassemblyして

> assembly

JARを作成。

[info] Packaging /path/to/target/scala-2.10/word-count-lucene-analyzer-cdh-hdfs-assembly-0.0.1-SNAPSHOT.jar ...

では、動作させてみます。HDFSへのアクセス先のホストは、「cdh-server」とします。

# sudo -u spark spark-submit --class org.littlewings.spark.WordCount /path/to/word-count-lucene-analyzer-cdh-hdfs-assembly-0.0.1-SNAPSHOT.jar cdh-server /user/spark/input/README.md /user/spark/output/word-count

動いてる感じ。

15/08/02 06:58:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/08/02 06:58:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/08/02 06:58:59 INFO output.FileOutputCommitter: Saved output of task 'attempt_201508020658_0001_m_000001_3' to hdfs://cdh-server:8020/user/spark/output/word-count/_temporary/0/task_201508020658_0001_m_000001
15/08/02 06:58:59 INFO spark.SparkHadoopWriter: attempt_201508020658_0001_m_000001_3: Committed
15/08/02 06:58:59 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1828 bytes result sent to driver
15/08/02 06:58:59 INFO output.FileOutputCommitter: Saved output of task 'attempt_201508020658_0001_m_000000_2' to hdfs://cdh-server:8020/user/spark/output/word-count/_temporary/0/task_201508020658_0001_m_000000
15/08/02 06:58:59 INFO spark.SparkHadoopWriter: attempt_201508020658_0001_m_000000_2: Committed
15/08/02 06:58:59 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1828 bytes result sent to driver
15/08/02 06:58:59 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 582 ms on localhost (1/2)
15/08/02 06:58:59 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 585 ms on localhost (2/2)
15/08/02 06:58:59 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at WordCount.scala:45) finished in 0.586 s
15/08/02 06:58:59 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/08/02 06:58:59 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at WordCount.scala:45, took 2.008179 s

なんか、できたっぽいです。

# sudo -u spark hdfs dfs -ls /user/spark/output/word-count
Found 3 items
-rw-r--r--   1 spark supergroup          0 2015-08-02 06:58 /user/spark/output/word-count/_SUCCESS
-rw-r--r--   1 spark supergroup       1209 2015-08-02 06:58 /user/spark/output/word-count/part-00000
-rw-r--r--   1 spark supergroup       1125 2015-08-02 06:58 /user/spark/output/word-count/part-00001

OKそうですね。

# sudo -u spark hdfs dfs -cat /user/spark/output/word-count/part-00000
(scala,4)
(package,3)
(hive,2)
(guide,2)
(its,1)
(general,2)
(have,1)
(locally,3)
(big,1)
(changed,1)
(several,1)
(only,1)
(basic,1)
(first,1)
(documentation,5)
〜省略〜

2つ目も。

# sudo -u spark hdfs dfs -cat /user/spark/output/word-count/part-00001
(particular,3)
(spark,24)
(7077,1)
(computation,1)
(instance,1)
(spark.apache.org,6)
(library,1)
(through,1)
(following,2)
(range,1)
(which,2)
(once,1)
(threads,1)
〜省略〜

一応、できましたよっと。

オマケ:Docker

今回、HDFSを動かすサーバーおよび、Spark Submitを行うクライアントをそれぞれDockerコンテナで動かしました。

この前作った、こちらのDockerfileをベースにしています。

CDH 5.4.4で、HDFS+YARNのDockerイメージを作る
http://d.hatena.ne.jp/Kazuhira/20150730/1438260029

今回は、SparkおよびHiveを足して、このようなDockerfileにしました。
Dockerfile

FROM centos:6.6

ENV JDK_VERSION 7u79
ENV JDK_BUILD_NO b15
ENV JDK_RPM jdk-${JDK_VERSION}-linux-x64.rpm

ENV JAVA_HOME /usr/java/default

## HDFS Ports.
EXPOSE 8020 50010 50020 50070 50075 50090

## YARN Ports.
EXPOSE 8030 8031 8032 8033 8040 8042 8089 10020 10033 13562 19888

## Spark Ports.
EXPOSE 6066 7077 7078 18080 18081

# $ grep EXPOSE Dockerfile | perl -wp -e 's!EXPOSE!!; s!\r?\n!!; s!(\d+) ?!-p $1:$1 !g'

RUN yum install -y wget \
                   sudo && \
    sed -ri 's/Defaults    requiretty/Defaults:root    !requiretty/' /etc/sudoers && \
    wget -q -O /tmp/jdk.rpm --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/${JDK_VERSION}-${JDK_BUILD_NO}/${JDK_RPM} && \
    rpm -ivh /tmp/jdk.rpm && \
    wget -q -O /etc/yum.repos.d/cloudera-cdh5.repo http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/cloudera-cdh5.repo && \
    sed -ri 's/cdh\/5/cdh\/5.4.4/' /etc/yum.repos.d/cloudera-cdh5.repo && \
    yum install -y hadoop \
                   hadoop-hdfs \
                   hadoop-hdfs-namenode \
                   hadoop-hdfs-secondarynamenode \
                   hadoop-hdfs-datanode \
                   hadoop-yarn \
                   hadoop-yarn-resourcemanager \
                   hadoop-yarn-nodemanager \
                   hadoop-mapreduce \
                   hadoop-mapreduce-historyserver \
                   spark-core \
                   spark-master \
                   spark-worker \
                   hive \
                   hadoop-client

RUN cp -Rp /etc/hadoop/conf.empty /etc/hadoop/conf.mine && \
    alternatives --install /etc/hadoop/conf hadoop-conf /etc/hadoop/conf.mine 50 && \
    alternatives --set hadoop-conf /etc/hadoop/conf.mine && \
    mkdir -p /var/lib/hadoop-hdfs/cache/hdfs/name && \
    chown -R hdfs.hdfs /var/lib/hadoop-hdfs/cache/hdfs/name && \
    chmod 775 /var/lib/hadoop-hdfs/cache/hdfs/name && \
    mkdir -p /var/lib/hadoop-hdfs/cache/hdfs/dfs && \
    chown -R hdfs.hdfs /var/lib/hadoop-hdfs/cache/hdfs/dfs && \
    chmod 775 /var/lib/hadoop-hdfs/cache/hdfs/dfs && \
    mkdir -p /var/lib/hadoop-hdfs/cache/hdfs/tmp && \
    chown -R hdfs.hdfs /var/lib/hadoop-hdfs/cache/hdfs/dfs && \
    chmod 775 /var/lib/hadoop-hdfs/cache/hdfs/dfs && \
    cp -Rp /etc/hive/conf.dist /etc/hive/conf.mine && \
    alternatives --install /etc/hive/conf hive-conf /etc/hive/conf.mine 50 && \
    alternatives --set hive-conf /etc/hive/conf.mine && \
    cp -Rp /etc/spark/conf.dist /etc/spark/conf.mine && \
    alternatives --install /etc/spark/conf spark-conf /etc/spark/conf.mine 50 && \
    alternatives --set spark-conf /etc/spark/conf.mine


ADD core-site.xml /etc/hadoop/conf.mine/core-site.xml
ADD hdfs-site.xml /etc/hadoop/conf.mine/hdfs-site.xml
ADD mapred-site.xml /etc/hadoop/conf.mine/mapred-site.xml
ADD yarn-site.xml /etc/hadoop/conf.mine/yarn-site.xml
ADD create-user-hdfs.sh create-user-hdfs.sh
ADD fix-hostname.sh fix-hostname.sh
ADD init-and-start.sh init-and-start.sh

RUN chmod a+x create-user-hdfs.sh \
              fix-hostname.sh \
              init-and-start.sh

CMD ./init-and-start.sh && tail -f /dev/null

※「create-user-hdfs.sh」とかいうスクリプトは、今回関係ないので無視してください…。

Hiveは現時点では使わないのですが、Spark Shellを動かした時にHiveがないと例外が出る(とはいえ、Spark SQLが動かなくなるだけ)のがちょっと気持ち悪かったので、入れています。

HDFSの設定ファイルなどは、前回のままです。また、前回はYARNのResourceManager、NodeManagerも動かしていましたが、今回はHDFSのNameNodeおよびDataNodeのみにしています。

こちらで、Dockerイメージをビルド…。

$ docker build -t kazuhira/centos6-cdh5:5.4.4 .

HDFSのサーバー側は、コンテナ名「cdh-server」、ホスト名「cdh-server」で動作させます。

$ docker run -d --name cdh-server -h cdh-server -p 8020:8020 -p 50010:50010 -p 50020:50020 -p 50070:50070 -p 50075:50075 -p 50090:50090  -p 8030:8030 -p 8031:8031 -p 8032:8032 -p 8033:8033 -p 8040:8040 -p 8042:8042 -p 8089:8089 -p 10020:10020 -p 10033:10033 -p 13562:13562 -p 19888:19888  -p 6066:6066 -p 7077:7077 -p 7078:7078 -p 18080:18080 -p 18081:18081 kazuhira/centos6-cdh5:5.4.4

同じDockerイメージを使って、クライアントを起動します。この時、「cdh-server」とリンクさせておきます。

$ docker run -it --rm --link cdh-server -v /path/to/word-count-lucene-analyzer-cdh-hdfs/target/scala-2.10:/opt/spark-jar:ro kazuhira/centos6-cdh5:5.4.4 bash

また、ホスト側のディレクトリをData Volumeとして共有しておきます。これは、ホスト側でアプリケーションを書いて、コマンドの実行がコンテナ内だからです…。

マウント先は、こちら。

/opt/spark-jar

クライアント側では、設定ファイルのアクセス先ホスト名を「cdh-servert」と揃えた後、HDFS上にディレクトリと入力ファイルの作成。

# su - hdfs
$ hdfs dfs -mkdir -p /user/spark
$ hdfs dfs -chown spark /user/spark

今回は、sparkユーザーで作業することにしましょう。

入力ディレクトリ、出力ディレクトリの作成と、README.mdの登録…。

# sudo -u spark hdfs dfs -mkdir /user/spark/input
# sudo -u spark hdfs dfs -mkdir /user/spark/output
# sudo -u spark hdfs dfs -put /opt/spark-jar/README.md /user/spark/input

README.mdは、ホスト側からマウント先にcpで突っ込みました…。

あとは、このクライアント側からSpark Submitを実行しています。

# sudo -u spark spark-submit --class org.littlewings.spark.WordCount /path/to/word-count-lucene-analyzer-cdh-hdfs-assembly-0.0.1-SNAPSHOT.jar cdh-server /user/spark/input/README.md /user/spark/output/word-count

というわけで、こんな感じで今回の環境は作っています。Sparkそのものよりも、この環境準備の方に時間がかかっているような…。