CLOVER🍀

That was when it all began.

ElasticsearchにCSVファイルの内容をロードする(CSVモジュール & Pandas)

これは、なにをしたくて書いたもの?

Elasticsearchに、CSVファイルで用意されたデータをロードする、簡単なスクリプトを書いてみようかなぁと。

こういうのは、バルクローダー的なものでやったり、なにかツールを使ったりすることが多いように思いますが、とりあえず
さくっとやるのなら、簡単なスクリプトで済ませるのもありなのかなぁと。

まあ、勉強がてらに。言語はPythonを使用します。

お題

以下のバリエーションでやります。

  • ヘッダーありのCSVファイル
  • ヘッダーなしのCSVファイル

内容は、書籍で。

また、CSVファイルの読み込み方法は、Pythonの標準モジュールとPandasでやりたいと思います。CSVファイルの内容を、
辞書形式にしてElasticsearchに登録するようにします。この時、idはisbnを使うようにしましょう。

というわけで、主に使うのはこのあたりの情報ですね。

14.1. csv --- CSV ファイルの読み書き — Python 3.6.10 ドキュメント

Python Data Analysis Library — pandas: Python Data Analysis Library

Python Elasticsearch Client — Elasticsearch 7.0.0 documentation

GitHub - elastic/elasticsearch-py: Official Python low-level client for Elasticsearch.

環境

今回の環境は、こちら。

$ uname -srvmpio
Linux 4.15.0-72-generic #81-Ubuntu SMP Tue Nov 26 12:20:02 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux


$ python3 -V
Python 3.6.9

ElasticsearchないしPandasを使用するので、両方インストール。標準ライブラリのCSVモジュールを使う場合は、Pandasは不要です。

$ pip3 install pandas elasticsearch

バージョン。

$ pip3 freeze
elasticsearch==7.1.0
numpy==1.18.0
pandas==0.25.3
pkg-resources==0.0.0
python-dateutil==2.8.1
pytz==2019.3
six==1.13.0
urllib3==1.25.7

Elasticsearch。

$ java --version
openjdk 11.0.5 2019-10-15
OpenJDK Runtime Environment (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04)
OpenJDK 64-Bit Server VM (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04, mixed mode, sharing)


$ curl localhost:9200
{
  "name" : "ubuntu1804.localdomain",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "UBHfURUYRZW_SJOnTXfXwA",
  "version" : {
    "number" : "7.5.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "3ae9ac9a93c95bd0cdc054951cf95d88e1e18d96",
    "build_date" : "2019-12-16T22:57:37.835892Z",
    "build_snapshot" : false,
    "lucene_version" : "8.3.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

Elasticsearchが動作しているサーバーのIPアドレスは、192.168.33.13とします。

データ

用意したデータは、こちら。

ヘッダーありのCSV。
books.csv

isbn,title,price
978-4295003915,Elasticsearch実践ガイド,3080
978-4774192185,データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化],3278
978-4844378341,Elastic Stackで作るBI環境 Ver.7.4対応改訂版,2200
978-4844398981,Elasticsearch NEXT STEP,1980

ヘッダーなしのCSV。
books_without_header.csv

978-4295003915,Elasticsearch実践ガイド,3080
978-4774192185,データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化],3278
978-4844378341,Elastic Stackで作るBI環境 Ver.7.4対応改訂版,2200
978-4844398981,Elasticsearch NEXT STEP,1980

Elasticsearchで使うAPI

ElasticsearchへのアクセスはPythonクライアントを使うわけですが、単純にドキュメントを登録するケースと、バルクAPIを
使うケースの両方を試してみたいと思います。

このあたりですね。

Elasticsearch / index

Hepers / bulk

Elasticsearch / bulk

CSVモジュールを使う

では、最初にPython標準ライブラリに入っている、CSVモジュールを使います。

ヘッダーがあることを考えると、DictReaderを使うのがよいのでしょう。

class csv.DictReader

DictReaderを使うと、CSVファイルの内容を辞書形式(正確にはOrderedDict)で読み込むことができます。

作成したプログラムは、こんな感じ。 load_csv.py

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from csv import DictReader

csv_file = "books.csv"
#csv_file = "books_without_header.csv"
elasticsearch_host = "192.168.33.13"
index_name = "book"

es = Elasticsearch([f"http://{elasticsearch_host}:9200"])


### simple
with open(csv_file, "rt", encoding = "utf-8") as file:
    reader = DictReader(file)
    #reader = DictReader(file, fieldnames = ["isbn", "title", "price"])  # without Header

    for row in reader:
        doc = row
        isbn = doc["isbn"]

        es.index(index = index_name, id = isbn, body = doc)


### buik
"""
def doc_generator(index_name, csv_file):
    with open(csv_file, "rt", encoding = "utf-8") as file:
        reader = DictReader(file)
        #reader = DictReader(file, fieldnames = ["isbn", "title", "price"])  # without Header

        for row in reader:
            doc = row
            isbn = doc["isbn"]
    
            yield {
                "_index": index_name,
                "_id": doc["isbn"],
                "_source": doc
            }

helpers.bulk(es, doc_generator(index_name, csv_file))
"""

このまま動かすと、CSVファイルはヘッダーありを前提にして、読み込んだデータを1件1件登録するようになります。

要するに、これが動きます、と。

### simple
with open(csv_file, "rt", encoding = "utf-8") as file:
    reader = DictReader(file)
    #reader = DictReader(file, fieldnames = ["isbn", "title", "price"])  # without Header

    for row in reader:
        doc = row
        isbn = doc["isbn"]

        es.index(index = index_name, id = isbn, body = doc)

読み込んだ辞書をそのままbodyに渡し、idにはisbnを指定します。

        es.index(index = index_name, id = isbn, body = doc)

