CLOVER🍀

That was when it all began.

Jest+Groovyで、ElasticsearchにBulk Load

ちょっとタイトルと中身が微妙に合わないのですが…。
※読むとなんとなくわかるかもしれませんが、別にこの用途で必ずしもBulkである必要はなかったり…

例えば、こういうJSONファイルをElasticsearchに一括ロードしたいと思いまして。
data.json

[
  {
    "isbn": "978-4774161631",
    "title": "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン",
    "price": 3600,
    "publish_date": "20131129",
    "author": ["大谷 純", "阿部 慎一朗", "大須賀 稔", "北野 太郎", "鈴木 教嗣", "平賀 一昭", "株式会社リクルートテクノロジーズ"],
    "tag": ["Java", "Lucene", "Solr", "全文検索"]
  },
  {
    "isbn": "978-4048662024",
    "title": "高速スケーラブル検索エンジン ElasticSearch Server",
    "price": 2800,
    "publish_date": "20140321",
    "author": ["Rafal Kuc", "Marek Rogozinski", "大岩 達也", "大谷 純", "兼山 元太", "水戸 祐介", "守谷 純之介", "株式会社リクルートテクノロジーズ"],
    "tag": ["Java", "Lucene", "Elasticsearch", "全文検索"]
  },
  {
    "isbn": "978-4774127804",
    "title": "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築",
    "price": 3200,
    "publish_date": "20060517",
    "author": [" 関口 宏司"],
    "tag": ["Java", "Lucene", "全文検索"]
  },
  {
    "isbn": "978-4774127804",
    "title": "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava",
    "price": 3200,
    "publish_date": "20060517",
    "author": ["Antonio Goncalves", "日本オラクル株式会社", "株式会社プロシステムエルオーシー"],
    "tag": ["Java", "Java EE"]
  },
  {
    "isbn": "978-4777518654",
    "title": "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発",
    "price": 2500,
    "publish_date": "20141101",
    "author": ["槇 俊明"],
    "tag": ["Java", "Spring", "Spring Boot"]
  },
  {
    "isbn": "978-4798121963",
    "title": "エリック・エヴァンスのドメイン駆動設計",
    "price": 5200,
    "publish_date": "20110409",
    "author": ["エリック・エヴァンス", "今関 剛", "和智 右桂", "牧野 祐子"],
    "tag": ["DDD"]
  },
  {
    "isbn": "978-4798131610",
    "title": "実践ドメイン駆動設計",
    "price": 5200,
    "publish_date": "20150317",
    "author": ["ヴァーン・ヴァーノン", "高木 正弘"],
    "tag": ["DDD"]
  }
]

Elasticsearchだと、こういうケースはBulk APIを使うらしいのですが
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

Bulk APIの場合は「JSONは、ひとつの命令につき1行!」となっているようなので

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

先ほどのフォーマットだと使えないので、ちょっと辛い感じです。Solrではこのまま取り込めたのに。

そんなことを言っている方も、過去にいらっしゃったようです。
elasticsearchのBulk APIでは改行してはならない - sora_sakakiのブログ

Bulk APIで定義しているJSONの形式が、パフォーマンス上でポイントだということはわかりました。

あと、stream2esというものもあるみたいですが、こちらも1行1ドキュメントっぽいのでパス。
GitHub - elastic/stream2es: Stream data into ES (Wikipedia, Twitter, stdin, or other ESes)

ですけどね、自分は今回、「とりあえず最初に提示したフォーマットで一括ロードできればよい」という観点でのみ考えています。このブログでElasticsearchを使ってちょっと遊びたいだけなので、それほど速度なりにこだわりはありません。

というわけで、JSONファイルを入力とするスクリプトを書くことにしました。Groovyで。

公式クライアントとして、Groovyのものもあるみたいなのですが
Groovy API [2.2] | Elastic

NodeClientのラッパーみたいですし、クラスタ名まで書きたくないので、今回は試してみる目的も含めてJestを利用することにしました。

GitHub - searchbox-io/Jest: Elasticsearch Java Rest Client.

で、作ったのがこちら。
bulk-insert.groovy

@Grab('io.searchbox:jest:2.0.0')
import io.searchbox.client.JestClientFactory
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.core.Bulk
import io.searchbox.core.Index

import com.google.gson.Gson

def uri = args[0]
def matcher = (uri =~ /(https?:\/\/[^\/]+\/)([^\/]+)\/([^\/]+)\/?.*/)

def elasticsearchUri = matcher[0][1]
def index = matcher[0][2]
def type = matcher[0][3]
def jsonFile = args[1]
def idField
if (args.length > 2) {
  idField = args[2]
}

def gson = new Gson()
def sources = gson.fromJson(new File(jsonFile).getText('UTF-8'), ArrayList)

def clientFactory = new JestClientFactory()
clientFactory.httpClientConfig =
  new HttpClientConfig.Builder(elasticsearchUri)
  .multiThreaded(true)
  .build()

def client = clientFactory.object

def bulkBuilder =
  new Bulk.Builder()
  .defaultIndex(index)
  .defaultType(type)

sources.each { source ->
  def indexBuilder = new Index.Builder(source)
  if (idField) {
    indexBuilder.id(source[idField])
  }

  bulkBuilder.addAction(indexBuilder.build())
}

client.execute(bulkBuilder.build())

client.shutdownClient()

Jestの依存関係に入っているGsonを使ってJSONファイルをパースして、あとはBulkに放り込みます、と。メモリに展開してしまう系なので、とりあえず動作確認などの用途で簡単に使えればいい、くらいしか考えていません。

使い方としては、typeまでを含めたURLとJSONファイルのパスを渡します。

$ groovy bulk-insert.groovy http://localhost:9200/myindex/mytype data.json

スクリプト内で、URLを接続先(スキーム〜ポート)とインデックス名、タイプ名に分解します。

また、第3引数にJSONに書いたドキュメントの属性名を指定すると、それをidとして使用するようになります。以下の例だと「isbn」の値をidとして使います。

$ groovy bulk-insert.groovy http://localhost:9200/myindex/mytype data.json isbn

コード上で指定しているのは、ここですね。

sources.each { source ->
  def indexBuilder = new Index.Builder(source)
  if (idField) {
    indexBuilder.id(source[idField])
  }

完全に個人的な用途ですが、自分はこれでデータ登録とかしていこうかなと思います。