CLOVER🍀

That was when it all began.

Apache Spark 3.1をスタンドアロンモヌドロヌカルのみ、クラスタヌ構成で動かす

これは、なにをしたくお曞いたもの

Apache Sparkのスタンドアロンモヌドでクラスタを構成しおみようかな、ず。

それから、ロヌカルで動かす時のlocal[〜]の意味をよく忘れるので、これもメモしおおこうかなず。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)


$ python3 -V
Python 3.8.10

確認するテヌマに応じお、pipでPySparkをむンストヌルするか

$ pip3 install pyspark==3.1.2

Apache Sparkのディストリビュヌションそのものを䜿甚したす。

$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

Apache Sparkのバヌゞョンは、3.1.2です。

スタンドアロンモヌド

たずスタンドアロンモヌドずはなにかからですが、これはクラスタヌマネヌゞャヌのひず぀でApache Spark自䜓に
含たれおいるものです。

Cluster Mode Overview - Spark 3.1.2 Documentation

クラスタヌマネヌゞャヌにはスタンドアロン、Hadoop YARN、Kubernetes、Apache Mesosなどがありたす。

スタンドアロンモヌドのドキュメントは、こちら。

Spark Standalone Mode - Spark 3.1.2 Documentation

マスタヌ、ワヌカヌ構成を取り、以䞋のどちらかの圢態で動䜜させたす。

  • マスタヌ、ワヌカヌでクラスタヌを構成する
  • テスト目的のために単䞀のマシン䞊でデヌモンを構成する

たた、Sparkアプリケヌションはclientモヌド、clusterモヌドのデプロむモヌドのいずれかで動䜜したす。

Spark Standalone Mode / Launching Spark Applications

SparkContextを生成するドラむバヌをロヌカルで動䜜させるのがclientモヌド、Sparkクラスタヌ偎で動䜜させるのが
clusterモヌドです。

clientモヌドだずSparkアプリケヌションを送信するクラむアントず同じプロセスでドラむバヌが起動され、完了を埅ちたす。
clusterモヌドの堎合、クラむアントはSparkアプリケヌションをクラスタヌに送信したら完了を埅たずに終了したす。

この䜿い分けに぀いおは、こちらを参照したしょう。

Submitting Applications / Launching Applications with spark-submit

なんですけど、珟時点のApache Sparkでは、Pythonアプリケヌションの堎合はclusterモヌドでのデプロむは
できないみたいです。

PySparkを䜿っお、Sparkをロヌカルで実行する

たずは、pipを䜿っおむンストヌルしたApache Sparkを䜿っお、ロヌカルで実行したす。

この時によく出おくるlocal[*]の意味を確認しおおこうかな、ず。

たずえば、pysparkコマンドを実行したす。

$ pyspark

するずApache Sparkが起動しおログが出力されるのですが、その時にこんな衚瀺が芋えたす。

Spark context available as 'sc' (master = local[*], app id = local-1631357850789).

プログラムでmaster指定するような䟋もありたすね。

spark = SparkSession.builder.master('local[2]').getOrCreate()

これは、spark.masterずいう蚭定でこちらに蚘茉がありたす。

Spark Configuration / Application Properties

spark.masterに蚭定可胜な倀は、こちら。

Submitting Applications / Master URLs

このうち、local[〜]に関するものを抜粋したす。

圢匏 意味
local ひず぀のワヌカヌスレッドを䜿い、Sparkをロヌカルで実行する
local[K] K個のワヌカヌスレッドを䜿い、Sparkをロヌカルで実行する
local[K, F] K個のワヌカヌスレッドを䜿い、Sparkをロヌカルで実行する。タスクは最倧F - 1回の倱敗を蚱容する
local[*] 動䜜しおいるマシンのコア数ず同じ数のワヌカヌスレッドを䜿い、Sparkをロヌカルで実行する
local[*, K] 動䜜しおいるマシンのコア数ず同じ数のワヌカヌスレッドを䜿い、Sparkをロヌカルで実行する。タスクは最倧F - 1回の倱敗を蚱容する

先ほどの䟋だず、pysparkを起動した時のspark.masterはlocal[*]で、そのマシンのコアの数分だけワヌカヌスレッドを
構成しおロヌカル実行するずいう意味になりたすね。

spark.masterの蚭定は、pysparkやspark-submit、spark-shellなどの--masterオプションで指定できたす。

## pysparkの堎合
$ pyspark --master local[2]

>>> spark.conf.get('spark.master')
'local[2]'


## spark-submitの堎合
$ spark-submit --master local[2] [Application]

たたpipでむンストヌルしたApache Sparkからは脱線したすが、ディストリビュヌションからむンストヌルしおいる堎合は、
conf/spark-defaults.confファむルにデフォルトの倀を蚘茉しおおくこずもできたす。

Spark Configuration / Dynamically Loading Spark Properties

確認しおみたしょう。こんな感じのWord Countプログラムを甚意したす。

