CLOVER🍀

That was when it all began.

Apache Spark 3.1(PySpark)で、Pythonの実行パスを指定する

これは、なにをしたくて書いたもの?

Apache SparkでPythonプログラムを扱う時(要はPySpark)に、どのPythonを使用するのかがちょっと気になりまして。

調べてみることにしました。

環境

今回の環境は、こちら。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.3 LTS
Release:    20.04
Codename:   focal


$ uname -srvmpio
Linux 5.4.0-81-generic #91-Ubuntu SMP Thu Jul 15 19:09:17 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux


$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ python3 -V
Python 3.8.10

Ubuntu Linux 20.04 LTSです。

Apache Sparkは3.1.2を使います。

また、pythonというコマンドはインストールされていません。

$ python

コマンド 'python' が見つかりません。もしかして:

  command 'python3' from deb python3
  command 'python' from deb python-is-python3

Apache Sparkをインストールする

まずは、Apache Sparkをインストールします。Pythonで扱う場合はpipでもインストール可能ですが、今回はふつうに
tar.gzファイルをダウンロードしてくることにします。

$ curl -OL https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
$ tar xf spark-3.1.2-bin-hadoop3.2.tgz
$ cd spark-3.1.2-bin-hadoop3.2

PySparkを起動。

$ bin/pyspark
Python 3.8.10 (default, Jun  2 2021, 10:49:15) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 22:21:10 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 22:21:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 22:21:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.10 (default, Jun  2 2021 10:49:15)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630934476222).
SparkSession available as 'spark'.
>>> 

この時点でpython3コマンドを使っているような雰囲気がありますね。

バージョンを確認。

>>> import sys
>>> sys.version
'3.8.10 (default, Jun  2 2021, 10:49:15) \n[GCC 9.4.0]'

実際、このあたりを見ているとpython3コマンドを使うようになっているみたいですね。

https://github.com/apache/spark/blob/v3.1.2/bin/pyspark#L42

https://github.com/apache/spark/blob/v3.1.2/bin/find-spark-home#L38

とりあえず、README.mdでWord Countして動作確認。

>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.read.text('README.md')
>>> counts = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).collect()
>>> for (word, count) in sorted(counts, reverse=True, key=lambda x: x[1]):
...   print(f'{word}: {count}')

結果の抜粋。

: 73
the: 23
to: 16
Spark: 14
for: 12
a: 9
and: 9
##: 9
is: 7
on: 7
run: 7
can: 6
in: 5
also: 5
of: 5
an: 4
including: 4
if: 4
you: 4
*: 4
Please: 4

〜省略〜

PySparkで使うPythonの実行パスを指定する

先ほどのpysparkスクリプトを見ると気づくのですが、PySparkで使うPythonの実行パスを環境変数で指定することが
できます。

PYSPARK_PYTHONを使うと、まるっと切り替えられるようですね。これ、ドキュメントには記載がなさそうな感じです。

試しにと、Python 3.9をインストール。

$ sudo apt install python3.9

バージョン確認。

$ python3.9 -V
Python 3.9.5

PYSPARK_PYTHONpython3.9を指定して実行。

$ PYSPARK_PYTHON=python3.9 bin/pyspark

すると、Pythonのバージョンが変わりました。

Python 3.9.5 (default, May 19 2021, 11:32:47) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 22:28:30 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 22:28:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 22:28:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.9.5 (default, May 19 2021 11:32:47)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630934912371).
SparkSession available as 'spark'.

sys.versionでも確認。

>>> import sys
>>> sys.version
'3.9.5 (default, May 19 2021, 11:32:47) \n[GCC 9.3.0]'

ちなみに、タスクを実行するExecutorが使用するPythonと、タスクのアサインを指示するDriverが使用するPython
別々のものを指定することができます。

Executorの方はPYSPARK_PYTHON環境変数で、Driverの方はPYSPARK_DRIVER_PYTHON環境変数ですね。

$ export PYSPARK_PYTHON=...
$ export PYSPARK_DRIVER_PYTHON=...

PYSPARK_DRIVER_PYTHON環境変数を指定していない場合は、PYSPARK_PYTHON環境変数の値が使われます。
そして、PYSPARK_PYTHONのデフォルト値はPySparkの起動スクリプトの中でpython3となります。

コメントを見るとわかるのですが、PYSPARK_DRIVER_PYTHONの方はipythonなどでの指定を想定していそうな
感じですね。

https://github.com/apache/spark/blob/v3.1.2/bin/pyspark#L27-L31

ただし、ExecutorとDriverそれぞれで使うPythonのメジャーバージョン、マイナーバージョンが異なることは許容されないので
その点には注意です。

https://github.com/apache/spark/blob/v3.1.2/python/pyspark/worker.py#L472-L477

pipでインストールした場合は?

ここまでtar.gzファイルを使ってインストールしたApache Sparkで話をしてきましたが、pipでインストールした場合は
どうでしょう?

試してみます。

$ pip3 install pyspark==3.1.2

この場合、pysparkコマンドにパスが通っているので、そのまま起動できます。

$ pyspark
Python 3.8.10 (default, Jun  2 2021, 10:49:15) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 23:07:28 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 23:07:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/venv/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 23:07:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.10 (default, Jun  2 2021 10:49:15)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630937250516).
SparkSession available as 'spark'.
>>> 

ですが、PYSPARK_PYTHON環境変数を使うとSPARK_HOMEの探索がずれるみたいでうまく動きません。
※仮想環境(venv)で動かしています

$ PYSPARK_PYTHON=python3.9 pyspark
Traceback (most recent call last):
  File "path/to/venv/bin/find_spark_home.py", line 86, in <module>
    print(_find_spark_home())
  File "path/to/venv/bin/find_spark_home.py", line 52, in _find_spark_home
    module_home = os.path.dirname(find_spec("pyspark").origin)
AttributeError: 'NoneType' object has no attribute 'origin'
/path/to/venv/bin/pyspark: 行 24: /bin/load-spark-env.sh: そのようなファイルやディレクトリはありません
/path/to/venv/bin/pyspark: 行 68: /bin/spark-submit: そのようなファイルやディレクトリはありません

どうしたものかな?と思ったのですが、そもそもこのケースの場合は使うpipに使うPython自体を切り替えればいい
気がしますね。

$ sudo apt install python3.9-venv
$ . venv/bin/activate

pip3.9でPySparkをインストール。

$ pip3.9 install pyspark==3.1.2

これで、切り替わりました。

$ pyspark
Python 3.9.5 (default, May 19 2021, 11:32:47) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 23:18:13 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 23:18:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/venv/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 23:18:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.9.5 (default, May 19 2021 11:32:47)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630937895507).
SparkSession available as 'spark'.
>>>