CLOVER🍀

That was when it all began.

Apache Spark 3.1をスタンドアロンモード(ローカルのみ、クラスター構成)で動かす

これは、なにをしたくて書いたもの?

Apache Sparkのスタンドアロンモードでクラスタを構成してみようかな、と。

それから、ローカルで動かす時のlocal[〜]の意味をよく忘れるので、これもメモしておこうかなと。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)


$ python3 -V
Python 3.8.10

確認するテーマに応じて、pipでPySparkをインストールするか

$ pip3 install pyspark==3.1.2

Apache Sparkのディストリビューションそのものを使用します。

$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

Apache Sparkのバージョンは、3.1.2です。

スタンドアロンモード

まずスタンドアロンモードとはなにか?からですが、これはクラスターマネージャーのひとつでApache Spark自体に
含まれているものです。

Cluster Mode Overview - Spark 3.1.2 Documentation

クラスターマネージャーにはスタンドアロンHadoop YARN、KubernetesApache Mesosなどがあります。

スタンドアロンモードのドキュメントは、こちら。

Spark Standalone Mode - Spark 3.1.2 Documentation

マスター、ワーカー構成を取り、以下のどちらかの形態で動作させます。

  • マスター、ワーカーでクラスターを構成する
  • (テスト目的のために)単一のマシン上でデーモンを構成する

また、Sparkアプリケーションはclientモード、clusterモードのデプロイモードのいずれかで動作します。

Spark Standalone Mode / Launching Spark Applications

SparkContextを生成するドライバーをローカルで動作させるのがclientモード、Sparkクラスター側で動作させるのが
clusterモードです。

clientモードだとSparkアプリケーションを送信するクライアントと同じプロセスでドライバーが起動され、完了を待ちます。
clusterモードの場合、クライアントはSparkアプリケーションをクラスターに送信したら完了を待たずに終了します。

この使い分けについては、こちらを参照しましょう。

Submitting Applications / Launching Applications with spark-submit

なんですけど、現時点のApache Sparkでは、Pythonアプリケーションの場合はclusterモードでのデプロイは
できないみたいです。

PySparkを使って、Sparkをローカルで実行する

まずは、pipを使ってインストールしたApache Sparkを使って、ローカルで実行します。

この時によく出てくるlocal[*]の意味を確認しておこうかな、と。

たとえば、pysparkコマンドを実行します。

$ pyspark

するとApache Sparkが起動してログが出力されるのですが、その時にこんな表示が見えます。

Spark context available as 'sc' (master = local[*], app id = local-1631357850789).

プログラムでmaster指定するような例もありますね。

spark = SparkSession.builder.master('local[2]').getOrCreate()

これは、spark.masterという設定でこちらに記載があります。

Spark Configuration / Application Properties

spark.masterに設定可能な値は、こちら。

Submitting Applications / Master URLs

このうち、local[〜]に関するものを抜粋します。

形式 意味
local ひとつのワーカースレッドを使い、Sparkをローカルで実行する
local[K] K個のワーカースレッドを使い、Sparkをローカルで実行する
local[K, F] K個のワーカースレッドを使い、Sparkをローカルで実行する。タスクは最大F - 1回の失敗を許容する
local[*] 動作しているマシンのコア数と同じ数のワーカースレッドを使い、Sparkをローカルで実行する
local[*, K] 動作しているマシンのコア数と同じ数のワーカースレッドを使い、Sparkをローカルで実行する。タスクは最大F - 1回の失敗を許容する

先ほどの例だと、pysparkを起動した時のspark.masterlocal[*]で、そのマシンのコアの数分だけワーカースレッドを
構成してローカル実行するという意味になりますね。

spark.masterの設定は、pysparkspark-submitspark-shellなどの--masterオプションで指定できます。

## pysparkの場合
$ pyspark --master local[2]

>>> spark.conf.get('spark.master')
'local[2]'


## spark-submitの場合
$ spark-submit --master local[2] [Application]

またpipでインストールしたApache Sparkからは脱線しますが、ディストリビューションからインストールしている場合は、
conf/spark-defaults.confファイルにデフォルトの値を記載しておくこともできます。

Spark Configuration / Dynamically Loading Spark Properties

確認してみましょう。こんな感じのWord Countプログラムを用意します。

word_count.py

from pyspark.rdd import RDD
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, LongType

