CLOVER🍀

That was when it all began.

Apache Spark(スタンドアロンモード)のマスターノード、ワーカーノードをフォアグラウンドで起動する

これは、なにをしたくて書いたもの?

スタンドアロンモードのApache Sparkのマスターノード、ワーカーノードをふつうに起動するとバックグラウンドに
行ってしまうのですが。

これをフォアグラウンドで実行する方法はないのかな?と思いまして。

結果からいくと、SPARK_NO_DAEMONIZEという環境変数で制御できるようです。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)


$ python3 -V
Python 3.8.10

Apache Sparkは、3.1.2を使用します。

$ bin/pyspark --version
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.11
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.

SPARK_NO_DAEMONIZE環境変数

SPARK_NO_DAEMONIZEという環境変数を使うことで、今回のお題をクリアすることができます。

ドキュメントには特に記載がないようですが、sbin/start-master.shsbin/start-worker.shが内部で使用している
sbin/spark-daemon.shに記述があります。

https://github.com/apache/spark/blob/v3.1.2/sbin/spark-daemon.sh#L31

# Runs a Spark command as a daemon.
#
# Environment Variables
#
#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_HOME}/conf.
#   SPARK_LOG_DIR   Where log files are stored. ${SPARK_HOME}/logs by default.
#   SPARK_LOG_MAX_FILES Max log files of Spark daemons can rotate to. Default is 5.
#   SPARK_MASTER    host:path where spark code should be rsync'd from
#   SPARK_PID_DIR   The pid files are stored. /tmp by default.
#   SPARK_IDENT_STRING   A string representing this instance of spark. $USER by default
#   SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
#   SPARK_NO_DAEMONIZE   If set, will run the proposed command in the foreground. It will not output a PID file.
##

こちらですね。値を設定しておくと、フォアグラウンドで起動してくれるようです。

SPARK_NO_DAEMONIZE If set, will run the proposed command in the foreground. It will not output a PID file.

試してみましょう。

まずはSPARK_NO_DAEMONIZE環境変数を設定して、マスターノードを起動。

$ SPARK_NO_DAEMONIZE=1 sbin/start-master.sh

すると、フォアグラウンドでマスターノードが起動しました。

starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark--org.apache.spark.deploy.master.Master-1-master.out
Spark Command: /opt/java/openjdk/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host master --port 7077 --webui-port 8080
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/09/11 15:38:46 INFO Master: Started daemon with process name: 37@master
21/09/11 15:38:46 INFO SignalUtils: Registering signal handler for TERM
21/09/11 15:38:46 INFO SignalUtils: Registering signal handler for HUP
21/09/11 15:38:46 INFO SignalUtils: Registering signal handler for INT

〜省略〜

ワーカーノードも起動してみます。こちらも、SPARK_NO_DAEMONIZE環境変数を設定。

$ SPARK_NO_DAEMONIZE=1 sbin/start-worker.sh spark://[マスターノードのIPアドレス]:7077

こちらも、フォアグラウンドで起動しました。

starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark--org.apache.spark.deploy.worker.Worker-1-worker1.out
Spark Command: /opt/java/openjdk/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://172.17.0.2:7077
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/09/11 15:40:23 INFO Worker: Started daemon with process name: 19@worker1
21/09/11 15:40:23 INFO SignalUtils: Registering signal handler for TERM
21/09/11 15:40:23 INFO SignalUtils: Registering signal handler for HUP
21/09/11 15:40:23 INFO SignalUtils: Registering signal handler for INT

〜省略〜

知っておくと、Dockerコンテナ化する時などに便利かなと思います。

Apache Spark 3.1をスタンドアロンモード(ローカルのみ、クラスター構成)で動かす

これは、なにをしたくて書いたもの?

Apache Sparkのスタンドアロンモードでクラスタを構成してみようかな、と。

それから、ローカルで動かす時のlocal[〜]の意味をよく忘れるので、これもメモしておこうかなと。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)


$ python3 -V
Python 3.8.10

確認するテーマに応じて、pipでPySparkをインストールするか

$ pip3 install pyspark==3.1.2