word_count.py

from pyspark.rdd import RDD
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, LongType

spark: SparkSession = SparkSession.builder.getOrCreate()

print(f"spark.master = {spark.conf.get('spark.master')}")

df: DataFrame = spark.read.text('README.md')
splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' '))
words: RDD = splitted.map(lambda x: (x, 1))
results: RDD = words.reduceByKey(lambda a, b: a + b)

wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType())

results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save('/tmp/word_count_results')

print('end!!')

途䞭で、spark.masterの倀を出力するようにしたした。

print(f"spark.master = {spark.conf.get('spark.master')}")

Word Countする察象は、Apache SparkのREADME.mdずしたす。

$ curl -LO https://github.com/apache/spark/raw/v3.1.2/README.md

実行。

$ spark-submit word_count.py

この時、spark.masterはlocal[*]ずなっおいたす。

spark.master = local[*]

結果は、今回のプログラムでは/tmp/word_count_resultsディレクトリに出力されたす。

結果の䟋。

$ head -n 10 /tmp/word_count_results/part-00000-b64679f0-8e18-4c2e-880c-f7d57db2b3fc-c000.csv
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

再実行する堎合は、出力察象のディレクトリが存圚するず実行に倱敗するので、1床ディレクトリを削陀しおおくずよいでしょう。

$ rm -rf /tmp/word_count_results

local[2]で実行。

$ spark-submit --master local[2] word_count.py

蚭定に反映されおいるこずが確認できたす。

spark.master = local[2]

たた、spark.masterは前述の通りプログラム内でも蚘茉できたすが、通垞はコマンドラむンオプションや蚭定ファむルで
指定した方がよいでしょうね。

spark: SparkSession = SparkSession.builder.master('local[2]').getOrCreate()

スタンドアロンクラスタヌを構成する

続いおは、スタンドアロンクラスタヌを構成しおみたす。

172.17.0.2〜5の蚈4぀のノヌドを甚意し、172.17.0.2はマスタヌ、それ以倖はワヌカヌずいう構成にしたいず思いたす。

各ノヌドには、Apache Sparkをむンストヌルしおおきたす。

$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
$ tar xf spark-3.1.2-bin-hadoop3.2.tgz
$ cd spark-3.1.2-bin-hadoop3.2

クラスタヌは、以䞋の手順ずスクリプトに埓っお構成したす。

Spark Standalone Mode / Starting a Cluster Manually

Spark Standalone Mode / Cluster Launch Scripts

SSHを䜿っお䞀気にノヌドを起動する方法もあるようですが、SSHの蚭定が面倒なので 今回は個々に起動しおいきたす。

マスタヌは、以䞋のstart-master.shで起動したす。

$ sbin/start-master.sh

ホスト名は-hたたは--hostオプションで指定できたす。今回は指定しお起動したす。

$ sbin/start-master.sh -h 172.17.0.2

Web UIを芋おみたしょう。http://[マスタヌのIPアドレス]:8080にアクセスしおみたす。

f:id:Kazuhira:20210911222609p:plain

珟時点ではワヌカヌノヌドもありたせん。

次に、ワヌカヌを起動したしょう。マスタヌノヌドのURLを指定しお、start-worker.shを実行したす。

$ sbin/start-worker.sh spark://172.17.0.2:7077


$ sbin/start-worker.sh spark://172.17.0.2:7077


$ sbin/start-worker.sh spark://172.17.0.2:7077

起動埌、Web UIを芋るずワヌカヌが远加されおいたす。

f:id:Kazuhira:20210911222637p:plain

では、--masterを指定しおspark-submitを実行。ここで--masterの指定を忘れるず、単にロヌカルで動くだけになりたす 。

$ bin/spark-submit --master spark://172.17.0.2:7077 /host/word_count.py

実行䞭。

f:id:Kazuhira:20210911222717p:plain

終了。

f:id:Kazuhira:20210911223403p:plain

今回の実装方法の堎合、結果のファむルはワヌカヌノヌドのいずれかに出力されるようです。

今回は、2぀目に起動したワヌカヌノヌド䞊にありたした。

$ head -n 10 /tmp/word_count_results/_temporary/0/task_202109111327078853503780902620221_0001_m_000000/part-00000-1bdd6215-c471-4fb4-a0ea-ca8d32416b7d-c000.csv 
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

こういうこずになりたくなければ、デヌタストアやオブゞェクトストレヌゞに結果を保存するこずになるんでしょうね。

ワヌカヌやマスタヌを停止するコマンドは、こちら。

$ sbin/stop-worker.sh


$ sbin/stop-master.sh

これで今回確認したかったこずはひずずおりできた感じです。

たずめ

Apache Sparkで、スタンドアロンクラスタヌマネヌゞャヌでちょっず遊んでみたした。

ロヌカルで動かした時、クラスタヌで動かした時の初歩が確認できた感じですね。