CLOVER🍀

That was when it all began.

Apache Spark 3.1から、Amazon S3互換のオブジェクトストレージMinIOにアクセスする

これは、なにをしたくて書いたもの?

前にAmazon S3互換のオブジェクトストレージ、MinIOを試してみました。

Amazon S3互換のオブジェクトストレージ、MinIOを試す - CLOVER🍀

今回は、こちらにApache Sparkからアクセスしてみたいと思います。

環境

今回の環境は、こちら。Apache Sparkのプログラムは、Pythonで書くことにします。

$ python3 -V
Python 3.8.10

Apache Sparkは3.1.2 - Apache Hadoop 3.2を使います。
マスターノード1台、ワーカーノード3台のスタンドアロンクラスター構成で動作させます。特にプログラムには
表現されませんが。

ドライバーからマスターノードへは、ローカル接続でアクセスします。

MinIOのバージョンは、こちら。

$ minio --version
minio version RELEASE.2021-09-18T18-09-59Z

MinIOは、172.19.0.6で動作しているものとします。

準備とお題

PySparkをインストール。

$ pip3 install pyspark==3.1.2

お題としては、Apache SparkのREADME.mdを使ってWord Countすることにします。

https://github.com/apache/spark/blob/v3.1.2/README.md

Examples | Apache Spark

README.mdcurlでダウンロード。

$ curl -OL https://github.com/apache/spark/raw/v3.1.2/README.md

続いてMinIOにバケットを作成します。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ export AWS_DEFAULT_REGION=us-east-1
$ export MINIO_ENDPOINT=http://172.19.0.6:9000

README.mdを配置する、入力用のバケットを作成。

$ aws --endpoint-url $MINIO_ENDPOINT s3 mb s3://input-bucket
make_bucket: input-bucket

こちらにREADME.mdをアップロードします。

$ aws --endpoint-url $MINIO_ENDPOINT s3 cp ./README.md s3://input-bucket/spark/README.md
upload: ./README.md to s3://input-bucket/spark/README.md

続いて、出力用のバケットを作成。

$ aws --endpoint-url $MINIO_ENDPOINT s3 mb s3://output-bucket
make_bucket: output-bucket

これで、準備は完了です。

MinIOのバケットにアクセスするWord Countプログラムを作成する

では、MinIOのバケットにアクセスするWord Countプログラムを作成します。

作成したプログラムは、こちらに。

word_count.py

from datetime import datetime

from pyspark.rdd import RDD
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, LongType

spark: SparkSession = SparkSession.builder.getOrCreate()

hadoop_configuration = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_configuration.set('fs.s3a.endpoint', 'http://172.19.0.6:9000')
hadoop_configuration.set('fs.s3a.connection.ssl.enabled', 'false')

df: DataFrame = spark.read.text('s3a://input-bucket/spark/README.md')
splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' '))
words: RDD = splitted.map(lambda x: (x, 1))
results: RDD = words.reduceByKey(lambda a, b: a + b)

wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType())

output_dir = datetime.now().strftime('%Y%m%d%H%M%S')

results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save(f's3a://output-bucket/word-count-results-{output_dir}')

プログラム上のポイントは、こちらですね。

hadoop_configuration = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_configuration.set('fs.s3a.endpoint', 'http://172.19.0.6:9000')
hadoop_configuration.set('fs.s3a.connection.ssl.enabled', 'false')

Amazon S3のエンドポイントをMinIOに向け、SSLTLSでのアクセスを無効に設定します。

これは、Hadoop-AWSというモジュールの設定になります。

Apache Hadoop Amazon Web Services support – Hadoop-AWS module: Integration with Amazon Web Services

このモジュールは、実行時に--packagesオプションで追加します。

AWSのアクセスキーとリージョンは、環境変数で指定することにしましょう。

これらの設定は、Apache Sparkの各ノードでconf/spark-defaults.confファイルに設定することもできそうな感じですが、
今回はこの方法をとります。

Spark Configuration / Custom Hadoop/Hive Configuration

結果、spark-submitで実行する時のコマンドはこうなりました。

$ AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin spark-submit --master spark://localhost:7077 --packages org.apache.hadoop:hadoop-aws:3.2.0 word_count.py

MinIOのクレデンシャルを、環境変数で設定。

AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin

--packagesオプションで、Hadoop-AWSを追加します。

--packages org.apache.hadoop:hadoop-aws:3.2.0

ここで指定した依存ライブラリは、Apache Ivyでダウンロードされるようです。

:: loading settings :: url = jar:file:/path/to/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: $HOME/.ivy2/cache
The jars for the packages stored in: $HOME/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6834f39e-f1dc-408a-9fd0-ef5fcbe83f30;1.0
    confs: [default]
    found org.apache.hadoop#hadoop-aws;3.2.0 in central
    found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
:: resolution report :: resolve 265ms :: artifacts dl 9ms
    :: modules in use:
    com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
    org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-6834f39e-f1dc-408a-9fd0-ef5fcbe83f30
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/8ms)

実行結果の例。

$ aws --endpoint-url $MINIO_ENDPOINT s3 cp s3://output-bucket/word-count-results-20210921000225/part-00000-12d8cf19-18a9-4601-98fc-c0086e1aa44e-c000.csv - | head -n 10
"#",1
Apache,1
Spark,14
"",73
is,7
a,9
unified,1
analytics,1
engine,2
for,12

OKそうですね。

ところで、こちらのSparkContextに出てくる_jscとはなんでしょう?

hadoop_configuration = spark.sparkContext._jsc.hadoopConfiguration()

これは、JavaSparkContextオブジェクトのようです。

jsc : :py:class:py4j.java_gateway.JavaObject, optional The JavaSparkContext instance. This is only used internally.

内部利用のみ、と書かれている割には、コード例を検索するとそこそこ見つかるんですよね…。PySparkで扱える範囲が
限定的だからということでしょうか。

pyspark.context — PySpark 3.1.2 documentation

https://github.com/apache/spark/blob/v3.1.2/python/pyspark/context.py

まあ、今回はこれでOKとしましょう。

まとめ

Apache Spark 3.1から、Amazon S3互換のオブジェクトストレージであるMinIOにアクセスしてみました。

Hadoop-AWSに関する設定などがよくわからなくて戸惑いましたが、最終的にはなんとかなってよかったです。

今度は、Azure Storageのエミュレーターなどでも試してみたいですね。

追記:

と思ったのですが、Apache SparkからAzure StorageのエミュレーターAzuriteはできなさそうです…。

Unable to access the blob storage files from Apache spark. · Issue #757 · Azure/Azurite · GitHub