Apache Sparkのディストリビューションそのものを使用します。

$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

Apache Sparkのバージョンは、3.1.2です。

スタンドアロンモード

まずスタンドアロンモードとはなにか?からですが、これはクラスターマネージャーのひとつでApache Spark自体に
含まれているものです。

Cluster Mode Overview - Spark 3.1.2 Documentation

クラスターマネージャーにはスタンドアロンHadoop YARN、KubernetesApache Mesosなどがあります。

スタンドアロンモードのドキュメントは、こちら。

Spark Standalone Mode - Spark 3.1.2 Documentation

マスター、ワーカー構成を取り、以下のどちらかの形態で動作させます。

  • マスター、ワーカーでクラスターを構成する
  • (テスト目的のために)単一のマシン上でデーモンを構成する

また、Sparkアプリケーションはclientモード、clusterモードのデプロイモードのいずれかで動作します。

Spark Standalone Mode / Launching Spark Applications

SparkContextを生成するドライバーをローカルで動作させるのがclientモード、Sparkクラスター側で動作させるのが
clusterモードです。

clientモードだとSparkアプリケーションを送信するクライアントと同じプロセスでドライバーが起動され、完了を待ちます。
clusterモードの場合、クライアントはSparkアプリケーションをクラスターに送信したら完了を待たずに終了します。

この使い分けについては、こちらを参照しましょう。

Submitting Applications / Launching Applications with spark-submit

なんですけど、現時点のApache Sparkでは、Pythonアプリケーションの場合はclusterモードでのデプロイは
できないみたいです。

PySparkを使って、Sparkをローカルで実行する

まずは、pipを使ってインストールしたApache Sparkを使って、ローカルで実行します。

この時によく出てくるlocal[*]の意味を確認しておこうかな、と。

たとえば、pysparkコマンドを実行します。

$ pyspark

するとApache Sparkが起動してログが出力されるのですが、その時にこんな表示が見えます。

Spark context available as 'sc' (master = local[*], app id = local-1631357850789).

プログラムでmaster指定するような例もありますね。

spark = SparkSession.builder.master('local[2]').getOrCreate()

これは、spark.masterという設定でこちらに記載があります。

Spark Configuration / Application Properties

spark.masterに設定可能な値は、こちら。

Submitting Applications / Master URLs

このうち、local[〜]に関するものを抜粋します。

形式 意味
local ひとつのワーカースレッドを使い、Sparkをローカルで実行する
local[K] K個のワーカースレッドを使い、Sparkをローカルで実行する
local[K, F] K個のワーカースレッドを使い、Sparkをローカルで実行する。タスクは最大F - 1回の失敗を許容する
local[*] 動作しているマシンのコア数と同じ数のワーカースレッドを使い、Sparkをローカルで実行する
local[*, K] 動作しているマシンのコア数と同じ数のワーカースレッドを使い、Sparkをローカルで実行する。タスクは最大F - 1回の失敗を許容する

先ほどの例だと、pysparkを起動した時のspark.masterlocal[*]で、そのマシンのコアの数分だけワーカースレッドを
構成してローカル実行するという意味になりますね。

spark.masterの設定は、pysparkspark-submitspark-shellなどの--masterオプションで指定できます。

## pysparkの場合
$ pyspark --master local[2]

>>> spark.conf.get('spark.master')
'local[2]'


## spark-submitの場合
$ spark-submit --master local[2] [Application]

またpipでインストールしたApache Sparkからは脱線しますが、ディストリビューションからインストールしている場合は、
conf/spark-defaults.confファイルにデフォルトの値を記載しておくこともできます。

Spark Configuration / Dynamically Loading Spark Properties

確認してみましょう。こんな感じのWord Countプログラムを用意します。

word_count.py

from pyspark.rdd import RDD
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, LongType

spark: SparkSession = SparkSession.builder.getOrCreate()

print(f"spark.master = {spark.conf.get('spark.master')}")

df: DataFrame = spark.read.text('README.md')
splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' '))
words: RDD = splitted.map(lambda x: (x, 1))
results: RDD = words.reduceByKey(lambda a, b: a + b)

wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType())

results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save('/tmp/word_count_results')

print('end!!')

途中で、spark.masterの値を出力するようにしました。

print(f"spark.master = {spark.conf.get('spark.master')}")

Word Countする対象は、Apache SparkのREADME.mdとします。

$ curl -LO https://github.com/apache/spark/raw/v3.1.2/README.md

実行。

$ spark-submit word_count.py

この時、spark.masterlocal[*]となっています。

spark.master = local[*]

結果は、今回のプログラムでは/tmp/word_count_resultsディレクトリに出力されます。

結果の例。

$ head -n 10 /tmp/word_count_results/part-00000-b64679f0-8e18-4c2e-880c-f7d57db2b3fc-c000.csv
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

再実行する場合は、出力対象のディレクトリが存在すると実行に失敗するので、1度ディレクトリを削除しておくとよいでしょう。

$ rm -rf /tmp/word_count_results

local[2]で実行。

$ spark-submit --master local[2] word_count.py

設定に反映されていることが確認できます。

spark.master = local[2]

また、spark.masterは前述の通りプログラム内でも記載できますが、通常はコマンドラインオプションや設定ファイルで
指定した方がよいでしょうね。

spark: SparkSession = SparkSession.builder.master('local[2]').getOrCreate()

スタンドアロンクラスターを構成する

続いては、スタンドアロンクラスターを構成してみます。

172.17.0.2〜5の計4つのノードを用意し、172.17.0.2はマスター、それ以外はワーカーという構成にしたいと思います。

各ノードには、Apache Sparkをインストールしておきます。

$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
$ tar xf spark-3.1.2-bin-hadoop3.2.tgz
$ cd spark-3.1.2-bin-hadoop3.2

クラスターは、以下の手順とスクリプトに従って構成します。

Spark Standalone Mode / Starting a Cluster Manually

Spark Standalone Mode / Cluster Launch Scripts

SSHを使って一気にノードを起動する方法もあるようですが、SSHの設定が面倒なので…今回は個々に起動していきます。

マスターは、以下のstart-master.shで起動します。

$ sbin/start-master.sh

ホスト名は-hまたは--hostオプションで指定できます。今回は指定して起動します。

$ sbin/start-master.sh -h 172.17.0.2

Web UIを見てみましょう。http://[マスターのIPアドレス]:8080にアクセスしてみます。

f:id:Kazuhira:20210911222609p:plain

現時点ではワーカーノードもありません。

次に、ワーカーを起動しましょう。マスターノードのURLを指定して、start-worker.shを実行します。

$ sbin/start-worker.sh spark://172.17.0.2:7077


$ sbin/start-worker.sh spark://172.17.0.2:7077


$ sbin/start-worker.sh spark://172.17.0.2:7077

起動後、Web UIを見るとワーカーが追加されています。

f:id:Kazuhira:20210911222637p:plain

では、--masterを指定してspark-submitを実行。ここで--masterの指定を忘れると、単にローカルで動くだけになります…。

$ bin/spark-submit --master spark://172.17.0.2:7077 /host/word_count.py

実行中。

f:id:Kazuhira:20210911222717p:plain

終了。

f:id:Kazuhira:20210911223403p:plain

今回の実装方法の場合、結果のファイルはワーカーノードのいずれかに出力されるようです。

今回は、2つ目に起動したワーカーノード上にありました。

$ head -n 10 /tmp/word_count_results/_temporary/0/task_202109111327078853503780902620221_0001_m_000000/part-00000-1bdd6215-c471-4fb4-a0ea-ca8d32416b7d-c000.csv 
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

こういうことになりたくなければ、データストアやオブジェクトストレージに結果を保存することになるんでしょうね。

ワーカーやマスターを停止するコマンドは、こちら。

$ sbin/stop-worker.sh


$ sbin/stop-master.sh

これで今回確認したかったことはひととおりできた感じです。

まとめ

Apache Sparkで、スタンドアロンクラスターマネージャーでちょっと遊んでみました。

ローカルで動かした時、クラスターで動かした時の初歩が確認できた感じですね。