CLOVER🍀

That was when it all began.

Apache Spark 3.1(PySpark)で、Pythonの実行パスを指定する

これは、なにをしたくて書いたもの?

Apache SparkでPythonプログラムを扱う時(要はPySpark)に、どのPythonを使用するのかがちょっと気になりまして。

調べてみることにしました。

環境

今回の環境は、こちら。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.3 LTS
Release:    20.04
Codename:   focal


$ uname -srvmpio
Linux 5.4.0-81-generic #91-Ubuntu SMP Thu Jul 15 19:09:17 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux


$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ python3 -V
Python 3.8.10

Ubuntu Linux 20.04 LTSです。

Apache Sparkは3.1.2を使います。

また、pythonというコマンドはインストールされていません。

$ python

コマンド 'python' が見つかりません。もしかして:

  command 'python3' from deb python3
  command 'python' from deb python-is-python3

Apache Sparkをインストールする

まずは、Apache Sparkをインストールします。Pythonで扱う場合はpipでもインストール可能ですが、今回はふつうに
tar.gzファイルをダウンロードしてくることにします。

$ curl -OL https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
$ tar xf spark-3.1.2-bin-hadoop3.2.tgz
$ cd spark-3.1.2-bin-hadoop3.2

PySparkを起動。

$ bin/pyspark
Python 3.8.10 (default, Jun  2 2021, 10:49:15) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 22:21:10 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 22:21:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 22:21:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.10 (default, Jun  2 2021 10:49:15)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630934476222).
SparkSession available as 'spark'.
>>> 

この時点でpython3コマンドを使っているような雰囲気がありますね。

バージョンを確認。

>>> import sys
>>> sys.version
'3.8.10 (default, Jun  2 2021, 10:49:15) \n[GCC 9.4.0]'

実際、このあたりを見ているとpython3コマンドを使うようになっているみたいですね。

https://github.com/apache/spark/blob/v3.1.2/bin/pyspark#L42

https://github.com/apache/spark/blob/v3.1.2/bin/find-spark-home#L38

とりあえず、README.mdでWord Countして動作確認。

>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.read.text('README.md')
>>> counts = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).collect()
>>> for (word, count) in sorted(counts, reverse=True, key=lambda x: x[1]):
...   print(f'{word}: {count}')

結果の抜粋。

: 73
the: 23
to: 16
Spark: 14
for: 12
a: 9
and: 9
##: 9
is: 7
on: 7
run: 7
can: 6
in: 5
also: 5
of: 5
an: 4
including: 4
if: 4
you: 4
*: 4
Please: 4

〜省略〜

PySparkで使うPythonの実行パスを指定する

先ほどのpysparkスクリプトを見ると気づくのですが、PySparkで使うPythonの実行パスを環境変数で指定することが
できます。

PYSPARK_PYTHONを使うと、まるっと切り替えられるようですね。これ、ドキュメントには記載がなさそうな感じです。

試しにと、Python 3.9をインストール。

$ sudo apt install python3.9

バージョン確認。

$ python3.9 -V
Python 3.9.5

PYSPARK_PYTHONpython3.9を指定して実行。

$ PYSPARK_PYTHON=python3.9 bin/pyspark

すると、Pythonのバージョンが変わりました。

Python 3.9.5 (default, May 19 2021, 11:32:47) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 22:28:30 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 22:28:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 22:28:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.9.5 (default, May 19 2021 11:32:47)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630934912371).
SparkSession available as 'spark'.

sys.versionでも確認。

>>> import sys
>>> sys.version
'3.9.5 (default, May 19 2021, 11:32:47) \n[GCC 9.3.0]'

ちなみに、タスクを実行するExecutorが使用するPythonと、タスクのアサインを指示するDriverが使用するPython
別々のものを指定することができます。

Executorの方はPYSPARK_PYTHON環境変数で、Driverの方はPYSPARK_DRIVER_PYTHON環境変数ですね。

$ export PYSPARK_PYTHON=...
$ export PYSPARK_DRIVER_PYTHON=...

