Apache Sparkで、HDFS上のファイルに対して読み書きをしてみます。
といっても、SparkContext#textFileやRDD#saveAsTextFileへ渡すパスを、「hdfs://」から始まるものにすればよさそうです。
なお、HDFSとSparkですが、今回はCDH 5.4.4で構築してみました。なので、Apache Sparkは最新版の1.4系ではなく、1.3系になっています。
プログラム
LuceneのStanardAnalyzerを使い、SparkのREADME.mdのWord Countをするプログラムを書いてみます。
とりあえず、ビルド定義から。
build.sbt
name := "word-count-lucene-analyzer-cdh-hdfs" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-target:jvm-1.7", "-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) resolvers += "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.3.0-cdh5.4.4" % "provided", "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1" )
Clouderaのリポジトリを使うので、Resolverの追加と
resolvers += "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
Sparkのバージョン指定がちょっと変わります。
"org.apache.spark" %% "spark-core" % "1.3.0-cdh5.4.4" % "provided",
sbtプラグインの設定。assemblyを使います。
project/plugins.sbt
logLevel := Level.Warn addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
Word Countするプログラム。
src/main/scala/org/littlewings/spark/WordCount.scala
package org.littlewings.spark import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val (hostname, inputPath, outputPath) = args.toList match { case h :: in :: out :: Nil => (h, in, out) case _ => println("Required hostname, input-path, output-path.") sys.exit(1) } val file = s"hdfs://$hostname:8020$inputPath" println(s"Input File => $file") val conf = new SparkConf().setAppName("Word Count Application") val sc = new SparkContext(conf) val textFile = sc.textFile(file) textFile .flatMap { line => val analyzer = new StandardAnalyzer val tokenStream = analyzer.tokenStream("", line) val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .map(_ => charAttr.toString) .toVector } finally { tokenStream.end() tokenStream.close() } } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .saveAsTextFile(s"hdfs://$hostname:8020$outputPath") } }
起動引数に、HDFSにアクセスするためのホスト名(またはIPアドレス)、HDFS上の入力パス、HDFS上の出力先を指定するようにしています。
val (hostname, inputPath, outputPath) = args.toList match { case h :: in :: out :: Nil => (h, in, out) case _ => println("Required hostname, input-path, output-path.") sys.exit(1) }
入力。
val file = s"hdfs://$hostname:8020$inputPath"
出力。
.saveAsTextFile(s"hdfs://$hostname:8020$outputPath")
これをassemblyして
> assembly
JARを作成。
[info] Packaging /path/to/target/scala-2.10/word-count-lucene-analyzer-cdh-hdfs-assembly-0.0.1-SNAPSHOT.jar ...
では、動作させてみます。HDFSへのアクセス先のホストは、「cdh-server」とします。
# sudo -u spark spark-submit --class org.littlewings.spark.WordCount /path/to/word-count-lucene-analyzer-cdh-hdfs-assembly-0.0.1-SNAPSHOT.jar cdh-server /user/spark/input/README.md /user/spark/output/word-count
動いてる感じ。
15/08/02 06:58:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 15/08/02 06:58:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 15/08/02 06:58:59 INFO output.FileOutputCommitter: Saved output of task 'attempt_201508020658_0001_m_000001_3' to hdfs://cdh-server:8020/user/spark/output/word-count/_temporary/0/task_201508020658_0001_m_000001 15/08/02 06:58:59 INFO spark.SparkHadoopWriter: attempt_201508020658_0001_m_000001_3: Committed 15/08/02 06:58:59 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1828 bytes result sent to driver 15/08/02 06:58:59 INFO output.FileOutputCommitter: Saved output of task 'attempt_201508020658_0001_m_000000_2' to hdfs://cdh-server:8020/user/spark/output/word-count/_temporary/0/task_201508020658_0001_m_000000 15/08/02 06:58:59 INFO spark.SparkHadoopWriter: attempt_201508020658_0001_m_000000_2: Committed 15/08/02 06:58:59 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1828 bytes result sent to driver 15/08/02 06:58:59 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 582 ms on localhost (1/2) 15/08/02 06:58:59 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 585 ms on localhost (2/2) 15/08/02 06:58:59 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at WordCount.scala:45) finished in 0.586 s 15/08/02 06:58:59 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/08/02 06:58:59 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at WordCount.scala:45, took 2.008179 s
なんか、できたっぽいです。
# sudo -u spark hdfs dfs -ls /user/spark/output/word-count Found 3 items -rw-r--r-- 1 spark supergroup 0 2015-08-02 06:58 /user/spark/output/word-count/_SUCCESS -rw-r--r-- 1 spark supergroup 1209 2015-08-02 06:58 /user/spark/output/word-count/part-00000 -rw-r--r-- 1 spark supergroup 1125 2015-08-02 06:58 /user/spark/output/word-count/part-00001
OKそうですね。
# sudo -u spark hdfs dfs -cat /user/spark/output/word-count/part-00000 (scala,4) (package,3) (hive,2) (guide,2) (its,1) (general,2) (have,1) (locally,3) (big,1) (changed,1) (several,1) (only,1) (basic,1) (first,1) (documentation,5) 〜省略〜
2つ目も。
# sudo -u spark hdfs dfs -cat /user/spark/output/word-count/part-00001 (particular,3) (spark,24) (7077,1) (computation,1) (instance,1) (spark.apache.org,6) (library,1) (through,1) (following,2) (range,1) (which,2) (once,1) (threads,1) 〜省略〜
一応、できましたよっと。
オマケ:Docker
今回、HDFSを動かすサーバーおよび、Spark Submitを行うクライアントをそれぞれDockerコンテナで動かしました。
この前作った、こちらのDockerfileをベースにしています。
CDH 5.4.4で、HDFS+YARNのDockerイメージを作る
http://d.hatena.ne.jp/Kazuhira/20150730/1438260029
今回は、SparkおよびHiveを足して、このようなDockerfileにしました。
Dockerfile
FROM centos:6.6 ENV JDK_VERSION 7u79 ENV JDK_BUILD_NO b15 ENV JDK_RPM jdk-${JDK_VERSION}-linux-x64.rpm ENV JAVA_HOME /usr/java/default ## HDFS Ports. EXPOSE 8020 50010 50020 50070 50075 50090 ## YARN Ports. EXPOSE 8030 8031 8032 8033 8040 8042 8089 10020 10033 13562 19888 ## Spark Ports. EXPOSE 6066 7077 7078 18080 18081 # $ grep EXPOSE Dockerfile | perl -wp -e 's!EXPOSE!!; s!\r?\n!!; s!(\d+) ?!-p $1:$1 !g' RUN yum install -y wget \ sudo && \ sed -ri 's/Defaults requiretty/Defaults:root !requiretty/' /etc/sudoers && \ wget -q -O /tmp/jdk.rpm --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/${JDK_VERSION}-${JDK_BUILD_NO}/${JDK_RPM} && \ rpm -ivh /tmp/jdk.rpm && \ wget -q -O /etc/yum.repos.d/cloudera-cdh5.repo http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/cloudera-cdh5.repo && \ sed -ri 's/cdh\/5/cdh\/5.4.4/' /etc/yum.repos.d/cloudera-cdh5.repo && \ yum install -y hadoop \ hadoop-hdfs \ hadoop-hdfs-namenode \ hadoop-hdfs-secondarynamenode \ hadoop-hdfs-datanode \ hadoop-yarn \ hadoop-yarn-resourcemanager \ hadoop-yarn-nodemanager \ hadoop-mapreduce \ hadoop-mapreduce-historyserver \ spark-core \ spark-master \ spark-worker \ hive \ hadoop-client RUN cp -Rp /etc/hadoop/conf.empty /etc/hadoop/conf.mine && \ alternatives --install /etc/hadoop/conf hadoop-conf /etc/hadoop/conf.mine 50 && \ alternatives --set hadoop-conf /etc/hadoop/conf.mine && \ mkdir -p /var/lib/hadoop-hdfs/cache/hdfs/name && \ chown -R hdfs.hdfs /var/lib/hadoop-hdfs/cache/hdfs/name && \ chmod 775 /var/lib/hadoop-hdfs/cache/hdfs/name && \ mkdir -p /var/lib/hadoop-hdfs/cache/hdfs/dfs && \ chown -R hdfs.hdfs /var/lib/hadoop-hdfs/cache/hdfs/dfs && \ chmod 775 /var/lib/hadoop-hdfs/cache/hdfs/dfs && \ mkdir -p /var/lib/hadoop-hdfs/cache/hdfs/tmp && \ chown -R hdfs.hdfs /var/lib/hadoop-hdfs/cache/hdfs/dfs && \ chmod 775 /var/lib/hadoop-hdfs/cache/hdfs/dfs && \ cp -Rp /etc/hive/conf.dist /etc/hive/conf.mine && \ alternatives --install /etc/hive/conf hive-conf /etc/hive/conf.mine 50 && \ alternatives --set hive-conf /etc/hive/conf.mine && \ cp -Rp /etc/spark/conf.dist /etc/spark/conf.mine && \ alternatives --install /etc/spark/conf spark-conf /etc/spark/conf.mine 50 && \ alternatives --set spark-conf /etc/spark/conf.mine ADD core-site.xml /etc/hadoop/conf.mine/core-site.xml ADD hdfs-site.xml /etc/hadoop/conf.mine/hdfs-site.xml ADD mapred-site.xml /etc/hadoop/conf.mine/mapred-site.xml ADD yarn-site.xml /etc/hadoop/conf.mine/yarn-site.xml ADD create-user-hdfs.sh create-user-hdfs.sh ADD fix-hostname.sh fix-hostname.sh ADD init-and-start.sh init-and-start.sh RUN chmod a+x create-user-hdfs.sh \ fix-hostname.sh \ init-and-start.sh CMD ./init-and-start.sh && tail -f /dev/null
※「create-user-hdfs.sh」とかいうスクリプトは、今回関係ないので無視してください…。
Hiveは現時点では使わないのですが、Spark Shellを動かした時にHiveがないと例外が出る(とはいえ、Spark SQLが動かなくなるだけ)のがちょっと気持ち悪かったので、入れています。
HDFSの設定ファイルなどは、前回のままです。また、前回はYARNのResourceManager、NodeManagerも動かしていましたが、今回はHDFSのNameNodeおよびDataNodeのみにしています。
こちらで、Dockerイメージをビルド…。
$ docker build -t kazuhira/centos6-cdh5:5.4.4 .
HDFSのサーバー側は、コンテナ名「cdh-server」、ホスト名「cdh-server」で動作させます。
$ docker run -d --name cdh-server -h cdh-server -p 8020:8020 -p 50010:50010 -p 50020:50020 -p 50070:50070 -p 50075:50075 -p 50090:50090 -p 8030:8030 -p 8031:8031 -p 8032:8032 -p 8033:8033 -p 8040:8040 -p 8042:8042 -p 8089:8089 -p 10020:10020 -p 10033:10033 -p 13562:13562 -p 19888:19888 -p 6066:6066 -p 7077:7077 -p 7078:7078 -p 18080:18080 -p 18081:18081 kazuhira/centos6-cdh5:5.4.4
同じDockerイメージを使って、クライアントを起動します。この時、「cdh-server」とリンクさせておきます。
$ docker run -it --rm --link cdh-server -v /path/to/word-count-lucene-analyzer-cdh-hdfs/target/scala-2.10:/opt/spark-jar:ro kazuhira/centos6-cdh5:5.4.4 bash
また、ホスト側のディレクトリをData Volumeとして共有しておきます。これは、ホスト側でアプリケーションを書いて、コマンドの実行がコンテナ内だからです…。
マウント先は、こちら。
/opt/spark-jar
クライアント側では、設定ファイルのアクセス先ホスト名を「cdh-servert」と揃えた後、HDFS上にディレクトリと入力ファイルの作成。
# su - hdfs $ hdfs dfs -mkdir -p /user/spark $ hdfs dfs -chown spark /user/spark
今回は、sparkユーザーで作業することにしましょう。
入力ディレクトリ、出力ディレクトリの作成と、README.mdの登録…。
# sudo -u spark hdfs dfs -mkdir /user/spark/input # sudo -u spark hdfs dfs -mkdir /user/spark/output # sudo -u spark hdfs dfs -put /opt/spark-jar/README.md /user/spark/input
README.mdは、ホスト側からマウント先にcpで突っ込みました…。
あとは、このクライアント側からSpark Submitを実行しています。
# sudo -u spark spark-submit --class org.littlewings.spark.WordCount /path/to/word-count-lucene-analyzer-cdh-hdfs-assembly-0.0.1-SNAPSHOT.jar cdh-server /user/spark/input/README.md /user/spark/output/word-count
というわけで、こんな感じで今回の環境は作っています。Sparkそのものよりも、この環境準備の方に時間がかかっているような…。