ããã¯ããªã«ãããããŠæžãããã®ïŒ
Apache Sparkã®ã¹ã¿ã³ãã¢ãã³ã¢ãŒãã§ã¯ã©ã¹ã¿ãæ§æããŠã¿ããããªããšã
ãããããããŒã«ã«ã§åããæã®local[ã]
ã®æå³ãããå¿ããã®ã§ããããã¡ã¢ããŠãããããªãšã
ç°å¢
ä»åã®ç°å¢ã¯ããã¡ãã§ãã
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9) OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode) $ python3 -V Python 3.8.10
確èªããããŒãã«å¿ããŠãpipã§PySparkãã€ã³ã¹ããŒã«ããã
$ pip3 install pyspark==3.1.2
Apache Sparkã®ãã£ã¹ããªãã¥ãŒã·ã§ã³ãã®ãã®ã䜿çšããŸãã
$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
Apache Sparkã®ããŒãžã§ã³ã¯ã3.1.2ã§ãã
ã¹ã¿ã³ãã¢ãã³ã¢ãŒã
ãŸãã¹ã¿ã³ãã¢ãã³ã¢ãŒããšã¯ãªã«ãïŒããã§ãããããã¯ã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒã®ã²ãšã€ã§Apache Sparkèªäœã«
å«ãŸããŠãããã®ã§ãã
Cluster Mode Overview - Spark 3.1.2 Documentation
ã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒã«ã¯ã¹ã¿ã³ãã¢ãã³ãHadoop YARNãKubernetesãApache Mesosãªã©ããããŸãã
ã¹ã¿ã³ãã¢ãã³ã¢ãŒãã®ããã¥ã¡ã³ãã¯ããã¡ãã
Spark Standalone Mode - Spark 3.1.2 Documentation
ãã¹ã¿ãŒãã¯ãŒã«ãŒæ§æãåãã以äžã®ã©ã¡ããã®åœ¢æ ã§åäœãããŸãã
- ãã¹ã¿ãŒãã¯ãŒã«ãŒã§ã¯ã©ã¹ã¿ãŒãæ§æãã
- ïŒãã¹ãç®çã®ããã«ïŒåäžã®ãã·ã³äžã§ããŒã¢ã³ãæ§æãã
ãŸããSparkã¢ããªã±ãŒã·ã§ã³ã¯clientã¢ãŒããclusterã¢ãŒãã®ãããã€ã¢ãŒãã®ããããã§åäœããŸãã
Spark Standalone Mode / Launching Spark Applications
SparkContext
ãçæãããã©ã€ããŒãããŒã«ã«ã§åäœãããã®ãclientã¢ãŒããSparkã¯ã©ã¹ã¿ãŒåŽã§åäœãããã®ã
clusterã¢ãŒãã§ãã
clientã¢ãŒãã ãšSparkã¢ããªã±ãŒã·ã§ã³ãéä¿¡ããã¯ã©ã€ã¢ã³ããšåãããã»ã¹ã§ãã©ã€ããŒãèµ·åãããå®äºãåŸ
ã¡ãŸãã
clusterã¢ãŒãã®å Žåãã¯ã©ã€ã¢ã³ãã¯Sparkã¢ããªã±ãŒã·ã§ã³ãã¯ã©ã¹ã¿ãŒã«éä¿¡ãããå®äºãåŸ
ããã«çµäºããŸãã
ãã®äœ¿ãåãã«ã€ããŠã¯ããã¡ããåç §ããŸãããã
Submitting Applications / Launching Applications with spark-submit
ãªãã§ããã©ãçŸæç¹ã®Apache Sparkã§ã¯ãPythonã¢ããªã±ãŒã·ã§ã³ã®å Žåã¯clusterã¢ãŒãã§ã®ãããã€ã¯
ã§ããªãã¿ããã§ãã
PySparkã䜿ã£ãŠãSparkãããŒã«ã«ã§å®è¡ãã
ãŸãã¯ãpip
ã䜿ã£ãŠã€ã³ã¹ããŒã«ããApache Sparkã䜿ã£ãŠãããŒã«ã«ã§å®è¡ããŸãã
ãã®æã«ããåºãŠããlocal[*]
ã®æå³ã確èªããŠãããããªããšã
ããšãã°ãpyspark
ã³ãã³ããå®è¡ããŸãã
$ pyspark
ãããšApache Sparkãèµ·åããŠãã°ãåºåãããã®ã§ããããã®æã«ãããªè¡šç€ºãèŠããŸãã
Spark context available as 'sc' (master = local[*], app id = local-1631357850789).
ããã°ã©ã ã§master
æå®ãããããªäŸããããŸããã
spark = SparkSession.builder.master('local[2]').getOrCreate()
ããã¯ãspark.master
ãšããèšå®ã§ãã¡ãã«èšèŒããããŸãã
Spark Configuration / Application Properties
spark.master
ã«èšå®å¯èœãªå€ã¯ããã¡ãã
Submitting Applications / Master URLs
ãã®ãã¡ãlocal[ã]
ã«é¢ãããã®ãæç²ããŸãã
åœ¢åŒ | æå³ |
---|---|
local | ã²ãšã€ã®ã¯ãŒã«ãŒã¹ã¬ããã䜿ããSparkãããŒã«ã«ã§å®è¡ãã |
local[K] | Kåã®ã¯ãŒã«ãŒã¹ã¬ããã䜿ããSparkãããŒã«ã«ã§å®è¡ãã |
local[K, F] | Kåã®ã¯ãŒã«ãŒã¹ã¬ããã䜿ããSparkãããŒã«ã«ã§å®è¡ãããã¿ã¹ã¯ã¯æ倧F - 1åã®å€±æã蚱容ãã |
local[*] | åäœããŠãããã·ã³ã®ã³ã¢æ°ãšåãæ°ã®ã¯ãŒã«ãŒã¹ã¬ããã䜿ããSparkãããŒã«ã«ã§å®è¡ãã |
local[*, K] | åäœããŠãããã·ã³ã®ã³ã¢æ°ãšåãæ°ã®ã¯ãŒã«ãŒã¹ã¬ããã䜿ããSparkãããŒã«ã«ã§å®è¡ãããã¿ã¹ã¯ã¯æ倧F - 1åã®å€±æã蚱容ãã |
å
ã»ã©ã®äŸã ãšãpyspark
ãèµ·åããæã®spark.master
ã¯local[*]
ã§ããã®ãã·ã³ã®ã³ã¢ã®æ°åã ãã¯ãŒã«ãŒã¹ã¬ããã
æ§æããŠããŒã«ã«å®è¡ãããšããæå³ã«ãªããŸããã
spark.master
ã®èšå®ã¯ãpyspark
ãspark-submit
ãspark-shell
ãªã©ã®--master
ãªãã·ã§ã³ã§æå®ã§ããŸãã
## pysparkã®å Žå $ pyspark --master local[2] >>> spark.conf.get('spark.master') 'local[2]' ## spark-submitã®å Žå $ spark-submit --master local[2] [Application]
ãŸãpipã§ã€ã³ã¹ããŒã«ããApache Sparkããã¯è±ç·ããŸããããã£ã¹ããªãã¥ãŒã·ã§ã³ããã€ã³ã¹ããŒã«ããŠããå Žåã¯ã
conf/spark-defaults.conf
ãã¡ã€ã«ã«ããã©ã«ãã®å€ãèšèŒããŠããããšãã§ããŸãã
Spark Configuration / Dynamically Loading Spark Properties
確èªããŠã¿ãŸãããããããªæãã®Word Countããã°ã©ã ãçšæããŸãã
word_count.py
from pyspark.rdd import RDD from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType, StringType, LongType spark: SparkSession = SparkSession.builder.getOrCreate() print(f"spark.master = {spark.conf.get('spark.master')}") df: DataFrame = spark.read.text('README.md') splitted: RDD = df.rdd.map(lambda x: x[0]).flatMap(lambda x: x.split(' ')) words: RDD = splitted.map(lambda x: (x, 1)) results: RDD = words.reduceByKey(lambda a, b: a + b) wordCountSchema: StructType = StructType().add('word', StringType()).add('count', LongType()) results.toDF(wordCountSchema).select('word', 'count').write.format('csv').save('/tmp/word_count_results') print('end!!')
éäžã§ãspark.master
ã®å€ãåºåããããã«ããŸããã
print(f"spark.master = {spark.conf.get('spark.master')}")
Word Countãã察象ã¯ãApache Sparkã®README.md
ãšããŸãã
$ curl -LO https://github.com/apache/spark/raw/v3.1.2/README.md
å®è¡ã
$ spark-submit word_count.py
ãã®æãspark.master
ã¯local[*]
ãšãªã£ãŠããŸãã
spark.master = local[*]
çµæã¯ãä»åã®ããã°ã©ã ã§ã¯/tmp/word_count_results
ãã£ã¬ã¯ããªã«åºåãããŸãã
çµæã®äŸã
$ head -n 10 /tmp/word_count_results/part-00000-b64679f0-8e18-4c2e-880c-f7d57db2b3fc-c000.csv "#",1 Apache,1 Spark,14 "",73 is,7 a,9 unified,1 analytics,1 engine,2 for,12
åå®è¡ããå Žåã¯ãåºå察象ã®ãã£ã¬ã¯ããªãååšãããšå®è¡ã«å€±æããã®ã§ã1床ãã£ã¬ã¯ããªãåé€ããŠãããšããã§ãããã
$ rm -rf /tmp/word_count_results
local[2]
ã§å®è¡ã
$ spark-submit --master local[2] word_count.py
èšå®ã«åæ ãããŠããããšã確èªã§ããŸãã
spark.master = local[2]
ãŸããspark.master
ã¯åè¿°ã®éãããã°ã©ã å
ã§ãèšèŒã§ããŸãããéåžžã¯ã³ãã³ãã©ã€ã³ãªãã·ã§ã³ãèšå®ãã¡ã€ã«ã§
æå®ããæ¹ãããã§ããããã
spark: SparkSession = SparkSession.builder.master('local[2]').getOrCreate()
ã¹ã¿ã³ãã¢ãã³ã¯ã©ã¹ã¿ãŒãæ§æãã
ç¶ããŠã¯ãã¹ã¿ã³ãã¢ãã³ã¯ã©ã¹ã¿ãŒãæ§æããŠã¿ãŸãã
172.17.0.2ã5ã®èš4ã€ã®ããŒããçšæãã172.17.0.2
ã¯ãã¹ã¿ãŒããã以å€ã¯ã¯ãŒã«ãŒãšããæ§æã«ããããšæããŸãã
åããŒãã«ã¯ãApache Sparkãã€ã³ã¹ããŒã«ããŠãããŸãã
$ curl -LO https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz $ tar xf spark-3.1.2-bin-hadoop3.2.tgz $ cd spark-3.1.2-bin-hadoop3.2
ã¯ã©ã¹ã¿ãŒã¯ã以äžã®æé ãšã¹ã¯ãªããã«åŸã£ãŠæ§æããŸãã
Spark Standalone Mode / Starting a Cluster Manually
Spark Standalone Mode / Cluster Launch Scripts
SSHã䜿ã£ãŠäžæ°ã«ããŒããèµ·åããæ¹æ³ãããããã§ãããSSHã®èšå®ãé¢åãªã®ã§âŠä»åã¯åã ã«èµ·åããŠãããŸãã
ãã¹ã¿ãŒã¯ã以äžã®start-master.sh
ã§èµ·åããŸãã
$ sbin/start-master.sh
ãã¹ãåã¯-h
ãŸãã¯--host
ãªãã·ã§ã³ã§æå®ã§ããŸããä»åã¯æå®ããŠèµ·åããŸãã
$ sbin/start-master.sh -h 172.17.0.2
Web UIãèŠãŠã¿ãŸããããhttp://[ãã¹ã¿ãŒã®IPã¢ãã¬ã¹]:8080
ã«ã¢ã¯ã»ã¹ããŠã¿ãŸãã
çŸæç¹ã§ã¯ã¯ãŒã«ãŒããŒãããããŸããã
次ã«ãã¯ãŒã«ãŒãèµ·åããŸãããããã¹ã¿ãŒããŒãã®URLãæå®ããŠãstart-worker.sh
ãå®è¡ããŸãã
$ sbin/start-worker.sh spark://172.17.0.2:7077 $ sbin/start-worker.sh spark://172.17.0.2:7077 $ sbin/start-worker.sh spark://172.17.0.2:7077
èµ·ååŸãWeb UIãèŠããšã¯ãŒã«ãŒãè¿œå ãããŠããŸãã
ã§ã¯ã--master
ãæå®ããŠspark-submit
ãå®è¡ãããã§--master
ã®æå®ãå¿ãããšãåã«ããŒã«ã«ã§åãã ãã«ãªããŸãâŠã
$ bin/spark-submit --master spark://172.17.0.2:7077 /host/word_count.py
å®è¡äžã
çµäºã
ä»åã®å®è£ æ¹æ³ã®å Žåãçµæã®ãã¡ã€ã«ã¯ã¯ãŒã«ãŒããŒãã®ããããã«åºåãããããã§ãã
ä»åã¯ã2ã€ç®ã«èµ·åããã¯ãŒã«ãŒããŒãäžã«ãããŸããã
$ head -n 10 /tmp/word_count_results/_temporary/0/task_202109111327078853503780902620221_0001_m_000000/part-00000-1bdd6215-c471-4fb4-a0ea-ca8d32416b7d-c000.csv "#",1 Apache,1 Spark,14 "",73 is,7 a,9 unified,1 analytics,1 engine,2 for,12
ããããããšã«ãªããããªããã°ãããŒã¿ã¹ãã¢ããªããžã§ã¯ãã¹ãã¬ãŒãžã«çµæãä¿åããããšã«ãªããã§ããããã
ã¯ãŒã«ãŒããã¹ã¿ãŒãåæ¢ããã³ãã³ãã¯ããã¡ãã
$ sbin/stop-worker.sh $ sbin/stop-master.sh
ããã§ä»å確èªãããã£ãããšã¯ã²ãšãšããã§ããæãã§ãã
ãŸãšã
Apache Sparkã§ãã¹ã¿ã³ãã¢ãã³ã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒã§ã¡ãã£ãšéãã§ã¿ãŸããã
ããŒã«ã«ã§åãããæãã¯ã©ã¹ã¿ãŒã§åãããæã®åæ©ã確èªã§ããæãã§ããã