PYSPARK_DRIVER_PYTHON環境変数を指定していない場合は、PYSPARK_PYTHON環境変数の値が使われます。
そして、PYSPARK_PYTHONのデフォルト値はPySparkの起動スクリプトの中でpython3となります。

コメントを見るとわかるのですが、PYSPARK_DRIVER_PYTHONの方はipythonなどでの指定を想定していそうな
感じですね。

https://github.com/apache/spark/blob/v3.1.2/bin/pyspark#L27-L31

ただし、ExecutorとDriverそれぞれで使うPythonのメジャーバージョン、マイナーバージョンが異なることは許容されないので
その点には注意です。

https://github.com/apache/spark/blob/v3.1.2/python/pyspark/worker.py#L472-L477

pipでインストールした場合は?

ここまでtar.gzファイルを使ってインストールしたApache Sparkで話をしてきましたが、pipでインストールした場合は
どうでしょう?

試してみます。

$ pip3 install pyspark==3.1.2

この場合、pysparkコマンドにパスが通っているので、そのまま起動できます。

$ pyspark
Python 3.8.10 (default, Jun  2 2021, 10:49:15) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 23:07:28 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 23:07:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/venv/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 23:07:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.10 (default, Jun  2 2021 10:49:15)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630937250516).
SparkSession available as 'spark'.
>>> 

ですが、PYSPARK_PYTHON環境変数を使うとSPARK_HOMEの探索がずれるみたいでうまく動きません。
※仮想環境(venv)で動かしています

$ PYSPARK_PYTHON=python3.9 pyspark
Traceback (most recent call last):
  File "path/to/venv/bin/find_spark_home.py", line 86, in <module>
    print(_find_spark_home())
  File "path/to/venv/bin/find_spark_home.py", line 52, in _find_spark_home
    module_home = os.path.dirname(find_spec("pyspark").origin)
AttributeError: 'NoneType' object has no attribute 'origin'
/path/to/venv/bin/pyspark: 行 24: /bin/load-spark-env.sh: そのようなファイルやディレクトリはありません
/path/to/venv/bin/pyspark: 行 68: /bin/spark-submit: そのようなファイルやディレクトリはありません

どうしたものかな?と思ったのですが、そもそもこのケースの場合は使うpipに使うPython自体を切り替えればいい
気がしますね。

$ sudo apt install python3.9-venv
$ . venv/bin/activate

pip3.9でPySparkをインストール。

$ pip3.9 install pyspark==3.1.2

これで、切り替わりました。

$ pyspark
Python 3.9.5 (default, May 19 2021, 11:32:47) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/09/06 23:18:13 WARN Utils: Your hostname, ubuntu2004.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.121.6 instead (on interface eth0)
21/09/06 23:18:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/path/to/venv/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/09/06 23:18:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.9.5 (default, May 19 2021 11:32:47)
Spark context Web UI available at http://192.168.121.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1630937895507).
SparkSession available as 'spark'.
>>> 

LocalStackでAmazon API Gateway+AWS Lambdaを動かしてみる

これは、なにをしたくて書いたもの?

LocalStackの機能を見ていると、Amazon API Gatewayが使えそうなので、AWS Lambdaと組み合わせてLocalStack上で
動かしてみようかな、と。

環境

今回の環境は、こちらです。

$ localstack --version
0.12.17.5

AWS CLIは、LocalStack提供のもの+本家のものを使います。

$ awslocal --version
aws-cli/2.2.35 Python/3.8.8 Linux/5.4.0-81-generic exe/x86_64.ubuntu.20 prompt/off

LocalStackは、LAMBDA_EXECUTORdocker-reuseとしておきました。

$ LAMBDA_EXECUTOR=docker-reuse localstack start

参考するドキュメント

AWS Lambda関数は、Pythonで作成することにします。ランタイムはPython 3.8を使用。

Lambda ランタイム - AWS Lambda

ドキュメントは、全体を通してこのあたりを参考にしています。

チュートリアル: Python 3.8 での Lambda 関数の作成 - AWS Lambda

