これは、なにをしたくて書いたもの?
前にAmazon S3互換のオブジェクトストレージ、MinIOを試してみました。
Amazon S3互換のオブジェクトストレージ、MinIOを試す - CLOVER🍀
今回は、こちらにApache Sparkからアクセスしてみたいと思います。
環境
今回の環境は、こちら。Apache Sparkのプログラムは、Pythonで書くことにします。
$ python3 -V Python 3.8.10
Apache Sparkは3.1.2 - Apache Hadoop 3.2を使います。
マスターノード1台、ワーカーノード3台のスタンドアロンクラスター構成で動作させます。特にプログラムには
表現されませんが。
ドライバーからマスターノードへは、ローカル接続でアクセスします。
MinIOのバージョンは、こちら。
$ minio --version minio version RELEASE.2021-09-18T18-09-59Z
MinIOは、172.19.0.6で動作しているものとします。
準備とお題
PySparkをインストール。
$ pip3 install pyspark==3.1.2
お題としては、Apache SparkのREADME.md
を使ってWord Countすることにします。
https://github.com/apache/spark/blob/v3.1.2/README.md
README.md
をcurl
でダウンロード。
$ curl -OL https://github.com/apache/spark/raw/v3.1.2/README.md
続いてMinIOにバケットを作成します。
$ export AWS_ACCESS_KEY_ID=minioadmin $ export AWS_SECRET_ACCESS_KEY=minioadmin $ export AWS_DEFAULT_REGION=us-east-1 $ export MINIO_ENDPOINT=http://172.19.0.6:9000
README.md
を配置する、入力用のバケットを作成。
$ aws --endpoint-url $MINIO_ENDPOINT s3 mb s3://input-bucket make_bucket: input-bucket
こちらにREADME.md
をアップロードします。
$ aws --endpoint-url $MINIO_ENDPOINT s3 cp ./README.md s3://input-bucket/spark/README.md upload: ./README.md to s3://input-bucket/spark/README.md
続いて、出力用のバケットを作成。
$ aws --endpoint-url $MINIO_ENDPOINT s3 mb s3://output-bucket make_bucket: output-bucket
これで、準備は完了です。
MinIOのバケットにアクセスするWord Countプログラムを作成する
では、MinIOのバケットにアクセスするWord Countプログラムを作成します。
作成したプログラムは、こちらに。
word_count.py
from datetime import datetime from pyspark.rdd import RDD from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType, StringType, LongType spark: SparkSession = SparkSession.builder.getOrCreate() hadoop_configuration = spark.sparkContext._jsc.hadoopConfiguration() hadoop_configuration.set('fs.s3a.endpoint', 'http://172.19.0.6:9000') hadoop_configuration.set('fs.s3a.connection.ssl.enabled', 'false') df: DataFrame = spark.read.text('s3a://input-bucket/spark/README.md') splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' ')) words: RDD = splitted.map(lambda x: (x, 1)) results: RDD = words.reduceByKey(lambda a, b: a + b) wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType()) output_dir = datetime.now().strftime('%Y%m%d%H%M%S') results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save(f's3a://output-bucket/word-count-results-{output_dir}')
プログラム上のポイントは、こちらですね。
hadoop_configuration = spark.sparkContext._jsc.hadoopConfiguration() hadoop_configuration.set('fs.s3a.endpoint', 'http://172.19.0.6:9000') hadoop_configuration.set('fs.s3a.connection.ssl.enabled', 'false')
Amazon S3のエンドポイントをMinIOに向け、SSL/TLSでのアクセスを無効に設定します。
これは、Hadoop-AWSというモジュールの設定になります。
Apache Hadoop Amazon Web Services support – Hadoop-AWS module: Integration with Amazon Web Services
このモジュールは、実行時に--packages
オプションで追加します。
AWSのアクセスキーとリージョンは、環境変数で指定することにしましょう。
これらの設定は、Apache Sparkの各ノードでconf/spark-defaults.conf
ファイルに設定することもできそうな感じですが、
今回はこの方法をとります。
Spark Configuration / Custom Hadoop/Hive Configuration
結果、spark-submit
で実行する時のコマンドはこうなりました。
$ AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin spark-submit --master spark://localhost:7077 --packages org.apache.hadoop:hadoop-aws:3.2.0 word_count.py
MinIOのクレデンシャルを、環境変数で設定。
AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin
--packages
オプションで、Hadoop-AWSを追加します。
--packages org.apache.hadoop:hadoop-aws:3.2.0
ここで指定した依存ライブラリは、Apache Ivyでダウンロードされるようです。
:: loading settings :: url = jar:file:/path/to/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: $HOME/.ivy2/cache The jars for the packages stored in: $HOME/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-6834f39e-f1dc-408a-9fd0-ef5fcbe83f30;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.0 in central found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central :: resolution report :: resolve 265ms :: artifacts dl 9ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default] org.apache.hadoop#hadoop-aws;3.2.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-6834f39e-f1dc-408a-9fd0-ef5fcbe83f30 confs: [default] 0 artifacts copied, 2 already retrieved (0kB/8ms)
実行結果の例。
$ aws --endpoint-url $MINIO_ENDPOINT s3 cp s3://output-bucket/word-count-results-20210921000225/part-00000-12d8cf19-18a9-4601-98fc-c0086e1aa44e-c000.csv - | head -n 10 "#",1 Apache,1 Spark,14 "",73 is,7 a,9 unified,1 analytics,1 engine,2 for,12
OKそうですね。
ところで、こちらのSparkContext
に出てくる_jsc
とはなんでしょう?
hadoop_configuration = spark.sparkContext._jsc.hadoopConfiguration()
これは、JavaのSparkContext
オブジェクトのようです。
jsc : :py:class:
py4j.java_gateway.JavaObject
, optional The JavaSparkContext instance. This is only used internally.
内部利用のみ、と書かれている割には、コード例を検索するとそこそこ見つかるんですよね…。PySparkで扱える範囲が
限定的だからということでしょうか。
pyspark.context — PySpark 3.1.2 documentation
https://github.com/apache/spark/blob/v3.1.2/python/pyspark/context.py
まあ、今回はこれでOKとしましょう。
まとめ
Apache Spark 3.1から、Amazon S3互換のオブジェクトストレージであるMinIOにアクセスしてみました。
Hadoop-AWSに関する設定などがよくわからなくて戸惑いましたが、最終的にはなんとかなってよかったです。
今度は、Azure Storageのエミュレーターなどでも試してみたいですね。
追記:
と思ったのですが、Apache SparkからAzure StorageのエミュレーターAzuriteはできなさそうです…。
Unable to access the blob storage files from Apache spark. · Issue #757 · Azure/Azurite · GitHub