これは、なにをしたくて書いたもの?
Apache Sparkのスタンドアロンモードでクラスタを構成してみようかな、と。
それから、ローカルで動かす時のlocal[〜]
の意味をよく忘れるので、これもメモしておこうかなと。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9) OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode) $ python3 -V Python 3.8.10
確認するテーマに応じて、pipでPySparkをインストールするか
$ pip3 install pyspark==3.1.2
Apache Sparkのディストリビューションそのものを使用します。
$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
Apache Sparkのバージョンは、3.1.2です。
スタンドアロンモード
まずスタンドアロンモードとはなにか?からですが、これはクラスターマネージャーのひとつでApache Spark自体に
含まれているものです。
Cluster Mode Overview - Spark 3.1.2 Documentation
クラスターマネージャーにはスタンドアロン、Hadoop YARN、Kubernetes、Apache Mesosなどがあります。
スタンドアロンモードのドキュメントは、こちら。
Spark Standalone Mode - Spark 3.1.2 Documentation
マスター、ワーカー構成を取り、以下のどちらかの形態で動作させます。
- マスター、ワーカーでクラスターを構成する
- (テスト目的のために)単一のマシン上でデーモンを構成する
また、Sparkアプリケーションはclientモード、clusterモードのデプロイモードのいずれかで動作します。
Spark Standalone Mode / Launching Spark Applications
SparkContext
を生成するドライバーをローカルで動作させるのがclientモード、Sparkクラスター側で動作させるのが
clusterモードです。
clientモードだとSparkアプリケーションを送信するクライアントと同じプロセスでドライバーが起動され、完了を待ちます。
clusterモードの場合、クライアントはSparkアプリケーションをクラスターに送信したら完了を待たずに終了します。
この使い分けについては、こちらを参照しましょう。
Submitting Applications / Launching Applications with spark-submit
なんですけど、現時点のApache Sparkでは、Pythonアプリケーションの場合はclusterモードでのデプロイは
できないみたいです。
PySparkを使って、Sparkをローカルで実行する
まずは、pip
を使ってインストールしたApache Sparkを使って、ローカルで実行します。
この時によく出てくるlocal[*]
の意味を確認しておこうかな、と。
たとえば、pyspark
コマンドを実行します。
$ pyspark
するとApache Sparkが起動してログが出力されるのですが、その時にこんな表示が見えます。
Spark context available as 'sc' (master = local[*], app id = local-1631357850789).
プログラムでmaster
指定するような例もありますね。
spark = SparkSession.builder.master('local[2]').getOrCreate()
これは、spark.master
という設定でこちらに記載があります。
Spark Configuration / Application Properties
spark.master
に設定可能な値は、こちら。
Submitting Applications / Master URLs
このうち、local[〜]
に関するものを抜粋します。
形式 | 意味 |
---|---|
local | ひとつのワーカースレッドを使い、Sparkをローカルで実行する |
local[K] | K個のワーカースレッドを使い、Sparkをローカルで実行する |
local[K, F] | K個のワーカースレッドを使い、Sparkをローカルで実行する。タスクは最大F - 1回の失敗を許容する |
local[*] | 動作しているマシンのコア数と同じ数のワーカースレッドを使い、Sparkをローカルで実行する |
local[*, K] | 動作しているマシンのコア数と同じ数のワーカースレッドを使い、Sparkをローカルで実行する。タスクは最大F - 1回の失敗を許容する |
先ほどの例だと、pyspark
を起動した時のspark.master
はlocal[*]
で、そのマシンのコアの数分だけワーカースレッドを
構成してローカル実行するという意味になりますね。
spark.master
の設定は、pyspark
やspark-submit
、spark-shell
などの--master
オプションで指定できます。
## pysparkの場合 $ pyspark --master local[2] >>> spark.conf.get('spark.master') 'local[2]' ## spark-submitの場合 $ spark-submit --master local[2] [Application]
またpipでインストールしたApache Sparkからは脱線しますが、ディストリビューションからインストールしている場合は、
conf/spark-defaults.conf
ファイルにデフォルトの値を記載しておくこともできます。
Spark Configuration / Dynamically Loading Spark Properties
確認してみましょう。こんな感じのWord Countプログラムを用意します。
word_count.py
from pyspark.rdd import RDD from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType, StringType, LongType spark: SparkSession = SparkSession.builder.getOrCreate() print(f"spark.master = {spark.conf.get('spark.master')}") df: DataFrame = spark.read.text('README.md') splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' ')) words: RDD = splitted.map(lambda x: (x, 1)) results: RDD = words.reduceByKey(lambda a, b: a + b) wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType()) results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save('/tmp/word_count_results') print('end!!')
途中で、spark.master
の値を出力するようにしました。
print(f"spark.master = {spark.conf.get('spark.master')}")
Word Countする対象は、Apache SparkのREADME.md
とします。
$ curl -LO https://github.com/apache/spark/raw/v3.1.2/README.md
実行。
$ spark-submit word_count.py
この時、spark.master
はlocal[*]
となっています。
spark.master = local[*]
結果は、今回のプログラムでは/tmp/word_count_results
ディレクトリに出力されます。
結果の例。
$ head -n 10 /tmp/word_count_results/part-00000-b64679f0-8e18-4c2e-880c-f7d57db2b3fc-c000.csv "#",1 Apache,1 Spark,14 "",73 is,7 a,9 unified,1 analytics,1 engine,2 for,12
再実行する場合は、出力対象のディレクトリが存在すると実行に失敗するので、1度ディレクトリを削除しておくとよいでしょう。
$ rm -rf /tmp/word_count_results
local[2]
で実行。
$ spark-submit --master local[2] word_count.py
設定に反映されていることが確認できます。
spark.master = local[2]
また、spark.master
は前述の通りプログラム内でも記載できますが、通常はコマンドラインオプションや設定ファイルで
指定した方がよいでしょうね。
spark: SparkSession = SparkSession.builder.master('local[2]').getOrCreate()
スタンドアロンクラスターを構成する
172.17.0.2〜5の計4つのノードを用意し、172.17.0.2
はマスター、それ以外はワーカーという構成にしたいと思います。
各ノードには、Apache Sparkをインストールしておきます。
$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz $ tar xf spark-3.1.2-bin-hadoop3.2.tgz $ cd spark-3.1.2-bin-hadoop3.2
Spark Standalone Mode / Starting a Cluster Manually
Spark Standalone Mode / Cluster Launch Scripts
SSHを使って一気にノードを起動する方法もあるようですが、SSHの設定が面倒なので…今回は個々に起動していきます。
マスターは、以下のstart-master.sh
で起動します。
$ sbin/start-master.sh
ホスト名は-h
または--host
オプションで指定できます。今回は指定して起動します。
$ sbin/start-master.sh -h 172.17.0.2
Web UIを見てみましょう。http://[マスターのIPアドレス]:8080
にアクセスしてみます。
現時点ではワーカーノードもありません。
次に、ワーカーを起動しましょう。マスターノードのURLを指定して、start-worker.sh
を実行します。
$ sbin/start-worker.sh spark://172.17.0.2:7077 $ sbin/start-worker.sh spark://172.17.0.2:7077 $ sbin/start-worker.sh spark://172.17.0.2:7077
起動後、Web UIを見るとワーカーが追加されています。
では、--master
を指定してspark-submit
を実行。ここで--master
の指定を忘れると、単にローカルで動くだけになります…。
$ bin/spark-submit --master spark://172.17.0.2:7077 /host/word_count.py
実行中。
終了。
今回の実装方法の場合、結果のファイルはワーカーノードのいずれかに出力されるようです。
今回は、2つ目に起動したワーカーノード上にありました。
$ head -n 10 /tmp/word_count_results/_temporary/0/task_202109111327078853503780902620221_0001_m_000000/part-00000-1bdd6215-c471-4fb4-a0ea-ca8d32416b7d-c000.csv "#",1 Apache,1 Spark,14 "",73 is,7 a,9 unified,1 analytics,1 engine,2 for,12
こういうことになりたくなければ、データストアやオブジェクトストレージに結果を保存することになるんでしょうね。
ワーカーやマスターを停止するコマンドは、こちら。
$ sbin/stop-worker.sh $ sbin/stop-master.sh
これで今回確認したかったことはひととおりできた感じです。
まとめ
Apache Sparkで、スタンドアロンクラスターマネージャーでちょっと遊んでみました。
ローカルで動かした時、クラスターで動かした時の初歩が確認できた感じですね。