spark: SparkSession = SparkSession.builder.getOrCreate()

print(f"spark.master = {spark.conf.get('spark.master')}")

df: DataFrame = spark.read.text('README.md')
splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' '))
words: RDD = splitted.map(lambda x: (x, 1))
results: RDD = words.reduceByKey(lambda a, b: a + b)

wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType())

results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save('/tmp/word_count_results')

print('end!!')

途中で、spark.masterの値を出力するようにしました。

print(f"spark.master = {spark.conf.get('spark.master')}")

Word Countする対象は、Apache SparkのREADME.mdとします。

$ curl -LO https://github.com/apache/spark/raw/v3.1.2/README.md

実行。

$ spark-submit word_count.py

この時、spark.masterlocal[*]となっています。

spark.master = local[*]

結果は、今回のプログラムでは/tmp/word_count_resultsディレクトリに出力されます。

結果の例。

$ head -n 10 /tmp/word_count_results/part-00000-b64679f0-8e18-4c2e-880c-f7d57db2b3fc-c000.csv
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

再実行する場合は、出力対象のディレクトリが存在すると実行に失敗するので、1度ディレクトリを削除しておくとよいでしょう。

$ rm -rf /tmp/word_count_results

local[2]で実行。

$ spark-submit --master local[2] word_count.py

設定に反映されていることが確認できます。

spark.master = local[2]

また、spark.masterは前述の通りプログラム内でも記載できますが、通常はコマンドラインオプションや設定ファイルで
指定した方がよいでしょうね。

spark: SparkSession = SparkSession.builder.master('local[2]').getOrCreate()

スタンドアロンクラスターを構成する

続いては、スタンドアロンクラスターを構成してみます。

172.17.0.2〜5の計4つのノードを用意し、172.17.0.2はマスター、それ以外はワーカーという構成にしたいと思います。

各ノードには、Apache Sparkをインストールしておきます。

$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
$ tar xf spark-3.1.2-bin-hadoop3.2.tgz
$ cd spark-3.1.2-bin-hadoop3.2

クラスターは、以下の手順とスクリプトに従って構成します。

Spark Standalone Mode / Starting a Cluster Manually

Spark Standalone Mode / Cluster Launch Scripts

SSHを使って一気にノードを起動する方法もあるようですが、SSHの設定が面倒なので…今回は個々に起動していきます。

マスターは、以下のstart-master.shで起動します。

$ sbin/start-master.sh

ホスト名は-hまたは--hostオプションで指定できます。今回は指定して起動します。

$ sbin/start-master.sh -h 172.17.0.2

Web UIを見てみましょう。http://[マスターのIPアドレス]:8080にアクセスしてみます。

f:id:Kazuhira:20210911222609p:plain

現時点ではワーカーノードもありません。

次に、ワーカーを起動しましょう。マスターノードのURLを指定して、start-worker.shを実行します。

$ sbin/start-worker.sh spark://172.17.0.2:7077


$ sbin/start-worker.sh spark://172.17.0.2:7077


$ sbin/start-worker.sh spark://172.17.0.2:7077

起動後、Web UIを見るとワーカーが追加されています。

f:id:Kazuhira:20210911222637p:plain

では、--masterを指定してspark-submitを実行。ここで--masterの指定を忘れると、単にローカルで動くだけになります…。

$ bin/spark-submit --master spark://172.17.0.2:7077 /host/word_count.py

実行中。

f:id:Kazuhira:20210911222717p:plain

終了。

f:id:Kazuhira:20210911223403p:plain

今回の実装方法の場合、結果のファイルはワーカーノードのいずれかに出力されるようです。

今回は、2つ目に起動したワーカーノード上にありました。

$ head -n 10 /tmp/word_count_results/_temporary/0/task_202109111327078853503780902620221_0001_m_000000/part-00000-1bdd6215-c471-4fb4-a0ea-ca8d32416b7d-c000.csv 
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

こういうことになりたくなければ、データストアやオブジェクトストレージに結果を保存することになるんでしょうね。

ワーカーやマスターを停止するコマンドは、こちら。

$ sbin/stop-worker.sh


$ sbin/stop-master.sh

これで今回確認したかったことはひととおりできた感じです。

まとめ

Apache Sparkで、スタンドアロンクラスターマネージャーでちょっと遊んでみました。

ローカルで動かした時、クラスターで動かした時の初歩が確認できた感じですね。