Amazon API GatewayREST APIの方で作ることにします。よって、ドキュメントはこちらを参照。

API Gateway で Lambda プロキシ統合を設定する - Amazon API Gateway

AWS Lambda関数を作成してデプロイする

それでは、まずはAWS Lambda関数を作成します。

こちらのドキュメントを見ていると、Amazon API Gatewayの背後の配置するAWS Lambda関数の入出力には規定が
あるようです。

API Gateway が Lambda 出力を API レスポンスとしてクライアントに渡すには、Lambda 関数は結果をこの形式で返す必要があります。

API Gateway Lambda プロキシの統合について理解する

具体的には、こちらのドキュメントですね。

プロキシ統合のための Lambda 関数の入力形式

プロキシ統合のための Lambda 関数の出力形式

この形式に則った形で、AWS Lambda関数を作成。

my_function.py

import json

def handler(event, context):

    print(f'event = {event}')

    path = event['path']
    http_method = event['httpMethod']

    word = None

    if 'queryStringParameters' in event:
       if 'word' in event['queryStringParameters']:
           word = event['queryStringParameters']['word']

    if 'body' in event and len(event['body']) != 0:
        body = json.loads(event['body'])
        if 'word' in body:
            word = body['word']

    if word is None:
        word = 'World'
        
    return {
        'statusCode': 200,
        'body': {
            'message': f'Hello {word}!!',
            'path': path,
            'http_method': http_method
        }
    }

今回は、メッセージをQueryStringまたはHTTPボディで受け取り、その内容をレスポンスのメッセージに組み込んで返すように
しましょう。

作成したPythonスクリプトをzipにアーカイブ

$ zip my_function_package.zip my_function.py

デプロイ。

$ awslocal lambda create-function \
  --function-name my_function \
  --zip-file fileb://my_function_package.zip \
  --handler my_function.handler \
  --runtime python3.8 \
  --role test-role

更新する場合は、本来はupdate-function-codeなのですが…LocalStackだとうまくいかないことがあり。

$ awslocal lambda update-function-code --function-name my_function --zip-file fileb://my_function_package.zip

削除して、もう1度登録した方がよいかもしれません。

$ awslocal lambda delete-function --function-name my_function

Amazon API Gatewayのリソースを作成して、AWS Lambdaと統合する

続いて、Amazon API Gatewayの方の作業に進みます。ドキュメントは、こちらを参考に。

API Gateway で Lambda プロキシ統合を設定する - Amazon API Gateway

まずは、REST APIを作成。

$ awslocal apigateway create-rest-api --name 'My Rest API Gateway'

コマンドの実行結果。

{
    "id": ".....",
    "name": ".....",
    "createdDate": ".....,
    "version": "V1",
    "binaryMediaTypes": [],
    "apiKeySource": "HEADER",
    "endpointConfiguration": {
        "types": [
            "EDGE"
        ]
    },
    "tags": {},
    "disableExecuteApiEndpoint": false
}

このレスポンスに含まれるidの値を覚えておきます。今回はREST_API_IDという変数に保存しておきました。

$ REST_API_ID=[idの値を指定]

次に、ルートリソースを確認します。

$ awslocal apigateway get-resources --rest-api-id $REST_API_ID

レスポンス。

{
    "items": [
        {
            "id": ".....",
            "path": "/"
        }
    ]
}

ここで、ルートリソース(path /id)の値を覚えておきます。RESOURCE_ROOT_IDという変数に保存しておきました。

$ RESOURCE_ROOT_ID=[path / のidの値を指定]

ルートリソースを親として、新しくリソースを作成します。

$ awslocal apigateway create-resource \
  --rest-api-id $REST_API_ID \
  --parent-id $RESOURCE_ROOT_ID \
  --path-part {proxy+}

レスポンス。

{
    "id": ".....",
    "parentId": ".....",
    "pathPart": "{proxy+}",
    "path": "/{proxy+}"
}

ここで得られたリソースのidも覚えておきます。RESOURCE_IDという変数に保存しておきました。