読み込ませるファイルを変えて、ヘッダーなしにした場合は

csv_file = "books_without_header.csv"

DictReaderの作成方法を変え、fieldnamesでカラム名を与える必要があります。

    reader = DictReader(file, fieldnames = ["isbn", "title", "price"])  # without Header

バルクAPIを使う場合は、こちら。最初のソースコードでは、コメントアウトしてあります。

### buik
def doc_generator(index_name, csv_file):
    with open(csv_file, "rt", encoding = "utf-8") as file:
        reader = DictReader(file)
        #reader = DictReader(file, fieldnames = ["isbn", "title", "price"])  # without Header

        for row in reader:
            doc = row
            isbn = doc["isbn"]
    
            yield {
                "_index": index_name,
                "_id": doc["isbn"],
                "_source": doc
            }

helpers.bulk(es, doc_generator(index_name, csv_file))

ElasticsearchのPythonクライアントでバルクAPIを使う場合は、heplers#bulkを使うのが良さそうです。

Hepers / bulk

この関数には、イテレーターやジェネレーターを渡すこともできるので、今回はジェネレーターを渡すようにしました。

これらを実行して

$ python3 load_csv.py

Elasticsearchに取り込んだ結果は、こちら。

$ curl localhost:9200/book/_search?pretty
{
  "took" : 132,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "book",
        "_type" : "_doc",
        "_id" : "978-4295003915",
        "_score" : 1.0,
        "_source" : {
          "isbn" : "978-4295003915",
          "title" : "Elasticsearch実践ガイド",
          "price" : "3080"
        }
      },
      {
        "_index" : "book",
        "_type" : "_doc",
        "_id" : "978-4774192185",
        "_score" : 1.0,
        "_source" : {
          "isbn" : "978-4774192185",
          "title" : "データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]",
          "price" : "3278"
        }
      },
      {
        "_index" : "book",
        "_type" : "_doc",
        "_id" : "978-4844378341",
        "_score" : 1.0,
        "_source" : {
          "isbn" : "978-4844378341",
          "title" : "Elastic Stackで作るBI環境 Ver.7.4対応改訂版",
          "price" : "2200"
        }
      },
      {
        "_index" : "book",
        "_type" : "_doc",
        "_id" : "978-4844398981",
        "_score" : 1.0,
        "_source" : {
          "isbn" : "978-4844398981",
          "title" : "Elasticsearch NEXT STEP",
          "price" : "1980"
        }
      }
    ]
  }
}

Pandasを使う

続いて、Pandasを使う場合。

今回のお題だと、標準のCSVモジュールでも十分なのですが、今後のことを考えるとPandasも扱えるようになっていた方がいいのかなと。

PandasでCSVファイルを読み込むには、read_csv関数を使用します。

pandas.read_csv

作成したスクリプトは、こちら。
load_csv_pandas.py

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pandas as pd

csv_file = "books.csv"
#csv_file = "books_without_header.csv"
elasticsearch_host = "192.168.33.13"
index_name = "book"

es = Elasticsearch([f"http://{elasticsearch_host}:9200"])

df = pd.read_csv(csv_file, encoding = "utf-8")
#df = pd.read_csv(csv_file, encoding = "utf-8", header = None, names = ["isbn", "title", "price"])  # without Header


### simple
for row in df.iterrows():  # DataFrame to (index, Series) pair
    doc = row[1].to_dict()  # Series to dictionary
    isbn = doc["isbn"]

    es.index(index = index_name, id = isbn, body = doc)


### buik
"""
def doc_generator(index_name, df):
    docs = [row[1].to_dict() for row in df.iterrows()]
    for doc in docs:
        yield {
            "_index": index_name,
            "_id": doc["isbn"],
            "_source": doc
        }

helpers.bulk(es, doc_generator(index_name, df))
"""

標準ライブラリを使った時と同様、このまま動かすと、CSVファイルはヘッダーありを前提にして、読み込んだデータを1件1件登録
するようになります。

CSVファイルを読み込んでいる箇所は、こちら。

df = pd.read_csv(csv_file, encoding = "utf-8")
#df = pd.read_csv(csv_file, encoding = "utf-8", header = None, names = ["isbn", "title", "price"])  # without Header

コメントアウトしている箇所は、CSVファイルの1行目がヘッダーでない場合は明示的にカラム名を与えてあげればOKです。

次に、DataFrameをどうやってElasticsearchに登録するかですが、

for row in df.iterrows():  # DataFrame to (index, Series) pair
    doc = row[1].to_dict()  # Series to dictionary
    isbn = doc["isbn"]

    es.index(index = index_name, id = isbn, body = doc)

まずDataFrame#iterrowsで、インデックスとSeriesのペアから成るイテレーターに変換します。

pandas.DataFrame.iterrows

次に、Series#to_dictで、Seriesを辞書に変換します。

pandas.Series.to_dict

for row in df.iterrows():  # DataFrame to (index, Series) pair
    doc = row[1].to_dict()  # Series to dictionary

あとは、Elasticsearchに登録すればOKです。

    es.index(index = index_name, id = isbn, body = doc)

バルクAPIを使う場合。

### buik
def doc_generator(index_name, df):
    docs = [row[1].to_dict() for row in df.iterrows()]
    for doc in docs:
        yield {
            "_index": index_name,
            "_id": doc["isbn"],
            "_source": doc
        }

helpers.bulk(es, doc_generator(index_name, df))

使うAPIは、ここまでにすべて登場しているので、説明は割愛。

実行結果についても、標準ライブラリを使った場合と同じになります。

$ python3 load_csv_pandas.py

なので、Elasticsearchに登録されたデータの確認結果は割愛します。

とりあえず、こんな感じで書けていければまずはいいのではないでしょうか。