これは、なにをしたくて書いたもの?
Elasticsearchに、CSVファイルで用意されたデータをロードする、簡単なスクリプトを書いてみようかなぁと。
こういうのは、バルクローダー的なものでやったり、なにかツールを使ったりすることが多いように思いますが、とりあえず
さくっとやるのなら、簡単なスクリプトで済ませるのもありなのかなぁと。
まあ、勉強がてらに。言語はPythonを使用します。
お題
以下のバリエーションでやります。
内容は、書籍で。
また、CSVファイルの読み込み方法は、Pythonの標準モジュールとPandasでやりたいと思います。CSVファイルの内容を、
辞書形式にしてElasticsearchに登録するようにします。この時、idはisbnを使うようにしましょう。
というわけで、主に使うのはこのあたりの情報ですね。
14.1. csv --- CSV ファイルの読み書き — Python 3.6.10 ドキュメント
Python Data Analysis Library — pandas: Python Data Analysis Library
Python Elasticsearch Client — Elasticsearch 7.0.0 documentation
GitHub - elastic/elasticsearch-py: Official Python low-level client for Elasticsearch.
環境
今回の環境は、こちら。
$ uname -srvmpio Linux 4.15.0-72-generic #81-Ubuntu SMP Tue Nov 26 12:20:02 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux $ python3 -V Python 3.6.9
ElasticsearchないしPandasを使用するので、両方インストール。標準ライブラリのCSVモジュールを使う場合は、Pandasは不要です。
$ pip3 install pandas elasticsearch
バージョン。
$ pip3 freeze elasticsearch==7.1.0 numpy==1.18.0 pandas==0.25.3 pkg-resources==0.0.0 python-dateutil==2.8.1 pytz==2019.3 six==1.13.0 urllib3==1.25.7
Elasticsearch。
$ java --version openjdk 11.0.5 2019-10-15 OpenJDK Runtime Environment (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04) OpenJDK 64-Bit Server VM (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04, mixed mode, sharing) $ curl localhost:9200 { "name" : "ubuntu1804.localdomain", "cluster_name" : "elasticsearch", "cluster_uuid" : "UBHfURUYRZW_SJOnTXfXwA", "version" : { "number" : "7.5.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "3ae9ac9a93c95bd0cdc054951cf95d88e1e18d96", "build_date" : "2019-12-16T22:57:37.835892Z", "build_snapshot" : false, "lucene_version" : "8.3.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
Elasticsearchが動作しているサーバーのIPアドレスは、192.168.33.13とします。
データ
用意したデータは、こちら。
isbn,title,price 978-4295003915,Elasticsearch実践ガイド,3080 978-4774192185,データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化],3278 978-4844378341,Elastic Stackで作るBI環境 Ver.7.4対応改訂版,2200 978-4844398981,Elasticsearch NEXT STEP,1980
ヘッダーなしのCSV。
books_without_header.csv
978-4295003915,Elasticsearch実践ガイド,3080 978-4774192185,データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化],3278 978-4844378341,Elastic Stackで作るBI環境 Ver.7.4対応改訂版,2200 978-4844398981,Elasticsearch NEXT STEP,1980
Elasticsearchで使うAPI
ElasticsearchへのアクセスはPythonクライアントを使うわけですが、単純にドキュメントを登録するケースと、バルクAPIを
使うケースの両方を試してみたいと思います。
このあたりですね。
CSVモジュールを使う
では、最初にPython標準ライブラリに入っている、CSVモジュールを使います。
ヘッダーがあることを考えると、DictReaderを使うのがよいのでしょう。
DictReaderを使うと、CSVファイルの内容を辞書形式(正確にはOrderedDict)で読み込むことができます。
作成したプログラムは、こんな感じ。 load_csv.py
from elasticsearch import Elasticsearch from elasticsearch import helpers from csv import DictReader csv_file = "books.csv" #csv_file = "books_without_header.csv" elasticsearch_host = "192.168.33.13" index_name = "book" es = Elasticsearch([f"http://{elasticsearch_host}:9200"]) ### simple with open(csv_file, "rt", encoding = "utf-8") as file: reader = DictReader(file) #reader = DictReader(file, fieldnames = ["isbn", "title", "price"]) # without Header for row in reader: doc = row isbn = doc["isbn"] es.index(index = index_name, id = isbn, body = doc) ### buik """ def doc_generator(index_name, csv_file): with open(csv_file, "rt", encoding = "utf-8") as file: reader = DictReader(file) #reader = DictReader(file, fieldnames = ["isbn", "title", "price"]) # without Header for row in reader: doc = row isbn = doc["isbn"] yield { "_index": index_name, "_id": doc["isbn"], "_source": doc } helpers.bulk(es, doc_generator(index_name, csv_file)) """
このまま動かすと、CSVファイルはヘッダーありを前提にして、読み込んだデータを1件1件登録するようになります。
要するに、これが動きます、と。
### simple with open(csv_file, "rt", encoding = "utf-8") as file: reader = DictReader(file) #reader = DictReader(file, fieldnames = ["isbn", "title", "price"]) # without Header for row in reader: doc = row isbn = doc["isbn"] es.index(index = index_name, id = isbn, body = doc)
読み込んだ辞書をそのままbodyに渡し、idにはisbnを指定します。
es.index(index = index_name, id = isbn, body = doc)
読み込ませるファイルを変えて、ヘッダーなしにした場合は
csv_file = "books_without_header.csv"
DictReaderの作成方法を変え、fieldnamesでカラム名を与える必要があります。
reader = DictReader(file, fieldnames = ["isbn", "title", "price"]) # without Header
バルクAPIを使う場合は、こちら。最初のソースコードでは、コメントアウトしてあります。
### buik def doc_generator(index_name, csv_file): with open(csv_file, "rt", encoding = "utf-8") as file: reader = DictReader(file) #reader = DictReader(file, fieldnames = ["isbn", "title", "price"]) # without Header for row in reader: doc = row isbn = doc["isbn"] yield { "_index": index_name, "_id": doc["isbn"], "_source": doc } helpers.bulk(es, doc_generator(index_name, csv_file))
ElasticsearchのPythonクライアントでバルクAPIを使う場合は、heplers#bulkを使うのが良さそうです。
この関数には、イテレーターやジェネレーターを渡すこともできるので、今回はジェネレーターを渡すようにしました。
これらを実行して
$ python3 load_csv.py
Elasticsearchに取り込んだ結果は、こちら。
$ curl localhost:9200/book/_search?pretty { "took" : 132, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 4, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "book", "_type" : "_doc", "_id" : "978-4295003915", "_score" : 1.0, "_source" : { "isbn" : "978-4295003915", "title" : "Elasticsearch実践ガイド", "price" : "3080" } }, { "_index" : "book", "_type" : "_doc", "_id" : "978-4774192185", "_score" : 1.0, "_source" : { "isbn" : "978-4774192185", "title" : "データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]", "price" : "3278" } }, { "_index" : "book", "_type" : "_doc", "_id" : "978-4844378341", "_score" : 1.0, "_source" : { "isbn" : "978-4844378341", "title" : "Elastic Stackで作るBI環境 Ver.7.4対応改訂版", "price" : "2200" } }, { "_index" : "book", "_type" : "_doc", "_id" : "978-4844398981", "_score" : 1.0, "_source" : { "isbn" : "978-4844398981", "title" : "Elasticsearch NEXT STEP", "price" : "1980" } } ] } }
Pandasを使う
続いて、Pandasを使う場合。
今回のお題だと、標準のCSVモジュールでも十分なのですが、今後のことを考えるとPandasも扱えるようになっていた方がいいのかなと。
PandasでCSVファイルを読み込むには、read_csv関数を使用します。
作成したスクリプトは、こちら。
load_csv_pandas.py
from elasticsearch import Elasticsearch from elasticsearch import helpers import pandas as pd csv_file = "books.csv" #csv_file = "books_without_header.csv" elasticsearch_host = "192.168.33.13" index_name = "book" es = Elasticsearch([f"http://{elasticsearch_host}:9200"]) df = pd.read_csv(csv_file, encoding = "utf-8") #df = pd.read_csv(csv_file, encoding = "utf-8", header = None, names = ["isbn", "title", "price"]) # without Header ### simple for row in df.iterrows(): # DataFrame to (index, Series) pair doc = row[1].to_dict() # Series to dictionary isbn = doc["isbn"] es.index(index = index_name, id = isbn, body = doc) ### buik """ def doc_generator(index_name, df): docs = [row[1].to_dict() for row in df.iterrows()] for doc in docs: yield { "_index": index_name, "_id": doc["isbn"], "_source": doc } helpers.bulk(es, doc_generator(index_name, df)) """
標準ライブラリを使った時と同様、このまま動かすと、CSVファイルはヘッダーありを前提にして、読み込んだデータを1件1件登録
するようになります。
CSVファイルを読み込んでいる箇所は、こちら。
df = pd.read_csv(csv_file, encoding = "utf-8") #df = pd.read_csv(csv_file, encoding = "utf-8", header = None, names = ["isbn", "title", "price"]) # without Header
コメントアウトしている箇所は、CSVファイルの1行目がヘッダーでない場合は明示的にカラム名を与えてあげればOKです。
次に、DataFrameをどうやってElasticsearchに登録するかですが、
for row in df.iterrows(): # DataFrame to (index, Series) pair doc = row[1].to_dict() # Series to dictionary isbn = doc["isbn"] es.index(index = index_name, id = isbn, body = doc)
まずDataFrame#iterrowsで、インデックスとSeriesのペアから成るイテレーターに変換します。
次に、Series#to_dictで、Seriesを辞書に変換します。
for row in df.iterrows(): # DataFrame to (index, Series) pair doc = row[1].to_dict() # Series to dictionary
あとは、Elasticsearchに登録すればOKです。
es.index(index = index_name, id = isbn, body = doc)
バルクAPIを使う場合。
### buik def doc_generator(index_name, df): docs = [row[1].to_dict() for row in df.iterrows()] for doc in docs: yield { "_index": index_name, "_id": doc["isbn"], "_source": doc } helpers.bulk(es, doc_generator(index_name, df))
使うAPIは、ここまでにすべて登場しているので、説明は割愛。
実行結果についても、標準ライブラリを使った場合と同じになります。
$ python3 load_csv_pandas.py
なので、Elasticsearchに登録されたデータの確認結果は割愛します。
とりあえず、こんな感じで書けていければまずはいいのではないでしょうか。