$ RESOURCE_ID=[create-resourceの結果で得られたidの値]

作成したリソースに対してメソッドを追加。

$ awslocal apigateway put-method \
  --rest-api-id $REST_API_ID \
  --resource-id $RESOURCE_ID \
  --http-method ANY \
  --authorization-type "NONE" 

レスポンス。

{
    "httpMethod": "ANY",
    "authorizationType": "NONE",
    "apiKeyRequired": false
}

続いてはAWS Lambda関数とのインテグレーションを行うのですが、この時に作成したAWS Lambda関数のARNが必要に
なるので取得しておきます。

$ awslocal lambda get-function --function-name my_function --query 'Configuration.FunctionArn'
"arn:aws:lambda:us-east-1:000000000000:function:my_function"


$ LAMBDA_ARN=arn:aws:lambda:us-east-1:000000000000:function:my_function

取得したAWS Lambda関数のARN、REST APIのID、リソースのIDを指定して、インテグレーションの設定を行います。

$ awslocal apigateway put-integration \
  --rest-api-id $REST_API_ID \
  --resource-id $RESOURCE_ID \
  --http-method ANY \
  --type AWS_PROXY \
  --integration-http-method POST \
  --uri arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$LAMBDA_ARN/invocations \
  --credentials arn:aws:iam::000000000000:role/apigAwsProxyRole

--uriの部分ですが、AWS LambdaのARNの後に/invocationsが付くのは固定みたいですね。

レスポンス。

{
    "type": "AWS_PROXY",
    "httpMethod": "POST",
    "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:000000000000:function:my_function/invocations",
    "requestParameters": {},
    "passthroughBehavior": "WHEN_NO_MATCH",
    "cacheNamespace": ".....",
    "cacheKeyParameters": []
}

ステージを指定して、デプロイメントの作成。

$ awslocal apigateway create-deployment --rest-api-id $REST_API_ID --stage-name test
{
    "id": "cznsonlidb",
    "description": "",
    "createdDate": "2021-09-05T15:21:19+09:00"
}

ここまでやって、AWS Lambda関数をAmazon API Gateway経由で呼び出せるようになります。

ところで、本来のAmazon API Gatewayへアクセスするにはxxxxx.execute-api.us-east-1.amazonaws.comのような
ドメインでアクセスするのですが、LocalStackの場合はそうはいきません。どうするのでしょう。

README.mdに回答が書いてありました。

Invoking API Gateway

以下のURLでアクセスするようです。

http://localhost:4566/restapis/<apiId>/<stage>/_user_request_/<methodPath>

_user_request_という部分は、固定で入ります。

確認してみましょう。

HTTP GETでアクセス。

$ curl http://localhost:4566/restapis/$REST_API_ID/test/_user_request_/request_from_query?word=WordFromQuery
{"message": "Hello WordFromQuery!!", "path": "/request_from_query", "http_method": "GET"}

ルートリソースを親にしてpathを{proxy+}にしているので、この状態だとどのパスでもアクセスできるようです。

POSTでアクセス。

$ curl -XPOST http://localhost:4566/restapis/$REST_API_ID/test/_user_request_/request_post/json -d '{"word": "Lambda"}'
{"message": "Hello Lambda!!", "path": "/request_post/json", "http_method": "POST"}

なんとなく、HTTPヘッダーも付けてみたり。

$ curl -XPOST -H 'Content-Type: application/json' http://localhost:4566/restapis/$REST_API_ID/test/_user_request_/request_post/json -d '{"word": "Lambda"}'
{"message": "Hello Lambda!!", "path": "/request_post/json", "http_method": "POST"}

こんなところでしょうか。動作確認はOKです。

まとめ

LocalStackで、Amazon API GatewayAWS Lambdaを動かしてみました。

Amazon API Gatewayが初見だったのですが、用語と操作でいろいろ戸惑う感じがします…。

用語は、このあたりを見ておさえていきましょうか。

Amazon API Gateway の概念 - Amazon API Gateway

とりあえず、動かせるところまでいけたのでOKです。